终止使用Kafka-Streams和MongoDB的Spring Boot应用程序

我有一个使用Kafka-Streams的Spring Boot应用程序。详细地说,有一个流使用MongoDB中执行的查询结果过滤接收到的消息。该代码类似于以下内容。

final KStream<String,String> stream =
    kStreamBuilder.stream(Serdes.String(),Serdes.String(),inputTopic)
                  .filter((s,message) -> service.hasSomeProperty(message))
                  .to(Serdes.String(),outpuTopic);

方法service.hasSomeProperty(message)调用Mongo存储库,该存储库在专用集合上执行查询。

如果在与Mongo的通信过程中出现一些问题,则该异常会在Spring用于处理流的线程中被拦截。 流停止工作,但应用程序无法正常停止。

详细来说,我们面临的错误如下。

Caused by: com.mongodb.MongoSocketReadException: Prematurely reached end of stream
at com.mongodb.connection.SocketStream.read(SocketStream.java:88)
at com.mongodb.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:494)
at com.mongodb.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:224)
at com.mongodb.connection.UsageTrackingInternalConnection.receiveMessage(UsageTrackingInternalConnection.java:96)
at com.mongodb.connection.DefaultConnectionPool$PooledConnection.receiveMessage(DefaultConnectionPool.java:440)
at com.mongodb.connection.CommandProtocol.execute(CommandProtocol.java:112)
at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:168)
at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:289)
at com.mongodb.connection.DefaultServerConnection.command(DefaultServerConnection.java:176)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:216)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:207)
at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:113)
at com.mongodb.operation.FindOperation$1.call(FindOperation.java:516)
at com.mongodb.operation.FindOperation$1.call(FindOperation.java:510)
at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:435)
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:408)
at com.mongodb.operation.FindOperation.execute(FindOperation.java:510)
at com.mongodb.operation.FindOperation.execute(FindOperation.java:81)
at com.mongodb.Mongo.execute(Mongo.java:836)
at com.mongodb.Mongo$2.execute(Mongo.java:823)
at com.mongodb.DBCursor.initializeCursor(DBCursor.java:870)
at com.mongodb.DBCursor.hasnext(DBCursor.java:142)
at com.mongodb.DBCursor.one(DBCursor.java:679)
at com.mongodb.DBCollection.findOne(DBCollection.java:833)
at com.mongodb.DBCollection.findOne(DBCollection.java:796)
at com.mongodb.DBCollection.findOne(DBCollection.java:743)
at org.springframework.data.mongodb.core.MongoTemplate$FindOneCallback.doInCollection(MongoTemplate.java:2179)
at org.springframework.data.mongodb.core.MongoTemplate$FindOneCallback.doInCollection(MongoTemplate.java:2163)
at org.springframework.data.mongodb.core.MongoTemplate.executeFindOneInternal(MongoTemplate.java:1907)
... 31 more

我想知道的是,如何配置Spring Boot应用程序以在Kafka流过滤器内与Mongo发生通信错误时停止运行。

我知道这不是最好的方法,但是我无法重构太多的代码以使用GlobalKTable而不是Mongo集合。

感谢所有人。

cry1018 回答:终止使用Kafka-Streams和MongoDB的Spring Boot应用程序

在其中一个应用程序中,我在Spring Component中执行的操作是管理KafkaStreams实例并实现InitializingBean和DisposableBean接口:

@Autowired
private ApplicationContext appContext;

private KafkaStreams streams;

@Override
public void destroy() throws Exception {
    if(streams.state().isRunning())
        streams.close();
}

@Override
public void afterPropertiesSet() throws Exception {

    ... // streams setup

    streams.setUncaughtExceptionHandler((Thread thread,Throwable throwable) -> {
        LOG.error("Unexpected error in stream processing for thread: " + thread,throwable);
        closeApp();
    });

    ... // streams start

}

private void closeApp() {
    ((ConfigurableApplicationContext) appContext).close();
}

到目前为止,这对我来说还不错。您可以决定仅在Mongo连接引发的异常时停止应用程序。

本文链接:https://www.f2er.com/3118799.html

大家都在问