JMS 2.0 中的新内容,第 2 部分 — 新增的消息传递特性

作者:Nigel Deakin

了解如何利用 JMS 2.0 中新增的消息传递特性。

2013 年 6 月发布

本文是由两部分组成的系列的第二篇文章,介绍 Java 消息服务 (JMS) 2.0 中引入的一些消息传递新特性。本文假设您基本熟悉 JMS 1.1。

第 1 部分中,我们介绍了 JMS 2.0 中新引入的易用性特性。下面介绍重要的消息传递新特性。

JMS 2.0 于 2013 年 4 月发布,是自 2002 年发布 1.1 版之后对 JMS 规范的第一次更新。有人可能会认为 API 这么长时间没有变化,一定已变得陈旧、渐渐废弃。但如果按不同实现的数量来判断一个 API 标准是否成功的话,JMS 是现有最成功的 API 之一。

在 JMS 2.0 中,我们将重点放在近些年与其他企业 Java 技术中具有同样的易用性改进上,并利用此机会介绍了一些消息传递新特性。

JMS 2.0 是 Java EE 7 平台的一部分,可用于 Java EE Web 或 EJB 应用程序。它也可以独立用于 Java SE 环境。下面我将解释,有些特性只能用于独立环境,其他的则只在 Java EE Web 或 EJB 应用程序中可用。

下面讨论 JMS 2.0 中五个重要的消息传递新特性。

同一主题订阅允许多个使用者

在 JMS 1.1 中,主题订阅不允许一次有多个使用者。这意味着一个主题订阅上的消息处理工作不能在多个线程、连接或 Java 虚拟机 (JVM) 之间共享,从而限制了应用程序的可扩展性。在 JMS 2.0 中,通过引入一个被称作共享订阅 的新型主题订阅消除了这种限制。

我们来看看主题订阅在 JMS 1.1 中的工作方式。在清单 1 中,Session 上的 createConsumer 方法用于创建对指定主题的非持久订阅(稍后将讨论持久订阅):

private void createUnsharedConsumer(ConnectionFactory connectionFactory, Topic topic) 
      throws JMSException {
   Connection connection = connectionFactory.createConnection();
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageConsumer messageConsumer = session.createConsumer(topic);
   connection.start();
   Message message = messageConsumer.receive(10000);
   while (message != null) {
      System.out.println("Message received: " + ((TextMessage) message).getText());
      message = messageConsumer.receive(10000);
   }
   connection.close();
}

清单 1

在清单 1 中,使用者将收到发送到该主题的每个消息的一个副本。但是,如果应用程序处理每个消息需要的时间较长该怎么办?如何通过在两个 JVM(一个 JVM 处理某些消息,另一个 JVM 处理其余消息)之间共享这些消息的处理工作来提高应用程序的可扩展性?

在 JMS 1.1 中,不可能在正常 Java SE 应用程序中完成此操作。(在 Java EE 中,可以使用消息驱动的 bean 池 [MDB] 来完成)。如果使用 createConsumer 在单独的 JVM(或同一 JVM 上的单独线程)中创建另一个使用者,每个使用者将使用一个单独的订阅,因此将收到该主题收到的每个消息的一个副本。这不是我们所希望的。如果将“订阅”视作一个逻辑实体,接收发送到该主题的每条消息的一个副本,那么我们希望两个使用者使用同一订阅。

JMS 2.0 提供了一个解决方案。您可以使用一个新方法创建一个“共享的”非持久订阅:createSharedConsumer。此方法在 Session(针对使用经典 API 的应用程序)和 JMSContext(针对使用简化的 API 的应用程序)上均可用。由于这两个 JVM 需要能够识别它们需要共享的订阅,因此它们需要提供一个名称来识别共享订阅,如清单 2 所示。

private void createSharedConsumer(ConnectionFactory connectionFactory, Topic topic) throws JMSException {
   Connection connection = connectionFactory.createConnection();
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageConsumer messageConsumer = session.createSharedConsumer(topic,"mySubscription");
   connection.start();
   Message message = messageConsumer.receive(10000);
   while (message != null) {
      System.out.println("Message received: " + ((TextMessage) message).getText());
      message = messageConsumer.receive(10000);
   }
   connection.close();
}

清单 2

如果在两个单独的 JVM 中运行清单 2 中的代码,发送到主题的每条消息将提供给两个使用者中的一个或另一个。这样它们就能够共享处理订阅消息的工作。

该特性同样适用于使用持久订阅的应用程序。在 JMS 1.1 中,对 Session 使用 createDurableSubscriber 方法创建了一个持久订阅:

   MessageConsumer messageConsumer = session.createDurableSubscription(topic,"myDurableSub");

这将在针对指定主题创建一个名为 myDurableSub 的持久订阅。但是,和以前一样,没有办法在两个 JVM 之间或在同一 JVM 的两个线程之间共享这个持久订阅上的消息处理工作。根据您想尝试的确切操作的不同,您将获得一个 JMSException 或两个不同的订阅。

同样,JMS 2.0 为此问题提供了一个解决方案。现在可以使用新方法 createSharedDurableConsumer 创建一个“共享的”持久订阅。此方法在 Session(针对使用经典 API 的应用程序)和 JMSContext(针对使用简化的 API 的应用程序)上均可用。

   MessageConsumer messageConsumer = session.createSharedDurableConsumer(topic,"myDurableSub");

总而言之,JMS 1.1 定义了两种不同类型的主题订阅,而 JMS 2.0 定义了四种类型,这四种都可以使用经典或简化的 API 创建:

  • 非共享非持久订阅。这些在 JMS 1.1 和 JMS 2.0 中均可用,是用 createConsumer 创建的。它们只能有一个使用者。设置客户端标识符是可选的。
  • 非共享持久订阅。这些在 JMS 1.1 和 JMS 2.0 中均可用,是用 createDurableSubscriber 或(仅在 JMS 2.0 中)createDurableConsumer 创建的。它们只能有一个使用者。设置客户端标识符是强制性的,订阅由订阅名和客户端标识符的组合来标识。
  • 共享非持久订阅.这些只在 JMS 2.0 中可用,是用 createSharedConsumer 创建的。它们可以有任何数量的使用者。设置客户端标识符是可选的。如果设置的话,订阅由订阅名和客户端标识符的组合来标识。
  • 共享持久订阅。这些只在 JMS 2.0 中可用,是用 createSharedDurableConsumer 创建的。它们可以有任何数量的使用者。设置客户端标识符是可选的。如果设置的话,订阅由订阅名和客户端标识符的组合来标识。

传递延迟

现在可以指定消息的传递延迟。在指定传递延迟到期后,JMS 提供程序才会传递消息。

如果使用经典 API,需要在发送消息之前通过对 MessageProducer 调用 setDeliveryDelay 来设置传递延迟(毫秒),如清单 3 所示。

private void sendWithDeliveryDelayClassic(ConnectionFactory connectionFactory,Queue queue) 
   throws JMSException {

   // send a message with a delivery delay of 20 seconds
   try (Connection connection = connectionFactory.createConnection();){
      Session session = con.createSession();
      MessageProducer messageProducer = session.createProducer(queue);
      messageProducer.setDeliveryDelay(20000);
      TextMessage textMessage = session.createTextMessage("Hello world");
      messageProducer.send(textMessage);
   }
}

清单 3

如果使用简化的 API,需要在发送消息之前对 JMSProducer 调用 setDeliveryDelay。该方法返回 JMSProducer 对象,允许您在一行代码中完成创建 JMSProducer、设置传递延迟和发送消息的全部操作,如清单 4 所示。

private void sendWithDeliveryDelaySimplified(ConnectionFactory connectionFactory,Queue queue)
   throws JMSException {

   // send a message with a delivery delay of 20 seconds
   try (JMSContext context = connectionFactory.createContext();){
      context.createProducer().setDeliveryDelay(20000).send(queue,"Hello world");
   }
}

清单 4

异步发送消息

JMS 2.0 的另一个新特性是能够异步发送消息。

此特性适用于 Java SE 或 Java EE 应用程序客户端容器中运行的应用程序,不适用于 Java EE Web 或 EJB 容器中运行的应用程序。

通常情况下,发送持久消息时,直到 JMS 客户端将消息发送到服务器并收到回复,通知客户端已经安全收到和保存消息,send 方法才会返回。我们称之为同步发送

JMS 2.0 能够执行异步发送。异步发送消息时,send 方法向服务器发送消息,然后将控制权返回给应用程序,而不等待服务器的回复。当 JMS 客户端等待回复时,应用程序不必堵塞在那里空等,而是可以做点有用的事,比如发送其他消息或执行某些处理。

当收到从服务器返回的回复,指示服务器已收到并保存消息时,JMS 提供程序通过对应用程序指定的 CompletionListener 对象调用回调方法 onCompletion 来通知应用程序。

您可以通过两种主要方式在应用程序中使用异步发送

  • 让应用程序在间隔期间做点别的事(如更新显示或写入数据库),而不是空等服务器回复
  • 允许连续发送大量消息,而不必在每条消息后等待服务器回复

清单 5 是如何使用经典 API 实现上面第一条的示例:

private void asyncSendClassic(ConnectionFactory connectionFactory,Queue queue)
   throws Exception {

   // send a message asynchronously
   try (Connection connection = connectionFactory.createConnection();){
      Session session = connection.createSession();
      MessageProducer messageProducer = session.createProducer(queue);
      TextMessage textMessage = session.createTextMessage("Hello world");
      CountDownLatch latch = new CountDownLatch(1);
      MyCompletionListener myCompletionListener = new MyCompletionListener(latch);
      messageProducer.send(textMessage,new MyCompletionListener(latch));
      System.out.println("Message sent, now waiting for reply");

      // at this point we can do something else before waiting for the reply
      // this is not shown here

      // now wait for the reply from the server	
      latch.await();

      if (myCompletionListener.getException()==null){
         System.out.println("Reply received from server");
      } else {
         throw myCompletionListener.getException();
      }
   }
}

清单 5

清单 5 中所用的 MyCompletionListener 类是应用程序提供的一个单独类,它实现 javax.jms.CompletionListener 接口,如清单 6 所示:

class MyCompletionListener implements CompletionListener {

   CountDownLatch latch;
   Exception exception;
   
   public MyCompletionListener(CountDownLatch latch) {
      this.latch=latch;
   }

   @Override
   public void onCompletion(Message message) {
      latch.countDown();
   }

   @Override
   public void onException(Message message, Exception exception) {
      latch.countDown();
      this.exception=exception;
   }

   public Exception getException(){
      return exception;
   }
}

清单 6

在清单 6 中,我们对 MessageProducer 使用新方法发送消息,不等待服务器回复。这个新方法是 send(Message message, CompletionListener listener)。使用此方法发送消息,应用程序可以在服务器处理消息时执行其他操作。当应用程序准备继续时,使用 java.util.concurrent.CountDownLatch 等待从服务器收到回复。收到回复后,应用程序可以继续,并且跟正常同步发送之后一样相信消息已经成功发送。

如果使用 JMS 2.0 简化的 API,异步发送消息要略为简单些,如清单 7 所示:

private void asyncSendSimplified(ConnectionFactory connectionFactory,Queue queue) 
   throws Exception {

   // send a message asynchronously
   try (JMSContext context = connectionFactory.createContext();){
      CountDownLatch latch = new CountDownLatch(1);
      MyCompletionListener myCompletionListener = new MyCompletionListener(latch);
      context.createProducer().setAsync(myCompletionListener).send(queue,"Hello world");
      System.out.println("Message sent, now waiting for reply");

      // at this point we can do something else before waiting for the reply
      // this is not shown here

      latch.await();
      if (myCompletionListener.getException()==null){
         System.out.println("Reply received from server");
      } else {
         throw myCompletionListener.getException();
      }
   }
 }

清单 7

在这种情况下,调用 send(Message message) 之前将对 JMSProducer 调用方法 setAsync(CompletionListener listener)。由于 JMSProducer 支持方法链,因此可在同一行代码中完成这两个操作。

JMSXDeliveryCount

JMS 2.0 允许应用程序接收一条消息来确定消息重新传递的次数。可以从消息属性 JMSXDeliveryCount 获取此信息:

int deliveryCount = message.getIntProperty("JMSXDeliveryCount");

JMSXDeliveryCount 不是一个新属性,在 JMS 1.1 中就定义了。但在 JMS 1.1 中,JMS 提供程序可以选择实际设置它,这意味着使用它的应用程序代码是不可移植的。在 JMS 2.0 中,变成强制要求 JMS 提供程序设置此属性,以便可移植应用程序利用它。

那么,为什么应用程序可能希望了解消息重新传递的次数呢?

如果消息被重新传递,这意味着先前传递消息的尝试因为某种原因失败了。如果消息被反复重新传递,原因可能是接收应用程序出了问题。应用程序也许能够接收消息,但不能处理它,因此引发了异常或回滚事务。如果存在无法处理消息的长期性原因,如消息因某种方式变成“坏的”,则将一遍又一遍地重新传递同一消息,既浪费资源,又防碍了处理后续“好的”消息。

JMSXDeliveryCount 属性让使用应用程序可以检测消息已经重新传递多次,因此,在某种程度上是“坏的”。应用程序可以使用此信息采取某种特殊操作(而不是简单触发另一次重新传递),比如,使用该消息并将其发送到单独的“坏”消息队列等待管理员操作。

有些 JMS 提供程序已经提供了非标准工具来检测反复重新传递的消息,并将其转到死消息队列。尽管 JMS 2.0 定义了应如何处理此类消息,JMSXDeliveryCount 属性仍允许应用程序以可移植方式实现自己的“坏”消息处理代码。

清单 8 显示了一个 MessageListener,它引发 RuntimeException 模拟“坏”消息处理过程中的一个错误。MessageListener 使用 JMSXDeliveryCount 属性检测到消息已经重新传递了 10 次,然后采取不同的操作。

class MyMessageListener implements MessageListener {

   @Override
   public void onMessage(Message message) {
      try {
         int deliveryCount = message.getIntProperty("JMSXDeliveryCount");
    if (deliveryCount<10){
       // now throw a RuntimeException 
            // to simulate a problem processing the message
       // the message will then be redelivered
       throw new RuntimeException("Exception thrown to simulate a bad message");
         } else {
       // message has been redelivered ten times, 
       // let's do something to prevent endless redeliveries
       // such as sending it to dead message queue
       // details omitted
    }
      } catch (JMSException e) {
         throw new RuntimeException(e);
      }
   }
}

清单 8

MDB 配置属性

需要异步接收消息的 Java EE 应用程序使用 MDB 执行此操作,MDB 是通过指定一些配置属性进行配置的。

Java EE 的早期版本对于如何配置 MDB 很模糊。在 EJB 3.1 中,仅定义了以下配置属性:

  • acknowledgeMode(仅用在事务是 bean 托管时;可以设置为 Auto-acknowledgeDups-ok-acknowledge
  • messageSelector
  • destinationType(可以设置为 javax.jms.Queuejavax.jms.Topic
  • subscriptionDurability(仅用于主题;可以设置为 DurableNonDurable

但是,EJB 3.1 未定义应用程序应如何指定 MDB 从哪个队列或主题接收消息。需要由应用服务器或资源适配器来定义非标准方式来执行此操作。

EJB 3.1 也未定义 — 当从主题收到消息且 subscriptionDurability 属性设置为 Durable 时 — 应如何指定订阅名和客户端标识符。EJB 3.1 中没有指定 MDB 用来创建其与 JMS 服务器的连接的连接工厂的标准方式。

在 Java EE 的最新版本中,这些相当令人吃惊的限制全部解决了。EJB 3.2(Java EE 7 的一部分)还定义了以下配置属性:

  • destinationLookup:为便于管理而定义的队列或主题对象的 JNDI 名称,代表 MDB 将从中接收消息的队列或主题
  • connectionFactoryLookup:为便于管理而定义的 ConnectionFactory 对象的 JNDI 名称,MDB 将使用此对象连接到 JMS 提供程序
  • clientId:MDB 连接到 JMS 提供程序时使用的客户端标识符
  • subscriptionNamesubscriptionDurability 设置为 Durable 时使用的持久订阅名称

大多数应用服务器都支持 clientIdsubscriptionName,因此将这些定义为标准只不过是使现有实践标准化。

当然,总是可以配置 JMS MDB 使用的队列或主题,许多(但不是全部)应用服务器提供了一种指定连接工厂的方式。但是,实现此操作的方式是非标准的,每个应用服务器都不相同。应用服务器仍然可以自由地继续支持这些非标准机制。不过,您可以放心,使用 destinationLookupconnectionFactoryLookup 的应用程序将适用于多个应用服务器。

清单 9 显示一个 JMS MDB,它使用来自有关一个主题的持久订阅的消息,并使用新的标准属性:

@MessageDriven(activationConfig = { 
   @ActivationConfigProperty(
      propertyName = "connectionFactoryLookup", propertyValue = "jms/MyConnectionFactory"),
   @ActivationConfigProperty(
      propertyName = "destinationLookup", propertyValue = "jmq/PriceFeed"), 
   @ActivationConfigProperty(
      propertyName = "destinationType ", propertyValue = "javax.jms.Topic "),
   @ActivationConfigProperty(
      propertyName = "subscriptionDurability ", propertyValue = "Durable"), 
   @ActivationConfigProperty(
      propertyName = "subscriptionName", propertyValue = "MySub"), 
   @ActivationConfigProperty(
      propertyName = "clientId", propertyValue = "MyClientId") }) 
   
public class MyMDB implements MessageListener {
   public void onMessage(Message message) {
      ...

清单 9

总结

所有上述五个特性有助于 Java 开发人员更轻松地传递消息。结合第 1 部分讨论的易用性特性,它们代表了向 JMS 2.0 迈出的一大步 — 这应作为 Java 环境中最成功的 API 之一继续繁荣下去。

另请参见

关于作者

Nigel Deakin 是 Oracle 的技术人员中的一位主要成员,他是 JSR 343 (Java Message Service 2.0) 规范的带头人。除了负责领导 JMS 规范的后续版本,他还是 Oracle JMS 开发团队的成员,负责 Open Message Queue 和 GlassFish 应用服务器。他最近在美国旧金山的 JavaOne 大会、比利时安特卫普的 Devoxx 大会上发表了演讲,目前他在英国剑桥工作。

分享交流

请在 FacebookTwitterOracle Java 博客上加入 Java 社区对话!