获取启动MQTT订阅服务器之前发出的消息

我在MqttHelper类中实现了简单的MQTT订阅服务器,该订阅服务器可以正常工作并接收订阅。我发现订户在启动后仅接收发布的消息。是否可以以某种方式要求MQTT订户检索未启动时发出的所有消息?

package com.kkk.mqtt.helpers;

import android.content.Context;
import android.util.Log;

import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttactionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.io.UnsupportedEncodingException;

public class MqttHelper {
    public MqttAndroidClient mqttAndroidClient;

    final String serverUri = "tcp://tailor.cloudmqtt.com:16424";

    final String clientId = "ExampleAndroidClient";
    public final String subscriptionTopic = "sensor";

    final String username = "xxx";
    final String password = "yyy";



    public MqttHelper(Context context){
        mqttAndroidClient = new MqttAndroidClient(context,serverUri,clientId);

        mqttAndroidClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean b,String s) {
                Log.w("mqtt",s);
            }

            @Override
            public void connectionLost(Throwable throwable) {

            }

            @Override
            public void messageArrived(String topic,MqttMessage mqttMessage) throws Exception {
                Log.w("Mqtt",mqttMessage.toString());
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });
        connect();
    }

    public void setCallback(MqttCallbackExtended callback) {
        mqttAndroidClient.setCallback(callback);
    }


    public void publish(String topic,String info)
    {


        byte[] encodedInfo = new byte[0];
        try {
            encodedInfo = info.getBytes("UTF-8");
            MqttMessage message = new MqttMessage(encodedInfo);
            mqttAndroidClient.publish(topic,message);
            Log.e ("Mqtt","publish done");
        } catch (UnsupportedEncodingException | MqttException e) {
            e.printStackTrace();
            Log.e ("Mqtt",e.getMessage());
        }catch (Exception e) {
            Log.e ("Mqtt","general exception "+e.getMessage());
        }

    }

    private void connect(){
        Log.w("Mqtt","connect start " );
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setautomaticReconnect(true);
        mqttConnectOptions.setCleanSession(false);
        mqttConnectOptions.setusername(username);
        mqttConnectOptions.setPassword(password.toCharArray());

        try {

            mqttAndroidClient.connect(mqttConnectOptions,null,new IMqttactionListener()
            {
                @Override
                public void onSuccess(IMqttToken asyncactionToken) {
                    Log.w("Mqtt","onSuccess " );
                    DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
                    disconnectedBufferOptions.setBufferEnabled(true);
                    disconnectedBufferOptions.setBufferSize(100);
                    disconnectedBufferOptions.setPersistBuffer(false);
                    disconnectedBufferOptions.setDeleteOldestMessages(false);
                    mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
                    subscribeToTopic();
                    publish(MqttHelper.this.subscriptionTopic,"information");
                }

                @Override
                public void onFailure(IMqttToken asyncactionToken,Throwable exception) {
                    Log.w("Mqtt","Failed to connect to: " + serverUri + exception.toString());
                }
            });


        } catch (MqttException ex){
            ex.printStackTrace();
        }
    }


    private void subscribeToTopic() {
        try {
            mqttAndroidClient.subscribe(subscriptionTopic,new IMqttactionListener() {
                @Override
                public void onSuccess(IMqttToken asyncactionToken) {
                    Log.w("Mqtt","Subscribed!");
                }

                @Override
                public void onFailure(IMqttToken asyncactionToken,"Subscribed fail!");
                }
            });

        } catch (MqttException ex) {
            System.err.println("Exception whilst subscribing");
            ex.printStackTrace();
        }
    }
}

启动MQTT订阅者的代码:

private void startMqtt() {
    mqttHelper = new MqttHelper(getapplicationContext());
    mqttHelper.setCallback(new MqttCallbackExtended()
    {
        @Override
        public void connectComplete(boolean b,String s) {
            Log.w("Mqtt","Connect complete"+ s );
        }

        @Override
        public void connectionLost(Throwable throwable) {
            Log.w("Mqtt","Connection lost" );
        }

        @Override
        public void messageArrived(String topic,MqttMessage mqttMessage) throws Exception {
            Log.w("Mqtt",mqttMessage.toString());
            dataReceived.setText(mqttMessage.toString());
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            Log.w("Mqtt","Delivery complete" );

        }
    });
    Log.w("Mqtt","will publish");


}
yanglibaoqi 回答:获取启动MQTT订阅服务器之前发出的消息

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3066762.html

大家都在问