High Performance Message Processing with BMT Message-Driven Beans and Spring
Pages: 1, 2, 3, 4, 5

Implementing the Message Interceptors

Now we can take a closer look at the mdbTransactionInterceptor and mdbDuplicateHandlingAdvisor that provide functionality to ensure the required quality of service using the approach described above.

The mdbTransactionAdvisor is defined using a standard Spring TransactionInterceptor with transaction attribute PROPAGATION_REQUIRES_NEW for the process() method:

<bean id="mdbTransactionAdvisor" 
      class="org.springframework.transaction.interceptor.TransactionInterceptor">
  <property name="transactionManager" ref="transactionManager"/>
  <property name="transactionAttributes">
    <props>
      <!-- PROPAGATION_NAME,ISOLATION_NAME,readOnly,
                        timeout_NNNN,+Exception1,-Exception2 -->
      <prop key="process">PROPAGATION_REQUIRES_NEW,timeout_300</prop>
    </props>
  </property>
</bean>

In WebLogic Server, you can use the Spring wrapper for the platform transaction manager exposed in the server JNDI at javax.transaction.UserTransaction and define an application context like this:

<bean id="transactionManager"  class=
 "org.springframework.transaction.jta.WebLogicJtaTransactionManager">
 <property name="userTransactionName" 
              value="javax.transaction.UserTransaction"/>
 <property name="autodetectTransactionManager" 
              value="false"/>
</bean>

The next advice in the chain is the mdbDuplicateHandlingAdvisor. Because it has to save unique keys into some database table as well, you need a data source:

<bean id="mdbDuplicateHandlingAdvisor" 
        parent="messageProcessorAdvisor">
 <property name="advice">
  <bean class="org.javatx.mdb.MdbDuplicateMessagesHandlingAdvice">
    <!-- DataSource for duplicates table -->
    <constructor-arg index="0" ref="dataSource"/> 
  </bean>
 </property>
</bean>

Note that although it's possible to specify any data source for duplicate handling advice, it could be beneficial to take into account what the underlying business code is doing. For example, if the business code also uses a data source, it would be better to use the same one in this advice—this could potentially reduce the number of different XAResources enlisted in the transaction, and the J2EE server could even use a Single Phase Commit optimization if no other XAResources is used, effectively making the transaction local with a corresponding performance gain. An implementation of MdbDuplicateMessagesHandlingAdvice may look like this:

public final class MdbDuplicateMessagesHandlingAdvice 
                             implements MethodInterceptor {
 private final Log log = LogFactory.getLog(getClass());
 private final DuplicatesDao duplicatesDao;
 
 public MdbDuplicateMessagesHandlingAdvice(DataSource ds) {
   this.duplicatesDao = new DuplicatesDao(ds);
 }

 public Object invoke(MethodInvocation invocation) 
                                     throws Throwable {
   Object o = invocation.getArguments()[0];
   
   MessageData messageData = (MessageData) o;
   try {
     this.duplicatesDao.run(messageData);
     this.log.debug("Duplicate check ok");
   } catch (DataIntegrityViolationException e) {
     // Record already exists in database - duplicate message!
     // Log with appropriate severity or do whatever
     // other action is appropriate
     this.log.warn("Duplicate message, skipping processing: "+
         messageData+"; "+e.getMessage());
     // If we return from here, swallowing exception, message 
     // would be considered acknowledged    
     // and no redelivery would occur, that's exactly what we need.
     return null;
   }

   return invocation.proceed();
 }
}

The message identifier persistence is performed in the DuplicatesDao. This DAO class extends Spring's SqlUpdate class and inserts the key message properties into the database table, which has its primary key defined by the message ID and timestamp fields so any attempt to insert a row with the same message ID and timestamp would fail with a DB constraint violation error (and will be translated by Spring to a runtime DataIntegrityViolationException).

private static class DuplicatesDao extends SqlUpdate {
    private static final String SQL = 
      "INSERT INTO " +
        "MESSAGE_LOG(MESSAGE_ID, MESSAGE_TS) " +
        "VALUES(?, ?)";

    public DuplicatesDao(DataSource ds) {
        super(ds, SQL);
        declareParameter(new SqlParameter(Types.VARCHAR));
        declareParameter(new SqlParameter(Types.NUMERIC));
        setMaxRowsAffected(1);
    }

    public void run(MessageData data) {
        update(new Object[] {
            data.getMessageId(), 
            new BigDecimal(Long.toString(data.getTimestamp()))});
    }
}

As mentioned before, other advices in the chain are optional but can be used for diagnostic and monitoring if needed. The Spring Framework provides several generic implementations that could be declared in the application context like this:

<bean id="messageProcessorTraceAdvisor" 
         parent="messageProcessorAdvisor">
  <property name="advice">
    <bean class="org.springframework.aop.interceptor.SimpleTraceInterceptor">
      <property name="useDynamicLogger" value="true"/>
    </bean>
  </property>
</bean>

<bean id="messageProcessorPerformanceMonitorAdvisor" 
         parent="messageProcessorAdvisor">
  <property name="advice">
    <bean class="org.springframework.aop.interceptor.PerformanceMonitorInterceptor">
      <property name="useDynamicLogger" value="true"/>
      <property name="prefix" value="messageProcessor"/>
    </bean>
  </property>
</bean>

That is basically it. The above advices will handle all non-functional aspects of the message processing, such as transactions and the required quality of service, so the MessageProcessor implementation only needs to take care of the functional requirements of the message processing (business logic).

Pages: 1, 2, 3, 4, 5

Next Page »