Articles
Enterprise Architecture
High Performance Message Processing with BMT Message-Driven Beans and Spring
Pages:
1,
2,
3,
4,
5
Now we can take a closer look at the
mdbTransactionInterceptor and
mdbDuplicateHandlingAdvisor that provide functionality to ensure the required quality of service using the approach described above.
The
mdbTransactionAdvisor is defined using a standard Spring
TransactionInterceptor with transaction attribute
PROPAGATION_REQUIRES_NEW for the
process() method:
<bean id="mdbTransactionAdvisor"
class="org.springframework.transaction.interceptor.TransactionInterceptor">
<property name="transactionManager" ref="transactionManager"/>
<property name="transactionAttributes">
<props>
<!-- PROPAGATION_NAME,ISOLATION_NAME,readOnly,
timeout_NNNN,+Exception1,-Exception2 -->
<prop key="process">PROPAGATION_REQUIRES_NEW,timeout_300</prop>
</props>
</property>
</bean>
In WebLogic Server, you can use the Spring wrapper for the platform transaction manager exposed in the server JNDI at
javax.transaction.UserTransaction and define an application context like this:
<bean id="transactionManager" class=
"org.springframework.transaction.jta.WebLogicJtaTransactionManager">
<property name="userTransactionName"
value="javax.transaction.UserTransaction"/>
<property name="autodetectTransactionManager"
value="false"/>
</bean>
The next advice in the chain is the
mdbDuplicateHandlingAdvisor. Because it has to save unique keys into some database table as well, you need a data source:
<bean id="mdbDuplicateHandlingAdvisor"
parent="messageProcessorAdvisor">
<property name="advice">
<bean class="org.javatx.mdb.MdbDuplicateMessagesHandlingAdvice">
<!-- DataSource for duplicates table -->
<constructor-arg index="0" ref="dataSource"/>
</bean>
</property>
</bean>
Note that although it's possible to specify any data source for duplicate handling advice, it could be beneficial to take into account what the underlying business code is doing. For example, if the business code also uses a data source, it would be better to use the same one in this advice—this could potentially reduce the number of different XAResources enlisted in the transaction, and the J2EE server could even use a Single Phase Commit optimization if no other XAResources is used, effectively making the transaction local with a corresponding performance gain. An implementation of
MdbDuplicateMessagesHandlingAdvice may look like this:
public final class MdbDuplicateMessagesHandlingAdvice
implements MethodInterceptor {
private final Log log = LogFactory.getLog(getClass());
private final DuplicatesDao duplicatesDao;
public MdbDuplicateMessagesHandlingAdvice(DataSource ds) {
this.duplicatesDao = new DuplicatesDao(ds);
}
public Object invoke(MethodInvocation invocation)
throws Throwable {
Object o = invocation.getArguments()[0];
MessageData messageData = (MessageData) o;
try {
this.duplicatesDao.run(messageData);
this.log.debug("Duplicate check ok");
} catch (DataIntegrityViolationException e) {
// Record already exists in database - duplicate message!
// Log with appropriate severity or do whatever
// other action is appropriate
this.log.warn("Duplicate message, skipping processing: "+
messageData+"; "+e.getMessage());
// If we return from here, swallowing exception, message
// would be considered acknowledged
// and no redelivery would occur, that's exactly what we need.
return null;
}
return invocation.proceed();
}
}
The message identifier persistence is performed in the
DuplicatesDao. This DAO class extends Spring's
SqlUpdate class and inserts the key message properties into the database table, which has its primary key defined by the message ID and timestamp fields so any attempt to insert a row with the same message ID and timestamp would fail with a DB constraint violation error (and will be translated by Spring to a runtime
DataIntegrityViolationException).
private static class DuplicatesDao extends SqlUpdate {
private static final String SQL =
"INSERT INTO " +
"MESSAGE_LOG(MESSAGE_ID, MESSAGE_TS) " +
"VALUES(?, ?)";
public DuplicatesDao(DataSource ds) {
super(ds, SQL);
declareParameter(new SqlParameter(Types.VARCHAR));
declareParameter(new SqlParameter(Types.NUMERIC));
setMaxRowsAffected(1);
}
public void run(MessageData data) {
update(new Object[] {
data.getMessageId(),
new BigDecimal(Long.toString(data.getTimestamp()))});
}
}
As mentioned before, other advices in the chain are optional but can be used for diagnostic and monitoring if needed. The Spring Framework provides several generic implementations that could be declared in the application context like this:
<bean id="messageProcessorTraceAdvisor"
parent="messageProcessorAdvisor">
<property name="advice">
<bean class="org.springframework.aop.interceptor.SimpleTraceInterceptor">
<property name="useDynamicLogger" value="true"/>
</bean>
</property>
</bean>
<bean id="messageProcessorPerformanceMonitorAdvisor"
parent="messageProcessorAdvisor">
<property name="advice">
<bean class="org.springframework.aop.interceptor.PerformanceMonitorInterceptor">
<property name="useDynamicLogger" value="true"/>
<property name="prefix" value="messageProcessor"/>
</bean>
</property>
</bean>
That is basically it. The above advices will handle all non-functional aspects of the message processing, such as transactions and the required quality of service, so the
MessageProcessor implementation only needs to take care of the functional requirements of the message processing (business logic).