因此,目标队列是由MSG_TYPE从源目录中的.txt文件确定的,该目录是集成流程的源。
如果MSG_TYPE以“ ORU”开头,则发送到队列“ jms / dataqueue”,如果MSG_TYPE为“ MHS”,则发送到“ jms / headersQueue”,类似这样。我在这里使用wls JmsTemplate。所以我如何基于从文本文件中获取的Msgtype解析目标。 参见方法sendToJmsQueue,我希望在运行时确定inboundDataQueue
请参见下面的源代码。...
@EnableJms
@Configuration
@ComponentScan
@EnableAutoConfiguration
public class Integrationconfig {
private final Logger LOGGER = LoggerFactory.getLogger(Integrationconfig.class);
private QueueConnectionFactory factory;
private InitialContext jndi;
//private QueueSession session = null;
@Value("${spring.wls.InboundDataQueue}")
private String inboundDataQueue;
@Value("${spring.wls.InboundSolicitedQueue}")
private String inboundSolicitedQueue;
@Value("${spring.wls.InboundQueryQueue}")
private String inboundQueryQueue;
@Value("${spring.wls.InboundAckQueue}")
private String inboundAckQueue;
@Autowired
public FileProcessor fileProcessor;
@Autowired
public OutboundGatewayConfig outboundGatewayConfig;
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel jmsOutboundChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel customErrorChannel() {
return new DirectChannel();
}
@Bean
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
@Bean
public JmsTemplate getJmsTemplate(@Value("${spring.wls.jndiContext}") String context,@Value("${spring.wls.InboundServerURL}") String serverName,@Value("${spring.wls.inboundConnfactory}") String factoryName) {
if (jndi == null) {
// Create a context
Hashtable data = new Hashtable();
data.put(Context.INITIAL_CONTEXT_FactORY,context);
data.put(Context.PROVIDER_URL,serverName);
try {
jndi = new InitialContext(data);
} catch (NamingException e) {
LOGGER.error("Failed to initialize JNDI context:: ",e);
}
}
if (factory == null) {
// Look up JMS connection factory
/*
* factory = (QueueConnectionFactory)PortableRemoteObject.narrow( jndi.lookup(
* factoryName ),QueueConnectionFactory.class );
*/
try {
factory = (QueueConnectionFactory) (jndi.lookup(factoryName));
} catch (NamingException e) {
LOGGER.error("Error in Creating weblogic JNDI Connection factory:: ",e);
//throw new BeanCreationException("JmsTemplate","Failed to create a JmsTemplate",e);
}
}
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(factory);
JmsTemplate wlsJmsTemplate = new JmsTemplate();
wlsJmsTemplate.setConnectionFactory(cachingConnectionFactory);
return wlsJmsTemplate;
}
@Bean
public IntegrationFlow processFile() {
return IntegrationFlows
.from("fileInputChannel")
.transform(fileToStringTransformer())
.handle("fileProcessor","process")
.log(LoggingHandler.Level.INFO,"process file",m -> m.getHeaders().get("Message_Type"))
.channel(this.jmsOutboundChannel())
.get();
}
@Bean
public IntegrationFlow sendToJmsQueue(JmsTemplate wlsJmsTemplate) {
return IntegrationFlows.from(this.jmsOutboundChannel())
.log(LoggingHandler.Level.INFO,"sending to queue",m ->
m.getHeaders().get("Message_Type"))
//.channel(this.jmsOutboundChannel())
// want inboundDataQueue to be determined at runtime
.handle(Jms.outboundAdapter(wlsJmsTemplate).destination(inboundDataQueue),e -> e.advice(expressionAdvice()))
// .handleWithAdapter(adapters ->
// adapters.jms(wlsJmsTemplate).destination("outQueue"))
.get();
}
/**
* @param path
* @param fileExt
* @return watch for files created in configured directory and load then send to
* processing
*/
@Bean
@InboundChannelAdapter(value = "fileInputChannel",poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource(@Value("${file.poller.path}") final String path,@Value("${file.poller.fileName-pattern}") final String fileExt) {
CompositeFileListFilter<File> filters = new CompositeFileListFilter<>();
filters.addFilter(new SimplePatternFileListFilter(fileExt));
// filters.addFilter(new acceptOnceFileListFilter<File>());
FileReadingMessageSource source = new FileReadingMessageSource();
source.setautoCreateDirectory(false);
source.setDirectory(new File(path));
source.setfilter(filters);
source.setUseWatchService(true);
source.setWatchEvents(WatchEventType.CREATE);
System.out.println(path);
return source;
}
/**
* this method will evaluate the further action on successful or failure delivery to JMS Queue.
* Spel will rename the the file in Source Directory accordingly
*/
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("headers['file_originalFile'].renameTo(new java.io.File(headers['file_originalFile'].absolutePath + '.done'))");
advice.setfailureChannelName("failure.input");
advice.setOnFailureExpressionString("headers['file_originalFile'].renameTo(new java.io.File(headers['file_originalFile'].absolutePath + '.failed.to.send'))");
advice.setTrapException(true);
return advice;
}
}