asyncio同步原语与线程(threading)模块同步原语基本类似,但有两点重要区别:
- asyncio同步原语
非线程安全
,因此不应被用作系统线程同步(可以使用threading
代替); - asyncio同步原语不允许使用
timeout
参数;可以使用asyncio.wait_for()
方法执行有超时设置的操作。 asyncio有以下5个基本的同步原语: - Lock
- Event
- Condition
- Semaphore
- BoundedSemaphore
Lock
class asyncio.Lock(*,loop=None)
- 为asyncio任务提供一个
互斥锁
。非线程安全。 - asyncio锁可以用来保证对共享资源的独占访问。
asyncio锁的首选用法是同
async with
语句一起使用:1
2
3
4lock = asyncio.Lock()
# ... later
async with lock:
# 访问共享资源此代码段和以下代码是等价的:
1
2
3
4
5
6
7
8lock = asyncio.Lock()
# ... later
await lock.acquire()
try:
# 访问共享资源
finally:
lock.release()coroutine acquire()
- 获取asyncio同步锁。
- 该方法等待
锁
的状态变为unlocked
,之后设置其为locked
,并返回True
。
- release()
- 释放asyncio同步锁。
- 如果
锁
的状态是locked
,则将其重置为unlocked
并返回。 - 如果
锁
的状态是unlocked
,会引发RuntimeError
异常。
- locked()
- 如果
锁
的状态是locked
,则返回True
。
- 如果
- 为asyncio任务提供一个
Event
class asyncio.Event(*,loop=None)
- 事件对象,非线程安全。
- 用于向asyncio任务通知某些事件已发生。
- 事件对象用于管理
内部标志
。此标志可以通过set()
方法设置为True
,或通过clear()
方法复位为False
。wait()
方法在该标志设置为True
前一直保持阻塞。初始状态下,该标志为False
。 例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20async def waiter(event):
print('waiting for it ...')
await event.wait()
print('... got it!')
async def main():
# Create an Event object.
event = asyncio.Event()
# Spawn a Task to wait until 'event' is set.
waiter_task = asyncio.create_task(waiter(event))
# Sleep for 1 second and set the event.
await asyncio.sleep(1)
event.set()
# Wait until the waiter task is finished.
await waiter_task
asyncio.run(main())coroutine wait()
- 等待事件
内部标志
被设置为True
。 - 如果事件的内部
内部标志
已设置,则立即返回True
。否则,一直阻塞,直到另外的任务调用set()
。
- 等待事件
- set()
- 设置事件
内部标志
为True
。 - 所有等待事件的任务将会立即被触发。
- 设置事件
- clear()
- 清除事件
内部标志
(即设置为False
)。 - 等待事件的任务将会阻塞,直到
set()
方法被再次调用。
- 清除事件
- is_set()
- 如果事件
内部标志
被设置为True
,则返回True
。
- 如果事件
Condition
class asyncio.Condition(lock=None,*,loop=None)
- 条件对象,非线程安全。
- 异步条件原语用于在某些事件发生后,获得共享资源的独占访问权限。
- 本质上,条件对象结合了事件和锁的功能。可以让多个Condition对象共享一个Lock,这允许在对共享资源的特定状态感兴趣的不同任务之间协调对共享资源的独占访问。
- 可选参数
lock
必须为Lock
对象或None
。如果为None
,会自动创建一个Lock
对象。 使用条件对象的首选方法是
async with
方式:1
2
3
4
5cond = asyncio.Condition()
# ... later
async with cond:
await cond.wait()等价于:
1
2
3
4
5
6
7
8cond = asyncio.Condition()
# ... later
await lock.acquire()
try:
await cond.wait()
finally:
lock.release()coroutine acquire()
- 获取底层锁。
- 该方法一直等待,直到底层锁处于未锁定状态,然后设置其为锁定状态,并且返回
True
。
- notify(n=1)
- 唤醒至多
n
个等待条件的任务。如果没有正在等待的任务,则该方法无操作。 - 在调用该方法之前,必须先调用
acquire()
获取锁,并在调用该方法之后释放锁。如果在锁为锁定的情况下调用此方法,会引发RuntimeError
异常。
- 唤醒至多
- locked()
- 如果底层锁已获取,则返回
True
。
- 如果底层锁已获取,则返回
- notify_all()
- 唤醒所有正在等待该条件的任务。
- 该方法与
notify()
类似,区别只在它会唤醒所有正在等待的任务。 - 在调用该方法前,必须首先获取底层锁,并在执行完该方法后释放锁。如果在底层锁未锁定的情况下执行该方法,会引发
RuntimeError
异常。
- release()
- 释放底层锁。
- 在未锁定的锁上调用时,会引发
RuntimeError
异常。
- coroutine wait()
- 等待通知。
- 如果调用此方法的任务没有获取到锁,则引发
RuntimeError
异常。 - 此方法释放底层锁,然后保持阻塞,直至被
notify()
或notify_all()
唤醒。被唤醒之后,条件对象重新申请锁,该方法返回True
。
- coroutine wait_for(predicate)
- 等待
predicate
变为True
。 predicate
必须可调用,它的执行结果会被解释为布尔值,并作为最终结果返回。
- 等待
Semaphore
class asyncio.Semaphore(value=1,*,loop=None)
- 信号量(Semaphore)对象。非线程安全。
- 信号量用于管理一个内部计数器,此计数器逢
acquire()
递减,逢release()
递增。计数器的值不能小于0
,如果acquire()
被调用时计数器为0
,则阻塞,直到某一任务调用release()
。 value
为可选参数,用于设定内部计数器的初始值。如果给定的值小于0,则引发ValueError
异常。使用
信号量
的最佳方法是async with
声明:1
2
3
4
5sem = asyncio.Semaphore(10)
# ... later
async with sem:
# work with shared resource等价于:
1
2
3
4
5
6
7
8sem = asyncio.Semaphore(10)
# ... later
await sem.acquire()
try:
# work with shared resource
finally:
sem.release()
coroutine acquire()
- 申请一个信号量
- 如果内部计数器的值大于0,则减1并立即返回
True
。如果内部计数器的值为0,则等待release()
被调用,然后返回True
。
- locked()
- 如果信号量不能被立即申请,则返回
True
。
- 如果信号量不能被立即申请,则返回
- release()
- 释放信号量,内部计数器加1。
- 与
BoundedSemaphore
不同,Semaphore
允许release()
的调用次数大于acquire()
的调用次数。
BoundedSemaphore
- class asyncio.BoundedSemaphore(value=1,*,loop=None)
- 有界信号量,非线程安全。
- 有界信号量是一种特殊的信号量——如果
release()
后内部计数器的值大于初始值,则引发ValueError
异常。
从python3.7开始:通过await lock
、yield from lock
或通过声明with await lock
、with(yield from lock)
获取锁的用法被废弃。可使用async with lock
代替。