Articles
Enterprise Architecture
High Performance Message Processing with BMT Message-Driven Beans and Spring
Pages:
1,
2,
3,
4,
5
As shown in the previous section, when consuming messages non-transitionally and when once-and-only-once QoS is required, a special mechanism should be employed by the application to prevent duplicates processing, which could happen under rare circumstances of failure of one or more components between the transaction commit and message acknowledgment steps. This mechanism should be flexible enough to work equally well in different deployment scenarios; for example, if our application is deployed in a cluster, message redelivery could happen on a node different from the node where the initial delivery attempt took place. In order to detect duplicates and prevent messages from being processed a second time, the application should somehow recognize that the message was already processed.
It seems logical to keep track of processed messages in the database instance which is (usually) shared between all nodes in the cluster. As a unique identifier for the message we could use
JMSMessageID, a String value which, according to the JMS documentation, is a unique key for identifying messages in a historical repository. Although the exact scope of uniqueness is provider-defined, it should at least cover all messages for a specific installation of a provider, where an installation is some connected set of message routers. To be on the safe side we will also persist a
JMSTimestamp. The JMSTimestamp field contains the time a message was handed off to a provider to be sent. Combined, these two values should guarantee that we can uniquely identify every message, even if they're coming from different providers.
Obviously it is highly beneficial to try to design the duplicate detection mechanism to be as fast and efficient as possible; therefore, we would like to avoid an SQL select operation altogether and create a table with a compound primary key defined on the JMSMessageID and JMSTimestamp fields. Then we can detect duplicate records by the fact that subsequent attempts to insert values with the same primary key will fail because of a data integrity violation, which we can detect and handle appropriately. Below is a DDL script for an Oracle database:
CREATE TABLE message_log
(message_id VARCHAR2(255) NOT NULL,
message_ts INTEGER NOT NULL,
CONSTRAINT pk_message_log
PRIMARY KEY (message_id, message_ts)
);
Let's try to build a harness to simplify non-transactional message processing. Since there is quite a lot of cross-cutting functionality involved, it makes sense to try to use an AOP approach for this task. Below, we are going to show how to use AOP features of the Spring Framework to implement all logic necessary for non-transactional message processing with once-and-only-once QoS.
The power of the Spring AOP Framework had been already demonstrated in several other articles (see the References). It allows us to declaratively define a chain of aspects, where each aspect would contribute a specific piece of functionality to the execution flow. The resulting code looks very structured, and each individual piece can be tested in isolation in order to cover all use cases specific for each component.
For a basic introduction to the Spring AOP Framework refer to Using Spring AOP Framework in EJB (dev2dev, December 2005). The preferred way to use AOP in Spring is to have component interfaces that could be proxied by Spring. Though it's possible to use the JMS interfaces
MessageListener and
Message for such a purpose, it would actually make sense to define something more generic and decoupled from JMS interfaces. For example, you can declare a
MessageProcessor interface like this:
public interface MessageProcessor {
public Object process(MessageData messageData);
}
Note that the
process() method takes a
MessageData parameter that encapsulates all the data from the original
Message. You can implement Spring's
MessageConverter to convert instances of
Message to
MessageData and also extend
AbstractJmsMessageDrivenBean to implement common processing logic, namely:
MessageData
You can download complete source from the Download section. Here is what an implementation of the
MessageDataDrivenBean would look like:
public abstract class MessageDataDrivenBean extends
AbstractJmsMessageDrivenBean {
private MessageConverter messageConverter =
new MessageDataConverter();
private MessageProcessor messageProcessor;
protected void onEjbCreate() {
messageProcessor = ((MessageProcessor)
getBeanFactory().getBean(getName()));
}
public void onMessage(Message message) {
try {
MessageData messageData = (MessageData)
getMessageConverter().fromMessage(message);
this.messageProcessor.process(messageData);
} catch( MessageConversionException ex) {
String msg = "Message conversion error; "+
ex.getMessage();
this.logger.error(msg, ex);
throw new RuntimeException(msg, ex);
} catch( JMSException ex) {
String msg = "JMS error; "+ex.getMessage();
this.logger.error(msg, ex);
throw new RuntimeException(msg, ex);
}
}
protected MessageConverter getMessageConverter() {
return this.messageConverter;
}
protected abstract String getName();
}
Concrete subclasses only need to provide an implementation of the
getName() method that would return the bean name of a specific implementation of the
MessageProcessor interface, as it defined in the Spring context. It's also possible for subclasses to provide their own
MessageConverter that can use a different strategy to populate
MessageData. Here's a simple implementation of
MessageProcessor:
public class SimpleMessageProcessor implements MessageProcessor {
private Log log = LogFactory.getLog(getClass());
public Object process( MessageData messageData) {
log.info(messageData);
return null;
}
}
Finally, the concrete MDB could look like the one below. Note that we use XDoclet annotations to declare metadata for deployment descriptors:
/**
* SimpleMdb
*
* @ejb.bean
* name="org.javatx.mdb.SimpleMdb"
* type="MDB"
* destination-type="javax.jms.Queue"
*
transaction-type="Bean"
*
* @weblogic.pool
* initial-beans-in-free-pool=
* "${org.javatx.simple.mdb.pool.initial}"
* max-beans-in-free-pool=
* "${org.javatx.simple.mdb.pool.max}"
*
* @weblogic.message-driven
* connection-factory-jndi-name=
* "${org.javatx.qcf.jndi-name}"
* destination-jndi-name=
* "${org.javatx.simple.queue.jndi-name}"
* jms-polling-interval-seconds=
* "${org.javatx.mdb.pooling.interval}"
*
*
@ejb.env-entry
* name="BeanFactoryPath"
* value="applicationContext.xml"
*/
public class SimpleMdb extends MessageDataDrivenBean {
protected String getName() {
return
"simpleProcessor";
}
}
In the code above, the
BeanFactoryPath env-entry is used by Spring's EJB classes to locate the application context. This application context should have a declaration of the
simpleProcessor bean, which would handle all the processing logic, as well as non-functional requirements, such as transactions, preventing duplicate message processing and optional tracing and performance monitoring.
Obviously it makes sense to move all non-functional aspects into advices and define chains of interceptors using
ProxyFactoryBean that wrap the actual implementation of the
MessageProcessor. Such a definition could look like the following:
<bean id="
simpleProcessor"
class="org.springframework.aop.framework.ProxyFactoryBean">
<property name="target">
<bean class="SimpleMessageProcessor"/>
</property>
<property name="proxyInterfaces"
value="org.javatx.mdb.MessageProcessor"/>
<property name="interceptorNames">
<list>
<idref local="mdbTransactionAdvisor"/>
<idref local="mdbDuplicateHandlingAdvisor"/>
<!-- optional monitoring and tracing -->
<idref local="messageProcessorPerformanceMonitorAdvisor"/>
<idref local="messageProcessorTraceAdvisor"/>
</list>
</property>
</bean>
The sequence diagram in Figure 1 illustrates the message processing and stack of advisors required to support the QoS model.
|
|