将CDI事件桥接到Microprofile Reactive Message Broker

我跟随Quarkus - Using Apache Kafka with Reactive Messaging创建了一个品尝它的示例,我这样更改了消息流:

  1. 保存帖子后,通过CDI触发事件。
  2. 收到CDI并发送给Kafka主题。
  3. 从Kafka主题中读取数据,并将其作为SSE公开给客户端。

Kafka消息传递的配置,java.time的一部分。

application.properties

CDI事件和响应消息的事件处理类。

# Consume data from Kafka
mp.messaging.incoming.activities.connector=smallrye-kafka
mp.messaging.incoming.activities.value.deserializer=io.vertx.kafka.client.serialization.JsonObjectDeserializer

# Produce data to Kafka
mp.messaging.outgoing.activitiesOut.connector=smallrye-kafka
mp.messaging.outgoing.activitiesOut.topic=activities
mp.messaging.outgoing.activitiesOut.value.serializer=io.vertx.kafka.client.serialization.JsonObjectSerializer

当我尝试将其公开为SSE时,它无法按预期工作。

@ApplicationScoped
public class activityStreams {

    ReplaySubject<JsonObject> replaySubject;
    Flowable<JsonObject> flowable;

    @PostConstruct public void init() {
        replaySubject = ReplaySubject.create();
        flowable = replaySubject.share().toFlowable(BackpressureStrategy.BUFFER);
    }

    public void onactivityCreated(@ObservesAsync activity activity) {
        replaySubject.onNext(JsonObject.mapFrom(activity));
    }

    @Outgoing("activitiesOut")
    public Publisher<JsonObject> onReceivedactivityCreated() {
        return flowable;
    }

    @Incoming("activities")
    @Outgoing("my-data-stream")
    @Broadcast
    public activity onactivityReceived(JsonObject data) {
        activity activity = data.mapTo(activity.class);
        activity.setOccurred(LocalDateTime.now());
        return activity;
    }

}

在控制台日志记录中,我看到了发送到 activities 队列的消息,但是没有进一步的SSE步骤。当我通过@Path("/activities") @ApplicationScoped public class activityResource { @Inject @Channel("my-data-stream") public Publisher<activity> stream; @GET @Produces(MediaType.SERVER_SENT_EVENTS) @SseElementType(MediaType.APPLICATION_JSON) Publisher<activity> eventStream(){ return stream; } } 访问sse端点时,它总是返回未找到状态。

curl

完整的示例代码为here

dd8816139 回答:将CDI事件桥接到Microprofile Reactive Message Broker

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/2867118.html

大家都在问