AMQP消费者停止在第127条消息上接收

问题 :发布者应用不断向Artemis经纪人发布AMQP消息-但是,消费者应用在第127位之后停止接收消息。

浏览-巧合的是,注意到Smallrye网页的摘录中引用了以下内容:

”可以使用使用microProfile Config设置的smallrye.messaging.emitter.default-buffer-size属性来配置默认缓冲区大小(127)。请注意,仅在不使用OnOverflow时才应用此值。“

(来自https://smallrye.io/smallrye-reactive-messaging/-部分标题“ 14.3。@Channel”)

但是,我尝试使用microprofile-config.properties添加属性更改(以查看是否至少可以对“ 127”数字进行更改),但是没有任何效果。

即,这些似乎都不起作用...

   smallrye.messaging.emitter.default-buffer-size=10000
      or
   mp.messaging.incoming.data.default-buffer-size=10000

发布者(服务器)控制台

2020.01.02 23:46:47 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 119
2020.01.02 23:46:47 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,main]: ....... PUB ....... camelpub - body: 120
2020.01.02 23:46:47 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,main]: ....... PUB ....... camelpub - body: 121
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,main]: ....... PUB ....... camelpub - body: 122
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,main]: ....... PUB ....... camelpub - body: 123
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,main]: ....... PUB ....... camelpub - body: 124
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,main]: ....... PUB ....... camelpub - body: 125
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,main]: ....... PUB ....... camelpub - body: 126
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,main]: ....... PUB ....... camelpub - body: 127
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,main]: ....... PUB ....... camelpub - body: 128
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,main]: ....... PUB ....... camelpub - body: 129
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,main]: ....... PUB ....... camelpub - body: 130
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,main]: ....... PUB ....... camelpub - body: 131
2020.01.02 23:46:49 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,main]: ....... PUB ....... camelpub - body: 132
2020.01.02 23:46:49 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,main]: ....... PUB ....... camelpub - body: 133
(continues to send)

订户(服务器)控制台

2020.01.02 23:46:47 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,main]: ooooooo SUB ooooooo camelsub ==body==> 119
2020.01.02 23:46:47 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,main]: ooooooo SUB ooooooo camelsub ==body==> 120
2020.01.02 23:46:47 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,main]: ooooooo SUB ooooooo camelsub ==body==> 121
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,main]: ooooooo SUB ooooooo camelsub ==body==> 122
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,main]: ooooooo SUB ooooooo camelsub ==body==> 123
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,main]: ooooooo SUB ooooooo camelsub ==body==> 124
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,main]: ooooooo SUB ooooooo camelsub ==body==> 125
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,main]: ooooooo SUB ooooooo camelsub ==body==> 126
2020.01.02 23:46:48 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,main]: ooooooo SUB ooooooo camelsub ==body==> 127
(stops receiving on this message)

尝试在Artemis配置文件:broker.xml ...中专门操作AMQP的“接受者”,但无济于事:

<acceptor name="amqp">tcp://0.0.0.0:5672?protocols=AMQP;autoAcknowledge=true;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpcredits=1000;amqpLowcredits=300</acceptor> 

发布商应用

package aaa.bbb.ccc.jar;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;

@ApplicationScoped
public class CamelPub extends RouteBuilder {

    @Inject
    CamelContext ctx;

    CamelReactiveStreamsService crss;
    static int x = 0;

    @Outgoing("data")
    public Publisher<String> source() {
        return crss.from("seda:thesource",String.class);
    }

    @Override
    public void configure() {

    onException(Exception.class)
        .log("--------------------(PUBLISHER: Exception!)-------------------->>> exception=" + this.exceptionmessage().toString());         

    crss = CamelReactiveStreams.get(ctx);
    from("timer://thetimer?period=100")
        .process(new Processor() {
            @Override
            public void process(Exchange msg) throws Exception {
                msg.getIn().setBody(Integer.toString(x++)); 
            }
        })
        .log("....... PUB ....... camelpub - body: ${body}")
        .to("seda:thesource");       
    }
}

src / main / resources / Meta-INF / microprofile-config.properties(发布者)

injected.value=Injected value
value=lookup value
server.port=8084
server.host=0.0.0.0

mp.messaging.outgoing.data.connector=smallrye-amqp
mp.messaging.outgoing.data.host=localhost
mp.messaging.outgoing.data.port=5672
mp.messaging.outgoing.data.username=artuser
mp.messaging.outgoing.data.password=artpassword
mp.messaging.outgoing.data.endpoint-uri:seda:thesource
mp.messaging.outgoing.data.broadcast=true
mp.messaging.outgoing.data.durable=true
mp.messaging.outgoing.data.containerId=my-container-id
mp.messaging.outgoing.data.broadcast=true

订户应用

package aaa.bbb.ccc.jar;

import javax.inject.Inject;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import javax.enterprise.context.ApplicationScoped;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.reactivestreams.Subscriber;

@ApplicationScoped
public class CamelSub extends RouteBuilder {

    public CamelSub() throws Exception {
    }

    @Inject
    CamelContext ctx;

    CamelReactiveStreamsService crss;

    @Incoming("data")
    public Subscriber<String> sink() {
        return crss.subscriber("seda:thesink",String.class);
    }

    @Override
    public void configure() {
    onException(Exception.class)
        .log("--------------------(SUBSCRIber: Exception!)-------------------->>> exception=" + this.exceptionmessage().toString());
    crss = CamelReactiveStreams.get(ctx);
    from("seda:thesink")
        .log("ooooooo SUB ooooooo camelsub ==body==> ${body}");
    }
}

src / main / resources / Meta-INF / microprofile-config.properties(订阅者)

injected.value=Injected value
value=lookup value
server.port=8082
server.host=0.0.0.0

mp.messaging.incoming.data.connector=smallrye-amqp
mp.messaging.incoming.data.host=localhost
mp.messaging.incoming.data.port=5672
mp.messaging.incoming.data.username=artuser
mp.messaging.incoming.data.password=artpassword
mp.messaging.incoming.data.endpoint-uri:seda:thesink
mp.messaging.incoming.data.broadcast=true
mp.messaging.incoming.data.durable=true
mp.messaging.incoming.data.containerId=my-container-id
mp.messaging.incoming.data.retry=true
mp.messaging.incoming.data.retry-attempts=5

pub和sub的pom.xml信息(基本上是重复的-如下所示为pub)

<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <modelVersion>4.0.0</modelVersion>
    <groupId>aaa.bbb.ccc</groupId>
    <artifactId>camelpub</artifactId>
    <version>1.0</version>
    <properties>
        <helidonVersion>1.4.0</helidonVersion>
        <package>aaa.bbb.ccc.jar</package>
        <failOnmissingWebXml>false</failOnmissingWebXml>
        <mpVersion>3.2</mpVersion>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <libs.classpath.prefix>libs</libs.classpath.prefix>
        <mainClass>io.helidon.microprofile.server.Main</mainClass>
        <jersey.version>2.29</jersey.version>
        <copied.libs.dir>${project.build.directory}/${libs.classpath.prefix}</copied.libs.dir>
        <camelversion>3.0.0</camelversion>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.eclipse.microprofile</groupId>
            <artifactId>microprofile</artifactId>
            <version>${mpVersion}</version>
            <type>pom</type>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.microprofile.reactive.messaging</groupId>
            <artifactId>microprofile-reactive-messaging-api</artifactId>
            <version>1.0</version>
            <type>jar</type>
        </dependency>
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>8.0</version>
            <type>jar</type>
        </dependency> 
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>${camelversion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-reactive-streams</artifactId>
            <version>${camelversion}</version>
        </dependency>       
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-cdi</artifactId>
            <version>${camelversion}</version>
        </dependency>                              
        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-reactive-messaging-provider</artifactId>
            <version>1.0.8</version>
        </dependency>          
        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-reactive-messaging-amqp</artifactId>
            <version>1.0.8</version>
        </dependency>          
        <dependency>
            <groupId>javax.enterprise</groupId>
            <artifactId>cdi-api</artifactId>
            <version>2.0</version>
        </dependency>          
        <dependency>
            <groupId>io.helidon</groupId>
            <artifactId>helidon-bom</artifactId>
            <version>${helidonVersion}</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.jboss</groupId>
            <artifactId>jandex</artifactId>
            <version>2.1.1.Final</version>
            <scope>runtime</scope>
            <optional>true</optional>            
        </dependency>
        <dependency>
            <groupId>javax.activation</groupId>
            <artifactId>javax.activation-api</artifactId>
            <version>1.2.0</version>
            <scope>runtime</scope>            
        </dependency>
        <dependency>
            <groupId>org.glassfish.jersey.media</groupId>
            <artifactId>jersey-media-json-binding</artifactId>
            <version>${jersey.version}</version>
        </dependency>
        <dependency>
            <groupId>io.helidon.microprofile.bundles</groupId>
            <artifactId>helidon-microprofile-3.0</artifactId>
            <version>${helidonVersion}</version>
        </dependency>

    </dependencies>

    <build>
        <finalName>camelpub</finalName>
        <plugins>                             
            <plugin>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.5</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>${libs.classpath.prefix}</classpathPrefix>
                            <mainClass>${mainClass}</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.9</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${copied.libs.dir}</outputDirectory>
                            <overWriteReleases>false</overWriteReleases>
                            <overWritesnapshots>false</overWritesnapshots>
                            <overWriteIfNewer>true</overWriteIfNewer>
                            <overWriteIfNewer>true</overWriteIfNewer>
                            <includeScope>runtime</includeScope>
                            <excludeScope>test</excludeScope>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Apache Artemis broker.xml

<?xml version='1.0'?>
<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>0.0.0.0</name>
      <persistence-enabled>true</persistence-enabled>
      <journal-type>NIO</journal-type>
      <paging-directory>data/paging</paging-directory>
      <bindings-directory>data/bindings</bindings-directory>
      <journal-directory>data/journal</journal-directory>
      <large-messages-directory>data/large-messages</large-messages-directory>
      <journal-datasync>true</journal-datasync>
      <journal-min-files>2</journal-min-files>
      <journal-pool-files>10</journal-pool-files>
      <journal-device-block-size>4096</journal-device-block-size>
      <journal-file-size>10M</journal-file-size>      
      <journal-buffer-timeout>1316000</journal-buffer-timeout>

      <journal-max-io>1</journal-max-io>
      <disk-scan-period>5000</disk-scan-period>
      <max-disk-usage>90</max-disk-usage>
      <critical-analyzer>true</critical-analyzer>
      <critical-analyzer-timeout>120000</critical-analyzer-timeout>
      <critical-analyzer-check-period>60000</critical-analyzer-check-period>
      <critical-analyzer-policy>HALT</critical-analyzer-policy>

    <!--
      <acceptors>
         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpcredits=1000;amqpLowcredits=300</acceptor>
         <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpcredits=1000;amqpLowcredits=300</acceptor>
         <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
         <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
         <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
      </acceptors>  
    -->

      <acceptors>
         <acceptor name="amqp">tcp://0.0.0.0:5672?amqpIdleTimeout=0;tcpSendBufferSize=1048576;consumerWindowSize=-1;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpcredits=100;amqpLowcredits=20</acceptor>
      </acceptors>

      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="amq"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!-- if you define auto-create on certain queues,management has to be auto-create -->
         <address-setting match="activemq.management#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
      </address-settings>

      <addresses>
         <address name="DLQ">
            <anycast>
               <queue name="DLQ" />
            </anycast>
         </address>
         <address name="ExpiryQueue">
            <anycast>
               <queue name="ExpiryQueue" />
            </anycast>
         </address>
      </addresses>

      <!-- Uncomment the following if you want to use the Standard LoggingactiveMQServerPlugin pluging to log in events -->
      <broker-plugins>
         <broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingactiveMQServerPlugin">
            <property key="LOG_ALL_EVENTS" value="true"/>
            <property key="LOG_CONNECTION_EVENTS" value="true"/>
            <property key="LOG_SESSION_EVENTS" value="true"/>
            <property key="LOG_CONSUMER_EVENTS" value="true"/>
            <property key="LOG_DELIVERING_EVENTS" value="true"/>
            <property key="LOG_SENDING_EVENTS" value="true"/>
            <property key="LOG_INTERNAL_EVENTS" value="true"/>
         </broker-plugin>
      </broker-plugins>

   </core>
</configuration>

使用的技术:

  • Java 11
  • Apache Camel 3.0
  • Smallrye反应消息传递API
  • Apache Artemis经纪人2.10.1

注意:

  • 我们正在为发布商应用程序和服务器应用程序分别使用单独的Helidon服务器。
  • 使用Qpid代理时会发生相同的问题。
fql920011429 回答:AMQP消费者停止在第127条消息上接收

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/2826881.html

大家都在问