Topics
Enterprise Architecture
BMTメッセージ駆動型BeanとSpringを使用したハイパフォーマンスなメッセージ処理
Pages:
1,
2,
3,
4,
5
前のセクションで説明したとおり、トランザクションなしでメッセージを消費しつつ、once-and-only-onceのQoSが要求される場 合、アプリケーションは、重複処理を防止するための特殊なメカニズムを採用する必要があります。重複処理は、トランザクションのコミットとメッセージの確 認応答の間で、1つ以上のコンポーネントが故障するというまれな状況で発生します。このメカニズムは、非常に柔軟性があるので、他のデプロイシナリオでも うまく機能します。たとえば、アプリケーションが、クラスタにデプロイされているときに、最初に配信しようとしたノードとは別のノードに対してメッセージ の再配信が発生した場合です。重複を検出し、メッセージが2回処理されるのを防ぐには、アプリケーションが、そのメッセージは既に処理済みであることを何 らかの方法で識別する必要があります。
クラスタ内のすべてのノードで共有されるデータベースインスタンス内で、処理済のメッセージを追跡するのが理にかなっているように思えます。メッセージを一意に識別するには、
JMSMessageIDが 使用できます。JMSドキュメントによると、これは文字列値で、履歴リポジトリ内のメッセージを一意に識別するキーです。一意性の厳密なスコープは、プロ バイダによって定義されます。しかし、メッセージIDは、プロバイダの特定のインストールに対するメッセージは、少なくともカバーしているはずです。ここ で、インストールとは、複数のメッセージルータを接続したものを指します。安全のために、
JMSTimestampも永続化し ます。JMSTimestampフィールドには、メッセージが送信のためにプロバイダに渡された時刻が含まれています。これら2つの値を組み合わせること により、すべてのメッセージを一意に識別できます。たとえ、他のプロバイダからのメッセージであっても識別できます。
明らかに、重複検出メカニズムをできる限り高速で効率的なものに設計することは、非常に有益です。このため、SQLのselect操作を完全に避 け、JMSMessageIDフィールドとJMSTimestampフィールドに複合主キーを定義したテーブルを作成します。こうしておくと、同じ主キー を持つ値の挿入が続けて実行された場合、データの整合性違反のために処理が失敗します。この事実を使って、重複レコードを検出できます。データの整合性違 反は適切に検出して処理できます。以下に示したのは、Oracleデータベース用のDDLスクリプトです。
CREATE TABLE message_log
(message_id VARCHAR2(255) NOT NULL,
message_ts INTEGER NOT NULL,
CONSTRAINT pk_message_log
PRIMARY KEY (message_id, message_ts)
);
では、トランザクションなしのメッセージ処理を簡単にするための道具を作成しましょう。ここには、横断的な機能が非常に沢山含まれているので、この 作業にはAOPアプローチを使用するのが理にかなっています。以下に、SpringフレームワークのAOP機能を使用して、once-and-only- onceのQoSを持つトランザクションなしのメッセージ処理に必要なすべてのロジックを実装する方法を示します。
Spring のAOPフレームワークの実力は、他のいくつかの論文で既に実証済みです(関連情報参 照)。これを利用すると、一連のアスペクトを宣言的に定義できます。各アスペクトは、実行フローに対して特定の機能を果たします。その結果作成されたコー ドは、非常に構造化されています。各コンポーネントに固有のすべてのユースケースを網羅するために、個々のコード部分は個別にテストできます。
Spring AOPフレームワークの基本的な解説については、 「EJBコンポーネントを使用してのSpring AOP Frameworkの利用」(2005年12月にdev2devで公開)を参照してください。SpringでAOPを使用する際に好まれる方法は、Springがプロキシになることができるコンポーネントインタフェースを持つことです。そのために、JMSインタフェースの
MessageListenerと
Messageを使用することもできますが、実際には、JMSインタフェースとは独立の、より一般的なインタフェースを定義する方が理にかなっています。たとえば、次のような
MessageProcessorインタフェースを宣言します。
public interface MessageProcessor {
public Object process(MessageData messageData);
}
process()メソッドは、
MessageDataパラメータを取ります。このパラメータは、元の
Messageからのすべてのデータをカプセル化したものです。
Messageのインスタンスを
MessageData に変換するために、Springの
MessageConverterを実装できます。また、
AbstractJmsMessageDrivenBeanを拡張して、共通の処理ロジックを実装することもできます。
MessageDataへの変換
完全なソースコードは、「ダウンロード」セクションからダウンロードできます。ここでは、
MessageDataDrivenBeanの実装がどのようになるかを示します。
public abstract class MessageDataDrivenBean extends
AbstractJmsMessageDrivenBean {
private MessageConverter messageConverter =
new MessageDataConverter();
private MessageProcessor messageProcessor;
protected void onEjbCreate() {
messageProcessor = ((MessageProcessor)
getBeanFactory().getBean(getName()));
}
public void onMessage(Message message) {
try {
MessageData messageData = (MessageData)
getMessageConverter().fromMessage(message);
this.messageProcessor.process(messageData);
} catch( MessageConversionException ex) {
String msg = "Message conversion error; "+
ex.getMessage();
this.logger.error(msg, ex);
throw new RuntimeException(msg, ex);
} catch( JMSException ex) {
String msg = "JMS error; "+ex.getMessage();
this.logger.error(msg, ex);
throw new RuntimeException(msg, ex);
}
}
protected MessageConverter getMessageConverter() {
return this.messageConverter;
}
protected abstract String getName();
} AbstractJmsMessageDrivenBean { private MessageConverter messageConverter = new MessageDataConverter(); private MessageProcessor messageProcessor; protected void onEjbCreate() { messageProcessor = ((MessageProcessor) getBeanFactory().getBean(getName())); } public void onMessage(Message message) { try { MessageData messageData = (MessageData) getMessageConverter().fromMessage(message); this.messageProcessor.process(messageData); } catch( MessageConversionException ex) { String msg = "Message conversion error; "+ ex.getMessage(); this.logger.error(msg, ex); throw new RuntimeException(msg, ex); } catch( JMSException ex) { String msg = "JMS error; "+ex.getMessage(); this.logger.error(msg, ex); throw new RuntimeException(msg, ex); } } protected MessageConverter getMessageConverter() { return this.messageConverter; }
protected abstract String getName(); }
具象サブクラスで必要なのは、
getName()メソッドの実装を提供することだけです。このメソッドは、
MessageProcessorインタフェースの特定の実装のbean名を、Springコンテキストで定義されているとおりに返します。サブクラスが、独自の
MessageConverterを提供することもできます。その際、
MessageDataを移行するために別の方法を使用することができます。ここでは、
MessageProcessorの簡単な実装を示します。
public class SimpleMessageProcessor implements MessageProcessor {
private Log log = LogFactory.getLog(getClass());
public Object process( MessageData messageData) {
log.info(messageData);
return null;
}
}
最後に、具象MDBは以下のような実装になります。ここでは、デプロイメント記述子用のメタデータを宣言するために、XDoclet注釈を使用しています。
/**
* SimpleMdb
*
* @ejb.bean
* name="org.javatx.mdb.SimpleMdb"
* type="MDB"
* destination-type="javax.jms.Queue"
*
transaction-type="Bean"
*
* @weblogic.pool
* initial-beans-in-free-pool=
* "${org.javatx.simple.mdb.pool.initial}"
* max-beans-in-free-pool=
* "${org.javatx.simple.mdb.pool.max}"
*
* @weblogic.message-driven
* connection-factory-jndi-name=
* "${org.javatx.qcf.jndi-name}"
* destination-jndi-name=
* "${org.javatx.simple.queue.jndi-name}"
* jms-polling-interval-seconds=
* "${org.javatx.mdb.pooling.interval}"
*
*
@ejb.env-entry
* name="BeanFactoryPath"
* value="applicationContext.xml"
*/
public class SimpleMdb extends MessageDataDrivenBean {
protected String getName() {
return
"simpleProcessor";
}
}
上のコードでは、SpringのEJBクラスがアプリケーションコンテキストを見つけるために、
BeanFactoryPath env-entryが使われます。アプリケーションコンテキストは、
simpleProcessor Beanの宣言を含んでいなければなりません。このsimpleProcessor Beanは、非機能要件(トランザクション、重複メッセージ処理の防止、オプション機能の追跡とパフォーマンス監視など)を含む、すべての処理ロジックを扱います。
明らかに、非機能的なアスペクトはアドバイスに移し、
MessageProcessorの実際の実装をラップした
ProxyFactoryBeanを使用して、インタセプタチェーンを定義するのが理にかなっています。その定義は次のようになります。
<bean id="
simpleProcessor"
class="org.springframework.aop.framework.ProxyFactoryBean">
<property name="target">
<bean class="SimpleMessageProcessor"/>
</property>
<property name="proxyInterfaces"
value="org.javatx.mdb.MessageProcessor"/>
<property name="interceptorNames">
<list>
<idref local="mdbTransactionAdvisor"/>
<idref local="mdbDuplicateHandlingAdvisor"/>
<!-- optional monitoring and tracing -->
<idref local="messageProcessorPerformanceMonitorAdvisor"/>
<idref local="messageProcessorTraceAdvisor"/>
</list>
</property>
</bean>
図1のシーケンス図は、このQoSモデルをサポートするために必要なメッセージ処理とアドバイザスタックを表しています。
|
|