MQTT从主线程发送消息

我在MqttHelper类中实现了简单的MQTT订阅服务器,该订阅服务器可以正常工作并接收订阅。但是,当我需要从主程序向服务器发送消息时应该如何处理。我有方法publishIMqttactionListener上工作正常,但是如何在按钮按下事件时从主程序发送文本?

package com.kkk.mqtt.helpers;

import android.content.Context;
import android.util.Log;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttactionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.io.UnsupportedEncodingException;

public class MqttHelper {
    public MqttAndroidClient mqttAndroidClient;

    final String serverUri = "tcp://tailor.cloudmqtt.com:16424";

    final String clientId = "ExampleAndroidClient";
    public final String subscriptionTopic = "sensor";

    final String username = "xxx";
    final String password = "yyy";



    public MqttHelper(Context context){
        mqttAndroidClient = new MqttAndroidClient(context,serverUri,clientId);

        mqttAndroidClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean b,String s) {
                Log.w("mqtt",s);
            }

            @Override
            public void connectionLost(Throwable throwable) {

            }

            @Override
            public void messageArrived(String topic,MqttMessage mqttMessage) throws Exception {
                Log.w("Mqtt",mqttMessage.toString());
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });
        connect();
    }

    public void setCallback(MqttCallbackExtended callback) {
        mqttAndroidClient.setCallback(callback);
    }


    public void publish(String topic,String info)
    {


        byte[] encodedInfo = new byte[0];
        try {
            encodedInfo = info.getBytes("UTF-8");
            MqttMessage message = new MqttMessage(encodedInfo);
            mqttAndroidClient.publish(topic,message);
            Log.e ("Mqtt","publish done");
        } catch (UnsupportedEncodingException | MqttException e) {
            e.printStackTrace();
            Log.e ("Mqtt",e.getMessage());
        }catch (Exception e) {
            Log.e ("Mqtt","general exception "+e.getMessage());
        }

    }

    private void connect(){
        Log.w("Mqtt","connect start " );
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setautomaticReconnect(true);
        mqttConnectOptions.setCleanSession(false);
        mqttConnectOptions.setusername(username);
        mqttConnectOptions.setPassword(password.toCharArray());

        try {

            mqttAndroidClient.connect(mqttConnectOptions,null,new IMqttactionListener()
            {
                @Override
                public void onSuccess(IMqttToken asyncactionToken) {
                    Log.w("Mqtt","onSuccess " );
                    DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
                    disconnectedBufferOptions.setBufferEnabled(true);
                    disconnectedBufferOptions.setBufferSize(100);
                    disconnectedBufferOptions.setPersistBuffer(false);
                    disconnectedBufferOptions.setDeleteOldestMessages(false);
                    mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
                    subscribeToTopic();
                    publish(MqttHelper.this.subscriptionTopic,"information");
                }

                @Override
                public void onFailure(IMqttToken asyncactionToken,Throwable exception) {
                    Log.w("Mqtt","Failed to connect to: " + serverUri + exception.toString());
                }
            });


        } catch (MqttException ex){
            ex.printStackTrace();
        }
    }


    private void subscribeToTopic() {
        try {
            mqttAndroidClient.subscribe(subscriptionTopic,new IMqttactionListener() {
                @Override
                public void onSuccess(IMqttToken asyncactionToken) {
                    Log.w("Mqtt","Subscribed!");
                }

                @Override
                public void onFailure(IMqttToken asyncactionToken,"Subscribed fail!");
                }
            });

        } catch (MqttException ex) {
            System.err.println("Exception whilst subscribing");
            ex.printStackTrace();
        }
    }
}

启动MQTT订阅者的代码:

private void startMqtt() {
    mqttHelper = new MqttHelper(getapplicationContext());
    mqttHelper.setCallback(new MqttCallbackExtended()
    {
        @Override
        public void connectComplete(boolean b,String s) {
            Log.w("Mqtt","Connect complete"+ s );
        }

        @Override
        public void connectionLost(Throwable throwable) {
            Log.w("Mqtt","Connection lost" );
        }

        @Override
        public void messageArrived(String topic,MqttMessage mqttMessage) throws Exception {
            Log.w("Mqtt",mqttMessage.toString());
            dataReceived.setText(mqttMessage.toString());
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            Log.w("Mqtt","Delivery complete" );

        }
    });
    Log.w("Mqtt","will publish");


}
xiangjiale 回答:MQTT从主线程发送消息

Paho不在UI线程上运行,但可以异步地回调到UI线程。

只需让ActivityFragment实现MqttCallbackExtended接口:

public class SomeActivity extends AppCompatActivity implements MqttCallbackExtended { 

    ...

    @Override
    public void connectComplete(boolean reconnect,String serverURI) {
        Log.d("Mqtt","Connect complete > " + serverURI);
    }

    @Override
    public void connectionLost(Throwable cause) {
        Log.d("Mqtt","Connection lost");
    }

    @Override
    public void messageArrived(String topic,MqttMessage message) throws Exception {
        Log.d("Mqtt","Received > " + topic + " > " + message.toString());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        Log.d("Mqtt","Delivery complete");
    }
}

然后将MqttHelperSomeActivity一起构建MqttCallbackExtended listener

public MqttHelper(Context context,MqttCallbackExtended listener) {
    this.mqttAndroidClient = new MqttAndroidClient(context,serverUri,clientId);
    this.mqttAndroidClient.setCallback(listener);
}

例如:

this.mqttHelper = new MqttHelper(this);
this.mqttHelper.setCallback(this);

this.mqttHelper.publish("Java","SomeActivity will handle the callbacks.");

Application中处理它们是有问题的,因为Application没有UI,而Context没有Theme。但是对于扩展ActivityFragmentDialogFragmentRecyclerView.Adapter等的类,当要与interface是有意义的>他们的用户界面。


MqttCallbackExtended extends MqttCallback供参考。

,

另一种解决方案:

  1. 创建一个扩展MQTTService的{​​{1}}类。
    Android服务类在主线程中工作。因此,如果您想使用另一个线程,则可以简单地使用android.app.Service

  2. 您将使用回调方法在MqttAsyncClient另一个线程(不是主线程)中自动从代理接收消息。

  3. 将数据/命令从应用程序UI(活动片段,...)通过EventBus库简单地传递到messageArrived()

  4. 再次使用MQTTService回调方法中的 EventBus ,将从代理接收的数据传递到应用程序的所需部分。
    请注意,在此步骤中,如果目标是应用程序UI,则必须在目标中使用messageArrived()才能在主线程中获取数据。

示例代码:

@Subscribe(threadMode = ThreadMode.MAIN)

现在,您只需在应用程序的每个部分中接收从public class MQTTService extends Service { private MqttAsyncClient mqttClient; private String serverURI; @Override public void onCreate() { //do your initialization here serverURI = "tcp://yourBrokerUrlOrIP:yourBrokerPort"; EventBus.getDefault().register(this); } @Override public int onStartCommand(Intent intent,int flags,int startId) { init(); connect(); } private void init() { mqttClient = new MqttAsyncClient(serverURI,yourClientId,new MemoryPersistence()) mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { } @Override public void messageArrived(String topic,MqttMessage message) throws Exception { //now you will receive messages from the broker in another thread automatically (not UI Thread). //You can do your logic here. for example pass the received data to the different sections of the application: EventBus.getDefault().post(new YourPOJO(topic,message,...)); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); } private MqttConnectOptions getOptions(){ MqttConnectOptions options = new MqttConnectOptions(); options.setKeepAliveInterval(...); options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); options.setAutomaticReconnect(true); options.setCleanSession(false); options.setUserName(...); options.setPassword(...); //options.setWill(...); //your other configurations return options; } private void connect() { try { IMqttToken token = mqttClient.connect(getOptions(),null,new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { //do works after successful connection } @Override public void onFailure(IMqttToken asyncActionToken,Throwable exception) { } }); } catch (MqttException e) { e.printStackTrace(); } } @Override public void onDestroy() { EventBus.getDefault().unregister(this); mqttClient.close(); mqttClient.disconnect(); } //this method receives your command from the different application sections //you can simply create different "MqttCommandPOJO" classes for different purposes @Subscribe public void receiveFromApp1(MqttCommandPOJO1 pojo1) { //do your logic(1). For example: //publish or subscribe something to the broker (QOS=1 is a good choice). } @Subscribe public void receiveFromApp2(MqttCommandPOJO2 pojo2) { //do your logic(2). For example: //publish or subscribe something to the broker (QOS=1 is a good choice). } } 传递来的数据即可。

例如:

MQTTService

另一个链接:
General instructions


祝福

本文链接:https://www.f2er.com/3066792.html

大家都在问