本节代码片段需导入以下模块:
from qgis.core import (
Qgis ,
QgsApplication ,
QgsMessageLog ,
QgsProcessingAlgRunnerTask ,
QgsProcessingContext ,
QgsProcessingFeedback ,
QgsProject ,
QgsTask ,
QgsTaskManager ,
)
15 任务 - 在后台做繁重的工作
15.1 引言
使用线程的后台处理,是在进行繁重处理时保持用户界面响应的一种方式。任务可用于在QGIS中实现线程。
任务(QgsTask
)是在后台执行代码的容器,任务管理(QgsTaskManager
)用于控制任务的运行。这些类通过提供信号传递机制、进度报告和后台进程状态访问机制,简化了QGIS中的后台处理。可以使用子任务对任务进行分组。
全局任务管理器(QgsApplication.taskManager()
)通常被使用。这意味着你的任务可能不是由任务管理器控制的唯一任务。
有几种方法可以创建QGIS任务:
class SpecialisedTask ( QgsTask ):
pass
def heavyFunction ():
# 一些CPU密集型处理 ...
pass
def workdone ():
# ... 使用结果做一些有用的事情
pass
task = QgsTask . fromFunction ( 'heavy function' , heavyFunction ,
on_finished = workdone )
params = dict ()
context = QgsProcessingContext ()
context . setProject ( QgsProject . instance ())
feedback = QgsProcessingFeedback ()
buffer_alg = QgsApplication . instance () . processingRegistry () . algorithmById ( 'native:buffer' )
task = QgsProcessingAlgRunnerTask ( buffer_alg , params , context , feedback )
Warning
任何后台任务(无论如何创建)决不能使用任何主线程上的QObject,比如访问QgsVectorLayer, QgsProject或者执行任何GUI操作——比如创建新的部件或者与现有部件交互。只能从主线程访问或修改Qt控件。在任务启动之前,必须复制任务中使用的数据。试图从后台线程使用它们将导致崩溃。
此外,请始终确保并 context 和 feedback 至少与使用它们的任务一样长。如果在完成任务后,QGSTaskManager无法访问计划任务的上下文和反馈,则QGIS将崩溃。
Info
在调用 QgsProcessingContext
后不久调用 setProject() 是一种常见的模式。这允许任务及其回调函数使用大多数项目范围的设置。这在回调函数中使用空间图层时特别有价值。
可以使用QgsTask
中的addSubTask()
函数来描述任务之间的依赖关系。当声明依赖关系时,任务管理器将自动确定如何执行这些依赖关系。只要有可能,依赖项将并行执行,以便尽快满足它们。如果取消了一个任务所依赖的任务,则相关任务也将被取消。循环依赖可能造成死锁,所以要小心。
如果任务依赖于可用的图层,则可以使用QgsTask
中的setDependentLayers
函数来声明。如果任务所依赖的图层不可用,则该任务将被取消。
创建任务后,可以使用任务管理器的addTask()
函数调度任务运行。向管理器添加任务会自动将该任务的所有权转移给管理员,管理员将在执行完后清理和删除任务。任务的调度受任务优先级的影响,任务优先级在addTask()
中设置。
任务的状态可以使用QgsTask
、QgsTaskManager
的信号和函数进行监控。
15.2 示例
15.2.1 扩展QgsTask
在此示例中,RandomIntegerSumTask
扩展了QgsTask
,它将在指定的时间段内生成0到500之间的100个随机整数。如果随机数为42,则中止任务并引发异常。RandomIntegerSumTask
(带子任务)生成了几个实例并将其添加到任务管理器,展示两种类型的依赖项:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119 import random
from time import sleep
from qgis.core import (
QgsApplication , QgsTask , QgsMessageLog , Qgis
)
MESSAGE_CATEGORY = 'RandomIntegerSumTask'
class RandomIntegerSumTask ( QgsTask ):
"""展示如何子类化QgsTask"""
def __init__ ( self , description , duration ):
super () . __init__ ( description , QgsTask . CanCancel )
self . duration = duration
self . total = 0
self . iterations = 0
self . exception = None
def run ( self ):
"""在这里你要实现你的任务。
应该定期测试isCanceled(),以便优雅地终止。
此方法必须返回True或False。
引发异常将使QGIS崩溃,因此我们在内部处理这些异常,并在self.finished中抛出。
"""
QgsMessageLog . logMessage ( 'Started task " {} "' . format (
self . description ()),
MESSAGE_CATEGORY , Qgis . Info )
wait_time = self . duration / 100
for i in range ( 100 ):
sleep ( wait_time )
# 使用setProgress报告进度
self . setProgress ( i )
arandominteger = random . randint ( 0 , 500 )
self . total += arandominteger
self . iterations += 1
# 检查isCanceled()处理取消
if self . isCanceled ():
return False
# 模拟异常情况
if arandominteger == 42 :
# 不要raise Exception('bad value!'),否则将使QGIS崩溃
self . exception = Exception ( 'bad value!' )
return False
return True
def finished ( self , result ):
"""
当任务完成(无论成功与否)时,这个函数会被自动调用。
你可以通过实现 finished() 来执行任务完成后的后续事情。
finished总是从主线程中调用的,因此在这里进行GUI操作和引发 Python 异常是安全的。
result是self.run的返回值。
"""
if result :
QgsMessageLog . logMessage (
'Task " {name} " completed \n ' \
'Total: {total} (with {iterations} ' \
'iterations)' . format (
name = self . description (),
total = self . total ,
iterations = self . iterations ),
MESSAGE_CATEGORY , Qgis . Success )
else :
if self . exception is None :
QgsMessageLog . logMessage (
'Task " {name} " not successful but without ' \
'exception (probably the task was manually ' \
'canceled by the user)' . format (
name = self . description ()),
MESSAGE_CATEGORY , Qgis . Warning )
else :
QgsMessageLog . logMessage (
'Task " {name} " Exception: {exception} ' . format (
name = self . description (),
exception = self . exception ),
MESSAGE_CATEGORY , Qgis . Critical )
raise self . exception
def cancel ( self ):
QgsMessageLog . logMessage (
'Task " {name} " was canceled' . format (
name = self . description ()),
MESSAGE_CATEGORY , Qgis . Info )
super () . cancel ()
longtask = RandomIntegerSumTask ( 'waste cpu long' , 20 )
shorttask = RandomIntegerSumTask ( 'waste cpu short' , 10 )
minitask = RandomIntegerSumTask ( 'waste cpu mini' , 5 )
shortsubtask = RandomIntegerSumTask ( 'waste cpu subtask short' , 5 )
longsubtask = RandomIntegerSumTask ( 'waste cpu subtask long' , 10 )
shortestsubtask = RandomIntegerSumTask ( 'waste cpu subtask shortest' , 4 )
# 添加子任务(shortsubtask)到shorttask——必须在minitask和longtask完成后执行
shorttask . addSubTask ( shortsubtask , [ minitask , longtask ])
# 添加子任务(longsubtask)到longtask——必须父级任务之前运行
longtask . addSubTask ( longsubtask , [], QgsTask . ParentDependsOnSubTask )
# 添加子任务(shortestsubtask)到longtask
longtask . addSubTask ( shortestsubtask )
QgsApplication . taskManager () . addTask ( longtask )
QgsApplication . taskManager () . addTask ( shorttask )
QgsApplication . taskManager () . addTask ( minitask )
# RandomIntegerSumTask(0): Started task "waste cpu subtask shortest"
# RandomIntegerSumTask(0): Started task "waste cpu short"
# RandomIntegerSumTask(0): Started task "waste cpu mini"
# RandomIntegerSumTask(0): Started task "waste cpu subtask long"
# RandomIntegerSumTask(3): Task "waste cpu subtask shortest" completed
# RandomTotal: 25452 (with 100 iterations)
# RandomIntegerSumTask(3): Task "waste cpu mini" completed
# RandomTotal: 23810 (with 100 iterations)
# RandomIntegerSumTask(3): Task "waste cpu subtask long" completed
# RandomTotal: 26308 (with 100 iterations)
# RandomIntegerSumTask(0): Started task "waste cpu long"
# RandomIntegerSumTask(3): Task "waste cpu long" completed
# RandomTotal: 22534 (with 100 iterations)
15.2.2 从函数创建任务
从函数创建任务(本示例中的doSomething
)。该函数的第一个参数为QgsTask
。一个重要的参数是on_finished
,它是在任务完成时被调用的函数。示例中的doSomething
函数有另一个参数wait_time
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 import random
from time import sleep
MESSAGE_CATEGORY = 'TaskFromFunction'
def doSomething ( task , wait_time ):
"""
抛出一个异常终止任务
成功则返回结果
结果将和异常一起传递给 (成功则为空)on_finished函数.
如果存在异常,结果为空
"""
QgsMessageLog . logMessage ( 'Started task {} ' . format ( task . description ()),
MESSAGE_CATEGORY , Qgis . Info )
wait_time = wait_time / 100
total = 0
iterations = 0
for i in range ( 100 ):
sleep ( wait_time )
# 使用task.setProgress报告进度
task . setProgress ( i )
arandominteger = random . randint ( 0 , 500 )
total += arandominteger
iterations += 1
# 检查task.isCanceled()处理取消
if task . isCanceled ():
stopped ( task )
return None
# 抛出异常终止任务
if arandominteger == 42 :
raise Exception ( 'bad value!' )
return { 'total' : total , 'iterations' : iterations ,
'task' : task . description ()}
def stopped ( task ):
QgsMessageLog . logMessage (
'Task " {name} " was canceled' . format (
name = task . description ()),
MESSAGE_CATEGORY , Qgis . Info )
def completed ( exception , result = None ):
"""当doSomething完成时呗调佣
如果抛出异常则异常信息不是空
结果是doSomething返回的结果"""
if exception is None :
if result is None :
QgsMessageLog . logMessage (
'Completed with no exception and no result ' \
'(probably manually canceled by the user)' ,
MESSAGE_CATEGORY , Qgis . Warning )
else :
QgsMessageLog . logMessage (
'Task {name} completed \n '
'Total: {total} ( with {iterations} '
'iterations)' . format (
name = result [ 'task' ],
total = result [ 'total' ],
iterations = result [ 'iterations' ]),
MESSAGE_CATEGORY , Qgis . Info )
else :
QgsMessageLog . logMessage ( "Exception: {} " . format ( exception ),
MESSAGE_CATEGORY , Qgis . Critical )
raise exception
# 创建一些任务
task1 = QgsTask . fromFunction ( u 'Waste cpu 1' , doSomething ,
on_finished = completed , wait_time = 4 )
task2 = QgsTask . fromFunction ( u 'Waste cpu 2' , dosomething ,
on_finished = completed , wait_time = 3 )
QgsApplication . taskManager () . addTask ( task1 )
QgsApplication . taskManager () . addTask ( task2 )
# RandomIntegerSumTask(0): Started task "waste cpu subtask short"
# RandomTaskFromFunction(0): Started task Waste cpu 1
# RandomTaskFromFunction(0): Started task Waste cpu 2
# RandomTaskFromFunction(0): Task Waste cpu 2 completed
# RandomTotal: 23263 ( with 100 iterations)
# RandomTaskFromFunction(0): Task Waste cpu 1 completed
# RandomTotal: 25044 ( with 100 iterations)
14.2.3 处理算法任务
创建一个使用算法qgis:randompointsinextent
的任务,在指定范围内生成50000个随机点。结果以安全的方式添加到项目中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 from functools import partial
from qgis.core import ( QgsTaskManager , QgsMessageLog ,
QgsProcessingAlgRunnerTask , QgsApplication ,
QgsProcessingContext , QgsProcessingFeedback ,
QgsProject )
MESSAGE_CATEGORY = 'AlgRunnerTask'
def task_finished ( context , successful , results ):
if not successful :
QgsMessageLog . logMessage ( 'Task finished unsucessfully' ,
MESSAGE_CATEGORY , Qgis . Warning )
output_layer = context . getMapLayer ( results [ 'OUTPUT' ])
# 因为getMapLayer没有移交所有权, 当上下文超出范围时图层将被删除,你会遇到崩溃
# takeMapLayer移交所有权,因此它将安全地添加到QgsProject中,并赋予QgsProject所有权
if output_layer and output_layer . isValid ():
QgsProject . instance () . addMapLayer (
context . takeResultLayer ( output_layer . id ()))
alg = QgsApplication . processingRegistry () . algorithmById (
u 'qgis:randompointsinextent' )
context = QgsProcessingContext ()
feedback = QgsProcessingFeedback ()
params = {
'EXTENT' : '0.0,10.0,40,50 [EPSG:4326]' ,
'MIN_DISTANCE' : 0.0 ,
'POINTS_NUMBER' : 50000 ,
'TARGET_CRS' : 'EPSG:4326' ,
'OUTPUT' : 'memory:My random points'
}
task = QgsProcessingAlgRunnerTask ( alg , params , context , feedback )
task . executed . connect ( partial ( task_finished , context ))
QgsApplication . taskManager () . addTask ( task )
也可以查看博客:https://www.opengis.ch/2018/06/22/threads-in-pyqgis3/