Data Manipulation with Hive and Pig

The commands listed below are to be used with the associated hands-on lab Data Manipulation with Hive and Pig.

Exercise 1

1. Review the commands available for the Hadoop Distributed File System:

cd /home/oracle/movie/moviework/mapreduce
hadoop fs

2. List the contents of /user/oracle

hadoop fs -ls /user/oracle

3. Create a subdirectory called “my_stuff” in the /user/oracle folder and then ensure the directory has been created:

hadoop fs -mkdir /user/oracle/my_stuff
hadoop fs -ls /user/oracle

4. Remove the directory “my_stuff” and then ensure it has been removed:

hadoop fs -rm -r my_stuff
hadoop fs -ls

5. Inspect the compressed Avro application log:

cd /home/oracle/movie/moviework/mapreduce
./read_avro_file.sh

6. Review the commands available for the Hadoop Distributed File System and copy the gzipped file into HDFS:

hadoop fs
hadoop fs -put movieapp_3months.avro /user/oracle/moviework/applog_avro

7. Verify the copy by listing the directory contents in HDFS:

hadoop fs -ls /user/oracle/moviework/applog_avro

Exercise 2

1. Enter the Hive command line by typing hive at the Linux prompt:

hive

2. Create a new hive database called moviework. Ensure that the database has been successfully created:

create database moviework;
show databases;

3. Review the schema definition for the avro file and then define a table using that schema file

hadoop fs -cat moviework/schemas/activity.avsc

4. To create a table in a database, you can either fully qualify the table name (i.e. prepend the database to the name of the table) or you can designate that you want all DDL and DML operations to apply to a specific database. For simplicity, you will apply subsequent operations to the moviework database:

use moviework;

CREATE EXTERNAL TABLE movieapp_log_avro
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
WITH SERDEPROPERTIES ('avro.schema.url'='hdfs://bigdatalite.localdomain/user/oracle/moviework/schemas/activity.avsc')
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/user/oracle/moviework/applog_avro';

SELECT * FROM movieapp_log_avro LIMIT 20;

5. HiveQL supports many standard SQL operations. Find the min and max time periods that are available in the log file:

SELECT MIN(time), MAX(time) FROM movieapp_log_avro;

Exercise 3

1. Write a query to select only those clicks which correspond to starting, browsing, completing, or purchasing movies. Use a CASE statement to transform the RECOMMENDED column into integers where ‘Y’ is 1 and ‘N’ is 0. Also, ensure GENREID is not null. Only include the first 25 rows:

SELECT custid,
movieid,
CASE WHEN genreid > 0 THEN genreid ELSE -1 END genreid,
time,
CASE recommended WHEN 'Y' THEN 1 ELSE 0 END recommended,
activity,
price
FROM movieapp_log_avro
WHERE activity IN (2,4,5,11) LIMIT 25;

2. Write a query to select the customer ID, movie ID, recommended state and most recent rating for each movie.

SELECT
m1.custid,
m1.movieid,
CASE WHEN m1.genreid > 0 THEN m1.genreid ELSE -1 END genreid,
m1.time,
CASE m1.recommended WHEN 'Y' THEN 1 ELSE 0 END recommended,
m1.activity,
m1.rating
FROM movieapp_log_avro m1
JOIN
(SELECT
custid,
movieid,
CASE WHEN genreid > 0 THEN genreid ELSE -1 END genreid,
MAX(time) max_time,
activity
FROM movieapp_log_avro
GROUP BY custid,
movieid,
genreid,
activity
) m2
ON (
m1.custid = m2.custid
AND m1.movieid = m2.movieid
AND CASE WHEN m1.genreid > 0 THEN m1.genreid ELSE -1 END = CASE WHEN m2.genreid > 0 THEN m2.genreid ELSE -1 END
AND m1.time = m2.max_time
AND m1.activity = 1
AND m2.activity = 1
) LIMIT 25;

3. Load the results of the previous two queries into a staging table. First, create the staging table:

CREATE TABLE movieapp_log_stage (
custId INT,
movieId INT,
genreId INT,
time STRING,
recommended INT,
activity INT,
rating INT,
sales FLOAT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

4. Next, load the results of the queries into the staging table:

INSERT OVERWRITE TABLE movieapp_log_stage
SELECT * FROM (
SELECT custid,
movieid,
CASE WHEN genreid > 0 THEN genreid ELSE -1 END genreid,
time,
CAST((CASE recommended WHEN 'Y' THEN 1 ELSE 0 END) AS INT) recommended,
activity,
cast(null AS INT) rating,
price
FROM movieapp_log_avro
WHERE activity IN (2,4,5,11)
UNION ALL
SELECT
m1.custid,
m1.movieid,
CASE WHEN m1.genreid > 0 THEN m1.genreid ELSE -1 END genreid,
m1.time,
CAST((CASE m1.recommended WHEN 'Y' THEN 1 ELSE 0 END) AS INT) recommended,
m1.activity,
m1.rating,
cast(null as float) price
FROM movieapp_log_avro m1
JOIN
(SELECT
custid,
movieid,
CASE WHEN genreid > 0 THEN genreid ELSE -1 END genreid,
MAX(time) max_time,
activity
FROM movieapp_log_avro
GROUP BY custid,
movieid,
genreid,
activity
) m2
ON (
m1.custid = m2.custid
AND m1.movieid = m2.movieid
AND CASE WHEN m1.genreid > 0 THEN m1.genreid ELSE -1 END = CASE WHEN m2.genreid > 0 THEN m2.genreid ELSE -1 END
AND m1.time = m2.max_time
AND m1.activity = 1
AND m2.activity = 1
)
) union_result;


Optional: Pig


1. Start the Grunt shell and execute the following statements to set up a dataflow with the clickstream data. Note: Pig Latin statements are assembled into MapReduce jobs which are launched at execution of a DUMP or STORE statement.

pig

REGISTER /usr/lib/pig/piggybank.jar
REGISTER /usr/lib/pig/lib/avro-1.7.4.jar
REGISTER /usr/lib/pig/lib/json-simple-1.1.jar
REGISTER /usr/lib/pig/lib/snappy-java-1.0.4.1.jar
REGISTER /usr/lib/pig/lib/jackson-core-asl-1.8.8.jar
REGISTER /usr/lib/pig/lib/jackson-mapper-asl-1.8.8.jar

applogs = LOAD '/user/oracle/moviework/applog_avro'
USING org.apache.pig.piggybank.storage.avro.AvroStorage(
'no_schema_check',
'schema_file',
'hdfs://bigdatalite.localdomain/user/oracle/moviework/schemas/activity.avsc');

DESCRIBE applogs;

log_sample = SAMPLE applogs 0.001;

DESCRIBE log_sample;

DUMP log_sample;

2. Group the log_sample by movie and dump the resulting bag.

movie_logs = GROUP log_sample BY movieId;

dump movie_logs;