如何在 Spring Boot 中从 ActiveMQ 队列中读取挂起的消息
•浏览 1
How to read pending messages from an ActiveMQ queue in Spring Boot
我喜欢使用 Spring 引导读取 ActiveMQ 队列中的待处理(未确认)消息。怎么做?
到目前为止,我可以在消息发送到队列的那一刻读取消息:
@JmsListener(destination ="LOCAL.TEST",
containerFactory ="myJmsListenerContainerFactory")
public void receiveMessage(final Message jsonMessage) throws JMSException {
String messageData = null;
// jsonMessage.acknowledge(); // dont consume message (for testing)
LOGGER.info("=== Received message {}", jsonMessage);
}@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
return activeMQConnectionFactory;
}@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getActiveMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}@Autowired
private JmsTemplate jmsTemplate;
public void send(String destination, String message) {
LOGGER.info("sending message='{}' to destination='{}'", message, destination);
jmsTemplate.convertAndSend(destination, message);
}@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(getActiveMQConnectionFactory());
return template;
} ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
Connection connection = connectionFactory.createConnection("admin","admin");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("listenerQueue");
MessageConsumer consumer = session.createConsumer(destination);
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration elems = browser.getEnumeration();
while (elems.hasMoreElements()) {
Message message = (Message) consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Incoming Message: '" + textMessage.getText() +"'");
message.acknowledge();
}
}
connection.close();<!-- Dependencies to setup JMS and active mq environment -->
<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
activemq-broker</artifactId>
</dependency>@Bean
public JmsListenerContainerFactory< ? > myFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
logger.info("configuring jms connection factory....");
// anonymous class
factory.setErrorHandler(
new ErrorHandler() {
@Override
public void handleError(Throwable t) {
logger.error("An error has occurred in the transaction", t);
}
});
// lambda function
factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
configurer.configure(factory, connectionFactory);
return factory;
}
// Serialize message content to json using TextMessage
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
jmsTemplate.convertAndSend("anyQueueName","value2");
...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
public void receiveMessage(String user) {
System.out.println("Received <" + user +">");
}import javax.jms.TextMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
public void readMessageFromQueue(){
jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
@Override
public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
Enumeration<TextMessage> messages = browser.getEnumeration();
while (messages.hasMoreElements()) {
System.out.println("message found : -"+ messages.nextElement().getText());
}
}
});
}
为 mq-connection 使用标准配置:
@JmsListener(destination ="LOCAL.TEST",
containerFactory ="myJmsListenerContainerFactory")
public void receiveMessage(final Message jsonMessage) throws JMSException {
String messageData = null;
// jsonMessage.acknowledge(); // dont consume message (for testing)
LOGGER.info("=== Received message {}", jsonMessage);
}@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
return activeMQConnectionFactory;
}@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getActiveMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}@Autowired
private JmsTemplate jmsTemplate;
public void send(String destination, String message) {
LOGGER.info("sending message='{}' to destination='{}'", message, destination);
jmsTemplate.convertAndSend(destination, message);
}@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(getActiveMQConnectionFactory());
return template;
} ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
Connection connection = connectionFactory.createConnection("admin","admin");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("listenerQueue");
MessageConsumer consumer = session.createConsumer(destination);
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration elems = browser.getEnumeration();
while (elems.hasMoreElements()) {
Message message = (Message) consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Incoming Message: '" + textMessage.getText() +"'");
message.acknowledge();
}
}
connection.close();<!-- Dependencies to setup JMS and active mq environment -->
<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
activemq-broker</artifactId>
</dependency>@Bean
public JmsListenerContainerFactory< ? > myFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
logger.info("configuring jms connection factory....");
// anonymous class
factory.setErrorHandler(
new ErrorHandler() {
@Override
public void handleError(Throwable t) {
logger.error("An error has occurred in the transaction", t);
}
});
// lambda function
factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
configurer.configure(factory, connectionFactory);
return factory;
}
// Serialize message content to json using TextMessage
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
jmsTemplate.convertAndSend("anyQueueName","value2");
...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
public void receiveMessage(String user) {
System.out.println("Received <" + user +">");
}import javax.jms.TextMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
public void readMessageFromQueue(){
jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
@Override
public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
Enumeration<TextMessage> messages = browser.getEnumeration();
while (messages.hasMoreElements()) {
System.out.println("message found : -"+ messages.nextElement().getText());
}
}
});
}
和一个标准的 ListenerContainerFactory:
@JmsListener(destination ="LOCAL.TEST",
containerFactory ="myJmsListenerContainerFactory")
public void receiveMessage(final Message jsonMessage) throws JMSException {
String messageData = null;
// jsonMessage.acknowledge(); // dont consume message (for testing)
LOGGER.info("=== Received message {}", jsonMessage);
}@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
return activeMQConnectionFactory;
}@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getActiveMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}@Autowired
private JmsTemplate jmsTemplate;
public void send(String destination, String message) {
LOGGER.info("sending message='{}' to destination='{}'", message, destination);
jmsTemplate.convertAndSend(destination, message);
}@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(getActiveMQConnectionFactory());
return template;
} ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
Connection connection = connectionFactory.createConnection("admin","admin");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("listenerQueue");
MessageConsumer consumer = session.createConsumer(destination);
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration elems = browser.getEnumeration();
while (elems.hasMoreElements()) {
Message message = (Message) consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Incoming Message: '" + textMessage.getText() +"'");
message.acknowledge();
}
}
connection.close();<!-- Dependencies to setup JMS and active mq environment -->
<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
activemq-broker</artifactId>
</dependency>@Bean
public JmsListenerContainerFactory< ? > myFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
logger.info("configuring jms connection factory....");
// anonymous class
factory.setErrorHandler(
new ErrorHandler() {
@Override
public void handleError(Throwable t) {
logger.error("An error has occurred in the transaction", t);
}
});
// lambda function
factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
configurer.configure(factory, connectionFactory);
return factory;
}
// Serialize message content to json using TextMessage
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
jmsTemplate.convertAndSend("anyQueueName","value2");
...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
public void receiveMessage(String user) {
System.out.println("Received <" + user +">");
}import javax.jms.TextMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
public void readMessageFromQueue(){
jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
@Override
public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
Enumeration<TextMessage> messages = browser.getEnumeration();
while (messages.hasMoreElements()) {
System.out.println("message found : -"+ messages.nextElement().getText());
}
}
});
}
但是,如果我使用
手动发送一条消息,这只会记录一条消息
@JmsListener(destination ="LOCAL.TEST",
containerFactory ="myJmsListenerContainerFactory")
public void receiveMessage(final Message jsonMessage) throws JMSException {
String messageData = null;
// jsonMessage.acknowledge(); // dont consume message (for testing)
LOGGER.info("=== Received message {}", jsonMessage);
}@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
return activeMQConnectionFactory;
}@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getActiveMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}@Autowired
private JmsTemplate jmsTemplate;
public void send(String destination, String message) {
LOGGER.info("sending message='{}' to destination='{}'", message, destination);
jmsTemplate.convertAndSend(destination, message);
}@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(getActiveMQConnectionFactory());
return template;
} ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
Connection connection = connectionFactory.createConnection("admin","admin");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("listenerQueue");
MessageConsumer consumer = session.createConsumer(destination);
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration elems = browser.getEnumeration();
while (elems.hasMoreElements()) {
Message message = (Message) consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Incoming Message: '" + textMessage.getText() +"'");
message.acknowledge();
}
}
connection.close();<!-- Dependencies to setup JMS and active mq environment -->
<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
activemq-broker</artifactId>
</dependency>@Bean
public JmsListenerContainerFactory< ? > myFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
logger.info("configuring jms connection factory....");
// anonymous class
factory.setErrorHandler(
new ErrorHandler() {
@Override
public void handleError(Throwable t) {
logger.error("An error has occurred in the transaction", t);
}
});
// lambda function
factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
configurer.configure(factory, connectionFactory);
return factory;
}
// Serialize message content to json using TextMessage
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
jmsTemplate.convertAndSend("anyQueueName","value2");
...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
public void receiveMessage(String user) {
System.out.println("Received <" + user +">");
}import javax.jms.TextMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
public void readMessageFromQueue(){
jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
@Override
public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
Enumeration<TextMessage> messages = browser.getEnumeration();
while (messages.hasMoreElements()) {
System.out.println("message found : -"+ messages.nextElement().getText());
}
}
});
}
使用标准模板
@JmsListener(destination ="LOCAL.TEST",
containerFactory ="myJmsListenerContainerFactory")
public void receiveMessage(final Message jsonMessage) throws JMSException {
String messageData = null;
// jsonMessage.acknowledge(); // dont consume message (for testing)
LOGGER.info("=== Received message {}", jsonMessage);
}@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
return activeMQConnectionFactory;
}@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getActiveMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}@Autowired
private JmsTemplate jmsTemplate;
public void send(String destination, String message) {
LOGGER.info("sending message='{}' to destination='{}'", message, destination);
jmsTemplate.convertAndSend(destination, message);
}@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(getActiveMQConnectionFactory());
return template;
} ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
Connection connection = connectionFactory.createConnection("admin","admin");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("listenerQueue");
MessageConsumer consumer = session.createConsumer(destination);
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration elems = browser.getEnumeration();
while (elems.hasMoreElements()) {
Message message = (Message) consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Incoming Message: '" + textMessage.getText() +"'");
message.acknowledge();
}
}
connection.close();<!-- Dependencies to setup JMS and active mq environment -->
<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
activemq-broker</artifactId>
</dependency>@Bean
public JmsListenerContainerFactory< ? > myFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
logger.info("configuring jms connection factory....");
// anonymous class
factory.setErrorHandler(
new ErrorHandler() {
@Override
public void handleError(Throwable t) {
logger.error("An error has occurred in the transaction", t);
}
});
// lambda function
factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
configurer.configure(factory, connectionFactory);
return factory;
}
// Serialize message content to json using TextMessage
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
jmsTemplate.convertAndSend("anyQueueName","value2");
...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
public void receiveMessage(String user) {
System.out.println("Received <" + user +">");
}import javax.jms.TextMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
public void readMessageFromQueue(){
jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
@Override
public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
Enumeration<TextMessage> messages = browser.getEnumeration();
while (messages.hasMoreElements()) {
System.out.println("message found : -"+ messages.nextElement().getText());
}
}
});
}
我无法读取之前发送的仍在队列中的消息(因为我没有.acknowledge()它??们)...
JMS 支持"浏览"消息,这似乎是您想要的功能。因此,您应该更改 Spring 应用程序以使用 QueueBrowser 而不是实际使用消息。
要阅读所有待处理的消息,你可以这样做
@JmsListener(destination ="LOCAL.TEST",
containerFactory ="myJmsListenerContainerFactory")
public void receiveMessage(final Message jsonMessage) throws JMSException {
String messageData = null;
// jsonMessage.acknowledge(); // dont consume message (for testing)
LOGGER.info("=== Received message {}", jsonMessage);
}@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
return activeMQConnectionFactory;
}@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getActiveMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}@Autowired
private JmsTemplate jmsTemplate;
public void send(String destination, String message) {
LOGGER.info("sending message='{}' to destination='{}'", message, destination);
jmsTemplate.convertAndSend(destination, message);
}@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(getActiveMQConnectionFactory());
return template;
} ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
Connection connection = connectionFactory.createConnection("admin","admin");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("listenerQueue");
MessageConsumer consumer = session.createConsumer(destination);
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration elems = browser.getEnumeration();
while (elems.hasMoreElements()) {
Message message = (Message) consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Incoming Message: '" + textMessage.getText() +"'");
message.acknowledge();
}
}
connection.close();<!-- Dependencies to setup JMS and active mq environment -->
<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
activemq-broker</artifactId>
</dependency>@Bean
public JmsListenerContainerFactory< ? > myFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
logger.info("configuring jms connection factory....");
// anonymous class
factory.setErrorHandler(
new ErrorHandler() {
@Override
public void handleError(Throwable t) {
logger.error("An error has occurred in the transaction", t);
}
});
// lambda function
factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
configurer.configure(factory, connectionFactory);
return factory;
}
// Serialize message content to json using TextMessage
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
jmsTemplate.convertAndSend("anyQueueName","value2");
...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
public void receiveMessage(String user) {
System.out.println("Received <" + user +">");
}import javax.jms.TextMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
public void readMessageFromQueue(){
jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
@Override
public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
Enumeration<TextMessage> messages = browser.getEnumeration();
while (messages.hasMoreElements()) {
System.out.println("message found : -"+ messages.nextElement().getText());
}
}
});
}
一步一步实现Spring boot ActiveMQ。让我们编写一些代码使其更清晰。这将有助于仅读取当前会话中的所有待处理消息。
- 在 pom.xml 文件中添加这些依赖项。
@JmsListener(destination ="LOCAL.TEST",
containerFactory ="myJmsListenerContainerFactory")
public void receiveMessage(final Message jsonMessage) throws JMSException {
String messageData = null;
// jsonMessage.acknowledge(); // dont consume message (for testing)
LOGGER.info("=== Received message {}", jsonMessage);
}@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
return activeMQConnectionFactory;
}@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getActiveMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}@Autowired
private JmsTemplate jmsTemplate;
public void send(String destination, String message) {
LOGGER.info("sending message='{}' to destination='{}'", message, destination);
jmsTemplate.convertAndSend(destination, message);
}@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(getActiveMQConnectionFactory());
return template;
} ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
Connection connection = connectionFactory.createConnection("admin","admin");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("listenerQueue");
MessageConsumer consumer = session.createConsumer(destination);
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration elems = browser.getEnumeration();
while (elems.hasMoreElements()) {
Message message = (Message) consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Incoming Message: '" + textMessage.getText() +"'");
message.acknowledge();
}
}
connection.close();<!-- Dependencies to setup JMS and active mq environment -->
<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
activemq-broker</artifactId>
</dependency>@Bean
public JmsListenerContainerFactory< ? > myFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
logger.info("configuring jms connection factory....");
// anonymous class
factory.setErrorHandler(
new ErrorHandler() {
@Override
public void handleError(Throwable t) {
logger.error("An error has occurred in the transaction", t);
}
});
// lambda function
factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
configurer.configure(factory, connectionFactory);
return factory;
}
// Serialize message content to json using TextMessage
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
jmsTemplate.convertAndSend("anyQueueName","value2");
...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
public void receiveMessage(String user) {
System.out.println("Received <" + user +">");
}import javax.jms.TextMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
public void readMessageFromQueue(){
jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
@Override
public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
Enumeration<TextMessage> messages = browser.getEnumeration();
while (messages.hasMoreElements()) {
System.out.println("message found : -"+ messages.nextElement().getText());
}
}
});
}
@JmsListener(destination ="LOCAL.TEST",
containerFactory ="myJmsListenerContainerFactory")
public void receiveMessage(final Message jsonMessage) throws JMSException {
String messageData = null;
// jsonMessage.acknowledge(); // dont consume message (for testing)
LOGGER.info("=== Received message {}", jsonMessage);
}@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
return activeMQConnectionFactory;
}@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getActiveMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}@Autowired
private JmsTemplate jmsTemplate;
public void send(String destination, String message) {
LOGGER.info("sending message='{}' to destination='{}'", message, destination);
jmsTemplate.convertAndSend(destination, message);
}@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(getActiveMQConnectionFactory());
return template;
} ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
Connection connection = connectionFactory.createConnection("admin","admin");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("listenerQueue");
MessageConsumer consumer = session.createConsumer(destination);
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration elems = browser.getEnumeration();
while (elems.hasMoreElements()) {
Message message = (Message) consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Incoming Message: '" + textMessage.getText() +"'");
message.acknowledge();
}
}
connection.close();<!-- Dependencies to setup JMS and active mq environment -->
<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
activemq-broker</artifactId>
</dependency>@Bean
public JmsListenerContainerFactory< ? > myFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
logger.info("configuring jms connection factory....");
// anonymous class
factory.setErrorHandler(
new ErrorHandler() {
@Override
public void handleError(Throwable t) {
logger.error("An error has occurred in the transaction", t);
}
});
// lambda function
factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
configurer.configure(factory, connectionFactory);
return factory;
}
// Serialize message content to json using TextMessage
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
jmsTemplate.convertAndSend("anyQueueName","value2");
...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
public void receiveMessage(String user) {
System.out.println("Received <" + user +">");
}import javax.jms.TextMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
public void readMessageFromQueue(){
jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
@Override
public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
Enumeration<TextMessage> messages = browser.getEnumeration();
while (messages.hasMoreElements()) {
System.out.println("message found : -"+ messages.nextElement().getText());
}
}
});
}
@JmsListener(destination ="LOCAL.TEST",
containerFactory ="myJmsListenerContainerFactory")
public void receiveMessage(final Message jsonMessage) throws JMSException {
String messageData = null;
// jsonMessage.acknowledge(); // dont consume message (for testing)
LOGGER.info("=== Received message {}", jsonMessage);
}@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
return activeMQConnectionFactory;
}@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getActiveMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}@Autowired
private JmsTemplate jmsTemplate;
public void send(String destination, String message) {
LOGGER.info("sending message='{}' to destination='{}'", message, destination);
jmsTemplate.convertAndSend(destination, message);
}@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(getActiveMQConnectionFactory());
return template;
} ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
Connection connection = connectionFactory.createConnection("admin","admin");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("listenerQueue");
MessageConsumer consumer = session.createConsumer(destination);
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration elems = browser.getEnumeration();
while (elems.hasMoreElements()) {
Message message = (Message) consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Incoming Message: '" + textMessage.getText() +"'");
message.acknowledge();
}
}
connection.close();<!-- Dependencies to setup JMS and active mq environment -->
<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
activemq-broker</artifactId>
</dependency>@Bean
public JmsListenerContainerFactory< ? > myFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
logger.info("configuring jms connection factory....");
// anonymous class
factory.setErrorHandler(
new ErrorHandler() {
@Override
public void handleError(Throwable t) {
logger.error("An error has occurred in the transaction", t);
}
});
// lambda function
factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
configurer.configure(factory, connectionFactory);
return factory;
}
// Serialize message content to json using TextMessage
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
jmsTemplate.convertAndSend("anyQueueName","value2");
...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
public void receiveMessage(String user) {
System.out.println("Received <" + user +">");
}import javax.jms.TextMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
public void readMessageFromQueue(){
jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
@Override
public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
Enumeration<TextMessage> messages = browser.getEnumeration();
while (messages.hasMoreElements()) {
System.out.println("message found : -"+ messages.nextElement().getText());
}
}
});
}
@JmsListener(destination ="LOCAL.TEST",
containerFactory ="myJmsListenerContainerFactory")
public void receiveMessage(final Message jsonMessage) throws JMSException {
String messageData = null;
// jsonMessage.acknowledge(); // dont consume message (for testing)
LOGGER.info("=== Received message {}", jsonMessage);
}@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
return activeMQConnectionFactory;
}@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getActiveMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}@Autowired
private JmsTemplate jmsTemplate;
public void send(String destination, String message) {
LOGGER.info("sending message='{}' to destination='{}'", message, destination);
jmsTemplate.convertAndSend(destination, message);
}@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(getActiveMQConnectionFactory());
return template;
} ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
Connection connection = connectionFactory.createConnection("admin","admin");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("listenerQueue");
MessageConsumer consumer = session.createConsumer(destination);
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration elems = browser.getEnumeration();
while (elems.hasMoreElements()) {
Message message = (Message) consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Incoming Message: '" + textMessage.getText() +"'");
message.acknowledge();
}
}
connection.close();<!-- Dependencies to setup JMS and active mq environment -->
<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
activemq-broker</artifactId>
</dependency>@Bean
public JmsListenerContainerFactory< ? > myFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
logger.info("configuring jms connection factory....");
// anonymous class
factory.setErrorHandler(
new ErrorHandler() {
@Override
public void handleError(Throwable t) {
logger.error("An error has occurred in the transaction", t);
}
});
// lambda function
factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
configurer.configure(factory, connectionFactory);
return factory;
}
// Serialize message content to json using TextMessage
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
jmsTemplate.convertAndSend("anyQueueName","value2");
...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
public void receiveMessage(String user) {
System.out.println("Received <" + user +">");
}import javax.jms.TextMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
public void readMessageFromQueue(){
jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
@Override
public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
Enumeration<TextMessage> messages = browser.getEnumeration();
while (messages.hasMoreElements()) {
System.out.println("message found : -"+ messages.nextElement().getText());
}
}
});
}
@JmsListener(destination ="LOCAL.TEST",
containerFactory ="myJmsListenerContainerFactory")
public void receiveMessage(final Message jsonMessage) throws JMSException {
String messageData = null;
// jsonMessage.acknowledge(); // dont consume message (for testing)
LOGGER.info("=== Received message {}", jsonMessage);
}@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
return activeMQConnectionFactory;
}@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getActiveMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}@Autowired
private JmsTemplate jmsTemplate;
public void send(String destination, String message) {
LOGGER.info("sending message='{}' to destination='{}'", message, destination);
jmsTemplate.convertAndSend(destination, message);
}@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(getActiveMQConnectionFactory());
return template;
} ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
Connection connection = connectionFactory.createConnection("admin","admin");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("listenerQueue");
MessageConsumer consumer = session.createConsumer(destination);
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration elems = browser.getEnumeration();
while (elems.hasMoreElements()) {
Message message = (Message) consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Incoming Message: '" + textMessage.getText() +"'");
message.acknowledge();
}
}
connection.close();<!-- Dependencies to setup JMS and active mq environment -->
<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
activemq-broker</artifactId>
</dependency>@Bean
public JmsListenerContainerFactory< ? > myFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
logger.info("configuring jms connection factory....");
// anonymous class
factory.setErrorHandler(
new ErrorHandler() {
@Override
public void handleError(Throwable t) {
logger.error("An error has occurred in the transaction", t);
}
});
// lambda function
factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
configurer.configure(factory, connectionFactory);
return factory;
}
// Serialize message content to json using TextMessage
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
jmsTemplate.convertAndSend("anyQueueName","value2");
...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
public void receiveMessage(String user) {
System.out.println("Received <" + user +">");
}import javax.jms.TextMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
public void readMessageFromQueue(){
jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
@Override
public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
Enumeration<TextMessage> messages = browser.getEnumeration();
while (messages.hasMoreElements()) {
System.out.println("message found : -"+ messages.nextElement().getText());
}
}
});
}
@JmsListener(destination ="LOCAL.TEST",
containerFactory ="myJmsListenerContainerFactory")
public void receiveMessage(final Message jsonMessage) throws JMSException {
String messageData = null;
// jsonMessage.acknowledge(); // dont consume message (for testing)
LOGGER.info("=== Received message {}", jsonMessage);
}@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
return activeMQConnectionFactory;
}@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getActiveMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}@Autowired
private JmsTemplate jmsTemplate;
public void send(String destination, String message) {
LOGGER.info("sending message='{}' to destination='{}'", message, destination);
jmsTemplate.convertAndSend(destination, message);
}@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(getActiveMQConnectionFactory());
return template;
} ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
Connection connection = connectionFactory.createConnection("admin","admin");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("listenerQueue");
MessageConsumer consumer = session.createConsumer(destination);
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration elems = browser.getEnumeration();
while (elems.hasMoreElements()) {
Message message = (Message) consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Incoming Message: '" + textMessage.getText() +"'");
message.acknowledge();
}
}
connection.close();<!-- Dependencies to setup JMS and active mq environment -->
<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
activemq-broker</artifactId>
</dependency>@Bean
public JmsListenerContainerFactory< ? > myFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
logger.info("configuring jms connection factory....");
// anonymous class
factory.setErrorHandler(
new ErrorHandler() {
@Override
public void handleError(Throwable t) {
logger.error("An error has occurred in the transaction", t);
}
});
// lambda function
factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
configurer.configure(factory, connectionFactory);
return factory;
}
// Serialize message content to json using TextMessage
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
jmsTemplate.convertAndSend("anyQueueName","value2");
...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
public void receiveMessage(String user) {
System.out.println("Received <" + user +">");
}import javax.jms.TextMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
public void readMessageFromQueue(){
jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
@Override
public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
Enumeration<TextMessage> messages = browser.getEnumeration();
while (messages.hasMoreElements()) {
System.out.println("message found : -"+ messages.nextElement().getText());
}
}
});
}
@JmsListener(destination ="LOCAL.TEST",
containerFactory ="myJmsListenerContainerFactory")
public void receiveMessage(final Message jsonMessage) throws JMSException {
String messageData = null;
// jsonMessage.acknowledge(); // dont consume message (for testing)
LOGGER.info("=== Received message {}", jsonMessage);
}@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
return activeMQConnectionFactory;
}@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(getActiveMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}@Autowired
private JmsTemplate jmsTemplate;
public void send(String destination, String message) {
LOGGER.info("sending message='{}' to destination='{}'", message, destination);
jmsTemplate.convertAndSend(destination, message);
}@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(getActiveMQConnectionFactory());
return template;
} ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
Connection connection = connectionFactory.createConnection("admin","admin");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("listenerQueue");
MessageConsumer consumer = session.createConsumer(destination);
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration elems = browser.getEnumeration();
while (elems.hasMoreElements()) {
Message message = (Message) consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Incoming Message: '" + textMessage.getText() +"'");
message.acknowledge();
}
}
connection.close();<!-- Dependencies to setup JMS and active mq environment -->
<dependency>
<groupId>org.springframework.boot</groupId>
spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
activemq-broker</artifactId>
</dependency>@Bean
public JmsListenerContainerFactory< ? > myFactory(
ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
logger.info("configuring jms connection factory....");
// anonymous class
factory.setErrorHandler(
new ErrorHandler() {
@Override
public void handleError(Throwable t) {
logger.error("An error has occurred in the transaction", t);
}
});
// lambda function
factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
configurer.configure(factory, connectionFactory);
return factory;
}
// Serialize message content to json using TextMessage
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
jmsTemplate.convertAndSend("anyQueueName","value2");
...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
public void receiveMessage(String user) {
System.out.println("Received <" + user +">");
}import javax.jms.TextMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
public void readMessageFromQueue(){
jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
@Override
public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
Enumeration<TextMessage> messages = browser.getEnumeration();
while (messages.hasMoreElements()) {
System.out.println("message found : -"+ messages.nextElement().getText());
}
}
});
}
输出:-
找到消息:- value1
找到消息:- value2
-快乐编码
如果未确认消息将不会重新发送。在