来自asyncio docs:
asyncio.as_completed(aws,*,loop=None,timeout=None)
Run awaitable objects in the aws set concurrently. Return an iterator
of Future objects. Each Future object returned represents the earliest
result from the set of the remaining awaitables.
我假设这些Future对象中的每一个都具有@L_404_1@中描述的方法:.cancelled(),. exception()和.result().但似乎所产生的元素只是协程,而不是Future对象.我错过了什么?
这似乎打败了.as_completed()的描述.如果我需要等待它,协程如何“完成”?
>>> import asyncio
>>> import aiohttp
>>>
>>> async def get(session,url):
... async with session.request('GET',url) as resp:
... t = await resp.text()
... return t
...
>>> async def bulk_as_completed(urls):
... async with aiohttp.ClientSession() as session:
... aws = [get(session,url) for url in urls]
... for future in asyncio.as_completed(aws):
... for i in ('cancelled','exception','result'):
... print(hasattr(future,i))
... print(type(future))
... try:
... result = await future
... except:
... pass
... else:
... print(type(result))
... print()
...
>>>
>>> urls = (
... 'https://docs.python.org/3/library/asyncio-task.html',... 'https://docs.python.org/3/library/select.html',... 'https://docs.python.org/3/library/this-page-will-404.html',... )
>>>
>>> asyncio.run(bulk_as_completed(urls))
False
False
False
最终,我关心这个的原因是因为我想让异常像asyncio.gather(…,return_exceptions = True)那样冒泡.考虑在调用session.request()时添加一个伪造的URL:
urls = (
'https://docs.python.org/3/library/asyncio-task.html','https://docs.python.org/3/library/select.html','https://docs.python.org/3/library/this-page-will-404.html',# This URL will raise on session.request(). How can I propagate
# that exception to the iterator of results?
'https://asdfasdfasdf-does-not-exist-asdfasdfasdf.com'
)
我希望能够做的是这样的事情(使用Future对象的方法,但这些都不是Future对象,这就是问题):
async def bulk_as_completed(urls):
async with aiohttp.ClientSession() as session:
aws = [get(session,url) for url in urls]
for future in asyncio.as_completed(aws):
if future.cancelled():
res = futures.CancelledError()
else:
exc = future.exception()
if exc is not None:
res = exc
else:
res = future.result()
# ...
# [Do something with `res`]
What I would like to be able to do is something like this […]
async def bulk_as_completed(urls):
async with aiohttp.ClientSession() as session:
aws = [get(session,url) for url in urls]
for future in asyncio.as_completed(aws):
try:
res = await future
except Exception as e:
res = e
# ...
# [Do something with `res`]
This [yielding coroutines rather than futures] seems to defeat the description of
.as_completed()
. How is the coroutine “completed” if I need to await it?
不是.首次实现asyncio.as_completed时,异步迭代器不存在.没有异步迭代就没有办法在它们完成时返回期货,所以as_completed类型通过屈服(立即)虚拟等待来伪造它,人们必须等待获得实际结果.
即使as_completed产生了实际的期货,也不会对你的用例有所帮助,因为如果没有等待他们的人,这些期货就无法完成.为了提供as_completed让步完成的期货的预期语义,as_completed需要实现异步迭代,其等价的__next__可以等待.
as_completed的惊人行为之前已经提出,我已经通过提供异步迭代提交了an issue来修复它.一旦实现,您的原始代码将只用于更改为async for.