如何在 Spring Boot 中从 ActiveMQ 队列中读取挂起的消息

How to read pending messages from an ActiveMQ queue in Spring Boot

我喜欢使用 Spring 引导读取 ActiveMQ 队列中的待处理(未确认)消息。怎么做?

到目前为止,我可以在消息发送到队列的那一刻读取消息:

@JmsListener(destination ="LOCAL.TEST", 

 containerFactory ="myJmsListenerContainerFactory")

public void receiveMessage(final Message jsonMessage) throws JMSException {

  String messageData = null;

  // jsonMessage.acknowledge(); // dont consume message (for testing)

  LOGGER.info("=== Received message {}", jsonMessage);

}@Bean

public ActiveMQConnectionFactory getActiveMQConnectionFactory() {

  ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();

  activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);

  return activeMQConnectionFactory;

}@Bean

public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {

 DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

 factory.setConnectionFactory(getActiveMQConnectionFactory());

 factory.setConcurrency("1-1");

 return factory;

}@Autowired

private JmsTemplate jmsTemplate;



public void send(String destination, String message) {

  LOGGER.info("sending message='{}' to destination='{}'", message, destination);

  jmsTemplate.convertAndSend(destination, message);

}@Bean

public JmsTemplate jmsTemplate() {

 JmsTemplate template = new JmsTemplate();

 template.setConnectionFactory(getActiveMQConnectionFactory());

 return template;

}  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");

  Connection connection = connectionFactory.createConnection("admin","admin");

  connection.start();

  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

  Destination destination = session.createQueue("listenerQueue");

  MessageConsumer consumer = session.createConsumer(destination);



  QueueBrowser browser = session.createBrowser((Queue) destination);

  Enumeration elems = browser.getEnumeration();

  while (elems.hasMoreElements()) {

    Message message = (Message) consumer.receive();



    if (message instanceof TextMessage) {

      TextMessage textMessage = (TextMessage) message;

      System.out.println("Incoming Message: '" + textMessage.getText() +"'");

      message.acknowledge();

    }

  }

  connection.close();<!-- Dependencies to setup JMS and active mq environment -->

    <dependency>

      <groupId>org.springframework.boot</groupId>

      spring-boot-starter-activemq</artifactId>

    </dependency>

    <dependency>

      <groupId>org.apache.activemq</groupId>

      activemq-broker</artifactId>

    </dependency>@Bean

  public JmsListenerContainerFactory< ? > myFactory(

    ConnectionFactory connectionFactory,

    DefaultJmsListenerContainerFactoryConfigurer configurer) {

   DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

   logger.info("configuring jms connection factory....");

   // anonymous class

   factory.setErrorHandler(

       new ErrorHandler() {

         @Override

         public void handleError(Throwable t) {

           logger.error("An error has occurred in the transaction", t);

         }

       });

   // lambda function

   factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));

   configurer.configure(factory, connectionFactory);



   return factory;

  }



  // Serialize message content to json using TextMessage

  @Bean

  public MessageConverter jacksonJmsMessageConverter() {

   MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();

   converter.setTargetType(MessageType.TEXT);

   converter.setTypeIdPropertyName("_type");

   return converter;

  }spring.activemq.user=admin

spring.activemq.password=admin

spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired

private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");

jmsTemplate.convertAndSend("anyQueueName","value2");

...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")

  public void receiveMessage(String user) {

    System.out.println("Received <" + user +">");

  }import javax.jms.TextMessage;

import javax.jms.QueueBrowser;

import javax.jms.Session;

import javax.jms.TextMessage;



public void readMessageFromQueue(){

jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {

      @Override

      public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {

        Enumeration<TextMessage> messages = browser.getEnumeration();

        while (messages.hasMoreElements()) {

          System.out.println("message found : -"+ messages.nextElement().getText());

        }

      }

    });

}

为 mq-connection 使用标准配置:

@JmsListener(destination ="LOCAL.TEST", 

 containerFactory ="myJmsListenerContainerFactory")

public void receiveMessage(final Message jsonMessage) throws JMSException {

  String messageData = null;

  // jsonMessage.acknowledge(); // dont consume message (for testing)

  LOGGER.info("=== Received message {}", jsonMessage);

}@Bean

public ActiveMQConnectionFactory getActiveMQConnectionFactory() {

  ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();

  activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);

  return activeMQConnectionFactory;

}@Bean

public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {

 DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

 factory.setConnectionFactory(getActiveMQConnectionFactory());

 factory.setConcurrency("1-1");

 return factory;

}@Autowired

private JmsTemplate jmsTemplate;



public void send(String destination, String message) {

  LOGGER.info("sending message='{}' to destination='{}'", message, destination);

  jmsTemplate.convertAndSend(destination, message);

}@Bean

public JmsTemplate jmsTemplate() {

 JmsTemplate template = new JmsTemplate();

 template.setConnectionFactory(getActiveMQConnectionFactory());

 return template;

}  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");

  Connection connection = connectionFactory.createConnection("admin","admin");

  connection.start();

  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

  Destination destination = session.createQueue("listenerQueue");

  MessageConsumer consumer = session.createConsumer(destination);



  QueueBrowser browser = session.createBrowser((Queue) destination);

  Enumeration elems = browser.getEnumeration();

  while (elems.hasMoreElements()) {

    Message message = (Message) consumer.receive();



    if (message instanceof TextMessage) {

      TextMessage textMessage = (TextMessage) message;

      System.out.println("Incoming Message: '" + textMessage.getText() +"'");

      message.acknowledge();

    }

  }

  connection.close();<!-- Dependencies to setup JMS and active mq environment -->

    <dependency>

      <groupId>org.springframework.boot</groupId>

      spring-boot-starter-activemq</artifactId>

    </dependency>

    <dependency>

      <groupId>org.apache.activemq</groupId>

      activemq-broker</artifactId>

    </dependency>@Bean

  public JmsListenerContainerFactory< ? > myFactory(

    ConnectionFactory connectionFactory,

    DefaultJmsListenerContainerFactoryConfigurer configurer) {

   DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

   logger.info("configuring jms connection factory....");

   // anonymous class

   factory.setErrorHandler(

       new ErrorHandler() {

         @Override

         public void handleError(Throwable t) {

           logger.error("An error has occurred in the transaction", t);

         }

       });

   // lambda function

   factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));

   configurer.configure(factory, connectionFactory);



   return factory;

  }



  // Serialize message content to json using TextMessage

  @Bean

  public MessageConverter jacksonJmsMessageConverter() {

   MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();

   converter.setTargetType(MessageType.TEXT);

   converter.setTypeIdPropertyName("_type");

   return converter;

  }spring.activemq.user=admin

spring.activemq.password=admin

spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired

private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");

jmsTemplate.convertAndSend("anyQueueName","value2");

...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")

  public void receiveMessage(String user) {

    System.out.println("Received <" + user +">");

  }import javax.jms.TextMessage;

import javax.jms.QueueBrowser;

import javax.jms.Session;

import javax.jms.TextMessage;



public void readMessageFromQueue(){

jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {

      @Override

      public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {

        Enumeration<TextMessage> messages = browser.getEnumeration();

        while (messages.hasMoreElements()) {

          System.out.println("message found : -"+ messages.nextElement().getText());

        }

      }

    });

}

和一个标准的 ListenerContainerFactory:

@JmsListener(destination ="LOCAL.TEST", 

 containerFactory ="myJmsListenerContainerFactory")

public void receiveMessage(final Message jsonMessage) throws JMSException {

  String messageData = null;

  // jsonMessage.acknowledge(); // dont consume message (for testing)

  LOGGER.info("=== Received message {}", jsonMessage);

}@Bean

public ActiveMQConnectionFactory getActiveMQConnectionFactory() {

  ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();

  activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);

  return activeMQConnectionFactory;

}@Bean

public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {

 DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

 factory.setConnectionFactory(getActiveMQConnectionFactory());

 factory.setConcurrency("1-1");

 return factory;

}@Autowired

private JmsTemplate jmsTemplate;



public void send(String destination, String message) {

  LOGGER.info("sending message='{}' to destination='{}'", message, destination);

  jmsTemplate.convertAndSend(destination, message);

}@Bean

public JmsTemplate jmsTemplate() {

 JmsTemplate template = new JmsTemplate();

 template.setConnectionFactory(getActiveMQConnectionFactory());

 return template;

}  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");

  Connection connection = connectionFactory.createConnection("admin","admin");

  connection.start();

  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

  Destination destination = session.createQueue("listenerQueue");

  MessageConsumer consumer = session.createConsumer(destination);



  QueueBrowser browser = session.createBrowser((Queue) destination);

  Enumeration elems = browser.getEnumeration();

  while (elems.hasMoreElements()) {

    Message message = (Message) consumer.receive();



    if (message instanceof TextMessage) {

      TextMessage textMessage = (TextMessage) message;

      System.out.println("Incoming Message: '" + textMessage.getText() +"'");

      message.acknowledge();

    }

  }

  connection.close();<!-- Dependencies to setup JMS and active mq environment -->

    <dependency>

      <groupId>org.springframework.boot</groupId>

      spring-boot-starter-activemq</artifactId>

    </dependency>

    <dependency>

      <groupId>org.apache.activemq</groupId>

      activemq-broker</artifactId>

    </dependency>@Bean

  public JmsListenerContainerFactory< ? > myFactory(

    ConnectionFactory connectionFactory,

    DefaultJmsListenerContainerFactoryConfigurer configurer) {

   DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

   logger.info("configuring jms connection factory....");

   // anonymous class

   factory.setErrorHandler(

       new ErrorHandler() {

         @Override

         public void handleError(Throwable t) {

           logger.error("An error has occurred in the transaction", t);

         }

       });

   // lambda function

   factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));

   configurer.configure(factory, connectionFactory);



   return factory;

  }



  // Serialize message content to json using TextMessage

  @Bean

  public MessageConverter jacksonJmsMessageConverter() {

   MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();

   converter.setTargetType(MessageType.TEXT);

   converter.setTypeIdPropertyName("_type");

   return converter;

  }spring.activemq.user=admin

spring.activemq.password=admin

spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired

private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");

jmsTemplate.convertAndSend("anyQueueName","value2");

...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")

  public void receiveMessage(String user) {

    System.out.println("Received <" + user +">");

  }import javax.jms.TextMessage;

import javax.jms.QueueBrowser;

import javax.jms.Session;

import javax.jms.TextMessage;



public void readMessageFromQueue(){

jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {

      @Override

      public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {

        Enumeration<TextMessage> messages = browser.getEnumeration();

        while (messages.hasMoreElements()) {

          System.out.println("message found : -"+ messages.nextElement().getText());

        }

      }

    });

}

但是,如果我使用

手动发送一条消息,这只会记录一条消息

@JmsListener(destination ="LOCAL.TEST", 

 containerFactory ="myJmsListenerContainerFactory")

public void receiveMessage(final Message jsonMessage) throws JMSException {

  String messageData = null;

  // jsonMessage.acknowledge(); // dont consume message (for testing)

  LOGGER.info("=== Received message {}", jsonMessage);

}@Bean

public ActiveMQConnectionFactory getActiveMQConnectionFactory() {

  ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();

  activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);

  return activeMQConnectionFactory;

}@Bean

public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {

 DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

 factory.setConnectionFactory(getActiveMQConnectionFactory());

 factory.setConcurrency("1-1");

 return factory;

}@Autowired

private JmsTemplate jmsTemplate;



public void send(String destination, String message) {

  LOGGER.info("sending message='{}' to destination='{}'", message, destination);

  jmsTemplate.convertAndSend(destination, message);

}@Bean

public JmsTemplate jmsTemplate() {

 JmsTemplate template = new JmsTemplate();

 template.setConnectionFactory(getActiveMQConnectionFactory());

 return template;

}  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");

  Connection connection = connectionFactory.createConnection("admin","admin");

  connection.start();

  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

  Destination destination = session.createQueue("listenerQueue");

  MessageConsumer consumer = session.createConsumer(destination);



  QueueBrowser browser = session.createBrowser((Queue) destination);

  Enumeration elems = browser.getEnumeration();

  while (elems.hasMoreElements()) {

    Message message = (Message) consumer.receive();



    if (message instanceof TextMessage) {

      TextMessage textMessage = (TextMessage) message;

      System.out.println("Incoming Message: '" + textMessage.getText() +"'");

      message.acknowledge();

    }

  }

  connection.close();<!-- Dependencies to setup JMS and active mq environment -->

    <dependency>

      <groupId>org.springframework.boot</groupId>

      spring-boot-starter-activemq</artifactId>

    </dependency>

    <dependency>

      <groupId>org.apache.activemq</groupId>

      activemq-broker</artifactId>

    </dependency>@Bean

  public JmsListenerContainerFactory< ? > myFactory(

    ConnectionFactory connectionFactory,

    DefaultJmsListenerContainerFactoryConfigurer configurer) {

   DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

   logger.info("configuring jms connection factory....");

   // anonymous class

   factory.setErrorHandler(

       new ErrorHandler() {

         @Override

         public void handleError(Throwable t) {

           logger.error("An error has occurred in the transaction", t);

         }

       });

   // lambda function

   factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));

   configurer.configure(factory, connectionFactory);



   return factory;

  }



  // Serialize message content to json using TextMessage

  @Bean

  public MessageConverter jacksonJmsMessageConverter() {

   MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();

   converter.setTargetType(MessageType.TEXT);

   converter.setTypeIdPropertyName("_type");

   return converter;

  }spring.activemq.user=admin

spring.activemq.password=admin

spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired

private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");

jmsTemplate.convertAndSend("anyQueueName","value2");

...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")

  public void receiveMessage(String user) {

    System.out.println("Received <" + user +">");

  }import javax.jms.TextMessage;

import javax.jms.QueueBrowser;

import javax.jms.Session;

import javax.jms.TextMessage;



public void readMessageFromQueue(){

jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {

      @Override

      public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {

        Enumeration<TextMessage> messages = browser.getEnumeration();

        while (messages.hasMoreElements()) {

          System.out.println("message found : -"+ messages.nextElement().getText());

        }

      }

    });

}

使用标准模板

@JmsListener(destination ="LOCAL.TEST", 

 containerFactory ="myJmsListenerContainerFactory")

public void receiveMessage(final Message jsonMessage) throws JMSException {

  String messageData = null;

  // jsonMessage.acknowledge(); // dont consume message (for testing)

  LOGGER.info("=== Received message {}", jsonMessage);

}@Bean

public ActiveMQConnectionFactory getActiveMQConnectionFactory() {

  ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();

  activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);

  return activeMQConnectionFactory;

}@Bean

public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {

 DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

 factory.setConnectionFactory(getActiveMQConnectionFactory());

 factory.setConcurrency("1-1");

 return factory;

}@Autowired

private JmsTemplate jmsTemplate;



public void send(String destination, String message) {

  LOGGER.info("sending message='{}' to destination='{}'", message, destination);

  jmsTemplate.convertAndSend(destination, message);

}@Bean

public JmsTemplate jmsTemplate() {

 JmsTemplate template = new JmsTemplate();

 template.setConnectionFactory(getActiveMQConnectionFactory());

 return template;

}  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");

  Connection connection = connectionFactory.createConnection("admin","admin");

  connection.start();

  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

  Destination destination = session.createQueue("listenerQueue");

  MessageConsumer consumer = session.createConsumer(destination);



  QueueBrowser browser = session.createBrowser((Queue) destination);

  Enumeration elems = browser.getEnumeration();

  while (elems.hasMoreElements()) {

    Message message = (Message) consumer.receive();



    if (message instanceof TextMessage) {

      TextMessage textMessage = (TextMessage) message;

      System.out.println("Incoming Message: '" + textMessage.getText() +"'");

      message.acknowledge();

    }

  }

  connection.close();<!-- Dependencies to setup JMS and active mq environment -->

    <dependency>

      <groupId>org.springframework.boot</groupId>

      spring-boot-starter-activemq</artifactId>

    </dependency>

    <dependency>

      <groupId>org.apache.activemq</groupId>

      activemq-broker</artifactId>

    </dependency>@Bean

  public JmsListenerContainerFactory< ? > myFactory(

    ConnectionFactory connectionFactory,

    DefaultJmsListenerContainerFactoryConfigurer configurer) {

   DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

   logger.info("configuring jms connection factory....");

   // anonymous class

   factory.setErrorHandler(

       new ErrorHandler() {

         @Override

         public void handleError(Throwable t) {

           logger.error("An error has occurred in the transaction", t);

         }

       });

   // lambda function

   factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));

   configurer.configure(factory, connectionFactory);



   return factory;

  }



  // Serialize message content to json using TextMessage

  @Bean

  public MessageConverter jacksonJmsMessageConverter() {

   MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();

   converter.setTargetType(MessageType.TEXT);

   converter.setTypeIdPropertyName("_type");

   return converter;

  }spring.activemq.user=admin

spring.activemq.password=admin

spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired

private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");

jmsTemplate.convertAndSend("anyQueueName","value2");

...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")

  public void receiveMessage(String user) {

    System.out.println("Received <" + user +">");

  }import javax.jms.TextMessage;

import javax.jms.QueueBrowser;

import javax.jms.Session;

import javax.jms.TextMessage;



public void readMessageFromQueue(){

jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {

      @Override

      public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {

        Enumeration<TextMessage> messages = browser.getEnumeration();

        while (messages.hasMoreElements()) {

          System.out.println("message found : -"+ messages.nextElement().getText());

        }

      }

    });

}

我无法读取之前发送的仍在队列中的消息(因为我没有.acknowledge()它??们)...


JMS 支持"浏览"消息,这似乎是您想要的功能。因此,您应该更改 Spring 应用程序以使用 QueueBrowser 而不是实际使用消息。


要阅读所有待处理的消息,你可以这样做

@JmsListener(destination ="LOCAL.TEST", 

 containerFactory ="myJmsListenerContainerFactory")

public void receiveMessage(final Message jsonMessage) throws JMSException {

  String messageData = null;

  // jsonMessage.acknowledge(); // dont consume message (for testing)

  LOGGER.info("=== Received message {}", jsonMessage);

}@Bean

public ActiveMQConnectionFactory getActiveMQConnectionFactory() {

  ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();

  activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);

  return activeMQConnectionFactory;

}@Bean

public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {

 DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

 factory.setConnectionFactory(getActiveMQConnectionFactory());

 factory.setConcurrency("1-1");

 return factory;

}@Autowired

private JmsTemplate jmsTemplate;



public void send(String destination, String message) {

  LOGGER.info("sending message='{}' to destination='{}'", message, destination);

  jmsTemplate.convertAndSend(destination, message);

}@Bean

public JmsTemplate jmsTemplate() {

 JmsTemplate template = new JmsTemplate();

 template.setConnectionFactory(getActiveMQConnectionFactory());

 return template;

}  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");

  Connection connection = connectionFactory.createConnection("admin","admin");

  connection.start();

  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

  Destination destination = session.createQueue("listenerQueue");

  MessageConsumer consumer = session.createConsumer(destination);



  QueueBrowser browser = session.createBrowser((Queue) destination);

  Enumeration elems = browser.getEnumeration();

  while (elems.hasMoreElements()) {

    Message message = (Message) consumer.receive();



    if (message instanceof TextMessage) {

      TextMessage textMessage = (TextMessage) message;

      System.out.println("Incoming Message: '" + textMessage.getText() +"'");

      message.acknowledge();

    }

  }

  connection.close();<!-- Dependencies to setup JMS and active mq environment -->

    <dependency>

      <groupId>org.springframework.boot</groupId>

      spring-boot-starter-activemq</artifactId>

    </dependency>

    <dependency>

      <groupId>org.apache.activemq</groupId>

      activemq-broker</artifactId>

    </dependency>@Bean

  public JmsListenerContainerFactory< ? > myFactory(

    ConnectionFactory connectionFactory,

    DefaultJmsListenerContainerFactoryConfigurer configurer) {

   DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

   logger.info("configuring jms connection factory....");

   // anonymous class

   factory.setErrorHandler(

       new ErrorHandler() {

         @Override

         public void handleError(Throwable t) {

           logger.error("An error has occurred in the transaction", t);

         }

       });

   // lambda function

   factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));

   configurer.configure(factory, connectionFactory);



   return factory;

  }



  // Serialize message content to json using TextMessage

  @Bean

  public MessageConverter jacksonJmsMessageConverter() {

   MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();

   converter.setTargetType(MessageType.TEXT);

   converter.setTypeIdPropertyName("_type");

   return converter;

  }spring.activemq.user=admin

spring.activemq.password=admin

spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired

private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");

jmsTemplate.convertAndSend("anyQueueName","value2");

...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")

  public void receiveMessage(String user) {

    System.out.println("Received <" + user +">");

  }import javax.jms.TextMessage;

import javax.jms.QueueBrowser;

import javax.jms.Session;

import javax.jms.TextMessage;



public void readMessageFromQueue(){

jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {

      @Override

      public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {

        Enumeration<TextMessage> messages = browser.getEnumeration();

        while (messages.hasMoreElements()) {

          System.out.println("message found : -"+ messages.nextElement().getText());

        }

      }

    });

}

一步一步实现Spring boot ActiveMQ。让我们编写一些代码使其更清晰。这将有助于仅读取当前会话中的所有待处理消息。

  • 在 pom.xml 文件中添加这些依赖项。
  • @JmsListener(destination ="LOCAL.TEST", 
    
     containerFactory ="myJmsListenerContainerFactory")
    
    public void receiveMessage(final Message jsonMessage) throws JMSException {
    
      String messageData = null;
    
      // jsonMessage.acknowledge(); // dont consume message (for testing)
    
      LOGGER.info("=== Received message {}", jsonMessage);
    
    }@Bean
    
    public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
    
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    
      activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
    
      return activeMQConnectionFactory;
    
    }@Bean
    
    public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
    
     DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    
     factory.setConnectionFactory(getActiveMQConnectionFactory());
    
     factory.setConcurrency("1-1");
    
     return factory;
    
    }@Autowired
    
    private JmsTemplate jmsTemplate;
    
    
    
    public void send(String destination, String message) {
    
      LOGGER.info("sending message='{}' to destination='{}'", message, destination);
    
      jmsTemplate.convertAndSend(destination, message);
    
    }@Bean
    
    public JmsTemplate jmsTemplate() {
    
     JmsTemplate template = new JmsTemplate();
    
     template.setConnectionFactory(getActiveMQConnectionFactory());
    
     return template;
    
    }  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
    
      Connection connection = connectionFactory.createConnection("admin","admin");
    
      connection.start();
    
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
      Destination destination = session.createQueue("listenerQueue");
    
      MessageConsumer consumer = session.createConsumer(destination);
    
    
    
      QueueBrowser browser = session.createBrowser((Queue) destination);
    
      Enumeration elems = browser.getEnumeration();
    
      while (elems.hasMoreElements()) {
    
        Message message = (Message) consumer.receive();
    
    
    
        if (message instanceof TextMessage) {
    
          TextMessage textMessage = (TextMessage) message;
    
          System.out.println("Incoming Message: '" + textMessage.getText() +"'");
    
          message.acknowledge();
    
        }
    
      }
    
      connection.close();<!-- Dependencies to setup JMS and active mq environment -->
    
        <dependency>
    
          <groupId>org.springframework.boot</groupId>
    
          spring-boot-starter-activemq</artifactId>
    
        </dependency>
    
        <dependency>
    
          <groupId>org.apache.activemq</groupId>
    
          activemq-broker</artifactId>
    
        </dependency>@Bean
    
      public JmsListenerContainerFactory< ? > myFactory(
    
        ConnectionFactory connectionFactory,
    
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
    
       DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    
       logger.info("configuring jms connection factory....");
    
       // anonymous class
    
       factory.setErrorHandler(
    
           new ErrorHandler() {
    
             @Override
    
             public void handleError(Throwable t) {
    
               logger.error("An error has occurred in the transaction", t);
    
             }
    
           });
    
       // lambda function
    
       factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
    
       configurer.configure(factory, connectionFactory);
    
    
    
       return factory;
    
      }
    
    
    
      // Serialize message content to json using TextMessage
    
      @Bean
    
      public MessageConverter jacksonJmsMessageConverter() {
    
       MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    
       converter.setTargetType(MessageType.TEXT);
    
       converter.setTypeIdPropertyName("_type");
    
       return converter;
    
      }spring.activemq.user=admin
    
    spring.activemq.password=admin
    
    spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
    
    private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
    
    jmsTemplate.convertAndSend("anyQueueName","value2");
    
    ...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
    
      public void receiveMessage(String user) {
    
        System.out.println("Received <" + user +">");
    
      }import javax.jms.TextMessage;
    
    import javax.jms.QueueBrowser;
    
    import javax.jms.Session;
    
    import javax.jms.TextMessage;
    
    
    
    public void readMessageFromQueue(){
    
    jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
    
          @Override
    
          public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
    
            Enumeration<TextMessage> messages = browser.getEnumeration();
    
            while (messages.hasMoreElements()) {
    
              System.out.println("message found : -"+ messages.nextElement().getText());
    
            }
    
          }
    
        });
    
    }
  • 将 @EnableJms 添加到 main() 方法所在的主控制器中。
  • 仅通过在应用程序控制器中添加这两种方法来创建连接工厂。
  • @JmsListener(destination ="LOCAL.TEST", 
    
     containerFactory ="myJmsListenerContainerFactory")
    
    public void receiveMessage(final Message jsonMessage) throws JMSException {
    
      String messageData = null;
    
      // jsonMessage.acknowledge(); // dont consume message (for testing)
    
      LOGGER.info("=== Received message {}", jsonMessage);
    
    }@Bean
    
    public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
    
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    
      activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
    
      return activeMQConnectionFactory;
    
    }@Bean
    
    public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
    
     DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    
     factory.setConnectionFactory(getActiveMQConnectionFactory());
    
     factory.setConcurrency("1-1");
    
     return factory;
    
    }@Autowired
    
    private JmsTemplate jmsTemplate;
    
    
    
    public void send(String destination, String message) {
    
      LOGGER.info("sending message='{}' to destination='{}'", message, destination);
    
      jmsTemplate.convertAndSend(destination, message);
    
    }@Bean
    
    public JmsTemplate jmsTemplate() {
    
     JmsTemplate template = new JmsTemplate();
    
     template.setConnectionFactory(getActiveMQConnectionFactory());
    
     return template;
    
    }  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
    
      Connection connection = connectionFactory.createConnection("admin","admin");
    
      connection.start();
    
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
      Destination destination = session.createQueue("listenerQueue");
    
      MessageConsumer consumer = session.createConsumer(destination);
    
    
    
      QueueBrowser browser = session.createBrowser((Queue) destination);
    
      Enumeration elems = browser.getEnumeration();
    
      while (elems.hasMoreElements()) {
    
        Message message = (Message) consumer.receive();
    
    
    
        if (message instanceof TextMessage) {
    
          TextMessage textMessage = (TextMessage) message;
    
          System.out.println("Incoming Message: '" + textMessage.getText() +"'");
    
          message.acknowledge();
    
        }
    
      }
    
      connection.close();<!-- Dependencies to setup JMS and active mq environment -->
    
        <dependency>
    
          <groupId>org.springframework.boot</groupId>
    
          spring-boot-starter-activemq</artifactId>
    
        </dependency>
    
        <dependency>
    
          <groupId>org.apache.activemq</groupId>
    
          activemq-broker</artifactId>
    
        </dependency>@Bean
    
      public JmsListenerContainerFactory< ? > myFactory(
    
        ConnectionFactory connectionFactory,
    
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
    
       DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    
       logger.info("configuring jms connection factory....");
    
       // anonymous class
    
       factory.setErrorHandler(
    
           new ErrorHandler() {
    
             @Override
    
             public void handleError(Throwable t) {
    
               logger.error("An error has occurred in the transaction", t);
    
             }
    
           });
    
       // lambda function
    
       factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
    
       configurer.configure(factory, connectionFactory);
    
    
    
       return factory;
    
      }
    
    
    
      // Serialize message content to json using TextMessage
    
      @Bean
    
      public MessageConverter jacksonJmsMessageConverter() {
    
       MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    
       converter.setTargetType(MessageType.TEXT);
    
       converter.setTypeIdPropertyName("_type");
    
       return converter;
    
      }spring.activemq.user=admin
    
    spring.activemq.password=admin
    
    spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
    
    private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
    
    jmsTemplate.convertAndSend("anyQueueName","value2");
    
    ...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
    
      public void receiveMessage(String user) {
    
        System.out.println("Received <" + user +">");
    
      }import javax.jms.TextMessage;
    
    import javax.jms.QueueBrowser;
    
    import javax.jms.Session;
    
    import javax.jms.TextMessage;
    
    
    
    public void readMessageFromQueue(){
    
    jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
    
          @Override
    
          public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
    
            Enumeration<TextMessage> messages = browser.getEnumeration();
    
            while (messages.hasMoreElements()) {
    
              System.out.println("message found : -"+ messages.nextElement().getText());
    
            }
    
          }
    
        });
    
    }
  • 在 application.yml 文件中提及凭据为
  • @JmsListener(destination ="LOCAL.TEST", 
    
     containerFactory ="myJmsListenerContainerFactory")
    
    public void receiveMessage(final Message jsonMessage) throws JMSException {
    
      String messageData = null;
    
      // jsonMessage.acknowledge(); // dont consume message (for testing)
    
      LOGGER.info("=== Received message {}", jsonMessage);
    
    }@Bean
    
    public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
    
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    
      activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
    
      return activeMQConnectionFactory;
    
    }@Bean
    
    public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
    
     DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    
     factory.setConnectionFactory(getActiveMQConnectionFactory());
    
     factory.setConcurrency("1-1");
    
     return factory;
    
    }@Autowired
    
    private JmsTemplate jmsTemplate;
    
    
    
    public void send(String destination, String message) {
    
      LOGGER.info("sending message='{}' to destination='{}'", message, destination);
    
      jmsTemplate.convertAndSend(destination, message);
    
    }@Bean
    
    public JmsTemplate jmsTemplate() {
    
     JmsTemplate template = new JmsTemplate();
    
     template.setConnectionFactory(getActiveMQConnectionFactory());
    
     return template;
    
    }  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
    
      Connection connection = connectionFactory.createConnection("admin","admin");
    
      connection.start();
    
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
      Destination destination = session.createQueue("listenerQueue");
    
      MessageConsumer consumer = session.createConsumer(destination);
    
    
    
      QueueBrowser browser = session.createBrowser((Queue) destination);
    
      Enumeration elems = browser.getEnumeration();
    
      while (elems.hasMoreElements()) {
    
        Message message = (Message) consumer.receive();
    
    
    
        if (message instanceof TextMessage) {
    
          TextMessage textMessage = (TextMessage) message;
    
          System.out.println("Incoming Message: '" + textMessage.getText() +"'");
    
          message.acknowledge();
    
        }
    
      }
    
      connection.close();<!-- Dependencies to setup JMS and active mq environment -->
    
        <dependency>
    
          <groupId>org.springframework.boot</groupId>
    
          spring-boot-starter-activemq</artifactId>
    
        </dependency>
    
        <dependency>
    
          <groupId>org.apache.activemq</groupId>
    
          activemq-broker</artifactId>
    
        </dependency>@Bean
    
      public JmsListenerContainerFactory< ? > myFactory(
    
        ConnectionFactory connectionFactory,
    
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
    
       DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    
       logger.info("configuring jms connection factory....");
    
       // anonymous class
    
       factory.setErrorHandler(
    
           new ErrorHandler() {
    
             @Override
    
             public void handleError(Throwable t) {
    
               logger.error("An error has occurred in the transaction", t);
    
             }
    
           });
    
       // lambda function
    
       factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
    
       configurer.configure(factory, connectionFactory);
    
    
    
       return factory;
    
      }
    
    
    
      // Serialize message content to json using TextMessage
    
      @Bean
    
      public MessageConverter jacksonJmsMessageConverter() {
    
       MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    
       converter.setTargetType(MessageType.TEXT);
    
       converter.setTypeIdPropertyName("_type");
    
       return converter;
    
      }spring.activemq.user=admin
    
    spring.activemq.password=admin
    
    spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
    
    private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
    
    jmsTemplate.convertAndSend("anyQueueName","value2");
    
    ...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
    
      public void receiveMessage(String user) {
    
        System.out.println("Received <" + user +">");
    
      }import javax.jms.TextMessage;
    
    import javax.jms.QueueBrowser;
    
    import javax.jms.Session;
    
    import javax.jms.TextMessage;
    
    
    
    public void readMessageFromQueue(){
    
    jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
    
          @Override
    
          public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
    
            Enumeration<TextMessage> messages = browser.getEnumeration();
    
            while (messages.hasMoreElements()) {
    
              System.out.println("message found : -"+ messages.nextElement().getText());
    
            }
    
          }
    
        });
    
    }
  • 在任何 spring bean 类中自动装配 jmsTemplate。
  • @JmsListener(destination ="LOCAL.TEST", 
    
     containerFactory ="myJmsListenerContainerFactory")
    
    public void receiveMessage(final Message jsonMessage) throws JMSException {
    
      String messageData = null;
    
      // jsonMessage.acknowledge(); // dont consume message (for testing)
    
      LOGGER.info("=== Received message {}", jsonMessage);
    
    }@Bean
    
    public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
    
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    
      activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
    
      return activeMQConnectionFactory;
    
    }@Bean
    
    public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
    
     DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    
     factory.setConnectionFactory(getActiveMQConnectionFactory());
    
     factory.setConcurrency("1-1");
    
     return factory;
    
    }@Autowired
    
    private JmsTemplate jmsTemplate;
    
    
    
    public void send(String destination, String message) {
    
      LOGGER.info("sending message='{}' to destination='{}'", message, destination);
    
      jmsTemplate.convertAndSend(destination, message);
    
    }@Bean
    
    public JmsTemplate jmsTemplate() {
    
     JmsTemplate template = new JmsTemplate();
    
     template.setConnectionFactory(getActiveMQConnectionFactory());
    
     return template;
    
    }  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
    
      Connection connection = connectionFactory.createConnection("admin","admin");
    
      connection.start();
    
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
      Destination destination = session.createQueue("listenerQueue");
    
      MessageConsumer consumer = session.createConsumer(destination);
    
    
    
      QueueBrowser browser = session.createBrowser((Queue) destination);
    
      Enumeration elems = browser.getEnumeration();
    
      while (elems.hasMoreElements()) {
    
        Message message = (Message) consumer.receive();
    
    
    
        if (message instanceof TextMessage) {
    
          TextMessage textMessage = (TextMessage) message;
    
          System.out.println("Incoming Message: '" + textMessage.getText() +"'");
    
          message.acknowledge();
    
        }
    
      }
    
      connection.close();<!-- Dependencies to setup JMS and active mq environment -->
    
        <dependency>
    
          <groupId>org.springframework.boot</groupId>
    
          spring-boot-starter-activemq</artifactId>
    
        </dependency>
    
        <dependency>
    
          <groupId>org.apache.activemq</groupId>
    
          activemq-broker</artifactId>
    
        </dependency>@Bean
    
      public JmsListenerContainerFactory< ? > myFactory(
    
        ConnectionFactory connectionFactory,
    
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
    
       DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    
       logger.info("configuring jms connection factory....");
    
       // anonymous class
    
       factory.setErrorHandler(
    
           new ErrorHandler() {
    
             @Override
    
             public void handleError(Throwable t) {
    
               logger.error("An error has occurred in the transaction", t);
    
             }
    
           });
    
       // lambda function
    
       factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
    
       configurer.configure(factory, connectionFactory);
    
    
    
       return factory;
    
      }
    
    
    
      // Serialize message content to json using TextMessage
    
      @Bean
    
      public MessageConverter jacksonJmsMessageConverter() {
    
       MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    
       converter.setTargetType(MessageType.TEXT);
    
       converter.setTypeIdPropertyName("_type");
    
       return converter;
    
      }spring.activemq.user=admin
    
    spring.activemq.password=admin
    
    spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
    
    private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
    
    jmsTemplate.convertAndSend("anyQueueName","value2");
    
    ...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
    
      public void receiveMessage(String user) {
    
        System.out.println("Received <" + user +">");
    
      }import javax.jms.TextMessage;
    
    import javax.jms.QueueBrowser;
    
    import javax.jms.Session;
    
    import javax.jms.TextMessage;
    
    
    
    public void readMessageFromQueue(){
    
    jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
    
          @Override
    
          public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
    
            Enumeration<TextMessage> messages = browser.getEnumeration();
    
            while (messages.hasMoreElements()) {
    
              System.out.println("message found : -"+ messages.nextElement().getText());
    
            }
    
          }
    
        });
    
    }
  • 现在是时候将消息发送到队列了。
  • @JmsListener(destination ="LOCAL.TEST", 
    
     containerFactory ="myJmsListenerContainerFactory")
    
    public void receiveMessage(final Message jsonMessage) throws JMSException {
    
      String messageData = null;
    
      // jsonMessage.acknowledge(); // dont consume message (for testing)
    
      LOGGER.info("=== Received message {}", jsonMessage);
    
    }@Bean
    
    public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
    
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    
      activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
    
      return activeMQConnectionFactory;
    
    }@Bean
    
    public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
    
     DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    
     factory.setConnectionFactory(getActiveMQConnectionFactory());
    
     factory.setConcurrency("1-1");
    
     return factory;
    
    }@Autowired
    
    private JmsTemplate jmsTemplate;
    
    
    
    public void send(String destination, String message) {
    
      LOGGER.info("sending message='{}' to destination='{}'", message, destination);
    
      jmsTemplate.convertAndSend(destination, message);
    
    }@Bean
    
    public JmsTemplate jmsTemplate() {
    
     JmsTemplate template = new JmsTemplate();
    
     template.setConnectionFactory(getActiveMQConnectionFactory());
    
     return template;
    
    }  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
    
      Connection connection = connectionFactory.createConnection("admin","admin");
    
      connection.start();
    
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
      Destination destination = session.createQueue("listenerQueue");
    
      MessageConsumer consumer = session.createConsumer(destination);
    
    
    
      QueueBrowser browser = session.createBrowser((Queue) destination);
    
      Enumeration elems = browser.getEnumeration();
    
      while (elems.hasMoreElements()) {
    
        Message message = (Message) consumer.receive();
    
    
    
        if (message instanceof TextMessage) {
    
          TextMessage textMessage = (TextMessage) message;
    
          System.out.println("Incoming Message: '" + textMessage.getText() +"'");
    
          message.acknowledge();
    
        }
    
      }
    
      connection.close();<!-- Dependencies to setup JMS and active mq environment -->
    
        <dependency>
    
          <groupId>org.springframework.boot</groupId>
    
          spring-boot-starter-activemq</artifactId>
    
        </dependency>
    
        <dependency>
    
          <groupId>org.apache.activemq</groupId>
    
          activemq-broker</artifactId>
    
        </dependency>@Bean
    
      public JmsListenerContainerFactory< ? > myFactory(
    
        ConnectionFactory connectionFactory,
    
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
    
       DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    
       logger.info("configuring jms connection factory....");
    
       // anonymous class
    
       factory.setErrorHandler(
    
           new ErrorHandler() {
    
             @Override
    
             public void handleError(Throwable t) {
    
               logger.error("An error has occurred in the transaction", t);
    
             }
    
           });
    
       // lambda function
    
       factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
    
       configurer.configure(factory, connectionFactory);
    
    
    
       return factory;
    
      }
    
    
    
      // Serialize message content to json using TextMessage
    
      @Bean
    
      public MessageConverter jacksonJmsMessageConverter() {
    
       MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    
       converter.setTargetType(MessageType.TEXT);
    
       converter.setTypeIdPropertyName("_type");
    
       return converter;
    
      }spring.activemq.user=admin
    
    spring.activemq.password=admin
    
    spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
    
    private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
    
    jmsTemplate.convertAndSend("anyQueueName","value2");
    
    ...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
    
      public void receiveMessage(String user) {
    
        System.out.println("Received <" + user +">");
    
      }import javax.jms.TextMessage;
    
    import javax.jms.QueueBrowser;
    
    import javax.jms.Session;
    
    import javax.jms.TextMessage;
    
    
    
    public void readMessageFromQueue(){
    
    jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
    
          @Override
    
          public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
    
            Enumeration<TextMessage> messages = browser.getEnumeration();
    
            while (messages.hasMoreElements()) {
    
              System.out.println("message found : -"+ messages.nextElement().getText());
    
            }
    
          }
    
        });
    
    }
  • 添加一个 jmslistener。当任何消息被推送到队列时,JMS 将自动调用此方法。
  • @JmsListener(destination ="LOCAL.TEST", 
    
     containerFactory ="myJmsListenerContainerFactory")
    
    public void receiveMessage(final Message jsonMessage) throws JMSException {
    
      String messageData = null;
    
      // jsonMessage.acknowledge(); // dont consume message (for testing)
    
      LOGGER.info("=== Received message {}", jsonMessage);
    
    }@Bean
    
    public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
    
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    
      activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
    
      return activeMQConnectionFactory;
    
    }@Bean
    
    public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
    
     DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    
     factory.setConnectionFactory(getActiveMQConnectionFactory());
    
     factory.setConcurrency("1-1");
    
     return factory;
    
    }@Autowired
    
    private JmsTemplate jmsTemplate;
    
    
    
    public void send(String destination, String message) {
    
      LOGGER.info("sending message='{}' to destination='{}'", message, destination);
    
      jmsTemplate.convertAndSend(destination, message);
    
    }@Bean
    
    public JmsTemplate jmsTemplate() {
    
     JmsTemplate template = new JmsTemplate();
    
     template.setConnectionFactory(getActiveMQConnectionFactory());
    
     return template;
    
    }  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
    
      Connection connection = connectionFactory.createConnection("admin","admin");
    
      connection.start();
    
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
      Destination destination = session.createQueue("listenerQueue");
    
      MessageConsumer consumer = session.createConsumer(destination);
    
    
    
      QueueBrowser browser = session.createBrowser((Queue) destination);
    
      Enumeration elems = browser.getEnumeration();
    
      while (elems.hasMoreElements()) {
    
        Message message = (Message) consumer.receive();
    
    
    
        if (message instanceof TextMessage) {
    
          TextMessage textMessage = (TextMessage) message;
    
          System.out.println("Incoming Message: '" + textMessage.getText() +"'");
    
          message.acknowledge();
    
        }
    
      }
    
      connection.close();<!-- Dependencies to setup JMS and active mq environment -->
    
        <dependency>
    
          <groupId>org.springframework.boot</groupId>
    
          spring-boot-starter-activemq</artifactId>
    
        </dependency>
    
        <dependency>
    
          <groupId>org.apache.activemq</groupId>
    
          activemq-broker</artifactId>
    
        </dependency>@Bean
    
      public JmsListenerContainerFactory< ? > myFactory(
    
        ConnectionFactory connectionFactory,
    
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
    
       DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    
       logger.info("configuring jms connection factory....");
    
       // anonymous class
    
       factory.setErrorHandler(
    
           new ErrorHandler() {
    
             @Override
    
             public void handleError(Throwable t) {
    
               logger.error("An error has occurred in the transaction", t);
    
             }
    
           });
    
       // lambda function
    
       factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
    
       configurer.configure(factory, connectionFactory);
    
    
    
       return factory;
    
      }
    
    
    
      // Serialize message content to json using TextMessage
    
      @Bean
    
      public MessageConverter jacksonJmsMessageConverter() {
    
       MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    
       converter.setTargetType(MessageType.TEXT);
    
       converter.setTypeIdPropertyName("_type");
    
       return converter;
    
      }spring.activemq.user=admin
    
    spring.activemq.password=admin
    
    spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
    
    private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
    
    jmsTemplate.convertAndSend("anyQueueName","value2");
    
    ...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
    
      public void receiveMessage(String user) {
    
        System.out.println("Received <" + user +">");
    
      }import javax.jms.TextMessage;
    
    import javax.jms.QueueBrowser;
    
    import javax.jms.Session;
    
    import javax.jms.TextMessage;
    
    
    
    public void readMessageFromQueue(){
    
    jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
    
          @Override
    
          public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
    
            Enumeration<TextMessage> messages = browser.getEnumeration();
    
            while (messages.hasMoreElements()) {
    
              System.out.println("message found : -"+ messages.nextElement().getText());
    
            }
    
          }
    
        });
    
    }
  • 您可以手动阅读队列中可用的消息:-
  • @JmsListener(destination ="LOCAL.TEST", 
    
     containerFactory ="myJmsListenerContainerFactory")
    
    public void receiveMessage(final Message jsonMessage) throws JMSException {
    
      String messageData = null;
    
      // jsonMessage.acknowledge(); // dont consume message (for testing)
    
      LOGGER.info("=== Received message {}", jsonMessage);
    
    }@Bean
    
    public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
    
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    
      activeMQConnectionFactory.setBrokerURL(BROKER_URL +":" + BROKER_PORT);
    
      return activeMQConnectionFactory;
    
    }@Bean
    
    public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
    
     DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    
     factory.setConnectionFactory(getActiveMQConnectionFactory());
    
     factory.setConcurrency("1-1");
    
     return factory;
    
    }@Autowired
    
    private JmsTemplate jmsTemplate;
    
    
    
    public void send(String destination, String message) {
    
      LOGGER.info("sending message='{}' to destination='{}'", message, destination);
    
      jmsTemplate.convertAndSend(destination, message);
    
    }@Bean
    
    public JmsTemplate jmsTemplate() {
    
     JmsTemplate template = new JmsTemplate();
    
     template.setConnectionFactory(getActiveMQConnectionFactory());
    
     return template;
    
    }  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
    
      Connection connection = connectionFactory.createConnection("admin","admin");
    
      connection.start();
    
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
      Destination destination = session.createQueue("listenerQueue");
    
      MessageConsumer consumer = session.createConsumer(destination);
    
    
    
      QueueBrowser browser = session.createBrowser((Queue) destination);
    
      Enumeration elems = browser.getEnumeration();
    
      while (elems.hasMoreElements()) {
    
        Message message = (Message) consumer.receive();
    
    
    
        if (message instanceof TextMessage) {
    
          TextMessage textMessage = (TextMessage) message;
    
          System.out.println("Incoming Message: '" + textMessage.getText() +"'");
    
          message.acknowledge();
    
        }
    
      }
    
      connection.close();<!-- Dependencies to setup JMS and active mq environment -->
    
        <dependency>
    
          <groupId>org.springframework.boot</groupId>
    
          spring-boot-starter-activemq</artifactId>
    
        </dependency>
    
        <dependency>
    
          <groupId>org.apache.activemq</groupId>
    
          activemq-broker</artifactId>
    
        </dependency>@Bean
    
      public JmsListenerContainerFactory< ? > myFactory(
    
        ConnectionFactory connectionFactory,
    
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
    
       DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    
       logger.info("configuring jms connection factory....");
    
       // anonymous class
    
       factory.setErrorHandler(
    
           new ErrorHandler() {
    
             @Override
    
             public void handleError(Throwable t) {
    
               logger.error("An error has occurred in the transaction", t);
    
             }
    
           });
    
       // lambda function
    
       factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
    
       configurer.configure(factory, connectionFactory);
    
    
    
       return factory;
    
      }
    
    
    
      // Serialize message content to json using TextMessage
    
      @Bean
    
      public MessageConverter jacksonJmsMessageConverter() {
    
       MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    
       converter.setTargetType(MessageType.TEXT);
    
       converter.setTypeIdPropertyName("_type");
    
       return converter;
    
      }spring.activemq.user=admin
    
    spring.activemq.password=admin
    
    spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1@Autowired
    
    private JmsTemplate jmsTemplate;jmsTemplate.convertAndSend("anyQueueName","value1");
    
    jmsTemplate.convertAndSend("anyQueueName","value2");
    
    ...@JmsListener(destination ="anyQueueName", containerFactory ="myFactory")
    
      public void receiveMessage(String user) {
    
        System.out.println("Received <" + user +">");
    
      }import javax.jms.TextMessage;
    
    import javax.jms.QueueBrowser;
    
    import javax.jms.Session;
    
    import javax.jms.TextMessage;
    
    
    
    public void readMessageFromQueue(){
    
    jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
    
          @Override
    
          public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
    
            Enumeration<TextMessage> messages = browser.getEnumeration();
    
            while (messages.hasMoreElements()) {
    
              System.out.println("message found : -"+ messages.nextElement().getText());
    
            }
    
          }
    
        });
    
    }

    输出:-

    找到消息:- value1

    找到消息:- value2

    -快乐编码


    如果未确认消息将不会重新发送。在

相关推荐

  • Spring部署设置openshift

    Springdeploymentsettingsopenshift我有一个问题让我抓狂了三天。我根据OpenShift帐户上的教程部署了spring-eap6-quickstart代码。我已配置调试选项,并且已将Eclipse工作区与OpehShift服务器同步-服务器上的一切工作正常,但在Eclipse中出现无法消除的错误。我有这个错误:cvc-complex-type.2.4.a:Invali…
    2025-04-161
  • 检查Java中正则表达式中模式的第n次出现

    CheckfornthoccurrenceofpatterninregularexpressioninJava本问题已经有最佳答案,请猛点这里访问。我想使用Java正则表达式检查输入字符串中特定模式的第n次出现。你能建议怎么做吗?这应该可以工作:MatchResultfindNthOccurance(intn,Patternp,CharSequencesrc){Matcherm=p.matcher…
    2025-04-161
  • 如何让 JTable 停留在已编辑的单元格上

    HowtohaveJTablestayingontheeditedcell如果有人编辑JTable的单元格内容并按Enter,则内容会被修改并且表格选择会移动到下一行。是否可以禁止JTable在单元格编辑后转到下一行?原因是我的程序使用ListSelectionListener在单元格选择上同步了其他一些小部件,并且我不想在编辑当前单元格后选择下一行。Enter的默认绑定是名为selectNext…
    2025-04-161
  • Weblogic 12c 部署

    Weblogic12cdeploy我正在尝试将我的应用程序从Tomcat迁移到Weblogic12.2.1.3.0。我能够毫无错误地部署应用程序,但我遇到了与持久性提供程序相关的运行时错误。这是堆栈跟踪:javax.validation.ValidationException:CalltoTraversableResolver.isReachable()threwanexceptionatorg.…
    2025-04-161
  • Resteasy Content-Type 默认值

    ResteasyContent-Typedefaults我正在使用Resteasy编写一个可以返回JSON和XML的应用程序,但可以选择默认为XML。这是我的方法:@GET@Path("/content")@Produces({MediaType.APPLICATION_XML,MediaType.APPLICATION_JSON})publicStringcontentListRequestXm…
    2025-04-161
  • 代码不会停止运行,在 Java 中

    thecodedoesn'tstoprunning,inJava我正在用Java解决项目Euler中的问题10,即"Thesumoftheprimesbelow10is2+3+5+7=17.Findthesumofalltheprimesbelowtwomillion."我的代码是packageprojecteuler_1;importjava.math.BigInteger;importjava…
    2025-04-161
  • Out of memory java heap space

    Outofmemoryjavaheapspace我正在尝试将大量文件从服务器发送到多个客户端。当我尝试发送大小为700mb的文件时,它显示了"OutOfMemoryjavaheapspace"错误。我正在使用Netbeans7.1.2版本。我还在属性中尝试了VMoption。但仍然发生同样的错误。我认为阅读整个文件存在一些问题。下面的代码最多可用于300mb。请给我一些建议。提前致谢publicc…
    2025-04-161
  • Log4j 记录到共享日志文件

    Log4jLoggingtoaSharedLogFile有没有办法将log4j日志记录事件写入也被其他应用程序写入的日志文件。其他应用程序可以是非Java应用程序。有什么缺点?锁定问题?格式化?Log4j有一个SocketAppender,它将向服务发送事件,您可以自己实现或使用与Log4j捆绑的简单实现。它还支持syslogd和Windows事件日志,这对于尝试将日志输出与来自非Java应用程序…
    2025-04-161