如何在基于消息的调用中在Python中使用异步/等待

在NodeJS开发人员多年之后,我决定尝试一下Python。到目前为止还算不错,但是我碰到了一堵墙,我真的很想帮忙。

我正在研究一个使用MQTT与远程计算机通信的库。在该库上调用功能时,将发布一条消息,供在该远程计算机上进行处理。处理完成后,它会在总线上发布一条新消息,供我的库接听,并将结果返回给调用代码(调用库函数的代码)。

在Javascript中,这是通过返回具有resolvereject函数的Promise来完成的,该Promise可存储在库中,直到远程消息通过代理返回并带有结果(截获库中其他地方的其他函数),此时,我可以简单地调用先前存储的“解析”函数,以将控制权返回给调用代码(调用我的库的异步函数的代码)。只需使用async关键字即可调用此库函数。

现在在Python中,异步/等待不使用可以方便地存储起来以供以后使用的resolvereject函数,因此我认为逻辑必须以不同的方式实现。使用简单的回调函数而不是异步/等待工作流是可行的,但由于每个结果处理回调都是单独的函数,因此在为类似的来回通信依次调用多次时会带来不便。

这是Javascript的基本示例(仅供参考):

let TASKS = {};

....

mqttClient.on('message',(topic,message) => {
    if (topic == "RESULT_OK/123") {
        TASKS["123"].resolve(message);
    } else if (topic == "RESULT_KO/123") {
        TASKS["123"].reject(message);
    }
});

...

let myAsynclibraryFunction = (someCommand) => {
    return new Promise((res,rej) => {
        TASKS["123"] = {
            resolve: res,reject: rej
        };
        mqttClient.publish("REQUEST/123",someCommand);
    });
}

要调用它,我只需要做:

try{
    let response1 = await myAsynclibraryFunction("do this");
    let response2 = await myAsynclibraryFunction("now do that");
    ...
} catch(e) {
    ...
}

NodeJS是一种基于事件循环的语言,这就是为什么它非常适用于这些类型的用例。但是,这种类型的应用程序逻辑在处理基于消息的不同后端时很常见,因此我相信也有很好的方法可以在Python中解决此问题。

这是我正在研究的测试Python代码段,尝试使用future对象来实现类似的目的:

import paho.mqtt.client as mqtt
import asyncio
import threading

# Init a new asyncio event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# define a global future placeholder
_future = None

# Create MQTT client
mqttClient = mqtt.Client()

# MQTT Event - on connect
def on_connect(client,userdata,flags,rc):
    print("Connected")
    client.subscribe("YAVA/#")

    # We start a new thread to test our workflow.
    #
    # If I had done this on the current thread,then the MQTT event loop 
    # would get stuck (does not process incomming and outgoint messages anymore) when 
    # calling "await" on the future object later on.
    taskThread = threading.Thread(target=_simulateclient,args=())
    taskThread.start()

# MQTT Event - on incomming message
def on_message(client,msg):

    global _future
    if msg.topic.startswith("YAVA/API/TASK_DONE/") == True:
        payload = str(msg.payload.decode("utf-8","ignore"))
        # Resolve the future object
        _future.set_result(payload)

mqttClient.on_connect = on_connect
mqttClient.on_message = on_message




# Use asyncio to call a async function and test the workflow
def _simulateclient():
    asyncio.run(performAsyncTask())

# This async function will ask for a task to be performed on a remote machine,# and wait for the response to be sent back
async def performAsyncTask():
    result = await pubAndWhaitForResponse("YAVA/API/TASK_START","")
    print(result)

# perform the actual MQTT async logic
async def pubAndWhaitForResponse(topic,message):
    # Create a future object that can be resolved in the MQTT event "on_message"
    global _future
    _future = asyncio.get_running_loop().create_future()

    # Publish message that will start the task execution remotely somewhere
    global mqttClient
    mqttClient.publish(topic,message)

    # Now block the thread until the future get's resolved
    result = await _future

    # Return the result
    return result


# Start the broker and loop^forever in the main thread
mqttClient.connect("192.168.1.70",1883,60)
# The MQTT library will start a new thread that will continuously 
# process outgoing and incoming messages through that separate thread.
# The main thread will be blocked so that the program does not exit
mqttClient.loop_forever()

一切正常,但是_future.set_result(payload)行似乎并不能解决未来。我从来没有看到结果打印出来。

感觉没有多少东西可以得到这种排序了。任何建议都会很棒。

谢谢

zas135421 回答:如何在基于消息的调用中在Python中使用异步/等待

我首先看到的是,您在YAVA/API/TASK_START回调中检查该主题是否为YAVA/API/TASK_DONE/时,在on_message主题中发布。因此,您的_future永远不会得到结果,await _future永远不会返回...

我建议您添加日志。在代码的开头添加以下行:

import logging

logging.basicConfig(level=logging.DEBUG,format="%(asctime)s ## %(thread)d ## %(funcName)s ## %(message)s")

然后使用logging.info(...)来跟踪您的执行顺序。 我在您的代码中添加了一些内容(以及更改了on_message中的条件),这是输出。

2019-11-19 11:37:10,440 ## 140178907485888 ## __init__ ## Using selector: EpollSelector
2019-11-19 11:37:10,478 ## 140178907485888 ## on_connect ## Connected
2019-11-19 11:37:10,478 ## 140178887976704 ## _simulateClient ## Enter simulate client
2019-11-19 11:37:10,479 ## 140178887976704 ## __init__ ## Using selector: EpollSelector
2019-11-19 11:37:10,480 ## 140178887976704 ## performAsyncTask ## Perform async task
2019-11-19 11:37:10,480 ## 140178887976704 ## pubAndWhaitForResponse ## Pub and wait
2019-11-19 11:37:10,481 ## 140178887976704 ## pubAndWhaitForResponse ## Publish
2019-11-19 11:37:10,481 ## 140178887976704 ## pubAndWhaitForResponse ## Await future: <Future pending created at /usr/lib/python3.7/asyncio/base_events.py:391>
2019-11-19 11:37:10,499 ## 140178907485888 ## on_message ## New message
2019-11-19 11:37:10,499 ## 140178907485888 ## on_message ## Topic: YAVA/API/TASK_DONE
2019-11-19 11:37:10,499 ## 140178907485888 ## on_message ## Filling future: <Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7df0f5fd10>()] created at /usr/lib/python3.7/asyncio/base_events.py:391>

我还在_future.set_result(payload)行之后添加了一个日志,但是它从未出现。因此set_result似乎挂起或类似的东西……

您可能必须在其中进行挖掘,以了解其原因/悬挂位置。


编辑

顺便说一句,您混合了许多概念:asyncio,线程和mqtt(具有自己的循环)。 此外,asyncio.Future也不是线程安全的,我认为像您一样使用它是危险的。使用调试器时,进入set_result方法内部时,我在mqtt客户端类中遇到异常:

Non-thread-safe operation invoked on an event loop other than the current one

它从未在stdout / stderr上报告过,但是您可以在客户端的on_log回调中捕获它。


编辑2

这是代码的更多Pythonic示例。在此脚本中,set_result不会挂起(显示刚刚显示的日志),但是它是await中的main

import asyncio
import time

import paho.mqtt.client as mqtt
import logging

logging.basicConfig(level=logging.DEBUG,format="%(asctime)s ## %(thread)d ## %(funcName)s ## %(message)s")


def remote_on_connect(client,*_):
    logging.info("Connected")
    client.subscribe("YAVA/API/TASK_START")

def remote_on_message(client,_,_1):
    logging.info("Remotely processing your data")
    time.sleep(1)
    logging.info("Publishing result")
    client.publish("YAVA/API/TASK_DONE",42)


class Lib:
    def __init__(self):
        self.client = mqtt.Client()
        self.client.on_connect = Lib.on_connect
        self.client.on_log = lambda x: logging.info("Log: %s",x)
        self.client.connect("test.mosquitto.org")
        self.client.loop_start()

    def stop(self):
        self.client.loop_stop()

    def execute(self):
        self.client.publish("YAVA/API/TASK_START","foo")

        cb,fut = Lib.get_cb()
        self.client.on_message = cb
        return fut

    @staticmethod
    def on_connect(client,*_):
        logging.info("Connected")
        client.subscribe("YAVA/API/TASK_DONE")

    @staticmethod
    def get_cb():
        fut = asyncio.get_event_loop().create_future()
        def cb(_0,_1,msg):
            logging.info("Fetching back the result")
            logging.info(str(msg.payload.decode("utf-8","ignore")))
            fut.set_result(42)
            logging.info("Future updated")
        return cb,fut


async def main():
    remote_client = mqtt.Client()
    remote_client.on_connect = remote_on_connect
    remote_client.on_message = remote_on_message
    remote_client.connect("test.mosquitto.org")
    remote_client.loop_start()

    lib = Lib()
    future = lib.execute()

    logging.info("Result is:")
    await future
    logging.info(future.result())

    remote_client.loop_stop()
    lib.stop()

    logging.info("Exiting")


if __name__ == '__main__':
    asyncio.run(main())
,

我认为我们使用TYPES: BEGIN OF ty_belege,splkz(01) TYPE x,field(10) TYPE c,ftype(04) TYPE c,fleng(03) TYPE n,decim(02) TYPE n,olddt(50) TYPE c,newdt(50) TYPE c,END OF ty_belege. DATA: belege_00 TYPE STANDARD TABLE OF ty_belege. 库的方式很糟糕,将其与多进程/多线程并行性混合在一起。

这是基于multiprocessing模块的实现。在为远程提交任务时,您的库可以返回一个Queue,调用方可以将其与asyncio方法一起使用:它返回值(如果有),否则将挂起线程,等待该值。因此,get()充当Scala's FutureJS Promise

Queue
import multiprocessing
import time
from concurrent.futures.thread import ThreadPoolExecutor

import paho.mqtt.client as mqtt
import logging

logging.basicConfig(level=logging.DEBUG,format="%(asctime)s ## %(thread)d ## %(funcName)s ## %(message)s")


def remote_on_connect(client,42)


class Lib:
    def __init__(self):
        self.client = mqtt.Client()
        self.executor = ThreadPoolExecutor(max_workers=1)

        self.client.on_connect = Lib.on_connect
        self.client.connect("test.mosquitto.org")
        self.client.loop_start()

    def stop(self):
        self.client.loop_stop()

    def execute(self):
        cb,queue = self.get_cb()
        self.client.on_message = cb
        self.client.publish("YAVA/API/TASK_START","foo")
        return queue

    @staticmethod
    def on_connect(client,*_):
        logging.info("Connected")
        client.subscribe("YAVA/API/TASK_DONE")

    def get_cb(self):
        queue = multiprocessing.Queue(maxsize=1)
        def cb(_0,msg):
            self.client.on_message = None
            logging.info("Fetching back the result")
            logging.info(str(msg.payload.decode("utf-8","ignore")))
            queue.put(42)
            logging.info("Queue filled")
        return cb,queue


def main():
    remote_client = mqtt.Client()
    remote_client.on_connect = remote_on_connect
    remote_client.on_message = remote_on_message
    remote_client.connect("test.mosquitto.org")
    remote_client.loop_start()

    lib = Lib()
    future = lib.execute()

    logging.info("Result is:")
    logging.info(future.get())

    remote_client.loop_stop()
    lib.stop()

    logging.info("Exiting")


if __name__ == '__main__':
    main()

从输出中可以看到,2019-11-19 15:08:34,433 ## 139852611577600 ## remote_on_connect ## Connected 2019-11-19 15:08:34,450 ## 139852603184896 ## on_connect ## Connected 2019-11-19 15:08:34,452 ## 139852632065728 ## main ## Result is: 2019-11-19 15:08:34,467 ## 139852611577600 ## remote_on_message ## Remotely processing your data 2019-11-19 15:08:35,469 ## 139852611577600 ## remote_on_message ## Publishing result 2019-11-19 15:08:35,479 ## 139852603184896 ## cb ## Fetching back the result 2019-11-19 15:08:35,479 ## 139852603184896 ## cb ## 42 2019-11-19 15:08:35,480 ## 139852603184896 ## cb ## Queue filled 2019-11-19 15:08:35,480 ## 139852632065728 ## main ## 42 2019-11-19 15:08:36,481 ## 139852632065728 ## main ## Exiting 方法执行到main方法为止(如日志前面的future.get行所示)。然后,处理在另一个线程中进行,直到将值放入共享的Result is:中。现在Queue返回(因为该值可用),并且future.get方法结束。

希望这可以帮助您实现所需的目标,但是,欢迎通过main或使用比asyncio更小的数据结构来实现此目标的任何见解。

本文链接:https://www.f2er.com/3101746.html

大家都在问