如何将MQTT“消息”事件发送回REST主体?

我目前在解决如何将MQTT message事件捕获回用NodeJS编写的REST API主体方面遇到问题。我当前的设置是App-> NodeJS REST API-> MQTT broker inside RPi 3

这是我的MQTTHandler.js类,在其中放置了所有可重复使用的MQTT函数

const mqtt = require('mqtt')

class MQTTHandler {
  constructor (host) {
    this.client = null
    this.host = host
  }

  connect () {
    this.client = mqtt.connect(this.host)

    this.client.on('error',function (err) {
      console.log(err)
      this.client.end()
    })

    this.client.on('connect',function () {
      console.log('MQTT client connected...')
    })

    // I need this to send message back to app.js
    this.client.on('message',function (topic,message) {
      if (!message.toString()) message = 'null'

      console.log(JSON.parse(message.toString()))
    })

    this.client.on('close',function () {
      console.log('MQTT client disconnected...')
    })
  }

  subscribeTopic (topic) {
    this.client.subscribe(topic)
  }

  unsubscribeTopic (topic) {
    this.client.unsubscribe(topic)
  }

  sendMessage (topic,message) {
    this.client.publish(topic,message)
  }
}

module.exports = MQTTHandler

下面是我的app.js

的一小段
const MQTTHandler = require('./mqtt.handler')
...

var mqttClient = new MQTTHandler('mqtt://127.0.0.1')
mqttClient.connect()

app.get('/hello',function (req,res) {
  mqttClient.subscribeTopic('topic')
  mqttClient.sendMessage('topic','hello world')

  // I need to return the MQTT message event here
  // res.json(<mqtt message here>)

  res.end()
})

我已经尝试使用NodeJS的事件发射器,但是它似乎不起作用。任何帮助或建议,将不胜感激,谢谢!

baorry 回答:如何将MQTT“消息”事件发送回REST主体?

您正在尝试将同步协议(HTTP)与异步协议(MQTT)混合使用。这两种范式很难混合。

发布MQTT消息时,您不知道有多少客户可以订阅该主题,可以是零,也可以是很多。也不能保证它们中的任何一个都会发送答复,因此您需要包括超时。 (您还需要在有效负载中包含一个请求ID,以便您可以协调与该请求的任何响应,因为您无法说出响应的顺序。)

您的示例代码仅使用1个主题,这非常糟糕,因为您最终将需要从响应消息中过滤出请求消息。最好使用2个不同的主题(MQTT v5甚至还有一个msg标头,用于指定应发送响应的主题)。

已经说了一切,有可能构建出可行的东西(我将使用requestreply主题。

var inflightRequests = {};

// interval to clear out requests waiting for a response
// after 3 seconds
var timer = setInterval(function() {
  var now = new Date.now();
  var keys = Object.keys(inflightRequests);
  for (var key in keys) {
    var waiting = inflightRequests[keys[key]];
    var diff = now = waiting.timestamp;
    // 3 second timeout
    if (diff > 3000) {
      waiting.resp.status(408).send({});
      delete(inflightRequests[keys[key]]);
    }
  }
},500);

// on message handler to reply to the HTTP request
client.on('message',function(topic,msg){
  if (topic.equals('reply')) {
    var payload = JSON.parse(msg);
    var waiting = inflightRequest[payload.requestId];
    if (waiting) {
      waiting.res.send(payload.body);
      delete(inflightRequests[payload.requestId]);
    } else {
      // response arrived too late
    }
  }
});

// HTTP route handler.
app.get('/hello',function(req,res) {
  //using timestamp as request Id as don't have anything better in this example.
  var reqId = Date.now();
  var waiting = { 
    timeStamp: reqId,res: res
  }
  inflightRequests[reqId] = waiting;
  var message = {
    requestId: reqId,payload: 'hello world'
  }
  client.publish('request',JSON.stringify(message));
});
本文链接:https://www.f2er.com/3161972.html

大家都在问