在NodeJS开发人员多年之后,我决定尝试一下Python。到目前为止还算不错,但是我碰到了一堵墙,我真的很想帮忙。
我正在研究一个使用MQTT与远程计算机通信的库。在该库上调用功能时,将发布一条消息,供在该远程计算机上进行处理。处理完成后,它会在总线上发布一条新消息,供我的库接听,并将结果返回给调用代码(调用库函数的代码)。
在Javascript中,这是通过返回具有resolve
和reject
函数的Promise来完成的,该Promise可存储在库中,直到远程消息通过代理返回并带有结果(截获库中其他地方的其他函数),此时,我可以简单地调用先前存储的“解析”函数,以将控制权返回给调用代码(调用我的库的异步函数的代码)。只需使用async
关键字即可调用此库函数。
现在在Python中,异步/等待不使用可以方便地存储起来以供以后使用的resolve
和reject
函数,因此我认为逻辑必须以不同的方式实现。使用简单的回调函数而不是异步/等待工作流是可行的,但由于每个结果处理回调都是单独的函数,因此在为类似的来回通信依次调用多次时会带来不便。
这是Javascript的基本示例(仅供参考):
let TASKS = {};
....
mqttClient.on('message',(topic,message) => {
if (topic == "RESULT_OK/123") {
TASKS["123"].resolve(message);
} else if (topic == "RESULT_KO/123") {
TASKS["123"].reject(message);
}
});
...
let myAsynclibraryFunction = (someCommand) => {
return new Promise((res,rej) => {
TASKS["123"] = {
resolve: res,reject: rej
};
mqttClient.publish("REQUEST/123",someCommand);
});
}
要调用它,我只需要做:
try{
let response1 = await myAsynclibraryFunction("do this");
let response2 = await myAsynclibraryFunction("now do that");
...
} catch(e) {
...
}
NodeJS是一种基于事件循环的语言,这就是为什么它非常适用于这些类型的用例。但是,这种类型的应用程序逻辑在处理基于消息的不同后端时很常见,因此我相信也有很好的方法可以在Python中解决此问题。
这是我正在研究的测试Python代码段,尝试使用future
对象来实现类似的目的:
import paho.mqtt.client as mqtt
import asyncio
import threading
# Init a new asyncio event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# define a global future placeholder
_future = None
# Create MQTT client
mqttClient = mqtt.Client()
# MQTT Event - on connect
def on_connect(client,userdata,flags,rc):
print("Connected")
client.subscribe("YAVA/#")
# We start a new thread to test our workflow.
#
# If I had done this on the current thread,then the MQTT event loop
# would get stuck (does not process incomming and outgoint messages anymore) when
# calling "await" on the future object later on.
taskThread = threading.Thread(target=_simulateclient,args=())
taskThread.start()
# MQTT Event - on incomming message
def on_message(client,msg):
global _future
if msg.topic.startswith("YAVA/API/TASK_DONE/") == True:
payload = str(msg.payload.decode("utf-8","ignore"))
# Resolve the future object
_future.set_result(payload)
mqttClient.on_connect = on_connect
mqttClient.on_message = on_message
# Use asyncio to call a async function and test the workflow
def _simulateclient():
asyncio.run(performAsyncTask())
# This async function will ask for a task to be performed on a remote machine,# and wait for the response to be sent back
async def performAsyncTask():
result = await pubAndWhaitForResponse("YAVA/API/TASK_START","")
print(result)
# perform the actual MQTT async logic
async def pubAndWhaitForResponse(topic,message):
# Create a future object that can be resolved in the MQTT event "on_message"
global _future
_future = asyncio.get_running_loop().create_future()
# Publish message that will start the task execution remotely somewhere
global mqttClient
mqttClient.publish(topic,message)
# Now block the thread until the future get's resolved
result = await _future
# Return the result
return result
# Start the broker and loop^forever in the main thread
mqttClient.connect("192.168.1.70",1883,60)
# The MQTT library will start a new thread that will continuously
# process outgoing and incoming messages through that separate thread.
# The main thread will be blocked so that the program does not exit
mqttClient.loop_forever()
一切正常,但是_future.set_result(payload)
行似乎并不能解决未来。我从来没有看到结果打印出来。
感觉没有多少东西可以得到这种排序了。任何建议都会很棒。
谢谢