我有一个小型的spring-boot应用程序,该应用程序连接到activeMQ上的一个或多个主题,这些主题在启动时在应用程序的application.properties文件中设置-然后将这些消息发送到数据库。
一切正常,但是在尝试实施故障转移时遇到一些问题-基本上,应用程序将尝试重新连接,但是在重试一定次数后,应用程序进程将自动退出,从而阻止重试(理想情况下,我希望该应用程序永远重试,直到被手动终止或activeMQ再次可用为止)。我尝试将连接URL(在application.properties中使用maxReconnectAttempts
)中的连接选项(例如url.options
)显式设置为-1/0/99999,但是这些选项似乎都不正确。每次的行为都是相同的。通过查看关于Apache's own reference page的建议,我也希望这种行为也可以作为默认行为。
如果有人有任何建议强制不退出该应用程序,我将不胜感激!我认为很重要的代码如下:
@Configuration
public class AmqConfig {
private static final Logger LOG = LogManager.getLogger(AmqConfig.class);
private static final String LOG_PREFIX = "[AmqConfig] ";
private String clientId;
private static ArrayList<String> amqUrls = new ArrayList<>();
private static String amqConnectionUrl;
private static Integer numSubs;
private static ArrayList<String> destinations = new ArrayList<>();
@Autowired
DatabaseService databaseService;
public AmqConfig (@Value("${amq.urls}") String[] amqUrl,@Value("${amq.options}") String amqOptions,@Value("${tocCodes}") String[] tocCodes,@Value("${amq.numSubscribers}") Integer numSubs,@Value("${clientId}") String clientId) throws UnknownHostException {
Arrays.asList(amqUrl).forEach(url -> {
amqUrls.add("tcp://" + url);
});
String amqServerAddress = "failover:(" + String.join(",",amqUrls) + ")";
String options = Strings.isnullOrEmpty(amqOptions) ? "" : "?" + amqOptions;
this.amqConnectionUrl = amqServerAddress + options;
this.numSubs = Optional.ofNullable(numSubs).orElse(4);
this.clientId = Strings.isnullOrEmpty(clientId) ? Inetaddress.getLocalHost().getHostName() : clientId;
String topic = "Consumer." + this.clientId + ".VirtualTopic.Feed";
if (tocCodes.length > 0){
Arrays.asList(tocCodes).forEach(s -> destinations.add(topic + "_" + s));
} else { // no TOC codes = connecting to default feed
destinations.add(topic);
}
}
@Bean
public activeMQConnectionFactory connectionFactory() throws JMSException {
LOG.info("{}Connecting to AMQ at {}",LOG_PREFIX,amqConnectionUrl);
LOG.info("{}Using client id {}",clientId);
activeMQConnectionFactory connectionFactory =
new activeMQConnectionFactory(amqConnectionUrl);
Connection conn = connectionFactory.createConnection();
conn.setClientID(clientId);
conn.setExceptionListener(new AmqExceptionListener());
conn.start();
destinations.forEach(destinationName -> {
try {
for (int i = 0; i < numSubs; i++) {
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(destinationName);
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageReceiver(databaseService,destinationName));
}
} catch (JMSException e) {
LOG.error("{}Error setting up queue @ {}",destinationName);
LOG.error(e.getMessage());
}
});
return connectionFactory;
}
}
public class MessageReceiver implements MessageListener,ExceptionListener {
public static final Logger LOG = LogManager.getLogger(MessageReceiver.class);
private static final String LOG_PREFIX = "[Message Receiver] ";
private DatabaseService databaseService;
public MessageReceiver(DatabaseService databaseService,String destinationName){
this.databaseService = databaseService;
LOG.info("{}Creating MessageReceiver for queue with destination: {}",destinationName);
}
@Override
public void onmessage(Message message) {
String messageText = null;
if (message instanceof TextMessage) {
TextMessage tm = (TextMessage) message;
try {
messageText = tm.getText();
} catch (JMSException e) {
LOG.error("{} Error getting message from AMQ",e);
}
} else if (message instanceof activeMQMessage) {
messageText = message.toString();
} else {
LOG.warn("{}Unrecognised message type,cannot process",LOG_PREFIX);
LOG.warn(message.toString());
}
try {
databaseService.sendMessageNoResponse(messageText);
} catch (Exception e) {
LOG.error("{}Unable to acknowledge message from AMQ. Message: {}",messageText,e);
}
}
}
public class AmqExceptionListener implements ExceptionListener {
public static final Logger LOG = LogManager.getLogger(AmqExceptionListener.class);
private static final String LOG_PREFIX = "[AmqExceptionListener ] ";
@Override
public void onException(JMSException e){
LOG.error("{}Exception thrown by activeMQ",e);
}
}
我从应用程序中获得的控制台输出如下(很抱歉,由于没有太大的余地)
[2019-12-12 14:43:30.292] [WARN ] Transport (tcp://[address]:61616) failed,attempting to automatically reconnect: java.io.EOFException
[2019-12-12 14:43:51.098] [WARN ] Failed to connect to [tcp://[address]:61616] after: 10 attempt(s) continuing to retry.
Process finished with exit code 0