Oracle Cloud: Introdução ao Oracle Big Data (Parte V) - Pig

Por Gokhan Atil ,Joel Pérez & Evandro Giachetto
Publicado em Decembro 2017

Outros artigos nesta série:

Introdução ao Oracle Big Data Cloud (Parte I)

Introdução ao Oracle Big Data Cloud (Parte II) - Serviços

Introdução ao Oracle Big Data Cloud (Parte III) - Ambari

Introdução ao Oracle Big Data Cloud (Parte IV) - Zeppelin

Introdução ao Oracle Big Data Cloud (Parte V) - Pig to

Introdução ao Oracle Big Data Cloud (Parte VI) - Hive

Esta é nossa quinta postagem sobre a série introdutória ao Oracle Big Data. Nesta postagem iremos explorar o “Apache Pig”. Trata-se de uma ferramenta/plataforma criada pela “Yahoo!” para lidar com grandes conjuntos de dados sem a complexidade de se escrever um programa MapReduce tradicional. Foi projetado para processar qualquer tipo de dado (estruturado ou não-estruturado), sendo uma excelente ferramenta para ETL. Pig vem instalado em pronto para ser utilizado com o “Oracle Big Data”. Nesta postagem mostraremos como podemos utilizar o pig para ler, interpretar e analisar dados.

O Pig possui uma linguagem de programação de alto nível SQL chamada Pig Latin. Precisamos aprender os conceitos básicos desta linguagem para poder usar Pig. Cada declaração em um script Pig é processada pelo interpretador Pig para construir um plano lógico que será usado para processar os jobs MapReduce. As etapas no plano lógico não são “executadas” até que uma instrução DUMP ou STORE seja utilizada.

Scripts Pig geralmente possuem a seguinte estrutura:

  • Os dados são lidos utilizando as instruções LOAD.
  • Os dados são transformados/processados.
  • O resultado é exibido (em tela) ou armazenado em um arquivo (ou uma tabela Hive).

Na publicação anterior, usamos PySpark (API Spark para Python) para carregar dados de voos e criamos uma análise muito simples usando o Spark SQL. Faremos o mesmo com os mesmos dados usando Pig.

O Pig possui um shell interativo chamado "Grunt". Ao invés de escrever comandos Pig Latin em um arquivo e executá-lo com o comando pig, também podemos inserir cada declaração no Grunt e executá-las interativamente. Para entrar no shell Grunt vou fazer login no meu primeiro nó, mudar para o usuário hdfs (para poder acessar os arquivos CSV que usei na minha postagem anterior) e depois iniciar o shell interativo do pig (Grunt) no terminal, através do comando “pig”:



 root@bilyonveri-bdcsce-1 ~]# sudo su  - hdfs 
  [hdfs@bilyonveri-bdcsce-1 ~]$ pig 
  WARNING: Use "yarn jar" to  launch YARN applications. 
  17/06/22 06:40:55 INFO  pig.ExecTypeProvider: Trying ExecType : LOCAL 
  17/06/22 06:40:55 INFO  pig.ExecTypeProvider: Trying ExecType : MAPREDUCE 
  17/06/22 06:40:55 INFO  pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType 
  2017-06-22 06:40:55,549 [main]  INFO  org.apache.pig.Main - Apache Pig  version 0.15.0 (r: unknown)
   compiled Apr 20 2017, 18:50:24 
  2017-06-22 06:40:55,549 [main]  INFO  org.apache.pig.Main - Logging error  messages to: 
  /var/lib/hadoop-hdfs/pig_1498128055548.log 
  2017-06-22 06:40:55,566 [main]  INFO  org.apache.pig.impl.util.Utils -  Default bootup file 
  /var/lib/hadoop-hdfs/.pigbootup not found 
  2017-06-22 06:40:55,916 [main]  INFO   org.apache.pig.backend.hadoop.executionengine.HExecutionEngine -  
  Connecting to hadoop file system at: hdfs://mycluster 
  2017-06-22 06:40:56,598 [main]  INFO  org.apache.pig.PigServer - Pig Script  ID for the session:
 PIG-default-43bfbd27-07c7-42ee-8070-8aa6af5e7646 
  2017-06-22 06:40:56,933 [main]  INFO   org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline 
   service address: http://bilyonveri-bdcsce-2.compute-trbilyonveri.oraclecloud.internal:8188/ws/v1/timeline/ 
  2017-06-22 06:40:57,020 [main]  INFO   org.apache.pig.backend.hadoop.ATSService - Created ATS Hook
  grunt> 2017-06-22 06:40:57,157  [ATS Logger 0] ERROR org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl
    - Failed to get the response from the timeline server. 
  2017-06-22 06:40:57,157 [ATS Logger  0] INFO   org.apache.pig.backend.hadoop.ATSService - Failed to submit plan 
  to ATS:  Failed to get the response from the timeline server.
  
  grunt> 

O Grunt também pode ser usado para executar comandos hdfs. Então, vou executar os seguintes comandos para ver se eu posso acessar os arquivos CSV (2008.csv, carriers.csv). Esses arquivos estão localizados na pasta /user/zeppelin porque usei o Zeppelin para fazer o download deles.



grunt> fs -ls /user/zeppelin/*.csv
  grunt> fs -tail  /user/zeppelin/carriers.csv

Quando executamos os comandos acima listamos os arquivos 2008.csv e carriers.csv e também as últimas linhas do arquivo carriers.csv. Então eu tenho certeza que meu script pig também poderá acessá-los.

Então, antes de tudo, eu preciso ler os dados desses arquivos CSV. A seguinte declaração indicará ao Pig para ler os dados de carriers.csv e declarar um conjunto de dados chamado "carriers_file";



carriers_file  = LOAD '/user/zeppelin/carriers.csv';

Em seguida, informamos ao LOAD que utilizaremos um arquivo CSV e que as linhas não são multi-line; que utilizaremos o padrão UNIX de EOL (fim de linha) e ignoraremos o cabeçalho. Não se preocupe pois estas são opções muito simples e comumente utilizadas, tudo o que precisamos para adicionar alguns parâmetros ao "engine".



carriers_file = LOAD  '/user/zeppelin/carriers.csv' 
  USING  org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'NO_MULTILINE', 'UNIX',
  'SKIP_INPUT_HEADER'); 	

Também podemos definir nosso schema enquanto carregando os dados.:



carriers_file = LOAD  '/user/zeppelin/carriers.csv'
  USING  org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'NO_MULTILINE', 'UNIX',
  'SKIP_INPUT_HEADER') 
  AS (Code:chararray,  Description:chararray); 	

Tenha cuidado com nomes de variáveis, pois são case sensitive. Nos scripts Pig, nomes de variáveis são case sensitive, enquanto palavras-chave em Pig Latin são case insensitive. Então, se quisermos utilizar carriers_file em próximas declarações, precisamos usar "carrier_file" (todas as minúsculas). Vou definir o schema durante o load pois o arquivo 2008.csv tem muitos campos que não utilizaremos, portanto, não quero defini-los.



carriers = FOREACH carriers_file GENERATE REPLACE($0, '"', '')  as Code, REPLACE($1, '"', '') as Description; 

Se você não definir o schema para um conjunto de dados, você pode acessar os campos usando as variáveis $0, $1, $n… Em nosso arquivo CSV, os campos estão entre aspas (“ZMZ”,”Winnipesaukee Aviation Inc.”), portanto, precisamos remover os caracteres de “aspas” utilizando o comando REPLACE. Para poder fazê-lo, preciso instruir ao Pig que processe cada linha no conjunto de dados "carrier_file". Os comandos FOREACH e GENERATE são utilizados para tal. Então, a instrução acima processa cada linha, remove os caracteres de aspas ‘”’ de $0 e $1 (primeira e segunda colunas) e as nomeia como Code e Description.

Como você verá, esses comandos não levarão muito tempo para processar, e eles não iniciarão nenhum trabalho real (job MapReduce). Se você quiser ver o conteúdo de um data set pode utilizar o comando DUMP. Mas primeiro, não se esqueça de filtrar os dados. Por exemplo, eu posso usar as seguintes instruções para obter as primeiras 10 linhas do data set "carriers":

div class="ocode">


first_10_carriers = LIMIT carriers  10; 

DUMP first_10_carriers 

Farei algo muito parecido com o data set 2008.csv, mas será um pouco mais complexo devido à quantidade de colunas. Também quero definir os tipos de variáveis para cada coluna:



 flights_file = LOAD  '/user/zeppelin/2008.csv' 
  USING org.apache.pig.piggybank.storage.CSVExcelStorage(',',  'NO_MULTILINE', 'UNIX',   'SKIP_INPUT_HEADER'); 
  
  flights = FOREACH flights_file  GENERATE 
  (chararray)$8 as Code:chararray, 
  (chararray)$22 as  CancellationCode:chararray, 
  (int)REPLACE($24, 'NA', '0') as  CarrierDelay:int, 
  (int)REPLACE($25, 'NA', '0') as  WeatherDelay:int, 
  (int)REPLACE($26, 'NA', '0') as  NASDelay:int, 
  (int)REPLACE($27, 'NA', '0') as  SecurityDelay:int, 
  (int)REPLACE($28, 'NA', '0') as  LateAircraftDelay:int;

Da última query, eu apenas utilizarei “CarrierDelay”, mas eu quis mostrar como podemos selecionar as outras colunas. Desta vez eu não removi as aspas, em vez disso, substituí NA por 0. Você deve estar se perguntando por quê coloquei um “(int)” antes dos comandos REPLACE, dessa forma:



  (int)REPLACE($24, 'NA', '0') as CarrierDelay:int, 

Eu digo ao Pig que o comando REPLACE processará o 25º campo em "flights_file", retornará um número inteiro e eu irei armazená-lo como CarrierDelay (uma coluna inteira). Eu defino os tipos de dados retornados pela função para prevenir problemas de conversão nos jobs do MapReduce. Essa é uma dica muito importante, especialmente se utilizamos uma versão do Pig anterior à 0.17.

Agora precisamos unir esses dois data sets. Então utilizamos o comando JOIN:



  (  flights_join = JOIN flights BY Code, carriers BY Code; 
 

Então, agrupamos os dados pelas colunas Code e Description:



 flight_grouped = GROUP flights_join by (flights::Code,  carriers::Description); 
 

Em meu artigo anterior, encontrei o total dos voos atrasados, os voos totais, o tempo de atraso médio (baseado em CarrierDelay), a proporção de voos atrasados/totais para cada companhia aérea e os ordenamos. Essa é uma tarefa muito fácil quando fazemos uso de SQL, no entanto, PIG exige que pensemos um pouco mais para realizarmos essa consulta:



 result = FOREACH flight_grouped { 
  Total =  COUNT(flights_join); 
  delayed_flights  = FILTER flights_join BY CarrierDelay > 0; 
  avg_CarrierDelay  = AVG(flights_join.CarrierDelay); 
  Delayed =  COUNT(delayed_flights); 
  DelayedTotalRatio  = (double)(double)(Delayed / (double)Total); 
  GENERATE  FLATTEN(group) AS (Code, Description), avg_CarrierDelay as avg_CarrierDelay,  
Delayed as Delayed, Total as Total, DelayedTotalRatio as DelayedTotalRatio; 
  } 
  
  ordered_result = ORDER result by  DelayedTotalRatio DESC; 

  DUMP ordered_result 
 
 

Deixe-me tentar explicar o código acima (os números de cada item representam os números das linhas do bloco anterior):

Linha 1) Eu defino "result" como o data set de resultado. O pig deve processar cada linha no data set "flight_grouped".

Linha 2) O número de voos para cada companhia aérea será contado e armazenado como Total.

Linha 3) Um novo data set para voos atrasados será criado.

Linha 4) O atraso médio será calculado e armazenado como avg_CarrierDelay.

Linha 5) O número de vôos atrasados será contado e armazenado como “Delayed”.

Linha 6) A relação de voos atrasados/totais é calculada e armazenada como “DelayedTotalRatio”.

Linha 7) A linha que contém o Código, Descrição, tempo de atraso médio, número de voos atrasados, número total de voos e a proporção é retornada como uma linha (para o data set de resultados).

Linha 10) Um novo data set (ordered_result) é gerado ordenando de forma descendente pela coluna DelayedTotalRation o data set de resultados.

Linha 12) O comando DUMP é usado para gerar o resultado (na tela). Isso iniciará o job MapReduce criado pelo Pig.

Este é o script completo (que você pode copiar e colar no grunt):



 carriers_file = LOAD  '/user/zeppelin/carriers.csv' 
  USING  org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'NO_MULTILINE', 'UNIX',  
  'SKIP_INPUT_HEADER'); 
  
  carriers = FOREACH carriers_file  GENERATE REPLACE($0, '"', '') as Code, REPLACE($1, '"', '') as  Description; 
  
  flights_file = LOAD  '/user/zeppelin/2008.csv'
  USING org.apache.pig.piggybank.storage.CSVExcelStorage(',',  'NO_MULTILINE', 'UNIX',
 'SKIP_INPUT_HEADER'); 
 


  flights = FOREACH flights_file  GENERATE 
  (chararray)$8 as Code:chararray, 
  (chararray)$22 as  CancellationCode:chararray, 
  (int)REPLACE($24, 'NA', '0') as  CarrierDelay:int, 
  (int)REPLACE($25, 'NA', '0') as  WeatherDelay:int, 
  (int)REPLACE($26, 'NA', '0') as  NASDelay:int, 
  (int)REPLACE($27, 'NA', '0') as  SecurityDelay:int, 
  (int)REPLACE($28, 'NA', '0') as  LateAircraftDelay:int;
  
  flights_join = JOIN flights BY Code,  carriers BY Code; 
  
  flight_grouped = GROUP flights_join  by (flights::Code, carriers::Description);
  
result = FOREACH flight_grouped { 
  Total =  COUNT(flights_join); 
  delayed_flights  = FILTER flights_join BY CarrierDelay > 0; 
  avg_CarrierDelay  = AVG(flights_join.CarrierDelay); 
  Delayed = COUNT(delayed_flights); 
  DelayedTotalRatio  = (double)(double)(Delayed / (double)Total); 
  GENERATE  FLATTEN(group) AS (Code, Description), avg_CarrierDelay as avg_CarrierDelay, 
 Delayed as Delayed, Total as Total, DelayedTotalRatio as DelayedTotalRatio; 
} 

  ordered_result = ORDER result by  DelayedTotalRatio DESC; 
  
  DUMP ordered_result 
 

Abaixo, o resultado das estatísticas produzidas pelo job MapReduce ao executar a última instrução (DUMP) do script:



2017-06-22 04:54:01,470 [main]  INFO   org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - Script
  Statistics: 

  HadoopVersion PigVersion        UserId  StartedAt          FinishedAt        Features
  2.7.1.2.4.2.0-258           0.15.0   hdfs     2017-06-22 04:51:52      2017-06-22 04:54:01     
   HASH_JOIN,GROUP_BY
 
  Success! 
  
  Job Stats (time in seconds): 
  JobId   Maps    Reduces           MaxMapTime    MinMapTime     AvgMapTime    MedianMapTime
          MaxReduceTime           MinReduceTime            AvgReduceTime           MedianReducetime
          Alias     Feature            Outputs 
  job_1497859064811_0022         7          1          18        3          13        15        37        37        
  37        37
     carriers,carriers_file,flights,flights_file,flights_joiHASH_JOIN     
  job_1497859064811_0023         3          1          12        9          10        10        36        36        
  36        36
     delayed_flights,flight_grouped,result    GROUP_BY      hdfs://mycluster/tmp/temp1852761250/tmp-1091973150,
  
  Input(s): 
  Successfully read 7009728 records  from: "/user/zeppelin/2008.csv" 
  Successfully read 1491 records from:  "/user/zeppelin/carriers.csv"
  
  Output(s): 
  Successfully stored 20 records (1271  bytes) in: "hdfs://mycluster/tmp/temp1852761250/tmp-1091973150" 
  
  Counters: 
  Total records written : 20 
  Total bytes written : 1271 
  Spillable Memory Manager spill count  : 0 
  Total bags proactively spilled: 2 
  Total records proactively spilled:  1005991 
  
  Job DAG: 
  job_1497859064811_0022         ->         job_1497859064811_0023, 
  job_1497859064811_0023 

O processo todo levou menos que 3 minutos, segue o resultado final:





(EV,Atlantic Southeast  Airlines,6.01967031987882,37398,280575,0.13329056402031542) 
  (YV,Mesa Airlines  Inc.,7.561840505236732,30855,254930,0.12103322480680971) 
  (AA,American Airlines  Inc.,4.6651958636765665,72843,604885,0.12042454350827017) 
  (F9,Frontier Airlines  Inc.,2.620778596938243,11264,95762,0.11762494517658362) 
  (B6,JetBlue  Airways,3.455645593117481,21451,196091,0.10939308790306541) 
  (OH,Comair  Inc.,5.44396706594402,21308,197607,0.10783018820183496) 
  (NW,Northwest Airlines Inc.,4.252088295191744,36800,347652,0.10585297941619781) 
  (UA,United Air Lines  Inc.,3.826679866077884,46847,449515,0.10421676695994572) 
  (WN,Southwest Airlines  Co.,1.8814183268788787,117195,1201754,0.09751995832757786) 
  (CO,Continental Air Lines  Inc.,3.3377259553366505,28618,298455,0.09588715216699335) 
  (XE,Expressjet Airlines  Inc.,3.252273637553069,34914,374510,0.09322581506501829) 
  (DL,Delta Air Lines  Inc.,3.1226448285247086,41296,451931,0.0913767809687762) 
  (MQ,American Eagle Airlines  Inc.,3.475229929915446,44592,490693,0.0908755576297196) 
  (US,US Airways Inc. (Merged with  America West 9/05. Reporting for both starting
  10/07.),2.4456832066033347,38134,453589,0.0840717036788811) 
  (AS,Alaska Airlines  Inc.,3.1887930007544574,12374,151102,0.08189170229381477) 
  (HA,Hawaiian Airlines  Inc.,2.9899233332255037,4718,61826,0.07631093714618445) 
  (OO,Skywest Airlines  Inc.,2.9589868096953413,39663,567159,0.06993277017555924) 
  (9E,Pinnacle Airlines  Inc.,3.464840889675372,16176,262208,0.06169148157188187) 
  (FL,AirTran Airways Corporation,1.6955870439155623,13943,261684,0.05328182082206019) 
  (AQ,Aloha Airlines  Inc.,1.1188461538461538,233,7800,0.029871794871794873) 




Pig Latin é uma linguagem muito poderosa para processar dados, mas ela requer uma forma de pensar diferente do SQL. Espero que esta postagem o ajude, pelo menos, para ter uma visão geral sobre o Pig.

Esperamos que este artigo tenha sido útil e te convidamos a ler nossas próximas publicações com o foco em Oracle Cloud.

p>Pig Latin é uma linguagem muito poderosa para processar dados, mas ela requer uma forma de pensar diferente do SQL. Espero que esta postagem o ajude, pelo menos, para ter uma visão geral sobre o Pig.

Esperamos que este artigo tenha sido útil e te convidamos a ler nossas próximas publicações com o foco em Oracle Cloud.

Outros artigos nesta série:
Introdução ao Oracle Big Data Cloud (Parte I)
Introdução ao Oracle Big Data Cloud (Parte II) - Serviços
Introdução ao Oracle Big Data Cloud (Parte III) - Ambari
Introdução ao Oracle Big Data Cloud (Parte IV) - Zeppelin
Introdução ao Oracle Big Data Cloud (Parte VI) - Hive

Lembre de nos seguir no LinkedIn:
Gokhan Atil 's LinkedIn: https://www.linkedin.com/in/gokhanatil/
Joel Perez’s LinkedIn ( +7.5k Contacts ): www.linkedin.com/in/SirDBaaSJoelPerez
Evandro Giachetto’s LinkedIn:  https://www.linkedin.com/in/evandrogiachetto/
Join Our LinkedIn Group:  Oracle Cloud DBaaS Até a próxima!

Gokhan Atil é um Administrador de Banco de Dados Senior comexperiênciaprática em Oracle, PostgreSQL, Microsoft SQL Server, MySQL e bancos de dados NoSQL. Ele é um Oracle Certified Professional (OCP) em EBS R12, Oracle 10g e 11g. Ele também é especialista em Oracle Enterprise Manager. Gokhan é membro fundador e vice presidente do Turkish Oracle UserGroup (TROUG). Elejápalestrou em váriasconferênciasinternacionaisincluindo o Oracle Open World. Ele é co-autor do livro "Expert Oracle Enterprise Manager 12c" e possuium blog relacionado a Oracle desde 2008: www.gokhanatil.com. Gokhanrecebeu o título de Oracle ACE em 2011 e Oracle ACE Director em 2016 por suacontribuições positivas à comunidade Oracle.

Joel Pérez é um DBA (Oracle ACE Director, Maximum Availability OCM, OCM Cloud Admin. & OCM12c/11g) Especialista com mais de 17 anos de experiência real no mundo da tecnologia Oracle, especializada na concepção e implementação de soluções: Nuvem, alta disponibilida de, recuperação de desastres, Upgrades, replicação e toda a área relacionada com bancos de dados Oracle. Joel serve como "Senior Database Cloud Architect" para en.Enmotech.com Yunhe ENMO (Beijing) Technology Co. Ltd. Beijing, China.
Conectar com Joel: Conecte-se com Joel’s Linkedin para ser atualizado com as informações mais notáveis relacionadas ao Oracle Cloud: www.linkedin.com/in/SirDBaaSJoelPerez & Joel Pérez’s Blog: https://community.oracle.com/blogs/Sir.DBaaSJoelPerez

Evandro Giachetto é um DBA Oracle com 12+ anos de experiência em tecnologia Oracle, graduado em Tecnologia da Informação e construiu a base de sua carreira atuando em projetos internacionais como Sprint, BT e Ericsson. É especialista em tecnologias de Alta Disponibilidade, possui uma paixão quase incontrolável por performance SQL, apoiada pelos anos de experiência como Desenvolvedor PL/SQL, e compartilha dessa paixão através do blog  dbaoracle.eti.br
Oracle Database Certified SQL Expert e Oracle Database Administrator Certified Associate.