如何确保一个特定任务或其余任务可以一次使用asyncio运行?

我有一个处理事件的系统。有时可能会发生配置更改,这意味着在重新配置系统时必须停止处理事件。例如(简化和抽象):

class System:
    # this will be running multiple times concurrently
    async def process_events(self,event):
        # this line must not be run while `reconfigure_event_processing` is running
        handler = await lookup_event_handler(self.handler_lookup_config,event)

        await handler.handle_event(event)

    async def reconfigure_event_processing(self,config):
        # this code must wait until nothing is using `lookup_event_processing_configuration`
        self.handler_lookup_config = config

asyncio synchronisation primitives的文档对我来说很混乱,所以我想知道其中的哪一个(如果有的话)可以解决此问题。

iCMS 回答:如何确保一个特定任务或其余任务可以一次使用asyncio运行?

process_events的多个实例并行运行的情况下,如何处理与reconfigure_event_processing的同步取决于您未在问题中显示的代码外观:即,如何生成{ {1}}个任务?

这是我整理的一个简短示例,它使用process_events对象来协调事物。这实际上是一个可运行的示例:

asyncio.Condition

这里的逻辑本质上是:

  1. import asyncio import random from itertools import count class System(object): def __init__(self): self.condition = asyncio.Condition() self.flag = False async def process_events(self,event): print('start process event',event) await asyncio.sleep(random.randint(1,5)) print('end process event',event) async def reconfigure_event_processing(self,config): self.flag = True async with self.condition: await self.condition.wait() print('start reconfigure') await asyncio.sleep(2) print('end reconfigure') self.flag = False self.condition.notify() async def mainloop(self): tasks = set() for i in count(): if self.flag: print('wait for tasks to complete') done,pending = await asyncio.wait(tasks) tasks = pending print('done waiting for tasks') async with self.condition: self.condition.notify() await self.condition.wait() tasks.add(asyncio.create_task(self.process_events(i))) if len(tasks) >= 4: done,pending = await asyncio.wait( tasks,return_when=asyncio.FIRST_COMPLETED) tasks = pending async def reconfigure(self): while True: await asyncio.sleep(random.randint(1,5)) await self.reconfigure_event_processing('foo') if __name__ == '__main__': system = System() # run mainloop() and reconfigure() concurrently forever asyncio.run(asyncio.gather([system.mainloop(),system.reconfigure()])) 在需要运行时引发一个标志,然后等待reconfigure_event_processing
  2. 当主循环看到该标志时,它将等待所有正在运行的任务完成,然后通知该状况,从而导致self.condition运行。
  3. 同时,主循环本身会等待条件。
  4. 完成reconfigure_event_processing后,它会通知条件并取消设置标志。

如果运行此命令,您将看到(a)所有正在运行的任务都已完成,直到看到reconfigure_event_processing输出,并且(b)在看到start reconfigure输出之前不再启动任何任务

,

这就是我想出的,它基于@larsks的非常有用的答案(可能会帮助您更好)-继续投票吧!

import asyncio
from contextlib import asynccontextmanager


class MultiLock:
    """
    Some kind of "infinite semaphore" thing for asyncio. Not thread-safe.
    Ensures that either:
    - any number of coroutines can have a Partial lock
    OR
    - one coroutine can have the Total lock
    (or neither)

    By acquiring a Total lock,incoming requests to use a Partial lock must wait.
    Requests that were already in the middle of using a Partial lock must finish before
    the Total lock can be acquired.

    To acquire a Partial lock,you wait until the Total lock is released.

    You must not attempt to acquire a Total lock while a Partial lock is held in the
    same coroutine,or vice versa,as this will cause a deadlock.

    For the scenario outlined in https://stackoverflow.com/q/63657086,this can be used
    like::

        class System:
            def __init__(self):
                self.multilock = MultiLock()

            # this will be running multiple times concurrently
            async def process_events(self,event):
                # this line must not be run while `reconfigure_event_processing` is running
                async with multilock.partial_lock():
                    handler = await lookup_event_handler(self.handler_lookup_config,event)

                await handler.handle_event(event)

            async def reconfigure_event_processing(self,config):
                # this code must wait until nothing is using `lookup_event_handler`
                async with multilock.total_lock():
                    self.handler_lookup_config = config
    """

    def __init__(self):
        # represents "can I start doing something with a Partial lock?"
        self._event = asyncio.Event()
        # start with "yes"
        self._event.set()

        # used to wake up a Total lock to tell it to proceed
        self._event2 = asyncio.Event()

        # used to check if the final partial lock has been released
        self._counter = 0

        # ensures we atomically manage the counter (maybe unnecessary? I don't fully understand asyncio enough to be sure)
        self._counter_lock = asyncio.Lock()

    @asynccontextmanager
    async def partial_lock(self):
        # wait until the total lock is released
        await self._event.wait()

        # atomic increment
        async with self._counter_lock:
            self._counter += 1

        try:
            # serve the lock
            yield
        finally:
            # atomic decrement
            async with self._counter_lock:
                self._counter -= 1
                if self._counter == 0:
                    # tell a Total lock
                    self._event2.set()

    @asynccontextmanager
    async def total_lock(self):
        # tell new partial locks to wait until we've finished
        self._event.clear()

        # wait for a partial lock to tell us it was the last one
        await self._event2.wait()
        self._event2.clear()

        try:
            # serve the lock
            yield
        finally:
            # tell partial locks to proceed
            self._event.set()
本文链接:https://www.f2er.com/1655086.html

大家都在问