我刚开始使用Python =)
我想实现下一个:
- 在主程序中以foerver循环启动mqtt客户端
-
on_connect: 该代码始终调用一个工作线程。线程做 下一个:
- 如果(值==“ 0”)在表(标志列)中搜索
- 然后从(Topic列)订阅同一表中flag == 0的主题。
-
主程序包含发布和 其他功能。
表中的行将随着时间增加。例如新话题 将在线订阅。
问题在于如何使线程或子例程能够并行运行。当我尝试使用相同的client_id从其他文件订阅主题时,客户端再次启动。但是我需要将新主题订阅到已经订阅的其他主题上。
有什么想法吗? 感谢所有帮助
以下代码显示了我到目前为止的工作方式...
主程序 foo1.py
import paho.mqtt.client as mqtt
import mysql.connector as my_sql
def on_connect(client,userdata,flags,rc):
print("Connected with result code " + str(rc))
# here the thread prsj() called and starts to work all the time in parrallel
prsj() # <--
def on_message(client,msg):
print(msg.topic + " " + format(msg.payload.decode("utf-8")))
def on_disconnect(client,rc):
print("Unexpected MQTT disconnection. Will auto-reconnect")
def on_subscribe(client,obj,mid,granted_qos):
print("Subscribed: " + str(mid))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.on_subscribe = on_subscribe
client.connect("mqtt.eclipse.org",1883,60)
client.loop_forever()
这是mysql搜索和订阅新主题的功能 foo2.py
def prsj():
mydb = my_sql.connect(
host="localhost",user="root",passwd="",database="db")
cur = mydb.cursor()
cur.execute("SELECT * FROM topics")
result = cur.fetchall()
for t in result:
if (t[2] == "0"):
client.subscribe(t[1]) # <-- subscribe to the same client
i = t[0]
cur.execute("UPDATE topics SET flag=1 WHERE id='%s'" % (i))
mydb.commit()