异步在RuntimeError中:没有正在运行的事件循环

我正在编写多进程代码,该代码可在Python 3.7中完美运行。但是,我希望并行进程之一执行IO进程时,要使用AsyncIO i来获得收益,以便获得更好的性能,但还无法使其运行。

Ubuntu 18.04,Python 3.7,AsyncIO,pipenv(已安装所有pip库)

该方法使用多线程可以按预期运行,这是我想用AsyncIO替换的方法。

我已经搜索并尝试了main()函数中的循环,现在仅在预期的例程中进行了循环,看过示例并了解了这种新的异步方法,到目前为止,没有结果。

以下是应执行的app.py代码: python app.py

import sys
import traceback
import logging
import asyncio

from config import DEBUG
from config import log_config
from <some-module> import <some-class>

if DEBUG:
    logging.config.dictConfig(log_config())
else:
    logging.basicConfig(
        level=logging.DEBUG,format='%(relativeCreated)6d %(threadName)s %(message)s')
logger = logging.getLogger(__name__)


def main():
    try:
        <some> = <some-class>([
            'some-data1.csv','some-data2.csv'
            ])
        <some>.run()

    except:

        traceback.print_exc()
        pdb.post_mortem()

    sys.exit(0)


if __name__ == '__main__':

    asyncio.run(main())

这是定义给定类的代码

    _sql_client = SQLServer()
    _blob_client = BlockBlobStore()
    _keys = KeyVault()
    _data_source = _keys.fetch('some-data')
    #  Multiprocessing
    _manager = mp.Manager()
    _ns = _manager.Namespace()

    def __init__(self,list_of_collateral_files: list) -> None:

    @timeit
    def _get_filter_collateral(self,ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _get_hours(self,ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _load_original_bids(self,ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _merge_bids_with_hours(self,ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _get_collaterial_per_month(self,ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _calc_bid_per_path(self) -> None:

    @timeit
    def run(self) -> None:

包含异步代码的方法在这里:

    def _get_filter_collateral(self,ns: mp.managers.NamespaceProxy) -> None:

        all_files = self._blob_client.download_blobs(self._list_of_blob_files)

        _all_dfs = pd.DataFrame()
        async def read_task(file_: str) -> None:
            nonlocal _all_dfs
            df = pd.read_csv(StringIO(file_.content))
            _all_dfs = _all_dfs.append(df,sort=False)

        tasks = []
        loop = asyncio.new_event_loop()

        for file_ in all_files:
            tasks.append(asyncio.create_task(read_task(file_)))

        loop.run_until_complete(asyncio.wait(tasks))
        loop.close()

        _all_dfs['TOU'] = _all_dfs['TOU'].map(lambda x: 'OFFPEAK' if x == 'OFF' else 'ONPEAK')
        ns.dfs = _all_dfs

调用特定序列的方法和此异步方法是:

    def run(self) -> None:
        extract = []
        extract.append(mp.Process(target=self._get_filter_collateral,args=(self._ns,)))
        extract.append(mp.Process(target=self._get_hours,)))
        extract.append(mp.Process(target=self._load_original_bids,)))

        #  Start the parallel processes
        for process in extract:
            process.start()

        #  Await for database process to end
        extract[1].join()
        extract[2].join()

        #  Merge both database results
        self._merge_bids_with_hours(self._ns)

        extract[0].join()

        self._get_collaterial_per_month(self._ns)
        self._calc_bid_per_path()
        self._save_reports()
        self._upload_data()

这些是我得到的错误:

Process Process-2:
Traceback (most recent call last):
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py",line 297,in _bootstrap
    self.run()
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py",line 99,in run
    self._target(*self._args,**self._kwargs)
  File "<some-path>/src/azure/application/utils/lib.py",line 10,in timed
    result = method(*args,**kwargs)
  File "<some-path>/src/azure/application/caiso/main.py",line 104,in _get_filter_collateral
    tasks.append(asyncio.create_task(read_task(file_)))
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/asyncio/tasks.py",line 350,in create_task
    loop = events.get_running_loop()
RuntimeError: no running event loop
<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py:313: RuntimeWarning: coroutine '<some-class>._get_filter_collateral.<locals>.read_task' was never awaited
  traceback.print_exc()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
DEBUG Calculating monthly collateral...
Traceback (most recent call last):
  File "app.py",line 25,in main
    caiso.run()
  File "<some-path>/src/azure/application/utils/lib.py",line 425,in run
    self._get_collaterial_per_month(self._ns)
  File "<some-path>/src/azure/application/utils/lib.py",line 196,in _get_collaterial_per_month
    credit_margin = ns.dfs
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py",line 1122,in __getattr__
    return callmethod('__getattribute__',(key,))
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py",line 834,in _callmethod
    raise convert_to_error(kind,result)
AttributeError: 'Namespace' object has no attribute 'dfs'
> <some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py(834)_callmethod()
-> raise convert_to_error(kind,result)
(Pdb)

yingq0072 回答:异步在RuntimeError中:没有正在运行的事件循环

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3134718.html

大家都在问