线程内的异步函数

cryptofeed 是一个 Python 库,它使用 asyncio 库来获取不同加密货币交易所的实时价格。 在这个简短的程序中,我们尝试在一个独立线程中运行 cryptofeed FeedHandler。代码示例如下所示:

import functools as fct
from cryptofeed import FeedHandler
from cryptofeed.defines import BID,ASK,L2_BOOK
from cryptofeed.exchanges import Kraken
from datetime import datetime
import threading


async def bookfunc(params,orderbooks,feed,symbol,book,timestamp,receipt_timestamp):

    print(f'Timestamp: {timestamp} Cryptofeed Receipt: {receipt_timestamp} Feed: {feed} Symbol: {symbol}'
          f' Book Bid Size is {len(book[BID])} Ask Size is {len(book[ASK])}')
    orderbooks = filter_orderbook(orderbooks,params['orderbook']['depth'])


def func():

    # Parameters
    params = {'orderbook': {'depth': 2},'price_model':{},'trade_model': {}}
    config = {'log': {'filename': 'logs/demo.log','level': 'INFO'}}

    orderbooks = {}

    f = FeedHandler(config=config)
    f.add_feed(Kraken(checksum_validation=True,subscription={L2_BOOK: ['BTC-USD','ETH-USD','LINK-USD','LTC-USD','ADA-USD']},callbacks={L2_BOOK: fct.partial(bookfunc,params,orderbooks)})) # This way passes the orderbooks inside the callback

    f.run()


if __name__ == '__main__':
    thread = threading.Thread(target=func,args=())
    thread.start()

代码执行时,得到如下错误:

raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'Thread-1'.

知道如何解决这个问题吗?

编辑: 这是stackoverflow中不同问题的解决方案。一个例子是以下问题:

Running cryptofeed (asyncio library) in a new thread

wahaha1021 回答:线程内的异步函数

您必须通过 asyncio 为您的线程手动设置一个事件循环。您可以通过将您的函数包装在一种循环设置器中来实现:

def create_loop(func):
   loop = asyncio.new_event_loop()
   asyncio.set_event_loop(loop)
   loop.run_until_complete(func())
   
if __name__ == "__main__":
    thread = threading.Thread(target=create_loop,args=(func,))
    thread.start()
,

(从 cryptofeed==1.9.2 开始)有两件重要的事情可以让它从一个线程运行:

  1. FeedHandler.run 默认设置信号处理程序,must be done from the main thread。为了避免这种情况,有 install_signal_handlers 方法的参数。
  2. FeedHandler set's uvloop's policy 的初始化器,但是 不调用 uvloop.install()(假设它是应用程序的 责任我猜)。没有它asyncio.set_event_loop 没有效果。或者,您可以设置 'uvloop': False 在馈送处理程序配置中(如下所示),或者只是卸载 uvloop
import asyncio
import threading

from cryptofeed import FeedHandler
from cryptofeed.defines import BID,ASK,L2_BOOK
from cryptofeed.exchanges import Kraken


async def bookfunc(**kwargs):
    print('bookfunc',kwargs)
    

def run_feed_handler_forever():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    
    config = {
        'uvloop': False,'log': {'filename': 'log.log','level': 'DEBUG'},}
    l2_book = ['BTC-USD','ETH-USD','LINK-USD','LTC-USD','ADA-USD']
    feed = Kraken(
        subscription={L2_BOOK: l2_book},callbacks={L2_BOOK: bookfunc}
    )
    fh = FeedHandler(config)
    fh.add_feed(feed)
    fh.run(install_signal_handlers=False)


if __name__ == '__main__':
    thread = threading.Thread(target=run_feed_handler_forever)
    thread.start()
    thread.join()
本文链接:https://www.f2er.com/75598.html

大家都在问