如何通过OpenLiberty上的JMS连接到Kafka?

我正在尝试使用JMS连接到Kafka。我跟随this guide使用Payara Kafka连接器。这适用于Wildfly。但是我无法在OpenLiberty上使用它。

server.xml

<resourceAdapter id="kafkajmsra" location="${shared.resource.dir}kafka-rar-0.5.0.rar"/>

<jmsTopicConnectionFactory jndiName="JMSTopicFactory">
     <properties.kafkajmsra
                bootstrapServerConfig="kafka:9092"/>
</jmsTopicConnectionFactory>

<jmsTopic id="kafkaTopic" jndiName="JmsTopic">
     <properties.kafkajmsra topicName="demoTopic" />
</jmsTopic>

使用这些配置,如果我尝试注入那些组件,则会得到NullPointerException。可以找到JNDI名称,但不能使用这些参数。

@Resource(lookup = "JMSTopicFactory")
private TopicConnectionFactory jmsTopicFactory;

@Resource(lookup = "JMSTopic")
private Topic jmsTopic;

我在server.xml中缺少什么吗?


我尝试使用默认的JMS连接器。它确实连接到Kafka,但连接被拒绝,并且在kafka端它告诉我:

[2020-05-31 20:05:27,134] WARN [SocketServer brokerId=1] Unexpected error from /172.20.0.4; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = -1091633152)
 at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:103)
 at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448)
 at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398)
 at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
 at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
 at kafka.network.Processor.poll(SocketServer.scala:893)
 at kafka.network.Processor.run(SocketServer.scala:792)
 at java.lang.Thread.run(Thread.java:748)

编辑:

我将server.xml更改为现在的样子:

    <resourceAdapter id="kafkajmsra" location="${shared.resource.dir}/kafka-rar-0.4.0.rar"/>

    <connectionFactory jndi="java:app/KafkaConnectionFactory"
                       interfaceName="fish.payara.cloud.connectors.kafka.api.KafkaConnectionFactory"
                       resourceAdapter="liberty/wlp/usr/shared/resources/kafka-rar-0.4.0.rar">
    </connectionFactory>

,Java代码如下所示:

@ApplicationScoped
public class TopicProducer {

  private static final Logger LOG = LoggerFactory.getLogger(TopicProducer.class);

  public TopicProducer() throws Exception {
    LOG.info("Starting TopicProducer");
  }

  @Resource(lookup = "java:app/KafkaConnectionFactory")
  KafkaConnectionFactory kafkaConnectionFactory;

  public void send(final String msg) {
    try (KafkaConnection connection = kafkaConnectionFactory.createConnection()) {

      LOG.info("Send message: {}",msg);

      connection.send(new ProducerRecord("demoTopic",msg));
    } catch (Exception e) {
      LOG.error(e.getMessage(),e);
    }
  }
}

但是现在我在@Resource上收到一个NullPointerException。我的猜测是找不到资源适配器。

iCMS 回答:如何通过OpenLiberty上的JMS连接到Kafka?

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/2232241.html

大家都在问