Tame Big Data using Oracle Data Integration
Overview: Integrate Hadoop using Oracle Data Integration
- Review the Scenario
- Ingesting Data Using Sqoop and Oracle GoldenGate
- Transforming Data Using Hive, Spark, or Pig
- Loading Data to Oracle DB using Oracle Loader for Hadoop
- Accessing Hadoop Data from Oracle using Big Data SQL
- Data Sessionization using Pig
- Execute all Steps Using an ODI Package
Purpose
This demo illustrates how you can move and transform all your data using Oracle Data Integration - whether that data resides in Oracle Database, Hadoop, third-party databases, applications, files, or a combination of these sources. The "Design once, run anywhere" paradigm allows you to focus on the logical rules of data transformation and movement while the choice of implementation is separate and can evolve as use cases change and new technologies become available.
This demo is included in the virtual machine Big Data Lite 4.2, which is downloadable from http://www.oracle.com/technetwork/database/bigdata-appliance/oracle-bigdatalite-2104726.html.
Time to Complete
Approximately one hour
Introduction
This tutorial is divided into the following sections:
Scenario
Oracle MoviePlex is an online movie streaming company. Its web site collects every customer interaction in massive JSON formatted log files. It also maintains movie data in an Oracle source database. By unlocking the information contained in these sources and combining it with enterprise data in its data warehouse, the company will be able to enrich its understanding of customer behavior, the effectiveness of product offers, the organization of web site content, and more.
The company is using Oracle's Big Data Management System to unify their data platform and facilitate these analyses, and it is achieved by implementing a Data Reservoir pattern where both structured and unstructured data is collected and staged in a Big Data instance for further analysis and load into target DBs.

Oracle Data Integration provides a unified tool-driven approach to declaratively define integration of all data. The concept of "Design once, run anywhere" means that users can define integration processes regardless of the implementation language and run them in different environments. A transformation that is executed today on an RDBMS can be reused to run the Hadoop cluster and utilize future Hadoop languages and optimizations in the as they become available.
For Oracle MoviePlex, a variety of mechanisms is showcased. Data is loaded from a source database to Hive tables, both in bulk using Sqoop through Oracle Data Integrator(ODI), as well as through change data capture using Oracle GoldenGate(OGG). Data is transformed through joining, filtering and aggregating through Hive,Spark, or Pig in ODI, and the resulting data can be unloaded into a target Oracle DB using the optimized Oracle Loader for Hadoop(OLH) or Oracle SQL Connector for Hadoop(OSCH). Hadoop data can also be used in Oracle DB using Big Data SQL, where ODI transparently generates the necessary external tables to expose Hive tables in the Oracle DB to be used in SQL queries.
Let's begin the tutorial by reviewing how data is moved and transformed using Oracle Data Integration for Big Data.
Resetting the demo
You can reset the demo environment from a previously run demo or hands-on lab by executing the script:-
/home/oracle/movie/moviework/odi/reset_ogg_odi.sh
Preparation Steps
- ORCL Oracle Database 12c
- Zookeeper
- HDFS
- Hive
- YARN (need to scroll down)
Downloading and Installing Big Data Lite Virtual Machine
Please follow the instructions at the Big Data Lite Deployment Guide for details on downloading,installing, and starting Big Data Lite 4.2.
Starting required services
Select any services that are not started and press OK.
Prepare Oracle Database for Oracle GoldenGate
cd /home/oracle/movie/moviework/ogg
./enable_ogg_on_orcl.sh.
This only has to be done once, the configuration stays valid after reboots of the VM. In order to undo the changes that this script does, you can execute disable_ogg_on_orcl.sh on the same directory.
Part 1 - Ingesting Data Using Sqoop and Oracle GoldenGate
- ORCL Oracle Database 12c
- Zookeeper
- Kafka
- Source table movie_updates
provides information about each movie
with op providing operation types I, U, and D,
while ts provides a timestamp value.
- AGGREGATE is used to group all movies based on movie_id and calculate the latest timestamp as max_ts.
- JOIN is used to get all movie update entries with the latest timestamp for each movie_id as calculated in the aggregate.
- FILTER is being used to filter out any rows that are of operation type D.This means that the latest update was a delete.
In this section, you will learn how to ingest data from external sources into Hadoop, using Sqoop for bulk load and Oracle GoldenGate for change data capture.

Ingest Bulk Data Using Sqoop in Oracle Data Integrator
Oracle Data Integrator allows the creation of mappings to move and transform data. In this step we will review and execute an existing mapping to load data from an Oracle DB source to a Hive target table.
Open and review the Sqoop mapping




Please note that movie_updates has additional fields op for operation type and ts for timestamp. These columns are used to prepare the data for later reconciliation with updates that will be appended by Oracle GoldenGate. The field op is initialized with a string 'i', while ts calls SYSDATE when executed on the Oracle source.
Click on the tab Physical under the mapping to see the physical design.

Select the access point MOVIE_AP to review the configuration of the data load.


Execute the Sqoop mapping


Note: The execution can take several minutes depending on the environment. In order to view the generated code only, you can check the checkbox Simulation. In this case the generated session is displayed in a dialog window and no execution is shown in the operator.
If execution has not finished it will show the Run icon for an ongoing task. You can refresh the view by pressing the blue Refresh icons to refresh once or to refresh automatically every 5 seconds.




Ingest Change Data using Oracle GoldenGate for Big Data
Oracle GoldenGate allows the capture of completed transactions from a source database, and the replication of these changes to a target system. Oracle GoldenGate is non-invasive, highly performant, and reliable in capturing and applying these changes. Oracle GoldenGate for Big Data provides a component to replicate changes captured by GoldenGate into different Hadoop target technologies. In this tutorial we will replicate inserts into the MOVIE table in Oracle to the respective movie_updates table in Hive. Oracle GoldenGate provides this capability through GoldenGate for Big Data and provides adapters for Hive, HDFS, HBase, Flume, and Kafka. For this example we will be first using the Hive example and later also demonstrate the delivery mechanism to Kafka.
The GoldenGate processes for the Hive example are as following:
Start Oracle GoldenGate and Set Up Replication from Oracle to Hive

cd /u01/ogg
./ggsci

obey dirprm/bigdata.oby
info all

exit
cd /u01/ogg-bd
./ggsci

obey dirprm/bigdata.oby
info all

The RKAFKA process can be ignored here, it will show as STOPPED if Kafka is not started.
sqlplus system/welcome1@orcl

INSERT INTO "MOVIEDEMO"."MOVIE" (MOVIE_ID, TITLE, YEAR, BUDGET, GROSS, PLOT_SUMMARY) VALUES ('1', 'Sharknado', '2014', '500000', '20000000', 'Flying sharks attack city');
commit;
Note: Alternatively you can execute the following command:
@ /home/oracle/movie/moviework/ogg/oracle_insert_movie.sql;


UPDATE "MOVIEDEMO"."MOVIE" SET BUDGET=BUDGET+'10000' where MOVIE_ID='1';
commit;
Note: Alternatively you can execute the following command:
@ /home/oracle/movie/moviework/ogg/oracle_update_movie.sql;

DELETE FROM "MOVIEDEMO"."MOVIE" WHERE MOVIE_ID='1';
commit;
Note: Alternatively you can execute the following command:
@ /home/oracle/movie/moviework/ogg/oracle_delete_movie.sql;

hive
This will open the Hive CLI to query the result table.

show create table movie_view;
This shows a view you can use to query a reconciliated version of the movie table in Hive. Please note that in this tutorial we will also use Oracle Data Integrator to create a reconciliated table. Reconciliation by View is provides a real-time status, but is more resource intensive for frequent queries.

You can query this view by entering:
select * from movie_view;
Replicate Data from Oracle to Kafka
Oracle GoldenGate for Big Data provides a built in component to replicate changes captured by GoldenGate into Kafka. It also provides capabilities to the data in various formats such as JSON, XML, Avro in addition to the CSV format. The GoldenGate processes for the Kafka handler example are as following:
Set up Kafka

Kafka uses Zookeeper so you need to first start a Zookeeper server if you don't already have one. You can use the convenience script packaged with Kafka to get a quick-and-dirty single-node Zookeeper instance in case it is not running.
Check whether zookeeper and Kafka is running by running >ps -eaf | grep -i kafka
If Kafka and zookeeper are not running, start it with the Step 1
Let's create a kafka topic named "oggtopic" :
cd /usr/lib/kafka
>bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic oggtopic --from-beginning
Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default each line will be sent as a separate message.
Run the producer and then type a few messages into another console to send to the server.
> /usr/lib/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic oggtopic Test message 1 This is another test message 2
If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal.
All of the command line tools have additional options; running the command with no arguments will display usage information documenting them in more detail.
Working with Kafka handler
Start a terminal window from the menu bar by single-clicking on the Terminal icon.

cd /u01/ogg-bd
./ggsci

ggsci> obey dirprm/bigdata.oby

ggsci> info all

cd /u01/ogg
./ggsci
>ggsci obey dirprm/bigdata.oby
ggsci> info all

>sqlplus system/welcome1@orcl

INSERT INTO "MOVIEDEMO"."MOVIE" (MOVIE_ID, TITLE, YEAR, BUDGET, GROSS, PLOT_SUMMARY) VALUES ('1', 'Sharknado', '2014', '500000', '20000000', 'Flying sharks attack city');
commit;

The Kafka replicat produces the Kafka message in AVRO format and it is captured by the Kafka consumer console.
Merge Updates using Hive in Oracle Data Integrator
In our demo we have used Oracle Data Integrator to load initial data using Sqoop and Oracle GoldenGate to replicate changes in real-time. We have also used a view to reconciliate the updates on the fly. We can also merge the data in bulk using an ODI mapping to provide the same data that the original table in Oracle holds.
Open and review the Hive merge mapping

The mapping logical view shows the transformations used for the mapping:

Please note that this mapping is using the same logic as the Hive view shown in the previous section. This demo illustrates the ability of ODI to graphically design transformations similar to the reconciliation view.
Switch to the Physical View of the mapping. Since the transformation is within Hive, both source and target datastore are in the same execution unit.

Execute the Hive merge mapping

Note: The execution can take several minutes depending on the environment. In order to view the generated code only, you can check the checkbox Simulation. In this case the generated session is displayed in a dialog window and no execution is shown in the operator.

HiveMovie.movie. Select View Data from the menu to see the loaded rows.

Part 2 - Transforming Data using Hive, Spark, and Pig
- Source table movie
provides information about each movie, while
source table movieapp_log_avro
contains raw customer activities.
- FILTER is being used to filter down to activity = 1 events, which are rating events.
- AGGREGATE is used to group all ratings based on movieid and calculate an average of the movie ratings.
- JOIN is used to join base movie information from table movie with aggregated events to write to the target table movie_rating.
- The target table movie_rating stores the result from the join. It uses a user-defined function XROUND that provides rounding support for all languages.
- Source HDFS file movie_ratings
provides nested JSON information about each
movie with a sub-array of ratings for each movie.
- Flatten is being used to un-nest the rating information. It will do a cross-product of each movie with its nested ratings.
- AGGREGATE is used to group all ratings based on movie_id and calculate an average of the movie ratings.
- The target table movie_rating stores the result from the join. It uses a user-defined function XROUND that provides rounding support for all languages.
In this section, you will use Oracle Data Integrator to transform the movie data previously loaded with Sqoop and GoldenGate. The use case is to join the table movie with customer activity event data that has been previously loaded into an HDFS file using Flume and is now exposed as a Hive table movieapp_log_odistage. The activity data contains rating actions, we will calculate an average rating for every movie and store the result in a Hive table movie_rating. You can create one logical mapping and choose whether to use Hive, Spark, or Pig as execution engine of the staging location. ODI will generate either Hive SQL, Spark-Python, or Pig Lating and execute it in the appropriate server engine.
Also as part of this chapter we will show a mapping that takes
a nested JSON HDFS file as input and flattens it to calculate
movie ratings on the contents. The implementation engine used is
Spark.
This demo shows that with ODI you can create logical mappings declaratively without considering any implementation details; those can be added later in the physical design.

Transform Movie Data using Hive
Open and review the Hive mapping

The mapping logical view shows the transformations used for the mapping:

Switch to the Physical View of the mapping and make sure the tab Hive is selected on the bottom. There is a physical design for each of the 3 implementation engines. Since the transformation is within Hive, both source and target datastore are in the same execution unit.

Select the target MOVIE_RATING and review the Properties window, Integration Knowledge Module tab.The IKM Hive Append has been selected with default settings.

Execute the Hive mapping

Note: The execution can take several minutes depending on the environment. In order to view the generated code only, you can check the checkbox Simulation. In this case the generated session is displayed in a dialog window and no execution is shown in the operator.

HiveMovie.movie_rating. Select View Data from the menu to see the loaded rows.

Transform Movie Data using Spark
Open and review the mapping
Please note that we are using the same logical mapping for Hive, Spark, and Pig. There is no separate logical design for these implementation engines. The property Staging Location Hint of the logical diagram is used to select an implementation engine prior to generation of a physical design.

Switch to the Physical View of the mapping and make sure the tab Spark is selected on the bottom. There is a physical design for each of the 3 implementation engines. Since the transformation is using Spark and the sources and targets are defined using Hive Catalog metadata, source, staging (Spark), and target datastore are shown in separate execution units.

Select the component JOIN and review the Properties window, Extract Options tab. If you switch to the tab Advanced, you will see that an Extract Knowledge Module XKM Spark Join has been selected for you by the system. XKMs such as this one organize the Spark-Python code generation of each component for you. You can set advanced options such as CACHE_DATA or NUMBER_OF_TASKS in the Options tab to tune Spark memory handling and parallelism.

Execute the Spark mapping


Note: The execution can take several minutes depending on the environment. In order to view the generated code only, you can check the checkbox Simulation. In this case the generated session is displayed in a dialog window and no execution is shown in the operator.

Transform Movie Data using Pig
Open and review the mapping
Please note that we are using the same logical mapping
for Hive, Spark, and Pig. There is no separate logical
design for these implementation engines.
The property Staging
Location Hint of the logical diagram is used
to select an implementation engine for the next
generated physical design.

Switch to the Physical View of the mapping and make sure the tab Pig is selected on the bottom. There is a physical design for each of the 3 implementation engines. Since the transformation is using Pig and the sources and targets are defined using Hive Catalog metadata, source, staging (Pig), and target datastore are shown in separate execution units.

Execute the Pig mapping



Note: The execution can take several minutes depending on the environment. In order to view the generated code only, you can check the checkbox Simulation. In this case the generated session is displayed in a dialog window and no execution is shown in the operator.

Transform Nested JSON Data Using
Spark
Prepare and review source HDFS JSON data

In the terminal window, execute the commands:
hdfs dfs -put ~/movie/moviework/odi/movie_ratings/movie_ratings.json /user/odi/demo/movie_ratings.json
hdfs dfs -cat /user/odi/demo/movie_ratings.json
You can see a JSON-formatted file with records of movies and nested rating records between 1-5. This will be the input for our mapping, you can see the ODI Model HDFSMovie with datastore movie_ratings containing the metadata for the JSON file. Please note that the datastore does not keep metadata about complex attributes like the ratings sub-array, it is displayed as a flat String type attribute.

Open and review the mapping

The mapping logical view shows the transformations used for the mapping:

Switch to the Physical View of the mapping.

Execute the mapping

Note: The execution can take several minutes depending on the environment. In order to view the generated code only, you can check the checkbox Simulation. In this case the generated session is displayed in a dialog window and no execution is shown in the operator.

HiveMovie.movie_rating. Select View Data from the menu to see the loaded rows.

Part 3 - Loading Data to Oracle DB using Oracle Loader for Hadoop
In this task we load the results of the prior Hive transformation from the resulting Hive table into the Oracle DB data warehouse. We are using the Oracle Loader for Hadoop (OLH) build data loader which uses mechanisms specifically optimized for Oracle DB.

Load Movie data to Oracle DB
using ODI Mapping and Oracle Loader for Hadoop

The mapping logical view shows direct map from the Hive table movie to the Oracle table MOVIE. No transformations are done on the attributes.

Switch to the Physical View of the mapping. It shows the move from the Hive source to the Oracle target.

Select the access point MOVIE_RATING_AP (some of the label might be invisible) and review the Properties window, Integration Knowledge Module tab.The LKM Hive to Oracle OLH-OSCH Direct has been selected with default settings except for TRUNCATE=True, which allows repeated execution of the mapping, and USE_HIVE_STAGING_TABLE=False, which avoids an additional staging step. The output mode OLH_OUTPUT_MODE is set to JDBC by default, which is a good setting for debugging and simple use cases. Other settings are to perform an OLH load through OCI or data pump or to perform a load through OSCH.

Execute the OLH mapping

Note: The execution can take several minutes depending on the environment. In order to view the generated code only, you can check the checkbox Simulation. In this case the generated session is displayed in a dialog window and no execution is shown in the operator.

OracleMovie.ODI_MOVIE_RATING. Select View Data from the menu to see the loaded rows.

Part 4 - Accessing Hadoop Data from Oracle using Big Data SQL
In the next section ODI will transform data from both Oracle DB as well as Hive in a single mapping using Oracle Big Data SQL. Big Data SQL enables Oracle Exadata to seamlessly query data on the Oracle Big Data Appliance using Oracle's rich SQL dialect. Data stored in Hadoop is queried in exactly the same way as all other data in Oracle Database.
In this use case we are combining the previously used activity data with Customer data that is stored in Oracle DB. We are summarizing all purchase activities for each customer and join it with the core customer data.

- Hive source table movieapp_log_odistage
provides information about each movie, while Oracle
source table CUSTOMER
contains information about each customer.
- FILTER is being used to filter down to activity = 11 events, which are sales events.
- JOIN is used to join activity information with the respective customer information.
- AGGREGATE is used to group all ratings based on COUNTRY and CONTINENT information and calculate an sum of the sales values of all activities for this given geography.
Calculate Sales from Hive and
Oracle Tables using Big Data SQL
Open and review the Big Data SQL mapping

The mapping logical view shows the transformations used for the mapping:

Switch to the Physical View of the mapping. It is visible that table movieapp_log_stage is within Hive, while table CUSTOMER and all transformations are in the target Oracle DB.

Select the access point MOVIEAPP and review the Properties window, Loading Knowledge Module tab.The LKM Hive to Oracle (Big Data SQL) has been selected to access the Hive table from Oracle remotely through an external table definition. All LKM options are default.
Note: LKM Hive to Oracle (Big Data SQL) is a custom example KM that is available for download at Java.net.

Execute the Big Data SQL mapping

Note: The execution can take several minutes depending on the environment. In order to view the generated code only, you can check the checkbox Simulation. In this case the generated session is displayed in a dialog window and no execution is shown in the operator.


OracleMovie.ODI_COUNTRY_SALES. Select View Data from the menu to see the loaded rows.

Part 5 - Data Sessionization using Pig
- Hive source table movieapp_log_odistage
provides information about each movie, while Hive
table cust contains
information about each customer.
- SESSIONIZE is a table function component that contains custom Pig code calling the DataFu.sessionize UDF. This functionality combines activities to sessions based on the time window they are happening in.
- AGGREGATE is used to aggregate session statistics based on location (province, country).
- EXPRESSION is used to normalize and convert the session statistics.
- SORT is ordering the aggregate records based on average session length.
- The result is stored into the target Hive table session_stats.
This demo shows how to execute a complex mapping complex Pig using ODI with the ability to use user-defined functions and table functions. The mapping is using a Pig function sessionize from the Apache DataFu library which is included in this Hadoop distribution.
The objective of this mapping is to order the activities in the Hive table movieapp_log_odistage into separate sessions for different users and calculate statistics for session minimum, maximum, and average duration based on geography.

Load Sessionize and Analyze User
Activities using Pig in ODI
Open and review the mapping

The mapping logical view shows the transformations used for the mapping:

Switch to the Physical View of the mapping. Since the transformation is using Pig and the sources and targets are defined using Hive Catalog metadata, source, staging (Pig), and target datastore are shown in separate execution units.

Highlight the physical component SESSIONIZE and go to the properties window. Select the Extract Options tab and double-click on the Value column of the PIG_SCRIPT_CONTENT option

A dialog is shown with the custom Pig code used to
implement this table function. The function Sessionize
from DataFu is at the core of this
implementation. Please note that the code uses MOVIEAPP_LOG_ODISTAGE as
input relation and SESSIONIZE
as output relation.

Execute the Pig mapping

Note: You will see in Part 6 how to automate this step in a package.

Note: The execution can take several minutes depending on the environment. In order to view the generated code only, you can check the checkbox Simulation. In this case the generated session is displayed in a dialog window and no execution is shown in the operator.

Part 6. Execute all Steps Using an ODI Package
In this section we use an ODI Package to execute all other ODI steps together sequentially. Packages can be used to orchestrate various jobs such as mappings, procedures, tools, or other packages, and allow the inclusion of conditional logic and variables as part of the execution. This package also makes use of a File Copy tool that copies a JSON file from the local filesystem into HDFS as well as procedures to issue Hive SQL calls to truncate tables.
Review Package

Review the package. All mappings, procedures, and tool calls are connected in a workflow to execute in one operation.




Note: The execution can take several minutes depending on the environment.
Once the load is complete, the operator will show all tasks of the session as successful.
Summary
- Ingest Data Using Sqoop and Oracle GoldenGate
- Transform Data Using Hive, Pig, and Spark
- Load Data to Oracle DB using Oracle Loader for Hadoop
- Access Hadoop Data from Oracle using Big Data SQL
- Sessionize Data using Pig
- Use ODI Packages to orchestrate multiple jobs
- Authors: Alex Kotopoulis
In this tutorial, you have learned how to:
Resources
Go to the Oracle Technology Network for more information on Oracle Data Integration, Oracle Big Data SQL, Oracle Data Warehousing and Oracle Analytical SQL
Credits
To help navigate this Oracle by Example, note the following:
- Hiding Header Buttons:
- Click the Title to hide the buttons in the header. To show the buttons again, simply click the Title again.
- Topic List Button:
- A list of all the topics. Click one of the topics to navigate to that section.
- Expand/Collapse All Topics:
- To show/hide all the detail for all the sections. By default, all topics are collapsed
- Show/Hide All Images:
- To show/hide all the screenshots. By default, all images are displayed.
- Print:
- To print the content. The content currently displayed or hidden will be printed.