How to use or leverage Hive UDF classes in your Pig Latin Script?

In this Blog, let’s see how to leverage a Hive UDAF function in your Pig Latin Script.

I used HDP 2.3 version with Pig on Tez for this POC.

Make Sure you are using Pig Version 0.15

[root@sandbox ~]# pig version
WARNING: Use “yarn jar” to launch YARN applications.
15/08/26 01:42:59 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
15/08/26 01:42:59 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE
15/08/26 01:42:59 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType
2015-08-26 01:42:59,605 [main] INFO org.apache.pig.Main – Apache Pig version 0.15.0.2.3.0.0-2557 (rexported) compiled Jul 14 2015, 10:10:23
2015-08-26 01:42:59,606 [main] INFO org.apache.pig.Main – Logging error messages to: /root/pig_1440553379601.log
2015-08-26 01:43:02,035 [main] ERROR org.apache.pig.Main – ERROR 2997: Encountered IOException. File version does not exist
Details at logfile: /root/pig_1440553379601.log
2015-08-26 01:43:02,080 [main] INFO org.apache.pig.Main – Pig script completed in 2 seconds and 918 milliseconds (2918 ms)

Download the Sample Data as below
[root@sandbox hiveudfonpig]# wget http://seanlahman.com/files/database/lahman591-csv.zip
–2015-08-26 01:48:08– http://seanlahman.com/files/database/lahman591-csv.zip
Resolving seanlahman.com… 208.113.136.74
Connecting to seanlahman.com|208.113.136.74|:80… connected.
HTTP request sent, awaiting response… 200 OK
Length: 8337421 (8.0M) [application/zip]
Saving to: “lahman591-csv.zip”

100%[===========================================================================================================================================================>] 8,337,421 1.72M/s in 8.8s

2015-08-26 01:48:23 (926 KB/s) – “lahman591-csv.zip” saved [8337421/8337421]

Unzip the Downloaded Data:

[root@sandbox hiveudfonpig]# pwd
/home/giri/hiveudfonpig
[root@sandbox hiveudfonpig]# ls
lahman591-csv.zip
[root@sandbox hiveudfonpig]# unzip lahman591-csv.zip
Archive: lahman591-csv.zip
inflating: Managers.csv
inflating: ManagersHalf.csv
inflating: Pitching.csv
inflating: Salaries.csv
inflating: Schools.csv
inflating: SchoolsPlayers.csv
inflating: Teams.csv
inflating: TeamsFranchises.csv
inflating: TeamsHalf.csv
inflating: AllstarFull.csv
inflating: AwardsManagers.csv
inflating: AwardsPlayers.csv
inflating: AwardsShareManagers.csv
inflating: AwardsSharePlayers.csv
inflating: Batting.csv
inflating: Fielding.csv
inflating: FieldingOF.csv
inflating: readme59.txt
inflating: Master.csv
inflating: FieldingPost.csv
inflating: HallOfFame.csv
inflating: Appearances.csv
inflating: PitchingPost.csv
inflating: BattingPost.csv
inflating: SeriesPost.csv

Launch Pig on Tez as below:

[root@sandbox hiveudfonpig]# pig -x tez
WARNING: Use “yarn jar” to launch YARN applications.
15/08/26 01:50:51 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
15/08/26 01:50:51 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE
15/08/26 01:50:51 INFO pig.ExecTypeProvider: Trying ExecType : TEZ_LOCAL
15/08/26 01:50:51 INFO pig.ExecTypeProvider: Trying ExecType : TEZ
15/08/26 01:50:51 INFO pig.ExecTypeProvider: Picked TEZ as the ExecType
2015-08-26 01:50:52,113 [main] INFO org.apache.pig.Main – Apache Pig version 0.15.0.2.3.0.0-2557 (rexported) compiled Jul 14 2015, 10:10:23
2015-08-26 01:50:52,113 [main] INFO org.apache.pig.Main – Logging error messages to: /home/giri/hiveudfonpig/pig_1440553852108.log
2015-08-26 01:50:52,208 [main] INFO org.apache.pig.impl.util.Utils – Default bootup file /root/.pigbootup not found
2015-08-26 01:50:53,867 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine – Connecting to hadoop file system at: hdfs://sandbox.hortonworks.com:8020
grunt>

Let’s go through an example with Batting.csv file that is downloaded.

Please move the Batting.csv file to a HDFS directory.

[root@sandbox hiveudfonpig]# hdfs dfs -mkdir -p /user/giri/hiveudfonpig
[root@sandbox hiveudfonpig]# hdfs dfs -put Batting.csv /user/giri/hiveudfonpig
h[root@sandbox hiveudfonpig]# hdfs dfs -ls /user/giri/hiveudfonpig
Found 1 items
-rw-r–r– 1 root hdfs 6398990 2015-08-26 01:55 /user/giri/hiveudfonpig/Batting.csv

All set with the File Move.

The schema for this file is:
playerID,yearID,stint,teamID,lgID,G,G_batting,AB,R,H,B1,B2,HR,RBI,SB,CS,BB,SO,IBB,HBP,SH,SF,GIDP,G_old

For this sample analysis, let’s take all the attributes to be chararray data type.

Use case here is to find out the list of PlayerIDs and their corresponding lgIDs in one line tuples. We can achieve this in multiple ways. But see here how to do this using a Hive UDAF function called collect_set.

collect_set() UDAF in Hive:

Array Is the return type — > collect_set(col) — > Returns a set of objects with duplicate elements eliminated.

Pig Latin:

define collect_set_hiveudaf HiveUDAF(‘collect_set’);
a= load ‘/user/giri/hiveudfonpig/Batting.csv’ using PigStorage(‘,’) as (playerID:chararray,yearID:chararray,stint:chararray,teamID:chararray,lgID:chararray,G:chararray,G_batting:chararray,AB:chararray,R:chararray,H:chararray,B1:chararray,B2:chararray,HR:chararray,RBI:chararray,SB:chararray,CS:chararray,BB:chararray,SO:chararray,IBB:chararray,HBP:chararray,SH:chararray,SF:chararray,GIDP:chararray,G_old:chararray);
b = group a by playerID;
–Generate List of PlayerIDs and their corresponding lgIDs in One Line tuple
c = foreach b generate a.playerID,collect_set_hiveudaf(a.lgID);
grunt> describe c;
c: {{(playerID: chararray)},{()}}
d = limit c 100;
dump d;

{()} — > Returns the list of objects with duplicates eliminated.
Please note HiveUDF, HiveUDAF,HiveUDTF need to be used according to the use case.

Dump Output:

 HiveUDFonPig

How to Load the Data into ElasticSearch index through Pig

REGISTER /tmp/elasticsearch-hadoop-2.0.0.RC1.jar;

REGISTER /tmp/elasticsearch-hadoop-pig-2.0.0.RC1.jar;

load_pam_raw_data = LOAD '/QA//PAM/incremental/rawpamdata/2014-05-27/part-*' Using PigStorage('|') AS (AGNT_ID:chararray,LST_NM:chararray,MDL_NM:chararray,FRST_NM:chararray,PRF_LST_NM:chararray,PRF_MDL_NM:chararray,PRF_FRST_NM:chararray,AGNT_STATUS:chararray,STTS_EFFCTV_DT:chararray,AGNCY_ID:chararray,LCTN_ID:chararray,CHNL_TYP:chararray,AGNT_TYP_ID:chararray,AGNT_ADRS_LN1:chararray,AGNT_ADRS_LN2:chararray,AGNT_ST:chararray,AGNT_CITY_NM:chararray,AGNT_ZP_CD:chararray,BSNS_PHN:chararray);

STORE load_pam_raw_data INTO 'pig_pam_index/data' USING org.elasticsearch.hadoop.pig.EsStorage('es.nodes=node1:9200,node2:9200','es.index.auto.create = true');
if the ES index is already present, you need to give es.index.auto.create=false. Similar way, we can read the ES data through Pig which we will see in my next post.