Spring反应式流-意外关闭

我们正在将Spring Cloud Reactive Streams与RabbitMQ结合使用。

Spring Reactive Stream似乎在将消息从队列中拉出后立即对其进行确认。因此,在消息处理期间发生的任何错误未处理的异常都需要在应用程序中进行处理(这与非响应流不同,在非响应流中,可能会抛出未处理的异常并拒绝一条消息,从而将其发送到死信队列中)。

当消息传播时,我们应该如何处理应用程序中的突然关闭?

例如:

  • 应用程序将消息从队列中拉出
  • 应用程序将邮件标记为已确认
  • 应用程序开始消息处理
  • 在消息处理完成之前关闭应用程序

发生这种情况时,该消息似乎已完全丢失,因为它不在队列中,但应用程序已停止。我们如何恢复这些消息?

happynpuzy 回答:Spring反应式流-意外关闭

您需要使用手动确认并推迟确认,直到异步完成处理。为此,您需要使用整个消息:

    @Bean
    public Consumer<Flux<Message<String>>> async() {
        return inbound -> inbound

                ...

                .map(msg -> {
                    try {
                        msg.getHeaders().get(AmqpHeaders.CHANNEL,Channel.class)
                                .basicAck(msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG,Long.class),false);
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                    return msg.getPayload();
                })
                .subscribe(System.out::println);
    }
spring:
  cloud:
    stream:
      function.definition: async
      bindings:
        async-in-0:
          destination: testtock
          group: async
      rabbit:
        bindings:
          async-in-0:
            consumer:
              acknowledge-mode: MANUAL
              prefetch: 10

使用basicReject重新排队或发送到DLQ。

本文链接:https://www.f2er.com/2475947.html

大家都在问