Outros artigos nesta série:
Introdução ao Oracle Big Data Cloud (Parte I)
Introdução ao Oracle Big Data Cloud (Parte II) - Serviços
Introdução ao Oracle Big Data Cloud (Parte III) - Ambari
Introdução ao Oracle Big Data Cloud (Parte IV) - Zeppelin
Introdução ao Oracle Big Data Cloud (Parte V) - Pig to
Introdução ao Oracle Big Data Cloud (Parte VI) - Hive
Esta é nossa quinta postagem sobre a série introdutória ao Oracle Big Data. Nesta postagem iremos explorar o “Apache Pig”. Trata-se de uma ferramenta/plataforma criada pela “Yahoo!” para lidar com grandes conjuntos de dados sem a complexidade de se escrever um programa MapReduce tradicional. Foi projetado para processar qualquer tipo de dado (estruturado ou não-estruturado), sendo uma excelente ferramenta para ETL. Pig vem instalado em pronto para ser utilizado com o “Oracle Big Data”. Nesta postagem mostraremos como podemos utilizar o pig para ler, interpretar e analisar dados.

O Pig possui uma linguagem de programação de alto nível SQL chamada Pig Latin. Precisamos aprender os conceitos básicos desta linguagem para poder usar Pig. Cada declaração em um script Pig é processada pelo interpretador Pig para construir um plano lógico que será usado para processar os jobs MapReduce. As etapas no plano lógico não são “executadas” até que uma instrução DUMP ou STORE seja utilizada.
Scripts Pig geralmente possuem a seguinte estrutura:
Na publicação anterior, usamos PySpark (API Spark para Python) para carregar dados de voos e criamos uma análise muito simples usando o Spark SQL. Faremos o mesmo com os mesmos dados usando Pig.
O Pig possui um shell interativo chamado "Grunt". Ao invés de escrever comandos Pig Latin em um arquivo e executá-lo com o comando pig, também podemos inserir cada declaração no Grunt e executá-las interativamente. Para entrar no shell Grunt vou fazer login no meu primeiro nó, mudar para o usuário hdfs (para poder acessar os arquivos CSV que usei na minha postagem anterior) e depois iniciar o shell interativo do pig (Grunt) no terminal, através do comando “pig”:
root@bilyonveri-bdcsce-1 ~]# sudo su - hdfs
[hdfs@bilyonveri-bdcsce-1 ~]$ pig
WARNING: Use "yarn jar" to launch YARN applications.
17/06/22 06:40:55 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
17/06/22 06:40:55 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE
17/06/22 06:40:55 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType
2017-06-22 06:40:55,549 [main] INFO org.apache.pig.Main - Apache Pig version 0.15.0 (r: unknown)
compiled Apr 20 2017, 18:50:24
2017-06-22 06:40:55,549 [main] INFO org.apache.pig.Main - Logging error messages to:
/var/lib/hadoop-hdfs/pig_1498128055548.log
2017-06-22 06:40:55,566 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file
/var/lib/hadoop-hdfs/.pigbootup not found
2017-06-22 06:40:55,916 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine -
Connecting to hadoop file system at: hdfs://mycluster
2017-06-22 06:40:56,598 [main] INFO org.apache.pig.PigServer - Pig Script ID for the session:
PIG-default-43bfbd27-07c7-42ee-8070-8aa6af5e7646
2017-06-22 06:40:56,933 [main] INFO org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline
service address: http://bilyonveri-bdcsce-2.compute-trbilyonveri.oraclecloud.internal:8188/ws/v1/timeline/
2017-06-22 06:40:57,020 [main] INFO org.apache.pig.backend.hadoop.ATSService - Created ATS Hook
grunt> 2017-06-22 06:40:57,157 [ATS Logger 0] ERROR org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl
- Failed to get the response from the timeline server.
2017-06-22 06:40:57,157 [ATS Logger 0] INFO org.apache.pig.backend.hadoop.ATSService - Failed to submit plan
to ATS: Failed to get the response from the timeline server.
grunt>
O Grunt também pode ser usado para executar comandos hdfs. Então, vou executar os seguintes comandos para ver se eu posso acessar os arquivos CSV (2008.csv, carriers.csv). Esses arquivos estão localizados na pasta /user/zeppelin porque usei o Zeppelin para fazer o download deles.
grunt> fs -ls /user/zeppelin/*.csv
grunt> fs -tail /user/zeppelin/carriers.csv
Quando executamos os comandos acima listamos os arquivos 2008.csv e carriers.csv e também as últimas linhas do arquivo carriers.csv. Então eu tenho certeza que meu script pig também poderá acessá-los.
Então, antes de tudo, eu preciso ler os dados desses arquivos CSV. A seguinte declaração indicará ao Pig para ler os dados de carriers.csv e declarar um conjunto de dados chamado "carriers_file";
carriers_file = LOAD '/user/zeppelin/carriers.csv';
Em seguida, informamos ao LOAD que utilizaremos um arquivo CSV e que as linhas não são multi-line; que utilizaremos o padrão UNIX de EOL (fim de linha) e ignoraremos o cabeçalho. Não se preocupe pois estas são opções muito simples e comumente utilizadas, tudo o que precisamos para adicionar alguns parâmetros ao "engine".
carriers_file = LOAD '/user/zeppelin/carriers.csv'
USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'NO_MULTILINE', 'UNIX',
'SKIP_INPUT_HEADER');
Também podemos definir nosso schema enquanto carregando os dados.:
carriers_file = LOAD '/user/zeppelin/carriers.csv'
USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'NO_MULTILINE', 'UNIX',
'SKIP_INPUT_HEADER')
AS (Code:chararray, Description:chararray);
Tenha cuidado com nomes de variáveis, pois são case sensitive. Nos scripts Pig, nomes de variáveis são case sensitive, enquanto palavras-chave em Pig Latin são case insensitive. Então, se quisermos utilizar carriers_file em próximas declarações, precisamos usar "carrier_file" (todas as minúsculas). Vou definir o schema durante o load pois o arquivo 2008.csv tem muitos campos que não utilizaremos, portanto, não quero defini-los.
carriers = FOREACH carriers_file GENERATE REPLACE($0, '"', '') as Code, REPLACE($1, '"', '') as Description;
Se você não definir o schema para um conjunto de dados, você pode acessar os campos usando as variáveis $0, $1, $n… Em nosso arquivo CSV, os campos estão entre aspas (“ZMZ”,”Winnipesaukee Aviation Inc.”), portanto, precisamos remover os caracteres de “aspas” utilizando o comando REPLACE. Para poder fazê-lo, preciso instruir ao Pig que processe cada linha no conjunto de dados "carrier_file". Os comandos FOREACH e GENERATE são utilizados para tal. Então, a instrução acima processa cada linha, remove os caracteres de aspas ‘”’ de $0 e $1 (primeira e segunda colunas) e as nomeia como Code e Description.
Como você verá, esses comandos não levarão muito tempo para processar, e eles não iniciarão nenhum trabalho real (job MapReduce). Se você quiser ver o conteúdo de um data set pode utilizar o comando DUMP. Mas primeiro, não se esqueça de filtrar os dados. Por exemplo, eu posso usar as seguintes instruções para obter as primeiras 10 linhas do data set "carriers":
div class="ocode">
first_10_carriers = LIMIT carriers 10;
DUMP first_10_carriers
Farei algo muito parecido com o data set 2008.csv, mas será um pouco mais complexo devido à quantidade de colunas. Também quero definir os tipos de variáveis para cada coluna:
flights_file = LOAD '/user/zeppelin/2008.csv'
USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'NO_MULTILINE', 'UNIX', 'SKIP_INPUT_HEADER');
flights = FOREACH flights_file GENERATE
(chararray)$8 as Code:chararray,
(chararray)$22 as CancellationCode:chararray,
(int)REPLACE($24, 'NA', '0') as CarrierDelay:int,
(int)REPLACE($25, 'NA', '0') as WeatherDelay:int,
(int)REPLACE($26, 'NA', '0') as NASDelay:int,
(int)REPLACE($27, 'NA', '0') as SecurityDelay:int,
(int)REPLACE($28, 'NA', '0') as LateAircraftDelay:int;
Da última query, eu apenas utilizarei “CarrierDelay”, mas eu quis mostrar como podemos selecionar as outras colunas. Desta vez eu não removi as aspas, em vez disso, substituí NA por 0. Você deve estar se perguntando por quê coloquei um “(int)” antes dos comandos REPLACE, dessa forma:
(int)REPLACE($24, 'NA', '0') as CarrierDelay:int,
Eu digo ao Pig que o comando REPLACE processará o 25º campo em "flights_file", retornará um número inteiro e eu irei armazená-lo como CarrierDelay (uma coluna inteira). Eu defino os tipos de dados retornados pela função para prevenir problemas de conversão nos jobs do MapReduce. Essa é uma dica muito importante, especialmente se utilizamos uma versão do Pig anterior à 0.17.
Agora precisamos unir esses dois data sets. Então utilizamos o comando JOIN:
( flights_join = JOIN flights BY Code, carriers BY Code;
Então, agrupamos os dados pelas colunas Code e Description:
flight_grouped = GROUP flights_join by (flights::Code, carriers::Description);
Em meu artigo anterior, encontrei o total dos voos atrasados, os voos totais, o tempo de atraso médio (baseado em CarrierDelay), a proporção de voos atrasados/totais para cada companhia aérea e os ordenamos. Essa é uma tarefa muito fácil quando fazemos uso de SQL, no entanto, PIG exige que pensemos um pouco mais para realizarmos essa consulta:
result = FOREACH flight_grouped {
Total = COUNT(flights_join);
delayed_flights = FILTER flights_join BY CarrierDelay > 0;
avg_CarrierDelay = AVG(flights_join.CarrierDelay);
Delayed = COUNT(delayed_flights);
DelayedTotalRatio = (double)(double)(Delayed / (double)Total);
GENERATE FLATTEN(group) AS (Code, Description), avg_CarrierDelay as avg_CarrierDelay,
Delayed as Delayed, Total as Total, DelayedTotalRatio as DelayedTotalRatio;
}
ordered_result = ORDER result by DelayedTotalRatio DESC;
DUMP ordered_result
Deixe-me tentar explicar o código acima (os números de cada item representam os números das linhas do bloco anterior):
Linha 1) Eu defino "result" como o data set de resultado. O pig deve processar cada linha no data set "flight_grouped".
Linha 2) O número de voos para cada companhia aérea será contado e armazenado como Total.
Linha 3) Um novo data set para voos atrasados será criado.
Linha 4) O atraso médio será calculado e armazenado como avg_CarrierDelay.
Linha 5) O número de vôos atrasados será contado e armazenado como “Delayed”.
Linha 6) A relação de voos atrasados/totais é calculada e armazenada como “DelayedTotalRatio”.
Linha 7) A linha que contém o Código, Descrição, tempo de atraso médio, número de voos atrasados, número total de voos e a proporção é retornada como uma linha (para o data set de resultados).
Linha 10) Um novo data set (ordered_result) é gerado ordenando de forma descendente pela coluna DelayedTotalRation o data set de resultados.
Linha 12) O comando DUMP é usado para gerar o resultado (na tela). Isso iniciará o job MapReduce criado pelo Pig.
Este é o script completo (que você pode copiar e colar no grunt):
carriers_file = LOAD '/user/zeppelin/carriers.csv'
USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'NO_MULTILINE', 'UNIX',
'SKIP_INPUT_HEADER');
carriers = FOREACH carriers_file GENERATE REPLACE($0, '"', '') as Code, REPLACE($1, '"', '') as Description;
flights_file = LOAD '/user/zeppelin/2008.csv'
USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'NO_MULTILINE', 'UNIX',
'SKIP_INPUT_HEADER');
flights = FOREACH flights_file GENERATE
(chararray)$8 as Code:chararray,
(chararray)$22 as CancellationCode:chararray,
(int)REPLACE($24, 'NA', '0') as CarrierDelay:int,
(int)REPLACE($25, 'NA', '0') as WeatherDelay:int,
(int)REPLACE($26, 'NA', '0') as NASDelay:int,
(int)REPLACE($27, 'NA', '0') as SecurityDelay:int,
(int)REPLACE($28, 'NA', '0') as LateAircraftDelay:int;
flights_join = JOIN flights BY Code, carriers BY Code;
flight_grouped = GROUP flights_join by (flights::Code, carriers::Description);
result = FOREACH flight_grouped {
Total = COUNT(flights_join);
delayed_flights = FILTER flights_join BY CarrierDelay > 0;
avg_CarrierDelay = AVG(flights_join.CarrierDelay);
Delayed = COUNT(delayed_flights);
DelayedTotalRatio = (double)(double)(Delayed / (double)Total);
GENERATE FLATTEN(group) AS (Code, Description), avg_CarrierDelay as avg_CarrierDelay,
Delayed as Delayed, Total as Total, DelayedTotalRatio as DelayedTotalRatio;
}
ordered_result = ORDER result by DelayedTotalRatio DESC;
DUMP ordered_result
Abaixo, o resultado das estatísticas produzidas pelo job MapReduce ao executar a última instrução (DUMP) do script:
2017-06-22 04:54:01,470 [main] INFO org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - Script
Statistics:
HadoopVersion PigVersion UserId StartedAt FinishedAt Features
2.7.1.2.4.2.0-258 0.15.0 hdfs 2017-06-22 04:51:52 2017-06-22 04:54:01
HASH_JOIN,GROUP_BY
Success!
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTime AvgMapTime MedianMapTime
MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime
Alias Feature Outputs
job_1497859064811_0022 7 1 18 3 13 15 37 37
37 37
carriers,carriers_file,flights,flights_file,flights_joiHASH_JOIN
job_1497859064811_0023 3 1 12 9 10 10 36 36
36 36
delayed_flights,flight_grouped,result GROUP_BY hdfs://mycluster/tmp/temp1852761250/tmp-1091973150,
Input(s):
Successfully read 7009728 records from: "/user/zeppelin/2008.csv"
Successfully read 1491 records from: "/user/zeppelin/carriers.csv"
Output(s):
Successfully stored 20 records (1271 bytes) in: "hdfs://mycluster/tmp/temp1852761250/tmp-1091973150"
Counters:
Total records written : 20
Total bytes written : 1271
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 2
Total records proactively spilled: 1005991
Job DAG:
job_1497859064811_0022 -> job_1497859064811_0023,
job_1497859064811_0023
O processo todo levou menos que 3 minutos, segue o resultado final:
(EV,Atlantic Southeast Airlines,6.01967031987882,37398,280575,0.13329056402031542)
(YV,Mesa Airlines Inc.,7.561840505236732,30855,254930,0.12103322480680971)
(AA,American Airlines Inc.,4.6651958636765665,72843,604885,0.12042454350827017)
(F9,Frontier Airlines Inc.,2.620778596938243,11264,95762,0.11762494517658362)
(B6,JetBlue Airways,3.455645593117481,21451,196091,0.10939308790306541)
(OH,Comair Inc.,5.44396706594402,21308,197607,0.10783018820183496)
(NW,Northwest Airlines Inc.,4.252088295191744,36800,347652,0.10585297941619781)
(UA,United Air Lines Inc.,3.826679866077884,46847,449515,0.10421676695994572)
(WN,Southwest Airlines Co.,1.8814183268788787,117195,1201754,0.09751995832757786)
(CO,Continental Air Lines Inc.,3.3377259553366505,28618,298455,0.09588715216699335)
(XE,Expressjet Airlines Inc.,3.252273637553069,34914,374510,0.09322581506501829)
(DL,Delta Air Lines Inc.,3.1226448285247086,41296,451931,0.0913767809687762)
(MQ,American Eagle Airlines Inc.,3.475229929915446,44592,490693,0.0908755576297196)
(US,US Airways Inc. (Merged with America West 9/05. Reporting for both starting
10/07.),2.4456832066033347,38134,453589,0.0840717036788811)
(AS,Alaska Airlines Inc.,3.1887930007544574,12374,151102,0.08189170229381477)
(HA,Hawaiian Airlines Inc.,2.9899233332255037,4718,61826,0.07631093714618445)
(OO,Skywest Airlines Inc.,2.9589868096953413,39663,567159,0.06993277017555924)
(9E,Pinnacle Airlines Inc.,3.464840889675372,16176,262208,0.06169148157188187)
(FL,AirTran Airways Corporation,1.6955870439155623,13943,261684,0.05328182082206019)
(AQ,Aloha Airlines Inc.,1.1188461538461538,233,7800,0.029871794871794873)
Pig Latin é uma linguagem muito poderosa para processar dados, mas ela requer uma forma de pensar diferente do SQL. Espero que esta postagem o ajude, pelo menos, para ter uma visão geral sobre o Pig.
Esperamos que este artigo tenha sido útil e te convidamos a ler nossas próximas publicações com o foco em Oracle Cloud.
p>Pig Latin é uma linguagem muito poderosa para processar dados, mas ela requer uma forma de pensar diferente do SQL. Espero que esta postagem o ajude, pelo menos, para ter uma visão geral sobre o Pig.Esperamos que este artigo tenha sido útil e te convidamos a ler nossas próximas publicações com o foco em Oracle Cloud.
Outros artigos nesta série:
Introdução ao Oracle Big Data Cloud (Parte I)
Introdução ao Oracle Big Data Cloud (Parte II) - Serviços
Introdução ao Oracle Big Data Cloud (Parte III) - Ambari
Introdução ao Oracle Big Data Cloud (Parte IV) - Zeppelin
Introdução ao Oracle Big Data Cloud (Parte VI) - Hive
Lembre de nos seguir no LinkedIn:
Gokhan Atil 's LinkedIn: https://www.linkedin.com/in/gokhanatil/
Joel Perez’s LinkedIn ( +7.5k Contacts ): www.linkedin.com/in/SirDBaaSJoelPerez
Evandro Giachetto’s LinkedIn: https://www.linkedin.com/in/evandrogiachetto/
Join Our LinkedIn Group: Oracle Cloud DBaaS
Até a próxima!
Gokhan Atil é um Administrador de Banco de Dados Senior comexperiênciaprática em Oracle, PostgreSQL, Microsoft SQL Server, MySQL e bancos de dados NoSQL. Ele é um Oracle Certified Professional (OCP) em EBS R12, Oracle 10g e 11g. Ele também é especialista em Oracle Enterprise Manager. Gokhan é membro fundador e vice presidente do Turkish Oracle UserGroup (TROUG). Elejápalestrou em váriasconferênciasinternacionaisincluindo o Oracle Open World. Ele é co-autor do livro "Expert Oracle Enterprise Manager 12c" e possuium blog relacionado a Oracle desde 2008: www.gokhanatil.com. Gokhanrecebeu o título de Oracle ACE em 2011 e Oracle ACE Director em 2016 por suacontribuições positivas à comunidade Oracle.
Joel Pérez é um DBA (Oracle ACE Director, Maximum Availability OCM, OCM Cloud Admin. & OCM12c/11g) Especialista com mais de 17 anos de experiência real no mundo da tecnologia Oracle, especializada na concepção e implementação de soluções: Nuvem, alta disponibilida de, recuperação de desastres, Upgrades, replicação e toda a área relacionada com bancos de dados Oracle. Joel serve como "Senior Database Cloud Architect" para en.Enmotech.com Yunhe ENMO (Beijing) Technology Co. Ltd. Beijing, China.
Conectar com Joel: Conecte-se com Joel’s Linkedin para ser atualizado com as informações mais notáveis relacionadas ao Oracle Cloud: www.linkedin.com/in/SirDBaaSJoelPerez & Joel Pérez’s Blog: https://community.oracle.com/blogs/Sir.DBaaSJoelPerez
Evandro Giachetto é um DBA Oracle com 12+ anos de experiência em tecnologia Oracle, graduado em Tecnologia da Informação e construiu a base de sua carreira atuando em projetos internacionais como Sprint, BT e Ericsson. É especialista em tecnologias de Alta Disponibilidade, possui uma paixão quase incontrolável por performance SQL, apoiada pelos anos de experiência como Desenvolvedor PL/SQL, e compartilha dessa paixão através do blog dbaoracle.eti.br
Oracle Database Certified SQL Expert e Oracle Database Administrator Certified Associate.