Architect: Complex Event Processing
Integrating Oracle SOA Suite with Oracle WebLogic Event Server Through Spring and OSGi
by Clemens Utschig-Utschig
, with contributions from Alex Alves
Introduction
Although Oracle SOA Suite provides great
flexibility in terms of integrating disparate systems into composite,
service-oriented applications, it has had to rely on external tools and
engines for complex event processing.
The goal is to be able to publish events seamlessly
from Oracle BPEL Process Manager to Oracle WebLogic Event Server,
process them with complex queries, and then route them on to the Oracle
Business Activity Monitoring's Active Data Cache (ADC).
This article outlines how we integrated Complex
Event Processing (CEP) capabilities with Oracle SOA Suite, and is based
on the actual development of the components.
The integration of a CEP engine with an integration
platform places two paradigms in conflict. In the service-oriented
architecture (SOA) corner of the ring are loose coupling and XML-based
messaging. And in the other corner are CEP with duple-based data and
high-performance pattern evaluation. To integrate those two paradigms
into one compelling architecture, some coding is needed.
In addison to working knowledge of Spring and the
Open Services Gateway initiative (OSGi), you will need practical
experience with Oracle BPEL Process Manager and Oracle WebLogic Event
Server, as well as a solid understanding of the their underlying
architectural concepts and the technologies.
Sending Events from BPEL-Sensor or Partnerlink
Oracle BPEL Process Manager offers two distinct
ways to connect to foreign systems. The first is the more traditional
use of partnerlinks, which are part of the specification. The
alternative is to use the sensor framework. As events are conditional
changes and should be independent of the process, we chose the latter
and built our own sensor publisher.
Creating your own sensor action and publisher is a
fairly simple task. There are, however, two important considerations.
First, publishing sensor data is a synchronous, blocking operation;
hence, the longer it takes to publish data, the longer the process
execution will take. Second, the larger the amount of data published,
the more memory the instance will need to execute. Both of these points
must be considered to ensure that a sensor is not harming the overall
stability of the process engine.
To accommodate high-performance transmission while
leveraging a standard protocol, we chose HTTP get as the means of
transportation and used name-value pairs for the dupes we sent.
The process for implementing a custom sensor publisher for BPEL is well documented, and only requires a class that implements com.oracle.bpel.sensor.DataPublisher to be present in the engine's classpath.
In the case of the HTTPNameValuePair
publisher, custom XPath expressions extract the data from the variable
snapshot taken and convert them into name-value pairs for the HTTP.
// The set of name-value pairs
// name -> event fieldname
// value -> xpath value
Iterator iFinal = _valuePairs.keySet().iterator();
NVPair [] pair = new NVPair[_valuePairs.size()];
int i = 0;
while (iFinal.hasNext()) {
String key = (String)iFinal.next();
pair[i] =
new NVPair(key, (String)_valuePairs.get(key));
i++;
}
// Send them out via plain http connection and check for
// the return code; everything but a 200 is considered a
// failure
try {
HTTPResponse resp = connection.Get(_uri, pair);
if (resp.getStatusCode() != 200) {
SensorLogger.warn("Failure: " +
resp.getStatusCode() + " - " +
resp.getReasonLine());
return;
}
} catch (Exception ePublish) {
SensorLogger.warn("Could not publish to endpoint: "
+ ePublish.getMessage());
return;
}
The configuration of the publisher follows the
standard way of configuring sensors and corresponding sensor actions
inside the process.
The configuration of the new sensor publisher is
illustrated below. Aside from XPath expressions into the data, it
contains information for the endpoint to be used.
<action name="SensorAction" publishName=""
publishType="Custom"
enabled="true"
publishTarget="com.oracle.bpel.sensor.valuepair.XPathValuePairHttpPublisher">
<property name="xpath_message">
ns1:TestValuePairSensorProcessRequest/ns1:input</property>
<property name="xpath_instanceId">
ns1:TestValuePairSensorProcessRequest/ns1:instanceId</property>
<property name="namespaceList">
ns1=http://xmlns.oracle.com/TestValuePairSensor;</property>
<property name="httpUrl">/adapter/index.html</property>
<property name="httpPort">9002</property>
<property name="httpServer">localhost</property>
<sensorName>ActivitySensor2</sensorName>
</action>
Once the publisher is configured, a little icon appears next to the activity to which the sensor is mapped:
Figure 1. Activity callbackClient with an attached sensor
Testing the new sensor action
To test the new sensor action, we used an HTTP tunneling tool supplied directly with Oracle SOA Suite. It can be found in $OH/bpel/bin and is named obtunnel.
After starting, it listens on a specific port and forwards an incoming request to the dedicated provider port.
Figure 2. Testing the new http sensor publisher
Another convenient method to see what the custom
publisher does is to make use of the sensor-logging infrastructure. All
parts of the BPEL engine make use of a common logging framework, which
can be configured from the console.
For example, using the code below...
com.collaxa.cube.engine.sensor.common.SensorLogger.info ("Some log message");
...creates an entry in the opmn container log as follows
<2008-06-28 14:10:10,859> <INFO>
<default.collaxa.cube.sensor> Some log message
Transforming HTTP Name-Value Pairs into POJO Events
Now that the BPEL process can publish events via
HTTP, the next step is to make sure that Oracle WebLogic Event Server
can convert those events into Plain Old Java Objects (POJOs).
The HTTPAdapter supplied with WebLogic Event Server
can be easily deployed -- configuration doesn't require much work. For
simplicity's sake, the event we'll use contains just two members, instanceId and message.
public class HelloWorldEvent {
// message
private String message;
// instance id
private String instanceId;
public void setInstanceId (String instanceId) {
this.instanceId = instanceId;
}
public String getInstanceId () {
return this.instanceId;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
First, the above event needs to be registered with the internal event type repository.
<wlevs:event-type-repository>
<wlevs:event-type type-name="HelloWorldEvent">
<wlevs:class>com.example.HelloWorldEvent</wlevs:class>
</wlevs:event-type>
</wlevs:event-type-repository>
The HTTPAdapter internally uses reflection to set
values into the event instance members. Hence, the names of the
parameters in the HTTP request match those field names.
<wlevs:adapter id="httpAdapter"
provider="HttpAdapterProvider"
manageable="true">
<!-- the http context path the adapter is mapped to -->
<wlevs:instance-property
name="contextPath" value="/adapter"/>
<!-- the POJO event to be generated -->
<wlevs:instance-property
name="eventTypeName"
value="HelloWorldEvent"/>
<!-- the names of the NVPairs coming alongin the request -->
<wlevs:instance-property name="eventPropertyNames" value="instanceId,message"/>
</wlevs:adapter>
Leveraging the HTTPAdapter in an Event Processing Network (EPN)
In the case of a new event arriving via HTTP, a new instance of the network is triggered.
Figure 3. The configured httpAdapter that publishes events to the helloworldProcessor
Bringing Oracle Business Activity Monitoring into the Picture
After integrating BPEL with Oracle WebLogic Event
Server through using HTTP events, the next step is to allow Oracle
WebLogic Event Server to communicate with Oracle Business Activity
Monitoring.
Although the easiest way is to just write Spring
beans that are based on the EventSink interface, we are going a step
further and will outline the creation of the Oracle Business Activity
Monitoring adapter, which is a container-managed object.
Designing Oracle Business Activity Monitoring Event Sink Beans
As already mentioned, an event in an EPN can be
routed to an endpoint sink. This endpoint is a class that implements
the EventSink interface (com.bea.wlevs.ede.api.EventSink) and is
declared as a Spring bean. Using this interface, the bean class will
get the events pushed through the stream, notably through the onEvent() method.
public void onEvent(List pEventList)
throws EventRejectedException;
We decided to work from the top down and define the
set of Spring beans needed to connect to an Oracle Business Activity
Monitoring server and trigger a certain operation on a defined Oracle
Business Activity Monitoring data object.
To make those Spring beans reusable, the connection information is separated from the data object information.
<!—- Spring Bean that configures a BAM server connection -->
<bean id="bamdemoServerConnection"
scope="prototype"
class="com.example.BamServerConnectionProxyBean">
<!-- compatibility mode -->
<property name="wsTypeVersion" value="10g"/>
<property name="endpoint"
value="http://your-server:80/OracleBAM/Services/DataObject/DataObjectOperations.asmx"/>
<property name="username" value="demo"/>
<property name="password" value="demo"/>
</bean>
The next step is to define the bean that
encapsulates the information to operate on a given data object. That
bean uses the connection information to connect to the right HTTP
endpoint.
<bean name="bamEventSinkAdapterBean"
class="com.example.BamEventSink">
<property name="bamServerConnection"
ref="bamdemoServerConnection" />
<property name="beanSerializer"
ref="inputBamXMLSerializerBean" />
<property name="dataObjectName"
value="/cutschig/wlevs/MessageDataObject"/>
<property name="dataObjectOperation"
value="INSERT" />
<property name="dataObjectKeys" value=""/>
<property name="verbose" value="true" />
</bean>
Underneath the covers, a class instance injects
those properties and references, where they can invoke operations on a
data object. This bean can be part of the EPN but is not managed by the
container. Rather than reinventing the code, we borrowed it from the
Oracle Business Activity Monitoring Sensor Action available in BPEL
today. Oracle Business Activity Monitoring offers a Web service–based
interface to update data objects.
Bridging POJOs and XML—simple yet powerful
Building the bridge from Oracle WebLogic Event
Server to Oracle Business Activity Monitoring requires converting
languages. Although an EPN is based on Java POJOs registered as events,
Oracle Business Activity Monitoring Web services are based on XML.
So rather than trying to use a Java-to-XML
serializer (and clutter the EPN with serializer classes), the easiest
way is to have the member names that map to the final XML contained in
a Java Bean (for example, instanceId). Events in Oracle
WebLogic Event Server are either hand-crafted Java Beans or, when they
are aggregated in the event processor through a query, an instance of com.bea.wlevs.ede.api.MapEventObject.
A simple yet extendable way to retrieve data from
an event is to use reflection and simple token replacement to create
the final XML instance.
Three pieces of information are needed. First, a line that represents the data object structure:
<property name="baseXML">
<!--
this is the xml that gets injected into the
serializer, note the #xx# tokens, those
will be replaced by the value of the xmlEventKey(s)
-->
<value><![CDATA[
<_MessageDataObject xmlns="http://xmlns.oracle.com/bam">
<_message>#MESSAGE#</_message>
<_instanceId>#INSTANCEID#</_instanceId>
</_MessageDataObject>
]]></value>
</property>
Second, the keys to retrieve the data from the event and replace the tokens with the corresponding values:
<property name="xmlEventKeys">
<list>
<!--
Each represents a key to get the information from the event,
and replace a token corresponding to the key in the above base xml
-->
<value>Message</value>
<value>InstanceId</value>
</list>
</property>
Finally, a root element to wrap several lines (baseXML) if more than one event comes along into a collection.
<property name="rootXML">
<!--
this is the xml that gets injected as
rootcontainer into the serializer,
note the #LINES# tokens, they will be
replaced by xml lines produced from baseXML
-->
<value>
<![CDATA[
<_MessageDataObjectCollection xmlns="http://xmlns.oracle.com/bam">
#LINES#
</_MessageDataObjectCollection>
]]>
</value>
</property>
Using Spring to inject the necessary information
into the serializer, the serializer can then create the right XML.
While this isn't the only way to accomplish this, this procdure is
convenient and extensible one.
Creating an adapter from the Oracle Business Activity Monitoring Event Sink Bean
To create a container-manageable adapter, the event-sink class needs to also implement another interface (com.bea.wlevs.ede.api.Adapter).
Once the Event Sink bean is an adapter, the next
step is to create a factory that can be managed by the OSGi container.
The factory needs to implement the com.bea.wlevs.ede.api.AdapterFactory interface.
/**
* Factory to create a new BAMEventSink for an EPN.
* Exposed as OSGI service.
* @author Clemens Utschig
*/
public class BAMEventSinkFactory implements AdapterFactory
{
private String mType = null;
public Adapter create() {
return new BAMEventSinkBean();
}
public void setType (String pType) {
mType = pType;
}
public String getType () {
return mType;
}
}
This factory acts as a service that the container
will register upon deployment. After the factory has been created it
can be registered in the Spring context, which is deployed with the
OSGi bundle.
<!-- the event sink factory bean -->
<bean id="bamEventSinkBeanFactory"
class="com.example.BAMEventSinkFactory"/>
<!-- this is the handoff to OSGI for creating the bam event adapter factory -->
<osgi:service
interface="com.bea.wlevs.ede.api.AdapterFactory"
ref="bamEventSinkBeanFactory">
<osgi:service-properties>
<entry key="type" value="bamEventSink"/>
</osgi:service-properties>
</osgi:service>
The only thing left is to register the Spring context into the manifest that is deployed with the bundle.
Manifest-Version: 1.0
Main-Class: com.oracle.bam.wlevs.osgi.help.InstallationInstructions
Creator: Clemens Utschig, Oracle Corp. 2008
Bundle-ManifestVersion: 2
Bundle-Name: oracle.bam.osgi
Bundle-Vendor: Oracle Corporation, (c) 2008
Bundle-Description: Oracle BAM dataobject sink beans
Bundle-SymbolicName: oracle.bam.osgi
Bundle-Activator: com.oracle.bam.wlevs.osgi.BAMClientBundleActivator
Bundle-Version: 10.1.3.3
Spring-Context: META-INF/spring/oracle.bam.osgi.context.xml
WLEvS-Manageable: true
Import-Package: com.bea.wlevs.ede.api;version="3.0.0.0",
com.oracle.wlevs.osgi.spring.beans;version="10.1.3.3"
Bundle-ClassPath: .,
lib/activation.jar,
lib/http_client.jar,
lib/mail.jar,
lib/soap.jar,
lib/xmlparserv2.jar
Export-Package: com.oracle.bam.wlevs.osgi.spring;version="10.1.3.3",
com.oracle.bam.wlevs.osgi.spring.beans;version="10.1.3.3"
Bringing it all together in the Event Processing Network
Now that the Oracle Business Activity Monitoring
event sink is deployed, it can be easily used within an EPN.
The configuration of the Oracle Business Activity Monitoring event sink
is still based on Spring beans. The only thing different now is the
configuration of the adapter.
<!-- The BAM event sink, created by the provider, which is an OSGI service
-->
<wlevs:adapter id="bamEventSinkAdapterBean"
provider="bamEventSink" manageable="true">
<!-- the config bean for the server -->
<wlevs:instance-property name="bamServerConnection"
ref="bamdemoServerConnection" />
<!-- the serializer -->
<wlevs:instance-property name="beanSerializer"
ref="inputBamXMLSerializerBean" />
<!-- the dataobject name -->
<wlevs:instance-property name="dataObjectName"
value="/cutschig/wlevs/MessageDataObject" />
<!-- operation to be used against the DO -->
<wlevs:instance-property name="dataObjectOperation"
value="INSERT" />
<wlevs:instance-property name="dataObjectKeys" value=""/>
<wlevs:instance-property name="verbose" value="true" />
</wlevs:adapter>
After adding the new adapter into the EPN and and connecting it through a stream, the event network is complete and ready to go.
Figure 4. The fully configured EPN, from the http event to the Oracle Business Activity Monitoring event sink.
Conclusion
We hope this article has provided you with the
knowledge to integrate Oracle WebLogic Event Server with Oracle SOA
Suite components through Spring and OSGi, two powerful technologies in
component development and lifecycle-dependency management, important
considerations in Service-Oriented Architecture.
Further Reading
Clemens Utschig-Utschig 
(
blog) is member of the Oracle SOA Product Management Team responsible for cross-product integration and customer enablement. Aside from technology, Clemens' focus is on project management and consulting in SOA implementations. A native Austrian, Clemens' Oracle career started in Europe at the local consulting services branch, working with customers on J2EE and SOA projects. Clemens is a frequent speaker at IT conferences.