在process_events
的多个实例并行运行的情况下,如何处理与reconfigure_event_processing
的同步取决于您未在问题中显示的代码外观:即,如何生成{ {1}}个任务?
这是我整理的一个简短示例,它使用process_events
对象来协调事物。这实际上是一个可运行的示例:
asyncio.Condition
这里的逻辑本质上是:
-
import asyncio
import random
from itertools import count
class System(object):
def __init__(self):
self.condition = asyncio.Condition()
self.flag = False
async def process_events(self,event):
print('start process event',event)
await asyncio.sleep(random.randint(1,5))
print('end process event',event)
async def reconfigure_event_processing(self,config):
self.flag = True
async with self.condition:
await self.condition.wait()
print('start reconfigure')
await asyncio.sleep(2)
print('end reconfigure')
self.flag = False
self.condition.notify()
async def mainloop(self):
tasks = set()
for i in count():
if self.flag:
print('wait for tasks to complete')
done,pending = await asyncio.wait(tasks)
tasks = pending
print('done waiting for tasks')
async with self.condition:
self.condition.notify()
await self.condition.wait()
tasks.add(asyncio.create_task(self.process_events(i)))
if len(tasks) >= 4:
done,pending = await asyncio.wait(
tasks,return_when=asyncio.FIRST_COMPLETED)
tasks = pending
async def reconfigure(self):
while True:
await asyncio.sleep(random.randint(1,5))
await self.reconfigure_event_processing('foo')
if __name__ == '__main__':
system = System()
# run mainloop() and reconfigure() concurrently forever
asyncio.run(asyncio.gather([system.mainloop(),system.reconfigure()]))
在需要运行时引发一个标志,然后等待reconfigure_event_processing
。
- 当主循环看到该标志时,它将等待所有正在运行的任务完成,然后通知该状况,从而导致
self.condition
运行。
- 同时,主循环本身会等待条件。
- 完成
reconfigure_event_processing
后,它会通知条件并取消设置标志。
如果运行此命令,您将看到(a)所有正在运行的任务都已完成,直到看到reconfigure_event_processing
输出,并且(b)在看到start reconfigure
输出之前不再启动任何任务
,
这就是我想出的,它基于@larsks的非常有用的答案(可能会帮助您更好)-继续投票吧!
import asyncio
from contextlib import asynccontextmanager
class MultiLock:
"""
Some kind of "infinite semaphore" thing for asyncio. Not thread-safe.
Ensures that either:
- any number of coroutines can have a Partial lock
OR
- one coroutine can have the Total lock
(or neither)
By acquiring a Total lock,incoming requests to use a Partial lock must wait.
Requests that were already in the middle of using a Partial lock must finish before
the Total lock can be acquired.
To acquire a Partial lock,you wait until the Total lock is released.
You must not attempt to acquire a Total lock while a Partial lock is held in the
same coroutine,or vice versa,as this will cause a deadlock.
For the scenario outlined in https://stackoverflow.com/q/63657086,this can be used
like::
class System:
def __init__(self):
self.multilock = MultiLock()
# this will be running multiple times concurrently
async def process_events(self,event):
# this line must not be run while `reconfigure_event_processing` is running
async with multilock.partial_lock():
handler = await lookup_event_handler(self.handler_lookup_config,event)
await handler.handle_event(event)
async def reconfigure_event_processing(self,config):
# this code must wait until nothing is using `lookup_event_handler`
async with multilock.total_lock():
self.handler_lookup_config = config
"""
def __init__(self):
# represents "can I start doing something with a Partial lock?"
self._event = asyncio.Event()
# start with "yes"
self._event.set()
# used to wake up a Total lock to tell it to proceed
self._event2 = asyncio.Event()
# used to check if the final partial lock has been released
self._counter = 0
# ensures we atomically manage the counter (maybe unnecessary? I don't fully understand asyncio enough to be sure)
self._counter_lock = asyncio.Lock()
@asynccontextmanager
async def partial_lock(self):
# wait until the total lock is released
await self._event.wait()
# atomic increment
async with self._counter_lock:
self._counter += 1
try:
# serve the lock
yield
finally:
# atomic decrement
async with self._counter_lock:
self._counter -= 1
if self._counter == 0:
# tell a Total lock
self._event2.set()
@asynccontextmanager
async def total_lock(self):
# tell new partial locks to wait until we've finished
self._event.clear()
# wait for a partial lock to tell us it was the last one
await self._event2.wait()
self._event2.clear()
try:
# serve the lock
yield
finally:
# tell partial locks to proceed
self._event.set()
本文链接:https://www.f2er.com/1655086.html