跳转至

本节代码片段需导入以下模块:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from qgis.core import (
    Qgis,
    QgsApplication,
    QgsMessageLog,
    QgsProcessingAlgRunnerTask,
    QgsProcessingContext,
    QgsProcessingFeedback,
    QgsProject,
    QgsTask,
    QgsTaskManager,
)

15 任务 - 在后台做繁重的工作⚓︎

15.1 引言⚓︎

使用线程的后台处理,是在进行繁重处理时保持用户界面响应的一种方式。任务可用于在QGIS中实现线程。

任务(QgsTask)是在后台执行代码的容器,任务管理(QgsTaskManager)用于控制任务的运行。这些类通过提供信号传递机制、进度报告和后台进程状态访问机制,简化了QGIS中的后台处理。可以使用子任务对任务进行分组。

全局任务管理器(QgsApplication.taskManager())通常被使用。这意味着你的任务可能不是由任务管理器控制的唯一任务。

有几种方法可以创建QGIS任务:

  • 通过扩展QgsTask创建自己的任务
1
2
class SpecialisedTask(QgsTask):
    pass
  • 从函数创建任务
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def heavyFunction():
    # 一些CPU密集型处理 ...
    pass

def workdone():
    # ... 使用结果做一些有用的事情
    pass

task = QgsTask.fromFunction('heavy function', heavyFunction,
                     on_finished=workdone)
  • 从处理算法创建任务
1
2
3
4
5
6
7
params = dict()
context = QgsProcessingContext()
context.setProject(QgsProject.instance())
feedback = QgsProcessingFeedback()

buffer_alg = QgsApplication.instance().processingRegistry().algorithmById('native:buffer')
task = QgsProcessingAlgRunnerTask(buffer_alg, params, context,feedback)

Warning

任何后台任务(无论如何创建)决不能使用任何主线程上的QObject,比如访问QgsVectorLayer, QgsProject或者执行任何GUI操作——比如创建新的部件或者与现有部件交互。只能从主线程访问或修改Qt控件。在任务启动之前,必须复制任务中使用的数据。试图从后台线程使用它们将导致崩溃。

此外,请始终确保并 contextfeedback 至少与使用它们的任务一样长。如果在完成任务后,QGSTaskManager无法访问计划任务的上下文和反馈,则QGIS将崩溃。

Info

在调用 QgsProcessingContext 后不久调用 setProject() 是一种常见的模式。这允许任务及其回调函数使用大多数项目范围的设置。这在回调函数中使用空间图层时特别有价值。

可以使用QgsTask中的addSubTask() 函数来描述任务之间的依赖关系。当声明依赖关系时,任务管理器将自动确定如何执行这些依赖关系。只要有可能,依赖项将并行执行,以便尽快满足它们。如果取消了一个任务所依赖的任务,则相关任务也将被取消。循环依赖可能造成死锁,所以要小心。

如果任务依赖于可用的图层,则可以使用QgsTask中的setDependentLayers 函数来声明。如果任务所依赖的图层不可用,则该任务将被取消。

创建任务后,可以使用任务管理器的addTask()函数调度任务运行。向管理器添加任务会自动将该任务的所有权转移给管理员,管理员将在执行完后清理和删除任务。任务的调度受任务优先级的影响,任务优先级在addTask()中设置。

任务的状态可以使用QgsTaskQgsTaskManager的信号和函数进行监控。

15.2 示例⚓︎

15.2.1 扩展QgsTask⚓︎

在此示例中,RandomIntegerSumTask扩展了QgsTask,它将在指定的时间段内生成0到500之间的100个随机整数。如果随机数为42,则中止任务并引发异常。RandomIntegerSumTask(带子任务)生成了几个实例并将其添加到任务管理器,展示两种类型的依赖项:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import random
from time import sleep

from qgis.core import (
    QgsApplication, QgsTask, QgsMessageLog, Qgis
    )

MESSAGE_CATEGORY = 'RandomIntegerSumTask'

class RandomIntegerSumTask(QgsTask):
    """展示如何子类化QgsTask"""
    def __init__(self, description, duration):
        super().__init__(description, QgsTask.CanCancel)
        self.duration = duration
        self.total = 0
        self.iterations = 0
        self.exception = None

    def run(self):
        """在这里你要实现你的任务。
        应该定期测试isCanceled(),以便优雅地终止。
        此方法必须返回True或False。
        引发异常将使QGIS崩溃,因此我们在内部处理这些异常,并在self.finished中抛出。
        """
        QgsMessageLog.logMessage('Started task "{}"'.format(
                                     self.description()),
                                 MESSAGE_CATEGORY, Qgis.Info)
        wait_time = self.duration / 100
        for i in range(100):
            sleep(wait_time)
            # 使用setProgress报告进度
            self.setProgress(i)
            arandominteger = random.randint(0, 500)
            self.total += arandominteger
            self.iterations += 1
            # 检查isCanceled()处理取消
            if self.isCanceled():
                return False
            # 模拟异常情况
            if arandominteger == 42:
                # 不要raise Exception('bad value!'),否则将使QGIS崩溃
                self.exception = Exception('bad value!')
                return False
        return True

    def finished(self, result):
        """
        当任务完成(无论成功与否)时,这个函数会被自动调用。
        你可以通过实现 finished() 来执行任务完成后的后续事情。
        finished总是从主线程中调用的,因此在这里进行GUI操作和引发 Python 异常是安全的。
        result是self.run的返回值。
        """
        if result:
            QgsMessageLog.logMessage(
                'Task "{name}" completed\n' \
                'Total: {total} (with {iterations} '\
              'iterations)'.format(
                  name=self.description(),
                  total=self.total,
                  iterations=self.iterations),
              MESSAGE_CATEGORY, Qgis.Success)
        else:
            if self.exception is None:
                QgsMessageLog.logMessage(
                    'Task "{name}" not successful but without '\
                    'exception (probably the task was manually '\
                    'canceled by the user)'.format(
                        name=self.description()),
                    MESSAGE_CATEGORY, Qgis.Warning)
            else:
                QgsMessageLog.logMessage(
                    'Task "{name}" Exception: {exception}'.format(
                        name=self.description(),
                        exception=self.exception),
                    MESSAGE_CATEGORY, Qgis.Critical)
                raise self.exception

    def cancel(self):
        QgsMessageLog.logMessage(
            'Task "{name}" was canceled'.format(
                name=self.description()),
            MESSAGE_CATEGORY, Qgis.Info)
        super().cancel()


longtask = RandomIntegerSumTask('waste cpu long', 20)
shorttask = RandomIntegerSumTask('waste cpu short', 10)
minitask = RandomIntegerSumTask('waste cpu mini', 5)
shortsubtask = RandomIntegerSumTask('waste cpu subtask short', 5)
longsubtask = RandomIntegerSumTask('waste cpu subtask long', 10)
shortestsubtask = RandomIntegerSumTask('waste cpu subtask shortest', 4)

# 添加子任务(shortsubtask)到shorttask——必须在minitask和longtask完成后执行
shorttask.addSubTask(shortsubtask, [minitask, longtask])
# 添加子任务(longsubtask)到longtask——必须父级任务之前运行
longtask.addSubTask(longsubtask, [], QgsTask.ParentDependsOnSubTask)
# 添加子任务(shortestsubtask)到longtask
longtask.addSubTask(shortestsubtask)

QgsApplication.taskManager().addTask(longtask)
QgsApplication.taskManager().addTask(shorttask)
QgsApplication.taskManager().addTask(minitask)




# RandomIntegerSumTask(0): Started task "waste cpu subtask shortest"
# RandomIntegerSumTask(0): Started task "waste cpu short"
# RandomIntegerSumTask(0): Started task "waste cpu mini"
# RandomIntegerSumTask(0): Started task "waste cpu subtask long"
# RandomIntegerSumTask(3): Task "waste cpu subtask shortest" completed
# RandomTotal: 25452 (with 100 iterations)
# RandomIntegerSumTask(3): Task "waste cpu mini" completed
# RandomTotal: 23810 (with 100 iterations)
# RandomIntegerSumTask(3): Task "waste cpu subtask long" completed
# RandomTotal: 26308 (with 100 iterations)
# RandomIntegerSumTask(0): Started task "waste cpu long"
# RandomIntegerSumTask(3): Task "waste cpu long" completed
# RandomTotal: 22534 (with 100 iterations)

15.2.2 从函数创建任务⚓︎

从函数创建任务(本示例中的doSomething)。该函数的第一个参数为QgsTask 。一个重要的参数是on_finished,它是在任务完成时被调用的函数。示例中的doSomething函数有另一个参数wait_time

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import random
from time import sleep

MESSAGE_CATEGORY = 'TaskFromFunction'

def doSomething(task, wait_time):
    """
    抛出一个异常终止任务
    成功则返回结果
    结果将和异常一起传递给 (成功则为空)on_finished函数.
    如果存在异常,结果为空
    """
    QgsMessageLog.logMessage('Started task {}'.format(task.description()),
                             MESSAGE_CATEGORY, Qgis.Info)
    wait_time = wait_time / 100
    total = 0
    iterations = 0
    for i in range(100):
        sleep(wait_time)
        # 使用task.setProgress报告进度
        task.setProgress(i)
        arandominteger = random.randint(0, 500)
        total += arandominteger
        iterations += 1
        # 检查task.isCanceled()处理取消
        if task.isCanceled():
            stopped(task)
            return None
        # 抛出异常终止任务
        if arandominteger == 42:
            raise Exception('bad value!')
    return {'total': total, 'iterations': iterations,
            'task': task.description()}

def stopped(task):
    QgsMessageLog.logMessage(
        'Task "{name}" was canceled'.format(
            name=task.description()),
        MESSAGE_CATEGORY, Qgis.Info)

def completed(exception, result=None):
    """当doSomething完成时呗调佣
    如果抛出异常则异常信息不是空
    结果是doSomething返回的结果"""
    if exception is None:
        if result is None:
            QgsMessageLog.logMessage(
                'Completed with no exception and no result '\
                '(probably manually canceled by the user)',
                MESSAGE_CATEGORY, Qgis.Warning)
        else:
            QgsMessageLog.logMessage(
                'Task {name} completed\n'
                'Total: {total} ( with {iterations} '
                'iterations)'.format(
                    name=result['task'],
                    total=result['total'],
                    iterations=result['iterations']),
                MESSAGE_CATEGORY, Qgis.Info)
    else:
        QgsMessageLog.logMessage("Exception: {}".format(exception),
                                 MESSAGE_CATEGORY, Qgis.Critical)
        raise exception

# 创建一些任务
task1 = QgsTask.fromFunction(u'Waste cpu 1', doSomething,
                             on_finished=completed, wait_time=4)
task2 = QgsTask.fromFunction(u'Waste cpu 2', dosomething,
                             on_finished=completed, wait_time=3)
QgsApplication.taskManager().addTask(task1)
QgsApplication.taskManager().addTask(task2)


# RandomIntegerSumTask(0): Started task "waste cpu subtask short"
# RandomTaskFromFunction(0): Started task Waste cpu 1
# RandomTaskFromFunction(0): Started task Waste cpu 2
# RandomTaskFromFunction(0): Task Waste cpu 2 completed
# RandomTotal: 23263 ( with 100 iterations)
# RandomTaskFromFunction(0): Task Waste cpu 1 completed
# RandomTotal: 25044 ( with 100 iterations)

14.2.3 处理算法任务⚓︎

创建一个使用算法qgis:randompointsinextent的任务,在指定范围内生成50000个随机点。结果以安全的方式添加到项目中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from functools import partial
from qgis.core import (QgsTaskManager, QgsMessageLog,
                       QgsProcessingAlgRunnerTask, QgsApplication,
                       QgsProcessingContext, QgsProcessingFeedback,
                       QgsProject)

MESSAGE_CATEGORY = 'AlgRunnerTask'

def task_finished(context, successful, results):
    if not successful:
        QgsMessageLog.logMessage('Task finished unsucessfully',
                                 MESSAGE_CATEGORY, Qgis.Warning)
    output_layer = context.getMapLayer(results['OUTPUT'])
    # 因为getMapLayer没有移交所有权, 当上下文超出范围时图层将被删除,你会遇到崩溃
    # takeMapLayer移交所有权,因此它将安全地添加到QgsProject中,并赋予QgsProject所有权
    if output_layer and output_layer.isValid():
        QgsProject.instance().addMapLayer(
             context.takeResultLayer(output_layer.id()))

alg = QgsApplication.processingRegistry().algorithmById(
                                      u'qgis:randompointsinextent')
context = QgsProcessingContext()
feedback = QgsProcessingFeedback()
params = {
    'EXTENT': '0.0,10.0,40,50 [EPSG:4326]',
    'MIN_DISTANCE': 0.0,
    'POINTS_NUMBER': 50000,
    'TARGET_CRS': 'EPSG:4326',
    'OUTPUT': 'memory:My random points'
}
task = QgsProcessingAlgRunnerTask(alg, params, context, feedback)
task.executed.connect(partial(task_finished, context))
QgsApplication.taskManager().addTask(task)

也可以查看博客:https://www.opengis.ch/2018/06/22/threads-in-pyqgis3/