Topics
Enterprise Architecture
BMTメッセージ駆動型BeanとSpringを使用したハイパフォーマンスなメッセージ処理
Pages:
1,
2,
3,
4,
5
では、
mdbTransactionInterceptorと
mdbDuplicateHandlingAdvisorを詳しく見ていきましょう。これらは、前述のアプローチを使用して、必要なサービス品質を保証する機能を提供します。
mdbTransactionAdvisorは、Springに標準の
TransactionInterceptorに
process()メソッド用のトランザクション属性
PROPAGATION_REQUIRES_NEWを持たせたものを使用して定義しています。
<bean id="mdbTransactionAdvisor"
class="org.springframework.transaction.interceptor.TransactionInterceptor">
<property name="transactionManager" ref="transactionManager"/>
<property name="transactionAttributes">
<props>
<!-- PROPAGATION_NAME,ISOLATION_NAME,readOnly,
timeout_NNNN,+Exception1,-Exception2 -->
<prop key="process">PROPAGATION_REQUIRES_NEW,timeout_300</prop>
</props>
</property>
</bean>
WebLogic Serverでは、
javax.transaction.UserTransactionでサーバJNDIに接続されているプラットフォームトランザクションマネージャ用のSpringラッパーを使用できます。アプリケーションコンテキストの定義は次のようになります。
<bean id="transactionManager" class=
"org.springframework.transaction.jta.WebLogicJtaTransactionManager">
<property name="userTransactionName"
value="javax.transaction.UserTransaction"/>
<property name="autodetectTransactionManager"
value="false"/>
</bean>
チェーンの中の次のアドバイスは
mdbDuplicateHandlingAdvisorです。一意のキーをデータベーステーブルに保存しなければならないので、次のようなデータソースが必要になります。
<bean id="mdbDuplicateHandlingAdvisor"
parent="messageProcessorAdvisor">
<property name="advice">
<bean class="org.javatx.mdb.MdbDuplicateMessagesHandlingAdvice">
<!-- DataSource for duplicates table -->
<constructor-arg index="0" ref="dataSource"/>
</bean>
</property>
</bean>
重複処理のアドバイスには、任意のデータソースを指定できますが、ベースにあるビジネスコードが何を実行するかを考慮すると有利です。たとえば、ビ ジネスコードもデータソースを使用する場合は、このアドバイスでも同じデーソースを使用した方がよいでしょう。これによって、トランザクションに登録され るXAResourcesの数を減らすことができます。さらに、他にXAResourcesが使われていない場合は、J2EEサーバは、1フェーズコミッ トを使用することができ、事実上、トランザクションをローカルにしてパフォーマンスを向上させることができます。
MdbDuplicateMessagesHandlingAdviceの実装は、次のようになります。
public final class MdbDuplicateMessagesHandlingAdvice
implements MethodInterceptor {
private final Log log = LogFactory.getLog(getClass());
private final DuplicatesDao duplicatesDao;
public MdbDuplicateMessagesHandlingAdvice(DataSource ds) {
this.duplicatesDao = new DuplicatesDao(ds);
}
public Object invoke(MethodInvocation invocation)
throws Throwable {
Object o = invocation.getArguments()[0];
MessageData messageData = (MessageData) o;
try {
this.duplicatesDao.run(messageData);
this.log.debug("Duplicate check ok");
} catch (DataIntegrityViolationException e) {
// Record already exists in database - duplicate message!
// Log with appropriate severity or do whatever
// other action is appropriate
this.log.warn("Duplicate message, skipping processing: "+
messageData+"; "+e.getMessage());
// If we return from here, swallowing exception, message
// would be considered acknowledged
// and no redelivery would occur, that's exactly what we need.
return null;
}
return invocation.proceed();
}
}
メッセージIDの永続化は、
DuplicatesDaoで実行されます。このDAOクラスは、Springの
SqlUpdateク ラスを拡張したもので、データベーステーブルにキーメッセージプロパティを挿入します。このテーブルは、メッセージIDフィールドとタイムスタンプフィー ルドによって定義された主キーを持っています。このため、同じメッセージIDとタイムスタンプを持つ行を挿入しようとすると、DB制約違反のエラーで失敗 します(これは、Springによって、実行時に
DataIntegrityViolationExceptionに変換されます)。
private static class DuplicatesDao extends SqlUpdate {
private static final String SQL =
"INSERT INTO " +
"MESSAGE_LOG(MESSAGE_ID, MESSAGE_TS) " +
"VALUES(?, ?)";
public DuplicatesDao(DataSource ds) {
super(ds, SQL);
declareParameter(new SqlParameter(Types.VARCHAR));
declareParameter(new SqlParameter(Types.NUMERIC));
setMaxRowsAffected(1);
}
public void run(MessageData data) {
update(new Object[] {
data.getMessageId(),
new BigDecimal(Long.toString(data.getTimestamp()))});
}
}
先に述べたように、このチェーン内のその他のアドバイスはオプションです。これらは、必要に応じて、診断や監視のために使われます。Springフレームワークには、次のように、アプリケーションコンテキスト内で宣言できる汎用的な実装がいくつかあります。
<bean id="messageProcessorTraceAdvisor"
parent="messageProcessorAdvisor">
<property name="advice">
<bean class="org.springframework.aop.interceptor.SimpleTraceInterceptor">
<property name="useDynamicLogger" value="true"/>
</bean>
</property>
</bean>
<bean id="messageProcessorPerformanceMonitorAdvisor"
parent="messageProcessorAdvisor">
<property name="advice">
<bean class="org.springframework.aop.interceptor.PerformanceMonitorInterceptor">
<property name="useDynamicLogger" value="true"/>
<property name="prefix" value="messageProcessor"/>
</bean>
</property>
</bean>
基本的には、これですべてです。上のアドバイスは、メッセージ処理の非機能的なアスペクト(トランザクション、要求されるサービス品質など)をすべて処理します。したがって、
MessageProcessorの実装では、メッセージ処理の機能要件(ビジネスロジック)にのみに注意を払えばよいのです。