Manipulating Hive tables with Oracle R connectors for Hadoop

Christos - Iraklis Tsatsoulis Hadoop, Hive, Oracle R 2 Comments

In this post, we’ll have a look at how easy it is to manipulate Hive tables using Oracle R connectors for Hadoop (ORCH, presently known as Oracle R Advanced Analytics for Hadoop – ORAAH). We will use the weblog data from Athens Datathon 2015, which we have already loaded in a Hive table named weblogs, as described in more detail in our previous post. Along the way, we will report about the necessary map-reduce (MR) jobs triggered by the commands we issue in R, in order to give a more thorough picture of what is happening behind the scenes.

We first have to load the required library, and connect to Hive:

> library(ORCH)
Loading required package: OREbase

Attaching package: ‘OREbase’

The following objects are masked from ‘package:base’:

    cbind, data.frame, eval, interaction, order, paste, pmax, pmin, rbind, table

Loading required package: OREstats
Loading required package: MASS
Loading required package: ORCHcore
Loading required package: rJava
Oracle R Connector for Hadoop 2.5.0 (rev. 307)
Info: using native C base64 encoding implementation
Info: Hadoop distribution is Cloudera's CDH v5.4.0
Info: using auto-detected ORCH HAL v4.2
Info: HDFS workdir is set to "/user/oracle"
Warning: mapReduce checks are skipped due to "ORCH_MAPRED_CHECK"=FALSE
Warning: HDFS checks are skipped due to "ORCH_HDFS_CHECK"=FALSE
Info: Hadoop 2.6.0-cdh5.4.0 is up
Info: Sqoop 1.4.5-cdh5.4.0 is up
Info: OLH 3.4.0 is up
Info: loaded ORCH core Java library "orch-core-2.5.0-mr2.jar"
Loading required package: ORCHstats
> ore.connect(type="HIVE")

In contrast to what happens in the case of an Oracle database connection, the command ore.ls() for a Hive connection gives no results; we first have to explicitly attach the specific Hive tables with which we are going to work (in our case weblogs), and then use ore.attach() without arguments:

> ore.ls()
character(0)
> ore.sync(table="weblogs")
> ore.ls()
[1] "weblogs"
> ore.attach()

Let’s suppose now that we want to create two new tables from weblogs: the first one will include only the actions of “legitimate” registered users, i.e. all actions with a non-empty value of registereduserid, but without the anomalous user with registereduserid = "cdc75a725963530a598e71c70677926b9a5f1bc1" (see this post for details); the second table will contain only the actions of this phantom user, possibly in order to examine them in more detail further downstrean in our data pipeline. Let us start with the first table:

> anom <- 'cdc75a725963530a598e71c70677926b9a5f1bc1' 
> reg <- weblogs[weblogs$registereduserid != '' & weblogs$registereduserid != anom,] # triggers 2 MR jobs, gives no. of columns in RStudio viewer (13)

We can see that the command above looks indistinguishable from a similar command we would use for an ordinary R dataframe; behind the scenes, it triggers 2 MR jobs, and it also returns the number of columns of the respective ore.frame item. We can verify that no new MR job is triggered if we ask for the number of columns, but a new MR job is indeed triggered to return the number of rows (records):

> ncol(reg)  # no new MR job triggered
[1] 13
> class(reg)
[1] "ore.frame"
attr(,"package")
[1] "OREbase"
> nrow(reg)   # triggers MR job
[1] 418329

We can now proceed to create a new Hive table, weblogs_registered, containing only the actions of the “legitimate” registered users, and verify its creation as follows:

> ore.create(reg, table='weblogs_registered')  # triggers 2 MR jobs
> ore.exists("weblogs_registered") 
[1] TRUE

We can repeat the above procedure, this time for creating our second table weblogs_phantom, containing the actions of our anomalous user:

> phantom <- weblogs[weblogs$registereduserid == anom,] # triggers 2 MR jobs 
> nrow(phantom) # triggers MR job
[1] 147638
> ore.create(phantom, table='weblogs_phantom')  # triggers 2 MR jobs

Notice that the tables we create during a session are automatically added to our workspace, without needing to invoke again any ore.sync or ore.attach commands:

> ore.ls()
[1] "weblogs"            "weblogs_phantom"    "weblogs_registered"

Since we have created a Hive table with only ~ 150,000 rows which should be manageable in-memory, let’s pull it to a regular R dataframe for further detailed processing:

> phantom_df <- ore.pull(weblogs_phantom) # triggers MR job 
Warning message: 
ORE object has no unique key - using random order 
> class(phantom_df)
[1] "data.frame"
> head(phantom_df)
  sttp                    usersessionid                         registereduserid                                     pathurl
1  800 af1594b83274edbd633ce613464fffc9 cdc75a725963530a598e71c70677926b9a5f1bc1   /comparelists/292/toggle_item/287321.json
2  800 af1594b83274edbd633ce613464fffc9 cdc75a725963530a598e71c70677926b9a5f1bc1   /comparelists/292/toggle_item/419492.json
3  800 af1594b83274edbd633ce613464fffc9 cdc75a725963530a598e71c70677926b9a5f1bc1    /comparelists/292/toggle_item/93301.json
4  800 af1594b83274edbd633ce613464fffc9 cdc75a725963530a598e71c70677926b9a5f1bc1   /comparelists/292/toggle_item/344422.json
5  800 af1594b83274edbd633ce613464fffc9 cdc75a725963530a598e71c70677926b9a5f1bc1   /comparelists/292/toggle_item/344423.json
6  800 138552666a0d0216373941df880d8cce cdc75a725963530a598e71c70677926b9a5f1bc1 /comparelists/1384/toggle_item/3290612.json
  action            datetime query categoryid shopid productid refurl sku_id results
1    csa 2013-09-22 00:05:36                                                        
2    csa 2013-09-22 00:05:54                                                        
3    csa 2013-09-22 00:05:58                                                        
4    csa 2013-09-22 00:06:09                                                        
5    csa 2013-09-22 00:07:23                                                        
6    csa 2013-09-22 00:08:55                                                        

On a second thought, we realize that we don’t really need the registereduserid field in the weblogs_phantom table (it consists of just a single value); so, as a last demonstration, we will:

  • Drop the existing Hive table weblogs_phantom
  • Remove the registereduserid column from our local R dataframe
  • Upload the updated R dataframe to Hive as a new weblogs_phantom table

Here are the necessary R commands:

> ore.drop(table="weblogs_phantom")
> ore.exists("weblogs_phantom")
[1] FALSE
> phantom_df$registereduserid <- NULL 
> ncol(phantom_df)
[1] 12
> ore.create(phantom_df, table='weblogs_phantom') # no MR job triggered
> ore.ls()
[1] "weblogs"            "weblogs_phantom"    "weblogs_registered" 
> ncol(weblogs_phantom) # no MR job triggered
[1] 12

As we can see, no more MR jobs are triggered in the whole process.

What if the number of records of our phantom user were so large that they could not fit in memory? In such a case we could not pull the data in a local R dataframe, as we have done above. Nevertheless, we can still do this operation (i.e selecting a subset or rows and columns) simply by using the overloaded subset method of Oracle R on ore.frames, in a manner practically indistinguishable from a similar operation on ordinary R dataframes:

> ore.drop(table="weblogs_phantom")
> cc <- colnames(weblogs)
> cc
 [1] "sttp"             "usersessionid"    "registereduserid" "pathurl"          "action"           "datetime"        
 [7] "query"            "categoryid"       "shopid"           "productid"        "refurl"           "sku_id"          
[13] "results"     
> phantom <- subset(weblogs, registereduserid == anom, select=cc[cc != "registereduserid"]) # triggers 2 MR jobs 
> ncol(phantom)  # no MR job triggered
[1] 12
> nrow(phantom)  # triggers MR job
[1] 147638
> class(phantom)
[1] "ore.frame"
attr(,"package")
[1] "OREbase"
> ore.create(phantom, table = 'weblogs_phantom')
> ore.ls()
[1] "weblogs"            "weblogs_phantom"    "weblogs_registered" 

Summary

We have demonstrated how to perform elementary ETL in Hive by creating, dropping, and altering tables, without using one single statement of HiveQL, by utilizing the Oracle R connectors for Hadoop. We claim that the code snippets presented above have indeed a ‘pure-R’ feeling, and regular R users should find them very familiar and intuitive.

> ore.disconnect()
> sessionInfo()
Oracle Distribution of R version 3.1.1  (--)
Platform: x86_64-unknown-linux-gnu (64-bit)

locale:
 [1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C         LC_TIME=C            LC_COLLATE=C         LC_MONETARY=C       
 [6] LC_MESSAGES=C        LC_PAPER=C           LC_NAME=C            LC_ADDRESS=C         LC_TELEPHONE=C      
[11] LC_MEASUREMENT=C     LC_IDENTIFICATION=C 

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] ORCH_2.5.0      ORCHstats_2.5.0 ORCHcore_2.5.0  rJava_0.9-7     OREstats_1.4.1  MASS_7.3-33     OREbase_1.4.1  

loaded via a namespace (and not attached):
[1] tools_3.1.1
Christos - Iraklis Tsatsoulis

Christos - Iraklis Tsatsoulis

Christos - Iraklis is one of our resident Data Scientists. He holds advanced graduate degrees in applied mathematics, engineering, and computing. He has been awarded both Chartered Engineer and Chartered Manager status in the UK, as well as Master status in Kaggle.com due to "consistent and stellar results" in predictive analytics contests.
Christos - Iraklis Tsatsoulis

Latest posts by Christos - Iraklis Tsatsoulis (see all)

2
Leave a Reply

avatar
2 Comment threads
0 Thread replies
0 Followers
 
Most reacted comment
Hottest comment thread
2 Comment authors
pdtvHadoop Corporate Training Recent comment authors
  Subscribe  
newest oldest most voted
Notify of
Hadoop Corporate Training
Guest
Hadoop Corporate Training

thanks for sharing nice information…

pdtv
Guest
pdtv

Great Article