处理两个同名设备

我的软件连接到 Corsair 的 iCUE 软件的 API。每个设备都有一个名称和一个设备索引。使用我的软件,我可以通过 sACN 自己控制每个设备的 LED。我读出哪个设备已连接,如果它已经连接并分配了一个宇宙,则分配相同的宇宙。现在我有 2 个同名的设备(Corsair Vengance RGB Pro),当然还有 2 个不同的设备索引。由于我的软件只检查名称是否已经存在于配置文件中(因为每次重新启动 PC 设备索引都不同)只保存一个设备但将两个设备分配给一个宇宙,这绝对是一个错误,但就是这样我打算的方式。但是现在,当我想通过 mqtt 控制两个设备时,我只能使用设备名称,我只能控制 1 个设备而不是两个。我该如何解决? 下面是一些代码:

from cuesdk import CueSdk
import json
from cuesdk.enums import CorsairLedId
import sacn
import time
import paho.mqtt.client as mqtt
import os
import sys
import math

def get_device_index_by_name(device_name):
    device_count = sdk.get_device_count()
    for device_index in range(device_count):
        device_test_name = sdk.get_device_info(device_index)
        if device_test_name.model == device_name:
            return device_index
def setup_receiver(universe,device_index):
    name = sdk.get_device_info(device_index)
    led_info = sdk.get_led_positions_by_device_index(device_index)
    led_ids = tuple(led_info.keys())
    led_buffer = {led_id: (0,0) for led_id in led_ids}
    universe = universe

    def callback(packet):
        data = packet.dmxData

        for x,led_id in enumerate(led_ids):
            led_buffer[led_id] = (data[3*x],data[3*x+1],data[3*x+2])

        sdk.set_led_colors_buffer_by_device_index(device_index,led_buffer)
        sdk.set_led_colors_flush_buffer()

    receiver.register_listener("universe",callback,universe=universe)
    print(f"Created sacn receiver for {name} on universe {universe},with {len(led_ids)} leds")
def load_config(config_path):
    if not os.path.isfile(config_path):  #Create the file if not present
        open(config_path,"w+")
    if config_path == MQTT_PATH:
         with open(config_path) as f:  #load the config file
            try:
                return json.load(f)
            except json.JSONDecodeError:
                data = {}
                data['enable_MQTT'] = True
                data['ip'] =""
                data['port']= 1883
                data['username'] =""
                data['password'] =""
                data['base_topic'] =""
                with open(config_path,"w",encoding="utf-8") as f:  # Save config
                    json.dump(
                        data,f,ensure_ascii=False,sort_keys=False,indent=4
                        )
                print(f"MQTT Config Created,please edit {MQTT_PATH} and restart this program!")
                print("For Home Assistant Auto Discovery,set base_topic to homeassistant!")
                sys.exit()
                
    with open(config_path) as f:  #load the config file
        try:
            return json.load(f)
        except json.JSONDecodeError:
            return {}   
    
def save_config(config_path):
    with open(config_path,encoding="utf-8") as f:  # Save config
        json.dump(
            conf,sort_keys=True,indent=4
        )

def setup_device_command_topics(device_short,device,base_topic):  #subscribe to device state topic and publish on message dynically
    command_topic = str(str(base_topic) + "/" + str(device_short) + "/command")
    state_topic = str(str(base_topic) + "/" + str(device_short) + "/state")
    print(f"subscribed to {command_topic}")
    print(f"publishing to {state_topic}")
    client.subscribe(command_topic)
    def callback(client,userdata,msg):
        device_index = get_device_index_by_name(device)
        universe = conf[device]
        payload = msg.payload.decode("utf-8")
        payload = json.loads(payload)
        state_payload = {}
        if payload["state"] == "ON" and "effect" in payload:    #effect changed
            print("effect changed")
            if payload['effect'] == "sACN":
                device_index = get_device_index_by_name(device)
                setup_receiver(universe,int(device_index))
                state_payload['effect'] = "sACN"
            if payload['effect'] == "iCUE":
                receiver.remove_listener_from_universe(universe)
                state_payload['effect'] = "iCUE"
        elif  not "effect" and "color" in payload: #effect == none,color
            print("color changed,no effect")
            universe = conf[device]
            receiver.remove_listener_from_universe(universe)
            set_all_device_leds(device_index,payload['color'])
            print(f"setting colors to {payload['color']}")
            state_payload['color'] = payload['color']
        elif payload['state'] == "ON" and not "effect" in payload and "color" in payload: # color changed
            print(f"color changed on {device}")
            save_device_colors(device,payload["color"])
            receiver.remove_listener_from_universe(universe)
            set_all_device_leds(device_index,payload['color'])
            state_payload['color'] = payload['color']
            state_payload['brightness'] = "255" 
            state_payload["state"] = "ON"
        elif payload['state'] == "OFF": #state off
            print("state off")
            receiver.remove_listener_from_universe(universe) 
            color = {}
            color['r'] = 0
            color['g'] = 0
            color['b'] = 0
            set_all_device_leds(device_index,color)
            state_payload = payload
            state_payload['brightness'] = "0"
        elif payload['state'] == "ON" and not "brightness" in payload: #state on
            print("state on")
            receiver.remove_listener_from_universe(universe)
            color = {}
            color['r'] = 255
            color['g'] = 255
            color['b'] = 255
            set_all_device_leds(device_index,color)
            print(f"setting colors to {color}")
            state_payload['state'] = "ON"
            state_payload['color'] = color
            state_payload['brightness'] = "255"
        elif 'brightness' in payload and payload["state"] == "ON": #brightness changed
            print(f"brightness changed: {payload['brightness']}")
            current_color = load_device_colors(device)
            print(f"current color is {current_color}")
            fac = map_brightness_to_percentage(payload['brightness'])
            print(f"fac = {fac}")
            color = {}
            color['r'] = int(current_color["r"] * fac)
            color['g'] = int(current_color["g"] * fac)   
            color['b'] = int(current_color["b"] * fac)
            set_all_device_leds(device_index,color)
            state_payload["state"] = "ON"
            state_payload["brightness"] = payload["brightness"]
            state_payload["color"] = color
            print(state_payload["color"])


            
        state_payload = json.dumps(state_payload)
        client.publish(state_topic,state_payload,qos=0,retain=True)
    
    client.message_callback_add(command_topic,callback)

conf = load_config(DEVICE_PATH)  #load device config config.json

receiver = sacn.sACNreceiver() #sACN receiver  
receiver.start()

device_count = sdk.get_device_count() #setup Corsair devices config
for device_index in range(device_count):
    device_name = sdk.get_device_info(device_index)
    device_type = device_name.type
    if device_name.model not in conf:
        universe = get_free_universe()
        conf.update({device_name.model: universe}) 
        print(f"conf= {conf}")
    else:
        universe = conf[device_name.model] 
    save_config(DEVICE_PATH)
    device_topic_name_short = device_name.model.replace(" ","_")
    setup_receiver(universe,device_index)
    if enable_mqtt == True:
        setup_device_command_topics(device_topic_name_short,device_name.model,light_topic)
        publish_device_info(light_topic,device_topic_name_short)

是的,我知道,setup_device_command_topics() 是一团糟,但它有效

esp281952531 回答:处理两个同名设备

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/327.html

大家都在问