High Performance Message Processing with BMT Message-Driven Beans and Spring
Pages: 1, 2, 3, 4, 5

Duplicate Delivery Detection and Prevention

As shown in the previous section, when consuming messages non-transitionally and when once-and-only-once QoS is required, a special mechanism should be employed by the application to prevent duplicates processing, which could happen under rare circumstances of failure of one or more components between the transaction commit and message acknowledgment steps. This mechanism should be flexible enough to work equally well in different deployment scenarios; for example, if our application is deployed in a cluster, message redelivery could happen on a node different from the node where the initial delivery attempt took place. In order to detect duplicates and prevent messages from being processed a second time, the application should somehow recognize that the message was already processed.

It seems logical to keep track of processed messages in the database instance which is (usually) shared between all nodes in the cluster. As a unique identifier for the message we could use JMSMessageID, a String value which, according to the JMS documentation, is a unique key for identifying messages in a historical repository. Although the exact scope of uniqueness is provider-defined, it should at least cover all messages for a specific installation of a provider, where an installation is some connected set of message routers. To be on the safe side we will also persist a JMSTimestamp. The JMSTimestamp field contains the time a message was handed off to a provider to be sent. Combined, these two values should guarantee that we can uniquely identify every message, even if they're coming from different providers.

Obviously it is highly beneficial to try to design the duplicate detection mechanism to be as fast and efficient as possible; therefore, we would like to avoid an SQL select operation altogether and create a table with a compound primary key defined on the JMSMessageID and JMSTimestamp fields. Then we can detect duplicate records by the fact that subsequent attempts to insert values with the same primary key will fail because of a data integrity violation, which we can detect and handle appropriately. Below is a DDL script for an Oracle database:

CREATE TABLE message_log
    (message_id VARCHAR2(255) NOT NULL,
    message_ts INTEGER NOT NULL,
  CONSTRAINT pk_message_log
    PRIMARY KEY (message_id, message_ts)
);

Implementing the Non-transactional Messages Consumer Model

Let's try to build a harness to simplify non-transactional message processing. Since there is quite a lot of cross-cutting functionality involved, it makes sense to try to use an AOP approach for this task. Below, we are going to show how to use AOP features of the Spring Framework to implement all logic necessary for non-transactional message processing with once-and-only-once QoS.

The power of the Spring AOP Framework had been already demonstrated in several other articles (see the References). It allows us to declaratively define a chain of aspects, where each aspect would contribute a specific piece of functionality to the execution flow. The resulting code looks very structured, and each individual piece can be tested in isolation in order to cover all use cases specific for each component.

For a basic introduction to the Spring AOP Framework refer to Using Spring AOP Framework in EJB (dev2dev, December 2005). The preferred way to use AOP in Spring is to have component interfaces that could be proxied by Spring. Though it's possible to use the JMS interfaces MessageListener and Message for such a purpose, it would actually make sense to define something more generic and decoupled from JMS interfaces. For example, you can declare a MessageProcessor interface like this:

public interface MessageProcessor {
   public Object process(MessageData messageData);
}

Note that the process() method takes a MessageData parameter that encapsulates all the data from the original Message. You can implement Spring's MessageConverter to convert instances of Message to MessageData and also extend AbstractJmsMessageDrivenBean to implement common processing logic, namely:

  • Conversion of JMS messages to MessageData
  • Obtaining a processor instance from the application context
  • Execution of the message processor
  • Exception handling

You can download complete source from the Download section. Here is what an implementation of the MessageDataDrivenBean would look like:

public abstract class MessageDataDrivenBean extends
                       AbstractJmsMessageDrivenBean {
  private MessageConverter messageConverter =
                              new MessageDataConverter();
  private MessageProcessor messageProcessor;

  protected void onEjbCreate() {
    messageProcessor = ((MessageProcessor) 
                   getBeanFactory().getBean(getName()));
  }

  public void onMessage(Message message) {
    try {
      MessageData messageData = (MessageData) 
            getMessageConverter().fromMessage(message);
      this.messageProcessor.process(messageData);
    
    } catch( MessageConversionException ex) {
      String msg = "Message conversion error; "+
                                     ex.getMessage();
      this.logger.error(msg, ex);
      throw new RuntimeException(msg, ex);
      
    } catch( JMSException ex) {
      String msg = "JMS error; "+ex.getMessage();
      this.logger.error(msg, ex);
      throw new RuntimeException(msg, ex);

    }
  }
  
  protected MessageConverter getMessageConverter() {
    return this.messageConverter;
  }
  
   
                        
protected abstract String getName();
  
}
                      

Concrete subclasses only need to provide an implementation of the getName() method that would return the bean name of a specific implementation of the MessageProcessor interface, as it defined in the Spring context. It's also possible for subclasses to provide their own MessageConverter that can use a different strategy to populate MessageData. Here's a simple implementation of MessageProcessor:

public class SimpleMessageProcessor implements MessageProcessor {

  private Log log = LogFactory.getLog(getClass());

  public Object process( MessageData messageData) {
    log.info(messageData);
    return null;
  }

}

Finally, the concrete MDB could look like the one below. Note that we use XDoclet annotations to declare metadata for deployment descriptors:

/**
 * SimpleMdb
 * 
 * @ejb.bean 
 *   name="org.javatx.mdb.SimpleMdb" 
 *   type="MDB"
 *   destination-type="javax.jms.Queue" 
 *    
                        
transaction-type="Bean"
 * 
 * @weblogic.pool 
 *   initial-beans-in-free-pool=
 *                "${org.javatx.simple.mdb.pool.initial}" 
 *   max-beans-in-free-pool=
 *                "${org.javatx.simple.mdb.pool.max}"
 * 
 * @weblogic.message-driven 
 *   connection-factory-jndi-name=
 *                  "${org.javatx.qcf.jndi-name}"
 *   destination-jndi-name=
 *                  "${org.javatx.simple.queue.jndi-name}"
 *   jms-polling-interval-seconds=
 *                  "${org.javatx.mdb.pooling.interval}"
 *   
 *  
                        
@ejb.env-entry
 *   name="BeanFactoryPath" 
 *   value="applicationContext.xml"
 */
public class SimpleMdb extends MessageDataDrivenBean {

  protected String getName() {
    return  
                        
"simpleProcessor";
  }
  
}
                      

In the code above, the BeanFactoryPath env-entry is used by Spring's EJB classes to locate the application context. This application context should have a declaration of the simpleProcessor bean, which would handle all the processing logic, as well as non-functional requirements, such as transactions, preventing duplicate message processing and optional tracing and performance monitoring.

Obviously it makes sense to move all non-functional aspects into advices and define chains of interceptors using ProxyFactoryBean that wrap the actual implementation of the MessageProcessor. Such a definition could look like the following:

<bean id="
                        
simpleProcessor" 
      class="org.springframework.aop.framework.ProxyFactoryBean">
  <property name="target">
    <bean class="SimpleMessageProcessor"/>
  </property>
  <property name="proxyInterfaces" 
               value="org.javatx.mdb.MessageProcessor"/>
  <property name="interceptorNames">
    <list>
      <idref local="mdbTransactionAdvisor"/>
      <idref local="mdbDuplicateHandlingAdvisor"/>
      <!-- optional monitoring and tracing -->
      <idref local="messageProcessorPerformanceMonitorAdvisor"/>
      <idref local="messageProcessorTraceAdvisor"/>
    </list>
  </property>
</bean>
                      

The sequence diagram in Figure 1 illustrates the message processing and stack of advisors required to support the QoS model.

Figure 1
Figure 1. Stack of advisors processing incoming messages (click for full-size image)

Pages: 1, 2, 3, 4, 5

Next Page ยป