我们通过提供createDurableSubscriber
和订户名称,以编程方式创建了clientId
的IBM MQ AMQP TOPIC订户。
我们启动了程序,因此它订阅了TOPIC并停止了该程序。然后将消息发送到主题,然后再次启动接收器程序,但是我们无法接收发送的消息并释放消息,这对于持久订阅是不应该发生的。
当使用mqsc命令DISPLAY TOPIC
,DISPLAY TPSTATUS
,DISPLAY TPSTATUS SUB
,DISPLAY SUB SUBID
连接订户时,但未看到订户程序停止时,我们可以看到amqp主题及其持久订阅。我们已经定义了属性DEFPSIST(YES)
,并且客户端(生产者到主题)正在发送持久消息。
消息消失了,因为我们无法在订阅者的持久队列中看到消息?是否取决于到期属性?
DISPLAY SUB SUBID
为订户连接后的输出。
AMQ8096: WebSphere MQ subscription inquired.
SUBID("hex sub id")
SUB(:private:CLINET01:TOPIC01) TOPICSTR(TOPIC01)
TOPICOBJ(SYSTEM.BASE.TOPIC) DISTYPE(RESOLVED)
DEST(SYSTEM.MANAGED.DUrablE.5F6B5C2524FB9AED)
DESTQMGR(qm.name) PUBAPPID( )
SELECTOR( ) SELTYPE(NONE)
USERDATA(010)
PUBaccT(***************************************************)
DESTCORL(***************************************************)
DESTCLAS(MANAGED) DUrablE(YES)
EXPIRY(0) PSPROP(MSGPROP)
PUBPRTY(ASPUB) REQONLY(NO)
SUBSCOPE(ALL) SUBLEVEL(1)
SUBTYPE(API) VARUSER(FIXED)
WSCHEMA(TOPIC) SUBUSER(mqm)
CRDATE(2020-09-28) CRTIME(04:14:09)
ALTDATE(2020-09-28) ALTTIME(04:14:09)
订户ID具有私有(不确定原因)和客户端ID,但没有订户名称Sub4。
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Topic;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.String;
import javax.jms.Destination;
import javax.naming.Context;
import org.apache.qpid.jms.JmsConnectionFactory;
import javax.jms.DeliveryMode;
import javax.naming.InitialContext;
import javax.jms.Message;
public class AMQPQueueExample1 implements Runnable {
private static final int DELIVERY_MODE = DeliveryMode.PERSISTENT;
public void run(){
try{
Connection connection = null;
Context context = new InitialContext();
ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("myFactoryLookup");
connection = connectionFactory.createConnection();
connection.setClientID("123");//("WHATS_MY_PURPOSE3"); // Why do we need clientID while publishing the TOPIC from consumer / publisher
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Topic priceTopic = (Topic) context.lookup("myTopicLookup1");
MessageConsumer subscriber1 = session.createDurableSubscriber(priceTopic,"sub420"); //"sub3");
System.out.println("TOPIC "+priceTopic);
connection.start();
while(true){
TextMessage message1 = (TextMessage) subscriber1.receive(1000);
if(message1!=null)
System.out.println("Subscriber 1 received : " + message1.getText());
}
}catch(Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) {
AMQPQueueExample1 amp=new AMQPQueueExample1();
Thread thread = new Thread(amp);
thread.start();
}
}
值从jndi.properties文件中获取上下文工厂和提供者URL。