原文出自:Asyncio Synchronization Primitives Tutorial - Queues and Locks


在本教程中,我们将查看Asyncio编程冒险中可用的各种同步原语。我们将简要介绍为什么这些同步原语很重要,以及在简单的基于Asyncio的程序中使用它们的各种方法。


为什么这些很重要?

在编写并发系统时,您必须尝试确保您的程序没有称为 Race Condition 的小东西。当多个并发工作者同时尝试修改共享变量,数组等时,会发生竞争条件,并且由于时序问题,它们产生错误的结果。

由于存在这些竞争条件,我们必须利用称为同步原语的东西。当谈到Asyncio中的同步原语时,我们有一个数字可供选择。这些都基于线程模块等价物,并且往往具有与我们一起使用的相同API。

Locks

描述锁如何工作的最好类比是想象有一群人试图进入浴室。一个人进去并锁上门,这样做可以防止他人在你正在洗澡时进来。

在计算术语中我们锁定某些东西的时候,我们基本上阻止了其他人操作修改我们锁定的资源。

一个简单的locks示例

在这个例子中,我们将创建一个 asyncio.Lock() 实例,我们将尝试使用 with await lock 获取此锁。一旦我们的 Worker 获得了这个锁,我们就会执行我们的关键代码部分,然后继续释放我们刚刚获得的锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import time


async def my_worker(lock):
    print("试图获得lock")
    # 获得lock
    with await lock:
        # 运行代码的关键部分
        print("目前已锁定")
        time.sleep(2)
    print("解锁的关键部分")


async def main():
    # 实例化一个锁
    lock = asyncio.Lock()
    # 等待执行2个Worker协程,每个协程都传入相同的锁实例
    await asyncio.wait([my_worker(lock), my_worker(lock)])

# 启动一个简单的循环并运行我们的main函数直到它完成
lock = asyncio.Lock()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("所有任务都完成了")
loop.close()
输出:

当我们运行这个时,你应该看到我们只有一个同时运行的Works能够一次运行.

Queues

在以同步方式进行通信时,asyncio提供了自己的基于队列的实现. 我们可以让生产者以同步的方式将事物推送到我们的队列中,并让消费者同时轮询这个队列以获取推送到它上面的任何东西。

简单的示例:

在这个例子中,我们将创建一个 newsProducer() 协程和一个 newsConsumer() 协程。

newsProducer() 协程 将会推送新的新闻项目到我们的同步队列,newsConsumer() 协程将尝试检索已被推入到所述队列的任何项目,然后在获得消息是打印出来。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
import random


async def news_producer(my_queue):
    while True:
        await asyncio.sleep(1)
        print("将新闻项目放入队列")
        await my_queue.put(random.randint(1,5))


async def news_consumer(_id, my_queue):
    print(my_queue)
    while True:
        print("消费者:{} 试图从队列中获取".format(_id))
        item = await my_queue.get()
        if item is None:
            # 没有生产者发送的数据是,表面她已经完成了
            break
        print("消费者:{} 使用ID为 {} 的文章".format(_id, item))


loop = asyncio.get_event_loop()
my_queue = asyncio.Queue(loop=loop, maxsize=10)
try:
    loop.run_until_complete(asyncio.gather(news_producer(my_queue),
                                           news_consumer(1, my_queue), news_consumer(2, my_queue)))
except KeyboardInterrupt:
    pass
finally:
    loop.close()
输出

当我们尝试运行它时,您应该看到我们的生产者将项目推送到我们的队列中,然后我们的消费者彼此竞争以便将任何内容取出队列。

结论

如果您发现本教程有用或需要进一步的帮助,请在下面的评论部分告诉我们!