我正在尝试使用JMS连接到Kafka。我跟随this guide使用Payara Kafka连接器。这适用于Wildfly。但是我无法在OpenLiberty上使用它。
server.xml
:
<resourceAdapter id="kafkajmsra" location="${shared.resource.dir}kafka-rar-0.5.0.rar"/>
<jmsTopicConnectionFactory jndiName="JMSTopicFactory">
<properties.kafkajmsra
bootstrapServerConfig="kafka:9092"/>
</jmsTopicConnectionFactory>
<jmsTopic id="kafkaTopic" jndiName="JmsTopic">
<properties.kafkajmsra topicName="demoTopic" />
</jmsTopic>
使用这些配置,如果我尝试注入那些组件,则会得到NullPointerException。可以找到JNDI名称,但不能使用这些参数。
@Resource(lookup = "JMSTopicFactory")
private TopicConnectionFactory jmsTopicFactory;
@Resource(lookup = "JMSTopic")
private Topic jmsTopic;
我在server.xml
中缺少什么吗?
我尝试使用默认的JMS连接器。它确实连接到Kafka,但连接被拒绝,并且在kafka端它告诉我:
[2020-05-31 20:05:27,134] WARN [SocketServer brokerId=1] Unexpected error from /172.20.0.4; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = -1091633152)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:103)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
at kafka.network.Processor.poll(SocketServer.scala:893)
at kafka.network.Processor.run(SocketServer.scala:792)
at java.lang.Thread.run(Thread.java:748)
编辑:
我将server.xml
更改为现在的样子:
<resourceAdapter id="kafkajmsra" location="${shared.resource.dir}/kafka-rar-0.4.0.rar"/>
<connectionFactory jndi="java:app/KafkaConnectionFactory"
interfaceName="fish.payara.cloud.connectors.kafka.api.KafkaConnectionFactory"
resourceAdapter="liberty/wlp/usr/shared/resources/kafka-rar-0.4.0.rar">
</connectionFactory>
,Java代码如下所示:
@ApplicationScoped
public class TopicProducer {
private static final Logger LOG = LoggerFactory.getLogger(TopicProducer.class);
public TopicProducer() throws Exception {
LOG.info("Starting TopicProducer");
}
@Resource(lookup = "java:app/KafkaConnectionFactory")
KafkaConnectionFactory kafkaConnectionFactory;
public void send(final String msg) {
try (KafkaConnection connection = kafkaConnectionFactory.createConnection()) {
LOG.info("Send message: {}",msg);
connection.send(new ProducerRecord("demoTopic",msg));
} catch (Exception e) {
LOG.error(e.getMessage(),e);
}
}
}
但是现在我在@Resource上收到一个NullPointerException。我的猜测是找不到资源适配器。