As Published In
Oracle Magazine
November/December 2009

DEVELOPER: .NET


Line Up Your Applications

By Christian Shay

Write messaging applications with ODP.NET and Oracle Streams Advanced Queuing.

Applications frequently need to pass messages to each other, and they need to do so reliably and often asynchronously. This goal is typically achieved with queuing: producer applications enqueue messages, and consumer applications dequeue messages. A message stays in the queue until a consumer application dequeues it or the message expires.

Oracle Streams Advanced Queuing provides database-integrated message queuing functionality. Because Oracle Streams Advanced Queuing is implemented in database tables, queue data enjoys the Oracle Database benefits of high availability, scalability, and reliability.

Oracle Data Provider for .NET (ODP.NET), Oracle’s ADO.NET-compliant data provider, includes Oracle Streams Advanced Queuing classes to expose the messaging capabilities of Oracle Streams Advanced Queuing. Oracle Developer Tools for Visual Studio, a plug-in for Microsoft Visual Studio 2008 and Visual Studio 2005, includes administration tools that assist with tasks such as creating and modifying queues and queue tables.

This article guides you through creating a set of .NET applications that make up a stock ticker system that demonstrates many Oracle Streams Advanced Queuing features. You will first create a queue in Oracle Database that holds stock ticker symbols and the corresponding stock prices. Every time a stock price changes, it will be loaded onto this queue by the StockQueueLoader application that you create next. You will also create the StockTickerListener application that receives notifications whenever a new stock price arrives in the queue and then retrieves the stock information and displays it on a ticker screen. StockTickerListener can receive notifications for all stocks that are loaded in the queue, but the queue can also be configured with rules specifying that StockTickerListener receives notifications only when specific stocks and, optionally, price ranges for specific stocks are loaded in the queue. To make it easy for a user to dynamically modify these rules, you will also run a helper application called StockQueueSubscriberAdmin.

Setup

To follow along with the steps in this article, you will need the following:

 

  • SYSDBA privileges for Oracle Database Release 9.2 or later, which may be required to grant some of the Oracle Streams Advanced Queuing privileges

  • Microsoft Visual Studio 2008 or Visual Studio 2005

  • Oracle Developer Tools for Visual Studio and Oracle Data Provider for .NET Release 11.1.7.10 or later


Before you build the sample applications, you will create connections to Oracle Database from Visual Studio and grant the appropriate privileges to enable the use of queuing.

To connect to Oracle Database from Visual Studio, create a new connection in Data Connections , the top-level node of Server Explorer. (If Server Explorer is not yet visible, select View -> Server Explorer from the main menu.)

Right-click Data Connections , and select Add Connection . In the dialog box that appears, confirm that Oracle Database (Oracle ODP.NET) appears in the Data source field. If you do not see Oracle Database (Oracle ODP.NET) , click the Change button and select Oracle Data Provider for .NET from the list of providers. Select the database you want to connect to from the Datasource name list. Click Use a specific user name and password . In the User name field, enter SYS. In the Password field, enter the password for SYS, and ensure that Role is set to SYSDBA . Click Test connection to verify that you can connect, and click OK to complete the connection setup.

To create and modify queues and queue tables and to enqueue and dequeue from them, you need the EXECUTE privilege on the DBMS_AQ and DBMS_AQADM PL/SQL packages. (If you wanted to modify queues you did not create, you would also require privileges on the AQ_ADMINISTRATOR_ROLE role.) You will grant the required privileges, using the Grant/Revoke Privilege dialog box in Visual Studio.

Right-click the SYS connection node, and choose Privileges from the context menu. In the Grant/Revoke Privileges dialog box that appears, set Object Type to Package . Leave Schema set to SYS . Click the first radio button, labeled Grant/Revoke privileges to a user/role on one or more database object(s) . Hold down the Ctrl key, and in the Object Names list, highlight both DBMS_AQ and DBMS_AQADM . From the User/Role list, select User and choose HR or some other username. In the list of privileges, in the Grant column, check the check box next to the EXECUTE privilege. Click OK to accept the changes, and close the dialog box.

Now connect to Oracle Database as HR (or whichever user you just granted the Oracle Streams Advanced Queuing package privileges). In Server Explorer, right-click Data Connections and select Add Connection . Follow the same steps to connect as HR that you used to connect as SYS, except to set Role to Default .

Creating the STOCK User-Defined Type

Oracle Database queues typically hold objects of a single type (unless they are defined as ANYTYPE queues) and can be an XMLType, a user-defined type (UDT), or a RAW type. For more information on Oracle Streams Advanced Queuing queue types, including the benefits and limitations of each, see the Oracle Streams Advanced Queuing User’s Guide.

The queue for this article will be made up of a user-defined type called STOCK, whose definition will include a stock symbol name as well as the price of the stock. (For a deep dive into Oracle user-defined types with Visual Studio and .NET, see “It Takes All Types” in the May/June 2008 issue of Oracle Magazine).

To create the user-defined type, in Server Explorer, under the HR data connection, right-click the User Defined Types node and choose New Object Type from the menu. This will launch Object Designer. In the Type name field, enter STOCK . Click the Add button to add an attribute, and in the Attribute Properties Name field, enter SYMBOL . From the Type list, choose VARCHAR2 . Click the Add button again to add another attribute. In the Name field for this new attribute, enter PRICE and choose NUMBER from the Type list. Click OK to create the type. The type will now be visible in Server Explorer under the User Defined Types node and will also be available in datatype lists in various Oracle designers, such as Queue Designer.

Creating the Queue and the Queue Table

Next, create a queue named STOCK_Q and its queue table, STOCK_QTAB. Under the HR data connection node in Server Explorer, expand the Advanced Queues node, which will expose the Queues and Queue Tables collection nodes (see Figure 1). Right-click the Queues node, and select New Queue from the menu. This will launch Queue Designer (see Figure 2).

 

figure 1
Figure 1: Server Explorer Advanced Queues and User-Defined Types nodes (left); Queues node context menu (right)


 

figure 2
Figure 2: Queue Designer


In the Queue name field, enter STOCK_Q . From the Queue type list, choose Normal Queue . For Payload Type , click the Complex radio button, and then from the list next to it, choose the STOCK user-defined type.

Click the Queue Table tab in the lower half of the designer. In the Queue Table area, click the New radio button and enter STOCK_QTAB as the name of the queue table. Leave the rest of the default settings as they are. (The most important settings are the Multi-Consumer and Secure check boxes, which must be checked, because multiple stock ticker applications can be run at the same time and all of them will need access to the contents of the queue.)

Still in the Queue Designer, click the Subscribers tab. Click the grid in the lower half of the designer, which will cause SUBSCRIBER1 to be entered for Consumer Name . Leave the Rule field blank. Click a new line in the grid to create another subscriber entry. Change the name of this one from SUBSCRIBER2 to DEFAULT . Leave the rule for this subscriber entry blank.

Click Save to create the queue and the queue table. The STOCK_Q queue and the STOCK_QTAB queue table will now be visible in Server Explorer (see Figure 1). The designer automatically starts the queue after creating it.

Note that an exception queue named AQ$_STOCK_QTAB_E has been automatically created for you and that the exceptions will be stored in the STOCK_QTAB queue table. Oracle Streams Advanced Queuing exceptions are generated in a variety of circumstances, such as when a message expires before it is dequeued. For more information on Oracle Streams Advanced Queuing exceptions, see Oracle Streams Advanced Queuing User’s Guide.

Building the StockQueueLoader Application

With the queue and the queue table created, you will now build the StockQueueLoader application that generates fictional stock price fluctuations in a fantasy stock market consisting of 10 ticker symbols and loads these stock prices (using a STOCK user-defined type) into the STOCK_Q queue you just created. (In a real-world scenario, actual stock price updates could be obtained via Web services and then inserted into an Oracle queue.)

(Note that all of the applications created and used in this article—StockQueueLoader, StockTickerListener, and QueueSubscriberAdmin—are available prebuilt in the article’s application download.)

To begin construction of the queue loader application (StockQueueLoader), choose File -> New -> Project from the main menu. In the dialog box that appears, under Project type Visual C#, choose Windows , and then on the right side of the dialog box, choose the Console Application template. In the Name field, enter StockQueueLoader.

In the newly opened project, go to Solution Explorer; right-click the ConsoleApplication project; and from the menu, choose Add Reference . On the .NET tab in the Add Reference dialog box, scroll down until you see the Oracle.DataAccess assembly. You may see multiple versions, but select version 2.111.7.10—the version in which Oracle Streams Advanced Queuing support was first introduced—or later.

Next, download the sample applications at otn.oracle.com/oramag/oracle/09-nov/o69odt.zip, unzip the file, and copy the contents of the AQ_ORAMAG\CodeListings\aqloader.txt file on top of the default template code. (Excerpts of the new code appear in Listing 1.) Notice that the key ODP.NET classes being used are OracleAQQueue and OracleAQMessage. Furthermore, note that the payload for the OracleAQMessage class is another .NET class called STOCK. STOCK is the .NET custom class for the STOCK Oracle user-defined type. (Custom classes enable .NET code to pass user-defined type values to and from Oracle Database.)

Code Listing 1: StockQueueLoader application code excerpt

 

OracleAQQueue queue = new OracleAQQueue("HR.STOCK_Q", con);
queue.MessageType = OracleAQMessageType.Udt;
queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.Immediate;
OracleAQMessage enqMsg = new OracleAQMessage();

Console.WriteLine("Connected to Oracle.... Will start enqueuing in 10 seconds");
System.Threading.Thread.Sleep(10000);

string[] symbols = new string[10] { "ORCL", "MSFT", "INTC", "CSCO", "ADBE", "GOOG", "AAPL", "AMZN", "EBAY", "JAVA" };

double[] prices = new double[10] { 19.5, 19, 18.25, 15, 24, 383.25, 121, 79.75, 15, 9 };

Random rand = new Random();
while (true)
{
    for (int i = 0; i < 10; i++)
    {
        //...code to randomly increase or decrease prices[i] goes here
        }

        //We don't want multiple copies of the same stock on the queue
        //at the same time, so the message will expire after 5 seconds
        enqMsg.Expiration = 5;
        
        enqMsg.SenderId = new OracleAQAgent("QUEUELOADER");
        enqMsg.Payload = new STOCK(symbols[i], (decimal)prices[i]);
        queue.Enqueue(enqMsg);


Now generate the STOCK custom class from the STOCK user-defined type, by using a wizard. To start the wizard, in Server Explorer, under the HR data connection, expand the User-Defined Types node (see Figure 1). Find and right-click the STOCK node. From the menu, choose Generate Custom Class . In the wizard, click Next to move past the splash screen. In the next screen, uncheck the Inherit from Project check box, directly above the Namespace field. Then change Namespace to Stock , click Next until you get to the final screen, and click the Finish button. This will generate a Stock.cs file and place it in your project.

Now add the following constructor to the Stock class, below the automatically generated constructors:

 

public STOCK(string sym, decimal pr)
{
   SYMBOL = sym;
   PRICE = pr;
}


Finally, build the application, by selecting Build from the Visual Studio main menu and then selecting Rebuild Solution . Run the application, by choosing Debug from the main menu, and then select Run Without Debugging .

As you see messages appear in the console indicating that stock prices are being loaded into the queue, you can inspect the STOCK_QTAB table in Oracle Database to see these values being added there. In Server Explorer, under the HR data connection, expand the Tables node and double-click the STOCK_QTAB node to open the data window. If you right-click the data window and choose Refresh from the context menu, you can watch this queue table grow and notice that after five seconds, each entry in this table is replaced with an advanced queuing exception. Because the StockQueueLoader .NET code sets an expiration of five seconds for each item loaded into the queue, these exceptions indicate that the item was not dequeued before this expiration.

Building the StockTickerListener Application

Consumer applications have several options for finding out if there is something in a queue they are interested in. They can poll a queue by inspecting it from time to time to see if there is a message they need; they can listen to the queue with a dedicated thread; or they can register a callback , a subroutine that gets called and is passed a notification when a message that meets certain criteria is put into the queue.

There are various ways a consumer application can choose whether a message is of interest. Messages can be addressed to specific recipients or recipient groups. Messages can also be assigned correlations , which are ad hoc identifiers assigned by the producing applications. Messages also have priorities for filtering. Finally, messages can be inspected via rules , which are essentially WHERE clauses added to SELECT statements that query against the queue.

A consumer application can specify an interest in certain messages in a queue by becoming a subscriber. Each subscriber can define a named rule. You will now build an application—StockTickerListener—that receives callbacks whenever stock prices appear in the queue that match a subscriber rule. These stock prices will be output to a window much like a stock ticker.

To build the StockTickerListener application, in a new instance of Visual Studio, first create a new console application, including an Oracle.DataAccess reference (in the same way you did in the previous section). Now add the Stock.cs custom class file (from the StockQueueLoader application) to your new project, by right-clicking the new project name in Solution Explorer, choosing Add an Existing Item , and then browsing to the file. Visual Studio will then make a local copy of that file in your project, saving you the trouble of re-creating it.

Code Listing 2: StockTickerListener application code excerpt

 

OracleAQQueue queue = new OracleAQQueue("HR.STOCK_Q", con);
queue.MessageType = OracleAQMessageType.Udt;
queue.NotificationConsumers = new string[1] { "SUBSCRIBER1" };

queue.MessageAvailable +=
  new OracleAQMessageAvailableEventHandler(QueueCallback.MsgReceived);
.....

static void MsgReceived(object src, OracleAQMessageAvailableEventArgs arg)
{
    try
    {
        Console.WriteLine("Notification Received...");
        byte[] notifiedMsgId = arg.MessageId[0];

        OracleConnection con = new OracleConnection(constr);
        con.Open();

        // Prepare to dequeue message by its MessageID
        OracleAQQueue queue = new OracleAQQueue("HR.STOCK_Q", con);
        queue.MessageType = OracleAQMessageType.Udt;
        queue.UdtTypeName = "HR.STOCK";
        queue.DequeueOptions.MessageId = notifiedMsgId;
        queue.DequeueOptions.ConsumerName = "SUBSCRIBER1"; 

        queue.DequeueOptions.DequeueMode = OracleAQDequeueMode.Browse;

        //Dequeue
        OracleAQMessage deqMsg = queue.Dequeue();
        STOCK st = (deqMsg.Payload as STOCK);

        Console.WriteLine(st.SYMBOL + ": " + st.PRICE);


From the sample applications you downloaded earlier, copy the contents of the AQ_ORAMAG\CodeListings\stocktickerlistener.txt file on top of the console template code. (Excerpts of this code appear in Listing 2.) The main part of the code contains two key methods. The first is

 

queue.NotificationConsumers 


which is set to SUBSCRIBER1. This indicates that callbacks should be received only when the SUBSCRIBER1 subscription rules are satisfied. The second key method is

 

 
queue.MessageAvailable += new 
OracleAQMessageAvailableEventHandler
(QueueCallback.MsgReceived);


This registers the MsgReceived method as a callback.

In the code for the MsgReceived callback, note that the advanced queuing AQ MessageID is included as part of the arguments that are passed to the method. This ID is set to

 

queue.DequeueOptions.MessageId 


to specify the message you want. Finally,

 

queue.DequeueOptions.DequeueMode = OracleAQDequeueMode.Browse


indicates that the message should not be removed from the queue after viewing, because other client applications may need to view that stock.

Build this application, by selecting Build from the Visual Studio main menu and then selecting Rebuild Solution . Run the application, by choosing Debug from the main menu, and then select Run Without Debugging .

Now go back to the earlier instance of Visual Studio, rerun the StockQueueLoader application, and place both console application windows side by side. You should see the StockTickerListener application display every stock price that is loaded by the StockQueueLoader application (see Figure 3). This is because SUBSCRIBER1’s rule is currently set to null. Because these rules are essentially WHERE clauses in a query over the queue contents, a null rule means that every item on the queue will be included. Keep both applications running while you move on to the next section.

 

figure 3
Figure 3: StockQueueLoader (top) and StockTickerListener (bottom)


Modifying a SUBSCRIBER Rule at Runtime

Earlier, you saw a subscriber rule in the Queue Designer in Visual Studio. This rule selects specific STOCK user-defined type stock symbols and enables users to specify a range of stock prices in which they are interested. Now you will run a helper application that makes it easier to change this rule at runtime while the stock ticker application is running.

The download provided with this article includes a Windows Forms application called StockQueueSubscriberAdmin.This application presents a user interface (see Figure 4) that includes a list of stock symbols; an optional field for providing a stock price; and a list of conditions that enables the user to set up a subscriber rule for receiving stock prices that are less than, greater than, or equal to the provided value.

 

figure 4
Figure 4: StockQueueSubscriberAdmin user interface


Run the StockQueueSubscriberAdmin application. Select the ORCL symbol from the stock list, and click Add to Rule . In the text box, you will see

 

tab.user_data.SYMBOL=''ORCL'' 


 

Next Steps


DOWNLOAD
Oracle Data Provider for .NET and Oracle Developer Tools for Visual Studio
sample applications for this article

LEARN more about Oracle and .NET
oracle.com/technetwork/tech/dotnet
cshay.blogspot.com

READ more about Oracle Streams Advanced Queuing
Oracle Streams Advanced Queuing User’s Guide
user-defined types
It Takes All Types

This rule will cause your stock ticker to receive notifications only when ORCL stock values are placed in the queue. Next, select the JAVA symbol from the list and enter 3 in the Price field. Choose > (greater than) from the Condition list. Click Add to Rule . Now the text box contains

 

tab.user_data.SYMBOL=''ORCL'' OR 
(tab.user_data.SYMBOL=''JAVA'' AND 
tab.user_data.PRICE>3)


This rule will cause your stock ticker to receive notifications when all ORCL stock prices as well as any JAVA prices above $3 are loaded into the queue.

Finally, click Alter Subscription! . This will execute the DBMS_AQADM.ALTER_SUBSCRIBER procedure and pass the rule you have created. (The code that calls this procedure is shown in Listing 3.) To see the new rule, in Server Explorer right-click the STOCK_Q node, choose Design Queue , and then click the Subscribers tab.

Code Listing 3: StockQueueSubscriber

 

private void button2_Click_1(object sender, EventArgs e)
{
//Executes DBMS_AQADM.ALTER_SUBSCRIBER to modify the SUBSCRIBER1 subscription
string conString = "User Id=hr;Password=hr;Data Source=;";
OracleConnection con = new OracleConnection();
con.ConnectionString = conString;
con.Open();

OracleCommand cmd = con.CreateCommand();

string cmdtxt = "DECLARE " +
    "SUBSCRIBER SYS.AQ$_AGENT; " +
    "BEGIN " +
    @"SUBSCRIBER := SYS.AQ$_AGENT('""SUBSCRIBER1""', NULL, 0); " +
    "DBMS_AQADM.ALTER_SUBSCRIBER(" +
    @"queue_name => '""STOCK_Q""'," +
    "subscriber => SUBSCRIBER," +
    "rule => '" + textBox2.Text +"'," +
    "transformation => ''" +
    "); " +
    "END;";

OracleCommandBuilder oc = new OracleCommandBuilder();
cmd.CommandText = cmdtxt;

//Execute batched statement
cmd.ExecuteNonQuery();
}


If both of the earlier applications are still running, you should notice that your stock ticker application is now displaying only ORCL and JAVA ticker symbols. Each time the rule is modified, your running stock ticker will immediately reflect the change.

Now go back to the Queue Subscription GUI application, click Clear Rule , and click Alter Subscription! . This will set the rule to null. Watch as the stock ticker immediately resumes displaying all the stock symbols that are loaded into the queue.

Summary

This article showed you how you can leverage Oracle Streams Advanced Queuing database-integrated messaging from .NET applications. You loaded a queue from one application, subscribed to that queue from another application, and received callback notifications based on subscription rules you modified by using the DBMS_AQADM package.

 


Christian Shay ( christian.shay@oracle.com ) is a principal product manager at Oracle.



Send us your comments

false ,,,,,,,,,,,,,,,,