Oracle by Example brandingCreating a Pipeline and the Query Stages

section 0Before You Begin

This 15-minute tutorial shows you how to create a SupplierAnalysis pipeline and the query stages in your Stream Analytics pipeline so that you track financial losses attributed to specific products from various suppliers.

This is the second tutorial in the Tracking Financial Losses from Various Suppliers By Using a Stream Analytics Pipeline series. Read them sequentially.

Background

Stream Analytics is a feature of Oracle Integration Cloud. It's an intuitive, web-based interface, powered by Spark Streaming and Apache Kafka Messaging runtime, and the interface enables you to explore, analyze, and manipulate streaming data sources in real time.

What Do You Need?

Perform the tasks described in Creating Supporting Artifacts for the SupplierAnalysis Pipeline.


section 1Create a Stream Analytics Pipeline

  1. Click Catalog.
  2. In the Create New Item menu, select Pipeline.
    The Create Application dialog box
    Description of the illustration create_application.png
  3. In the Create New Pipeline dialog box, do the following:
    • In the Name field, enter SupplierAnalysis.
    • In the Description field, enter Tutorial: Instantly determine the current losses on various Supplier products for renegotiation by Procurement Department.
    • In the Tags field, enter retail, tutorial.
    • In the Stream drop-down list, select SalesTransactions.
  4. Click Save. The events stream into the SupplierAnalysis pipeline.
    The table with events streaming
    Description of the illustration create_app_live_stream.png

section 2Create a Query Stage to Obtain Unit Costs

  1. In the Add a Stage menu, select Query.
    The Create Query Stage dialog box
    Description of the illustration create_query_stage.png
  2. In the Create Query Stage dialog box, enter the following:
    • In the Name field, enter GetUnitCostAndSupplier.
    • In the Description field, enter Initial analytical query stage to join with the provided database table of suppliers.
  3. Click Save. The query stage is added to the pipeline.
  4. Click Add a Source and select the ListOfSuppliers reference.
    The ListOfSuppliers source
    Description of the illustration create_query_stage.png
  5. In the GetUnitCostandSupplier stage, click Add a Source next to the stream, then in the drop-down list, select ListOfSuppliers.
  6. Under Correlation Conditions, select Match All, then click Add a Condition, and then select PRODUCT equals (case insensitive) Product in SalesTransactions stream. Note: Make sure you corelate only one field, not multiple fields. The three columns of the reference are added to the Live Output Stream.
    The Live Output Stream table
    Description of the illustration live_data_stream_reference.png

section 3Create a Query Stage to Analyze Calculated Results

  1. In the Add a Stage menu, select Query.
  2. In the Create Query Stage dialog box, enter the following:
    • In the Name field, enter CalcualteTxnCostRevProfitLoss.
    • In the Description field, enter Use available custom functions using the Expression Builder to calculate the required values.
  3. Click Save. The query stage is added to the pipeline.
  4. In the Live Output Stream table, place your cursor over the first column (ProductLine) in the Live Output Stream and click Expression Builder.
  5. In the input field, enter =QuantitySold*UnitSalePrice and click the Apply icon to display the calc column.
  6. Right-click that column and rename it to TxnRevenue. The Live Output Stream displays the TxnRevenue column.
  7. Repeat steps 5, 6, and 7 to add the expressions and columns in the following table:
    Function Column Name
    =QuantitySold*UNITCOST TxnCost
    =TxnRevenue-TxnCost ProfitLoss

section 4Create a Query Stage to Show Products Indicating a Loss

  1. From the Add a Stage menu, select Query.
  2. In the Create Query Stage dialog box, enter the following:
    • In the Name field, enter LossesOnly.
    • In the Description field, enter Only show the products indicating a loss.
  3. Click Save. The query stage is added to the pipeline.
  4. Click Filters and then Add a Filter.
  5. To receive only negative loss values, in the first drop-down list, select ProfitLoss, and in the second drop-down list, select lower than, and then enter 0 in the blank field.
    The Filters tab
    Description of the illustration add_filter.png
  6. Add a custom function abs on the ProfitLoss column to convert the negative value to positive values:
    =abs(ProfitLoss) 
  7. To create another column that converts the negative value to positive values, in the ProfitLoss column, add =abs(ProfitLoss), rename the newly obtained column to TxnLoss and remove the ProfitLoss column from output.
  8. Click the clock icon next to Columns to add Timestamp to the Live Output Stream table.
    The Clock icon
    Description of the illustration time_stamp.png
  9. Rename this column to TxnTime.
    The Filters tab
    Description of the illustration live_stream_txn_time.png

section 5Create Unique Real-Time Operational Visualizations

  1. Click Visualizations on the LossesOnly query stage.
  2. Click Add a Visualization and then click Bar Chart.
  3. In the Create Visualization dialog box, enter the following values and click Create:
    • Name: Current Loss Amount By Supplier
    • Description: Bar Chart showing the levels of losses for products by suppliers
    • Y Axis Field Selection: TnxLoss
    • Axis Label: Loss
    • X Axis Field Selection: SUPPLIERNAME
    • Axis Label: Supplier
    • The Create Visualization dialog
      Description of the illustration add_bar_chart.png
  4. The data in displayed in a graphical view.
    The graphical view of data
    Description of the illustration loss_amt_by_supplier_chart.png

next stepNext Tutorial

Publishing and Deploying the Pipeline