原文出自:Asyncio Tasks Tutorial

Note: 作者在源代码中使用的time.sleep 正确应该使用asyncio.sleep


在本教程中,我们将介绍Asyncio中的Tasks。我们将在之前的 Asynci事件循环教程 之上构建。


Tasks

Asyncio中的Tasks负责在事件循环中执行协同程序。这些 Tasks 一次只能在一个事件循环中运行,为了实现并行执行,您必须在多个线程上运行多个事件循环。

I like to think of tasks within asyncio in a similar regard to how we’d think of tasks when used in conjunction with executors or pools like we’ve demonstrated in previous chapters.

在本节中,我们将介绍一些关键函数,我们可以使用这些函数来处理基于asyncio的程序中的任务

一个简单的例子

关于Asyncio中Task的关键注意事项之一是您不直接创建它们,而是使用ensure_future函数或AbstractEventLoop.create_task() 方法. 让我们快速浏览一下如何使用任务生成器函数来生成5个不同的任务,以便我们的事件循环进行处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import asyncio
import time


async def my_task():
    asyncio.sleep(1)
    print("Task 进程")


async def my_task_generator():
    for i in range(5):
        asyncio.ensure_future(my_task())

loop = asyncio.get_event_loop()
loop.run_until_complete(my_task_generator())
print("所有任务已完成")
loop.close()

现在让我们看一下如何使用 all_tasks() 方法检索所有任务。

all_tasks(loop=None)方法

能够确定当前正在处理的任务对于生产中需要能够预测工作量等事物的系统来说非常重要。

all_taks 方法让我们深入了解在事件循环执行之前哪些任务当前处于挂起状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
import time


async def my_task():
    asyncio.sleep(1)
    print("Task 进程")


async def my_task_generator():
    for i in range(5):
        asyncio.ensure_future(my_task())

    pending = asyncio.Task.all_tasks()
    print(pending)

loop = asyncio.get_event_loop()
loop.run_until_complete(my_task_generator())
print("所有任务已完成")
loop.close()

我们打印了一组5个不同的任务,您可以看到它们都处于挂起状态。

cancel() 函数

在您对正在执行的任务数量进行速率限制的情况下,或者您尝试正常关闭应用程序时,能够有效取消挂起的任务非常有用。值得庆幸的是,asyncio API提供了相对容易完成的必要函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import time


async def my_task():
    asyncio.sleep(1)
    print("Task 进程")
    for task in asyncio.Task.all_tasks():
        print(task)
        task.cancel()
        print(task)


async def my_task_generator():
    for i in range(5):
        asyncio.ensure_future(my_task())


loop = asyncio.get_event_loop()
loop.run_until_complete(my_task_generator())
print("所有任务已完成")
loop.close()

Noet:


请注意,除了 main() 任务之外的所有任务在我们调用 task.cancel() 后都会从挂起到取消。


Task 函数

因此,我们看看我们如何与各个任务进行交互,但现在让我们退后一步,看看我们如何作为一个集体与他们互动。

as_completed() 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio


async def my_worker(number):
    return number * 2


async def main(coros):
    for fs in asyncio.as_completed(coros):
        print(await fs)


coros = [my_worker(1) for i in range(5)]

try:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(coros))
except KeyboardInterrupt:
    pass
finally:
    loop.close()

gather() 函数

gather() 函数返回一个单独的future,它汇总了传递给它的给定协程的所有结果。您应该注意,结果不会按照提交的顺序返回,因此如果您关心顺序,那么您将必须实施一些管理功能来重新排序结果。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import asyncio


async def my_worker():
    print("Hello World")


async def main():
    print("My Main")

try:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*[my_worker() for i in range(5)]))
except KeyboardInterrupt:
    pass
finally:
    loop.close()

wait() 函数

wait() 函数只是阻塞,直到传递给它的Future实例完成,完成后,它将返回一个命名的2元组集: 第一组包含已完成的,第二组包含未完成的。这在您必须在给定时间内处理任务的情况下非常有用,例如,您正在进行大量REST API调用或从代理上的队列中提取消息,如果它们未能在给定的超时内完成,则可能尝试以不同的方式处理它们。