Advanced Pub/Sub: Why Do We Need It?

by Daniel Amadei

Exploring the advanced publish and subscribe features in Oracle Weblogic Server.

Published August 2012

Download
download-icon13-1 Oracle WebLogic Server

Advanced publish and subscribe (pub/sub) is a relatively new set of features among the many features that are already provided by Oracle WebLogic Server for Java Message Service (JMS) messaging. Advanced pub/sub features go beyond the JMS specification and add more robustness to messaging in Java. This article aims to show you why these features are so important.

The new features bring parallelism to the pub/sub world, providing an implementation of the Competing Consumers integration pattern, which is already present in point-to-point (queues). Enterprise Integration Patterns, by Gregor Hohpe and Bobby Woolf, defines the Competing Consumers pattern as follows:

"Competing Consumers are multiple consumers that are all created to receive messages from a single Point-to-Point Channel. When the channel delivers a message, any of the consumers could potentially receive it. The messaging system's implementation determines which consumer actually receives the message, but in effect the consumers compete with each other to be the receiver. Once a consumer receives a message, it can delegate to the rest of its application to help process the message. (This solution only works with Point-to-Point Channels; multiple consumers on a Publish-Subscribe Channel just create more copies of each message.)"

Oracle WebLogic Server goes a bit further and makes the pattern available also to the pub/sub area, leveraging performance, scalability, and high availability. If you are familiar with Oracle WebLogic Server messaging, you will notice these new features bring to topics a lot of capabilities that were already available when working with queues.

We are going to digest some of the new features using a "by example" methodology to present a use case, so you can analyze the options that are available. Before we do that, let's review what has changed.

The Past

The original distributed topic architecture was called "replicated." The documentation for Oracle WebLogic Server provides the following definition:

 

"All physical topic members receive each sent message. If a message arrives at one of the physical topic members, a copy of this message is forwarded to the other members of that uniform distributed topic. A subscription on any one particular member will get a copy of any message sent to the uniform distributed topic logical name or to any particular uniform distributed topic member."

 

By the definition presented, it's easy to understand that the message is replicated to all members of the distributed topic so that all members receive all messages sent. There's no way to avoid having messages be forwarded to all physical members of the distributed topic.

Also, only a single connection is allowed to use the same client ID on the entire cluster, and the subscription name can't be shared either.

This architecture presents some limitations when there is demand for parallelism, high availability, and the ability to control how many copies of the same message are received by an application.

The Present

With the release of Oracle WebLogic Server 10.3.4 the new features comprising the advanced pub/sub capabilities are as follows:

  • Partitioned topics: Partitioned topics give the same scalability present in distributed queues to distributed topics, allowing messages to be spread among multiple managed servers. This increases scalability, because it allows messages to be processed in parallel and also allows them to be pulverized among all available servers. Another important feature made available by partitioned topics is the ability to control how many copies of each message an application may receive, independent of how many connections and subscriptions it may have on the same topic stream.
  • No forwarding: Different from the replicated model, partitioned topics do not forward messages to the other physical members. Oracle WebLogic Server documentation defines partitioned topics as the following:

    "The physical member receiving the message is the only member of the uniform distributed topic that is aware of the message. When a message is published to the logical name of a Partitioned uniform distributed topic, it will only arrive on one particular physical topic member. Once a message arrives on a physical topic member, the message is not forwarded to the rest of the members of the uniform distributed destination, and subscribers on other physical topic members do not get a copy of that message. The Partitioned capability was added in WebLogic 10.3.4 (11gR1PS3)."
  • Sharing the same client ID and subscription name among different subscribers: Multiple subscribers can share the same client ID and subscription name, guaranteeing high availability and scalability for message consumption and processing (parallel consumption) for durable and nondurable subscriptions.
  • Ability to control how many messages an application will receive: Using the options above, you can control how many copies of a single message an application will receive. It can be one message copy per instance of the application or just one copy per application regardless of how many instances of the application are available. Message-driven beans (MDBs) have been updated and new activation configuration properties have been added to control such behavior.

These new features provide the following benefits:

  • Architectural flexibility: You can easily add more subscribers and take advantage of cluster expansion if message production supersedes message consumption without code change.
  • Configuration simplicity: MDBs can be easily configured to support different models of subscription.
  • Less resource usage: Not forwarding messages to all the cluster members, as happens when using the replicated model, ensures fewer resources (for example, networking) are used.
  • Performance and scalability: With the new options, you can consume messages in parallel much the same as you could do with queues. You can also expand your cluster the way you want and subscribers will adapt to the new architecture.
  • High availability: Because multiple physical subscribers can share the same subscription, there's guaranteed support for high availability, not only for the distributed topic itself but also for the topic subscribers.

Now that we've discussed the options available with the advanced pub/sub capabilities as well as their benefits, let's dive a little deeper with a practical example.

The Use Case

Suppose you have a single JMS message and you have to deliver it safely to many applications. By safely, I mean you have to guarantee the delivery of the message and even if any of the applications are offline when the message is sent, the offline applications should be able to receive the message when the applications come back online.

An application will present multiple connections trying to receive messages from the JMS system so it can be highly available and be scalable enough to keep consumption rates higher than production (remember the Competing Consumers pattern).

It's also mandatory that each application receives exactly one copy of each message regardless of the number of consumers, sessions, or connections it presents to the messaging system.

One Option

As one option, you can replicate the same message to many different queues. This is feasible and allows multiple connections to the servers for message consumption. Each of these connections will receive a single message. This message will be delivered just to one process unless there is a problem and the message is rolled back to the queue and can be reprocessed later (depending on the redelivery configuration). If the application is offline, the message will remain in the queue.

This seems to cover all of our requirements; however, this scenario has some disadvantages:

  • It will require you to control the transaction to guarantee the message is sent to all 10 queues or to none of them.
  • You will also have to live with the performance penalty and the space consumption for replicating the same message into many new ones.
  • The more applications you add, the more queues you'll have, demanding code changes and increasing the problems above in a linear fashion.

Another (Better) Option

The use of topics is a better option for our use case. To be more specific, based on the requirements, we should use topics along with durable subscriptions, which provides message persistence and guarantees delivery even when the subscriber is offline. We will see why this is a better approach.

The combination of distributed topics and durable subscriptions presents the same features of a collection of queues, but it allows the same message to be delivered to multiple recipients with fewer hops (lower latency), lower runtime overhead (no extra copies of messages), and lower administrative costs (simpler configuration). It's a better fit than the previous option.

Until Oracle WebLogic Server 10.3.4, you could have only one client using a particular durable subscription at any given time. This limitation would hurt one of our requirements: multiple instances of each application should be able to receive messages and each application should receive exactly one copy of a message. Starting with release 10.3.4 of Oracle WebLogic Server, the combination of shared subscriptions with partitioned distributed topics allows more than one client connection to share the same subscription name and still receive one copy of the message.

The features of advanced pub/sub apply to durable and nondurable subscriptions, so even if you use nondurable topic subscriptions, you still can benefit from some of these features. For more information on the subject, it's worth reading the product documentation.

Implementing the Use Case

We are going to implement some applications to see how this use case is applied in practice. These sample applications are one Swing client application for message production along with two MDBs and a Swing client application subscribing in a durable fashion to the same topic to receive a copy of each message published.

Figure 1 shows a high-level overview of the architecture implied by the use case:

amadei-advanced-pubsub-fig01
Figure 1. Use Case Architecture

The first step to implement our use case is to create the partitioned distributed topic, so when you create the distributed topic, for Forwarding Policy select Partitioned. You can see how this is accomplished in Figure 2:

amadei-advanced-pubsub-fig02
Figure 2. Selecting the Partitioned Forwarding Policy

Target the distributed topic to a subdeployment. The subdeployment itself should be targeted to individual JMS servers pertaining to the cluster, as can be seen in Figure 3:

amadei-advanced-pubsub-fig03
Figure 3. Targeting the Subdeployment

The next step is to create a connection factory prepared to support our requirements. This connection factory should allow multiple subscriptions with the same client ID and subscription name. When creating the connection factory, select Shareable for the Subscription Sharing Policy and select Unrestricted for the Client ID Policy, as shown in Figure 4:

amadei-advanced-pubsub-fig04
Figure 4. Creating the Connection Factory

The Code

To mimic many applications (as stated in the use case), we will have three subscribers: two MDBs (Application 01 and Application 02) and a Swing client application. These three applications will be enough to illustrate the behavior; creating more dummy applications would be quite boring.

Both Enterprise JavaBeans (EJB) applications are almost identical and are composed of a single MDB each. The MDB simply receives the message and prints the body of the message to the standard output device along with the name of the application that received it. This approach makes it easy to see and prove that the same message is being consumed by Application 01 and Application 02.

The EJB code for Application 01 can be seen below. It prints the string Message <message text> received by Application 01 to the standard output device:

package com.oracle.otn.article.adv.pub.sub.app01;

//imports...

@MessageDriven(activationConfig = {
   @ActivationConfigProperty(
      propertyName = "destinationType", propertyValue = 
	  "javax.jms.Topic"),
   @ActivationConfigProperty(
      propertyName = "connectionFactoryJndiName", propertyValue =
	   "jms.ShareableConnectionFactory"),
   @ActivationConfigProperty(
      propertyName = "topicMessagesDistributionMode", propertyValue =
	   "One-Copy-Per-Application"),
   @ActivationConfigProperty(
      propertyName = "subscriptionDurability", propertyValue = 
	  "Durable") 
   }, 
   mappedName = "jms.myTopic")
public class MySubscriberMDBApp01 implements MessageListener {

   public void onMessage(Message message) {
      try {
         TextMessage textMessage = (TextMessage) message;

         String text = textMessage.getText();

         System.out.println("Message '" + text
            + "' received by Application 01");
      } catch (JMSException e) {
         throw new EJBException(e);
      }
   }
} 

From the source code, we see some interesting configurations made using the @ActivationConfigProperty annotation:

  • destinationType: We specify the destination type as javax.jms.Topic.
  • connectionFactoryJndiName: We specify the jms.ShareableConnectionFactory as the connection factory to be used by the MDB. If you remember, this connection factory was created earlier and allows the client ID to be unrestricted and the subscription name to be shared.
  • topicMessagesDistributionMode: We specify One-Copy-Per-Application, which means that each application will receive only one copy of every message sent to that particular topic, regardless of the number of application instances. (In a Java EE application, this control is in fact made per MDB, so different MDBs in the same application [EAR], would receive one copy of the message each.)
  • subscriptionDurability: We specified this as Durable so that messages are kept in case the consumer is not active at the moment the message is published.

It's interesting to mention that all these configurations can also be done using XML deployment descriptors if you do not want to use the annotations or you need to overwrite a value defined by an annotation and you do not have access to the EJB source code.

The code for Application 02 is almost identical except it prints the message received by Application 02 not the message received by Application 01:

System.out.println("Message '" + text + "' received by Application 02");

After deploying both applications (they are available for download ), go back to the Oracle WebLogic Server console and access Services Messaging JMS Modules myTopic Monitoring Statistics. You will see something that resembles Figure 5:

amadei-advanced-pubsub-fig05
Figure 5. Viewing the Consumers

As shown in Figure 5, each member has 32 consumers. This number comes from the default of 16 consumers for each MDB; because we have two MDBs deployed (Application 01 and Application 02), we see 32 consumers.

Now, check the Durable Subscribers tab. You should see something that resembles Figure 6:

amadei-advanced-pubsub-fig06
Figure 6. Viewing the Durable Subscriptions

As Figure 6 illustrates, a durable subscription was created in each member of the distributed topic with the same subscription name. Because the topic is partitioned, messages will be spread among the members and won't be replicated. The consumers are responsible for consuming messages from the member to which they are connected. This allows for parallel processing, improves scalability, and is a key enabler to the one-message-per application approach.

The Producer

We can develop a very simple producer to input messages into the topic. The class shown below is the Swing client code for a simple producer that has the responsibility of taking the text input from a text box and sending it as a JMS text message. (The simplest way to execute the client applications [producer and subscriber] is to leverage wlfullclient.jar. For more info on how this JAR is generated, please consult http://docs.oracle.com/cd/E24329_01/web.1211/e24378/jarbuilder.htm.)

package com.oracle.otn.article.adv.pub.sub.producer;

//imports…

public class ProducerClient extends JFrame {

  private JLabel titleLabel;
  private JTextField textField;
  private JButton sendButton;

  public ProducerClient() {
    super("Send text message via JMS");
  }

  private void init() {
    setDefaultCloseOperation(EXIT_ON_CLOSE);

    titleLabel = new JLabel("Enter the text to send:");
    textField = new JTextField(20);

    JPanel panel = new JPanel();
    sendButton = new JButton("Send");

    sendButton.addActionListener(new ActionListener() {

      @Override
      public void actionPerformed(ActionEvent e) {
        String text = ProducerClient.this.textField.getText();

        try {
          new MessageSender().send(text);

          textField.setText("");
          JOptionPane.showMessageDialog(ProducerClient.this, "Message sent...",
              "Success", JOptionPane.INFORMATION_MESSAGE);
        } catch (Exception jmsEx) {
          JOptionPane.showMessageDialog(ProducerClient.this,
              "Error sending message. Please check logs.", "Error",
              JOptionPane.ERROR_MESSAGE);
          jmsEx.printStackTrace();
        }
      }
    });

    panel.add(textField);
    panel.add(sendButton);

    getContentPane().add(titleLabel, BorderLayout.NORTH);
    getContentPane().add(panel, BorderLayout.CENTER);

    pack();
    setVisible(true);
  }

  public static void main(String args[]) {
    SwingUtilities.invokeLater(new Runnable() {
      public void run() {
        new ProducerClient().init();
      }
    });
  }
}

The class below is responsible for actually sending the JMS message to the topic. It is just for demonstration purposes because it could be tuned, for example, to reuse heavyweight objects, such as the connection, the session, and so on. Therefore, do not use it outside this sample without making the necessary changes.

package com.oracle.otn.article.adv.pub.sub.producer;

//imports…

public class MessageSender {

  public void send(String text) throws Exception {

    Context context = new InitialContext();
    ConnectionFactory cf;
    Connection connection = null;
    Session session = null;
    MessageProducer producer = null;

    try {
      cf = (ConnectionFactory) context.lookup("weblogic.jms.ConnectionFactory");

      Topic topic = (Topic) context.lookup("jms.myTopic");

      connection = cf.createConnection();
      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

      producer = session.createProducer(topic);

      producer.send(session.createTextMessage(text));

    } finally {
      closeAll(producer, session, connection, context);
    }
  }

  private void closeAll(MessageProducer producer, Session session,
      Connection connection, Context context) {

    // code to close the resouces
  }
}

After testing the producer and sending one message with the text "Test message one," we see the following in the log of one of our managed servers. (The exact text will depend on which managed server the message went to.)

Message 'Message test one' received by Application 01
Message 'Message test one' received by Application 02

Looking at the monitoring again, we see that the message passed through one managed server, in this case, server2, as shown in Figure 7:

amadei-advanced-pubsub-fig07
Figure 7. Checking the Message Path

After executing the same test three more times using the messages "Message test two," "Message test three," and "Message test four" and looking at the server logs, we see the following messages divided per (managed) server, which shows that the messages were split and balanced among the managed servers. (It's important to notice that the load balancing semantics of your connection factory may impact how the load balancing occurs, especially when you keep connected to Oracle WebLogic Server and the connection factory server affinity configuration.)

Server 1:

Message 'Message test two' received by Application 01
Message 'Message test two' received by Application 02
Message 'Message test three' received by Application 01
Message 'Message test three' received by Application 02

Server 2:

Message 'Message test one' received by Application 01
Message 'Message test one' received by Application 02
Message 'Message test four' received by Application 01
Message 'Message test four' received by Application 02

Looking at the monitoring, the Messages Total column in Figure 8 proves that messages were balanced among both servers, as indicated by the log messages:

amadei-advanced-pubsub-fig08
Figure 8. Verifying Message Balancing

A Standalone Client Application for Consumption

If you can't use an MDB to consume and process your messages for some reason, for example, if you need to use a client/server application or something similar to that, you will have to connect to individual members. When using distributed topics with durable subscriptions, you cannot subscribe directly to the distributed topic JNDI name. (You can do that for the producer, as we did with ours, and you can connect and try to consume from distributed queues using the distributed queue JNDI name. However, the member from which you will consume is not defined, and there will be no load balancing. So it's always safer to connect directly to the members when consuming. You can also use more advanced approaches with foreign JNDI to avoid hard-coding individual JNDI names, but that is beyond the scope of this article.)

I have built a sample application to demonstrate how to connect and subscribe from individual members. It is a very simple Swing application, capable of receiving messages and showing them in a JTable. The application connects to both managed servers and uses the destination availability API to register a listener in each of the members using a durable subscription. Then it unregisters the listeners when that member goes down. You can download the sample application here and see how the API was used. Just remember this is a sample, so use it at your own risk.

Below we see the code for the client application that creates and registers the MessageListener:

private synchronized void connectAndSelfRegister() throws JMSException,
      NamingException {
    context = new InitialContext();
    cf = (ConnectionFactory) context.lookup("jms.ShareableConnectionFactory");

    Topic topic = (Topic) context.lookup(jndiName);

    connection = cf.createConnection();
    connection.setClientID("SwingApp");

    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    subscriber = session.createDurableSubscriber(topic, "SwingApp");
    subscriber.setMessageListener(this);
    connection.setExceptionListener(this);

    connection.start();

    context.close();
    context = null;
  }

From the code snippet, you can see the application is using our shareable connection factory. The connection is using "SwingApp" as the client ID and the subscription name. Because we are using the unrestricted client policy and the shareable subscription name, we can connect with the same client ID and subscription name to all the members (and we could connect to more subscribers with the same name in the same member, also). The method registers its own object (this) as the message listener. That is possible because the class implements MessageListener, as we can see below:

public class AsyncListener implements MessageListener, ExceptionListener {
  //code inside the class…

  @Override
  public void onMessage(Message message) {
    try {
      TextMessage textMessage = (TextMessage) message;
      consumerClient.onTextMessage(textMessage.getText());
    } catch (Exception e) {
      throw ensureRuntimeException(e);
    }
  }
}

When a message arrives, the onMessage method of the AsyncListener class receives the message and sends it to the screen by calling method onTextMessage(String) from the ConsumerClient class. The onTextMessage() method is shown below and is just responsible for showing the message on screen. This way, our class is responsible for the presentation layer and does not have to worry about JMS. It is just notified to show a message without knowing where it came from or how it was received:

public void onTextMessage(final String text) {
    SwingUtilities.invokeLater(new Runnable() {
      @Override
      public void run() {
        DefaultTableModel model = (DefaultTableModel) messagesTable.getModel();

        model.insertRow(model.getRowCount(), new Object[] { text,
            new SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(new Date()) });
      }
    });
  }

When looking at the Durable Subscribers monitoring screen, we see what's shown in Figure 9:

amadei-advanced-pubsub-fig09
Figure 9. Viewing the Durable Subscribers

Figure 9 shows our subscribers connected in each of the distributed topic members (two in our case). One nice thing to do is close the client application and send some messages to the topic. After sending five messages while the client consumer application is down, we see what's shown in the Messages Current Count column in Figure 10:

amadei-advanced-pubsub-fig10
Figure 10. Viewing the Current Messages Count

In Figure 10, you can see that just the messages for our client application are still there. Oracle WebLogic Server logically shows a message as multiple messages when there are multiple durable subscribers interested in receiving it. Oracle WebLogic Server shows as "current messages" messages for the applications that have not been consumed yet, as can be seen from Figure 10.

Now, turning the client application on, we see what's shown in Figure 11 in the application main screen:

amadei-advanced-pubsub-fig11
Figure 11. Verifying That Messages Were Consumed

Figure 11 shows that the messages were consumed when the application resumed running. Looking at the monitoring in the Oracle WebLogic Server console, you should see that all messages were consumed and zero messages are current for all subscribers, proving the behavior and benefits of the durable subscription approach.

Conclusion

The advanced pub/sub features of Oracle WebLogic Server are a reality and, depending on your use case, using them can be the best option when you have to deliver the same message to multiple consumers and process the messages in parallel. Using this approach increases scalability while also providing architectural flexibility, configuration simplicity, and support for new messaging and SOA patterns.

About the Author

Daniel Amadei is a senior principal consultant for Oracle Consulting in Brazil, specializing in SOA and integration technologies. In addition to his SOA experience, he has worked with Java technologies since 1999. Daniel's certifications include Oracle SOA Foundation Practitioner, Oracle SOA Architect Certified Expert, and Sun Certified Enterprise Architect (SCEA).