asyncio异步IO--同步原语

asyncio同步原语与线程(threading)模块同步原语基本类似,但有两点重要区别:

  • asyncio同步原语非线程安全,因此不应被用作系统线程同步(可以使用threading代替);
  • asyncio同步原语不允许使用timeout参数;可以使用asyncio.wait_for()方法执行有超时设置的操作。 asyncio有以下5个基本的同步原语:
  • Lock
  • Event
  • Condition
  • Semaphore
  • BoundedSemaphore

Lock

  • class asyncio.Lock(*,loop=None)

    • 为asyncio任务提供一个互斥锁。非线程安全。
    • asyncio锁可以用来保证对共享资源的独占访问。
    • asyncio锁的首选用法是同async with语句一起使用:

      1
      2
      3
      4
      lock = asyncio.Lock()
      # ... later
      async with lock:
      # 访问共享资源

      此代码段和以下代码是等价的:

      1
      2
      3
      4
      5
      6
      7
      8
      lock = asyncio.Lock()

      # ... later
      await lock.acquire()
      try:
      # 访问共享资源
      finally:
      lock.release()
    • coroutine acquire()

      • 获取asyncio同步锁。
      • 该方法等待的状态变为unlocked,之后设置其为locked,并返回True
    • release()
      • 释放asyncio同步锁。
      • 如果的状态是locked,则将其重置为unlocked并返回。
      • 如果的状态是unlocked,会引发RuntimeError异常。
    • locked()
      • 如果的状态是locked,则返回True

Event

  • class asyncio.Event(*,loop=None)

    • 事件对象,非线程安全。
    • 用于向asyncio任务通知某些事件已发生。
    • 事件对象用于管理内部标志。此标志可以通过set()方法设置为True,或通过clear()方法复位为Falsewait()方法在该标志设置为True前一直保持阻塞。初始状态下,该标志为False
    • 例如:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      async def waiter(event):
      print('waiting for it ...')
      await event.wait()
      print('... got it!')

      async def main():
      # Create an Event object.
      event = asyncio.Event()

      # Spawn a Task to wait until 'event' is set.
      waiter_task = asyncio.create_task(waiter(event))

      # Sleep for 1 second and set the event.
      await asyncio.sleep(1)
      event.set()

      # Wait until the waiter task is finished.
      await waiter_task

      asyncio.run(main())
    • coroutine wait()

      • 等待事件内部标志被设置为True
      • 如果事件的内部内部标志已设置,则立即返回True。否则,一直阻塞,直到另外的任务调用set()
    • set()
      • 设置事件内部标志True
      • 所有等待事件的任务将会立即被触发。
    • clear()
      • 清除事件内部标志(即设置为False)。
      • 等待事件的任务将会阻塞,直到set()方法被再次调用。
    • is_set()
      • 如果事件内部标志被设置为True,则返回True

Condition

  • class asyncio.Condition(lock=None,*,loop=None)

    • 条件对象,非线程安全。
    • 异步条件原语用于在某些事件发生后,获得共享资源的独占访问权限。
    • 本质上,条件对象结合了事件和锁的功能。可以让多个Condition对象共享一个Lock,这允许在对共享资源的特定状态感兴趣的不同任务之间协调对共享资源的独占访问。
    • 可选参数lock必须为Lock对象或None。如果为None,会自动创建一个Lock对象。
    • 使用条件对象的首选方法是async with方式:

      1
      2
      3
      4
      5
      cond = asyncio.Condition()

      # ... later
      async with cond:
      await cond.wait()

      等价于:

      1
      2
      3
      4
      5
      6
      7
      8
      cond = asyncio.Condition()

      # ... later
      await lock.acquire()
      try:
      await cond.wait()
      finally:
      lock.release()
    • coroutine acquire()

      • 获取底层锁。
      • 该方法一直等待,直到底层锁处于未锁定状态,然后设置其为锁定状态,并且返回True
    • notify(n=1)
      • 唤醒至多n个等待条件的任务。如果没有正在等待的任务,则该方法无操作。
      • 在调用该方法之前,必须先调用acquire()获取锁,并在调用该方法之后释放锁。如果在锁为锁定的情况下调用此方法,会引发RuntimeError异常。
    • locked()
      • 如果底层锁已获取,则返回True
    • notify_all()
      • 唤醒所有正在等待该条件的任务。
      • 该方法与notify()类似,区别只在它会唤醒所有正在等待的任务。
      • 在调用该方法前,必须首先获取底层锁,并在执行完该方法后释放锁。如果在底层锁未锁定的情况下执行该方法,会引发RuntimeError异常。
    • release()
      • 释放底层锁。
      • 在未锁定的锁上调用时,会引发RuntimeError异常。
    • coroutine wait()
      • 等待通知。
      • 如果调用此方法的任务没有获取到锁,则引发RuntimeError异常。
      • 此方法释放底层锁,然后保持阻塞,直至被notify()notify_all()唤醒。被唤醒之后,条件对象重新申请锁,该方法返回True
    • coroutine wait_for(predicate)
      • 等待predicate变为True
      • predicate必须可调用,它的执行结果会被解释为布尔值,并作为最终结果返回。

Semaphore

  • class asyncio.Semaphore(value=1,*,loop=None)

    • 信号量(Semaphore)对象。非线程安全。
    • 信号量用于管理一个内部计数器,此计数器逢acquire()递减,逢release()递增。计数器的值不能小于0,如果acquire()被调用时计数器为0,则阻塞,直到某一任务调用release()
    • value为可选参数,用于设定内部计数器的初始值。如果给定的值小于0,则引发ValueError异常。
    • 使用信号量的最佳方法是async with声明:

      1
      2
      3
      4
      5
      sem = asyncio.Semaphore(10)

      # ... later
      async with sem:
      # work with shared resource

      等价于:

      1
      2
      3
      4
      5
      6
      7
      8
      sem = asyncio.Semaphore(10)

      # ... later
      await sem.acquire()
      try:
      # work with shared resource
      finally:
      sem.release()
  • coroutine acquire()

    • 申请一个信号量
    • 如果内部计数器的值大于0,则减1并立即返回True。如果内部计数器的值为0,则等待release()被调用,然后返回True
  • locked()
    • 如果信号量不能被立即申请,则返回True
  • release()
    • 释放信号量,内部计数器加1。
    • BoundedSemaphore不同,Semaphore允许release()的调用次数大于acquire()的调用次数。

BoundedSemaphore

  • class asyncio.BoundedSemaphore(value=1,*,loop=None)
    • 有界信号量,非线程安全。
    • 有界信号量是一种特殊的信号量——如果release()后内部计数器的值大于初始值,则引发ValueError异常。

从python3.7开始:通过await lockyield from lock或通过声明with await lockwith(yield from lock)获取锁的用法被废弃。可使用async with lock代替。