通过Node.js使用RabbitMQ消息时,我可以等待流程完成吗?

我对Node.js和ES6很陌生,这让我有些困惑。我试图让进程继续运行,使用RabbitMQ队列中的消息。它需要能够处理消息(大约需要30-60秒),然后才能获取下一条消息。目前,我拥有的代码将捕获所有可能的消息,然后尝试分叉进程。当队列中有3-5条消息时,这很好,但是对于20、50或100条消息,这将导致服务器内存不足。

我尝试使.consume()回调函数异步并将await添加到消息处理函数。我尝试在await new Promise的{​​{1}}回调中包装.consume()。我尝试将processMessage添加到调用await的行中。没有任何改变。

channel.consume

作为一个旁注,如果我创建一个字符串数组并遍历字符串,则当我在#!/usr/bin/env node const amqp = require('amqplib'); const consumeFromqueue = async (queue,isnoAck = false,durable = false,prefetch = null) => { const conn_str = "amqp://" + process.env.RABBITMQ_username + ":" + process.env.RABBITMQ_PASSWORD + "@" + process.env.RABBITMQ_HOST + "/development?heartbeat=60" const cluster = await amqp.connect(conn_str); const channel = await cluster.createChannel(); await channel.assertQueue(queue,{ durable: durable,autoDelete: true }); if (prefetch) { channel.prefetch(prefetch); } console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`) try { channel.consume(queue,message => { if (message !== null) { console.log(' [x] Received',message.content.toString()); processMessage(message.content.toString()); channel.ack(message); return null; } else { console.log(error,'Queue is empty!') channel.reject(message); } },{noAck: isnoAck}); } catch (error) { console.log(error,'Failed to consume messages from Queue!') cluster.close(); } } exports.consumeFromqueue = consumeFromqueue; 行中添加await时,它将等待执行处理(30-60秒),然后处理下一个字符串。

processMessage

因此,我基本上需要具有类似功能的东西,但是需要监听RabbitMQ中的队列。

daiandy001 回答:通过Node.js使用RabbitMQ消息时,我可以等待流程完成吗?

如果您想限制使用者在任何给定时间处理的消息数,请使用channel.prefetch()

给出的计数是通过 可以等待确认的渠道;一旦计数 未完成的消息,服务器将不会在此发送更多消息 频道,直到一个或多个已被确认。

也就是说,如果您只希望一次只能处理一条消息,然后再转到下一条,则设置channel.prefetch(1)

,

移动到下一个,设置channel.prefetch(1)

channel.prefetch(1)
本文链接:https://www.f2er.com/2675363.html

大家都在问