我对Node.js和ES6很陌生,这让我有些困惑。我试图让进程继续运行,使用RabbitMQ队列中的消息。它需要能够处理消息(大约需要30-60秒),然后才能获取下一条消息。目前,我拥有的代码将捕获所有可能的消息,然后尝试分叉进程。当队列中有3-5条消息时,这很好,但是对于20、50或100条消息,这将导致服务器内存不足。
我尝试使.consume()
回调函数异步并将await
添加到消息处理函数。我尝试在await new Promise
的{{1}}回调中包装.consume()
。我尝试将processMessage
添加到调用await
的行中。没有任何改变。
channel.consume
作为一个旁注,如果我创建一个字符串数组并遍历字符串,则当我在#!/usr/bin/env node
const amqp = require('amqplib');
const consumeFromqueue = async (queue,isnoAck = false,durable = false,prefetch = null) => {
const conn_str = "amqp://" + process.env.RABBITMQ_username + ":" + process.env.RABBITMQ_PASSWORD + "@" + process.env.RABBITMQ_HOST + "/development?heartbeat=60"
const cluster = await amqp.connect(conn_str);
const channel = await cluster.createChannel();
await channel.assertQueue(queue,{ durable: durable,autoDelete: true });
if (prefetch) {
channel.prefetch(prefetch);
}
console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)
try {
channel.consume(queue,message => {
if (message !== null) {
console.log(' [x] Received',message.content.toString());
processMessage(message.content.toString());
channel.ack(message);
return null;
} else {
console.log(error,'Queue is empty!')
channel.reject(message);
}
},{noAck: isnoAck});
} catch (error) {
console.log(error,'Failed to consume messages from Queue!')
cluster.close();
}
}
exports.consumeFromqueue = consumeFromqueue;
行中添加await时,它将等待执行处理(30-60秒),然后处理下一个字符串。
processMessage
因此,我基本上需要具有类似功能的东西,但是需要监听RabbitMQ中的队列。