How to use SparkR in Cloudera Hadoop

Christos - Iraklis Tsatsoulis Big Data, R, Spark 18 Comments

Suppose you are an avid R user, and you would like to use SparkR in Cloudera Hadoop; unfortunately, as of the latest CDH version (5.7), SparkR is still not supported (and, according to a recent discussion in the Cloudera forums, we shouldn’t expect this to happen anytime soon). Is there anything  you can do?

Well, indeed there is. In this post we will demonstrate how to use SparkR in a Cloudera Hadoop cluster. The assumptions are the following:

  • A Cloudera Hadoop cluster, with R installed in all worker nodes
  • A gateway node, through which you connect to the cluster to submit jobs, and in which you naturally have a user account (i.e. no superuser privileges needed)
Step 1: download Spark locally

The first step is to download Spark locally in your gateway home folder; this is very simple actually, and I have provided detailed instructions elsewhere. The best part? You don’t need to download a Spark version that matches the version in your CDH distribution; in our cluster, the CDH version is 5.6, which comes with Spark 1.5.0, while locally I have downloaded Spark 1.6.1, prebuilt for Hadoop 2.6. Here is my local home directory in my gateway:

[ctsats@dev-hd-01 ~]$ ll
total 282640
drwxrwxr-x   2 ctsats ctsats        52 Mar  1 18:16 kaggle
drwxr-xr-x.  3 ctsats ctsats        91 Apr 12 18:30 R
drwxrwxr-x   2 ctsats ctsats        25 Feb  5 13:21 scripts
drwxr-xr-x  12 ctsats ctsats      4096 Feb 27 07:02 spark-1.6.1-bin-hadoop2.6
-rw-rw-r--   1 ctsats ctsats 289405702 Apr 11 12:55 spark-1.6.1-bin-hadoop2.6.tgz
-rw-rw-r--.  1 ctsats ctsats        90 Feb  3 14:29 test.data
Step 2: run SparkR scripts locally from RStudio

Next, we will run a SparkR script locally from an RStudio session. Here is a simple example script, reading a CSV file from HDFS and printing its first elements (detailed explanations below):

Sys.setenv(HADOOP_CONF_DIR='/etc/hadoop/conf.cloudera.hdfs')
Sys.setenv(YARN_CONF_DIR='/etc/hadoop/conf.cloudera.yarn')

library(SparkR, lib.loc = "/home/ctsats/spark-1.6.1-bin-hadoop2.6/R/lib")
library(magrittr)

sc <- sparkR.init(sparkHome = "/home/ctsats/spark-1.6.1-bin-hadoop2.6",
                  sparkPackages="com.databricks:spark-csv_2.10:1.4.0")

sqlContext <- sparkRSQL.init(sc)

df <- read.df(sqlContext, 'recsys/data/interactions_clean.csv', "com.databricks.spark.csv", header="true") 
df %>% head %>% print

sparkR.stop()

Let’s see the script line by line:

  • In lines 1 & 2, we set the required Hadoop environment variables (you can obtain them from your cluster administrator)
  • In line 4, we load the SparkR package, providing its location in our local machine
  • Spark context is initialized in lines 7 & 8. In line 7, we provide the sparkHome directory, while in line 8 we include any additional Spark packages with the sparkPackages argument (in our case, the spark-csv external package)
  • In line 10, we initialize the sqlContext, necessary because we are going to use Spark dataframes
  • In line 12, we read the CSV file interactions_clean.csv (which includes a header) from the HDFS directory recsys/data/ using the spark-csv package. The result is a Spark dataframe df.
  • In line 13, we use the pipe operators from the R package magrittr (loaded in line 5), to print out the first elements of our Spark dataframe (including its header).
  • Finally, in line 15 we stop the Spark context.

We can run the script from RStudio, either the whole of it or line by line in an interactive manner. We can also save it (sparkr-test.R) and run it from the command line:

[ctsats@dev-hd-01 ~]$ Rscript 'R/sparkr_test.R'
[...]
  user_id item_id interaction_type
1       7 1006839                1
2       9  944146                3
3       9 1053485                1
4      13 2444782                1
5      23  501722                1
6      23 1305844                1
[...]

So far so good. But you may be thinking “OK, this is nothing new; you have just downloaded again Spark locally, and are using this to run SparkR. We already knew this, so what is the fuss about?

So, here comes the really nice stuff…

Step 3: submit SparkR applications to YARN

It turns out that you can indeed submit SparkR applications to YARN, despite the fact that SparkR is not available in your (Cloudera) Hadoop cluster. Here is how.

First, we need to make some changes to our script above; here is the new script, saved as sparkr-submit_test.R:

library(SparkR, lib.loc = "/home/ctsats/spark-1.6.1-bin-hadoop2.6/R/lib")
library(magrittr)

sc <- sparkR.init(appName = 'SparkR-submit-test',
                  sparkEnvir = list(spark.executor.cores='2',          # STRINGS here for spark-submit!!!
                                    spark.executor.instances='12'))
                
sqlContext <- sparkRSQL.init(sc)

df <- read.df(sqlContext, 'recsys/data/interactions_clean.csv', "com.databricks.spark.csv", header="true") 
df %>% head %>% print

sparkR.stop()

The changes in our new script are:

  • We don’t set the environment variables HADOOP_CONF_DIR and YARN_CONF_DIR in-script
  • We also don’t declare any additional Spark packages, like spark-csv
  • We include further configuration settings in our Spark context (lines 4-6)

Now, let’s try to submit this script to YARN; to do so, we have to:

  1. Set the necessary environment variables from the terminal
  2. Change our working directory to where our local Spark executables are, i.e. spark-1.6.1-bin-hadoop2.6/bin in our case
  3. Precede the spark-submit command with ./
  4. Include the spark-csv package in the command line

Here is the procedure and the results:

[ctsats@dev-hd-01 ~]$ export HADOOP_CONF_DIR=/etc/hadoop/conf.cloudera.hdfs
[ctsats@dev-hd-01 ~]$ export YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn
[ctsats@dev-hd-01 ~]$ cd spark-1.6.1-bin-hadoop2.6/bin/
[ctsats@dev-hd-01 bin]$ ./spark-submit --master yarn-client --driver-memory 2g --packages com.databricks:spark-csv_2.10:1.4.0 ~/R/sparkr-submit_test.R
[...]
16/04/22 19:20:04 INFO spark.SparkContext: Running Spark version 1.6.1
[...]
  user_id item_id interaction_type
1       7 1006839                1
2       9  944146                3
3       9 1053485                1
4      13 2444782                1
5      23  501722                1
6      23 1305844                1

As we can see, not only the job is submitted to YARN and executed successfully, but it also used Spark 1.6.1 (highlighted line above), and not the existing Spark version in our Cloudera Hadoop cluster (1.5.0).

Notice also that neither SparkR nor magrittr R packages are installed in our cluster workers, the only requirement for them being an R installation.

Finally, we notice that we can use a similar approach to run other Spark components (e.g. PySpark jobs) in a Spark version newer than the one shipped with our Cloudera Hadoop version, which might also come handy (for instance, I have found a significant performance boost in some MLlib algorithms of Spark 1.6.1 compared to Spark 1.5.0).

Conclusions

We have presented a simple and straightforward way for running SparkR applications in a Cloudera Hadoop cluster, both locally and in YARN. Apart from R being present to the worker nodes, no modification to the cluster software is required, neither superuser privileges in the local working client machine (gateway). The approach demonstrated can also be used to “bypass” the Spark version shipped with Cloudera Hadoop (which usually lags behind the latest one) with a newer one, if necessary.

Christos - Iraklis Tsatsoulis

Christos - Iraklis Tsatsoulis

Christos - Iraklis is one of our resident Data Scientists. He holds advanced graduate degrees in applied mathematics, engineering, and computing. He has been awarded both Chartered Engineer and Chartered Manager status in the UK, as well as Master status in Kaggle.com due to "consistent and stellar results" in predictive analytics contests.
Christos - Iraklis Tsatsoulis

Latest posts by Christos - Iraklis Tsatsoulis (see all)

18
Leave a Reply

avatar
8 Comment threads
10 Thread replies
6 Followers
 
Most reacted comment
Hottest comment thread
10 Comment authors
AaronpraveenMalcolm McRobertsBobLuiz Fuzaro Recent comment authors
  Subscribe  
newest oldest most voted
Notify of
spary
Guest
spary

Thanks Christos for this. Would you be able to share inputs on how the setup would work against a secure (kerberos enabled) CDH cluster? Also, wondering how jobs developed on a local RStudio IDE on Windows can be pushed down to a remote YARN RM running on a Red Hat cluster.

robinisme2
Guest
robinisme2

Hi Christos, thanks for your sharing.
Would you like to share how to connect to hive and do some SQL query through sparkR? Thanks.

adam
Guest
adam

let me get this clear – you’ve to install r on all nodes of your cluster – for any non-trivial installation that’s more than you can count on your fingers!

Luiz Fuzaro
Guest
Luiz Fuzaro

Thanks Christos for inspiration.
Your recipe works with SparkR-2.0 too, with little adjusts, follow bellow.
cloudera –> 2.6.0-cdh5.7.0, rc00978c67b0d3fe9f3b896b5030741bd40bf541a
bin/spark-submit –master yarn-client –driver-memory 1g –num-executors 5 –packages com.databricks:spark-avro_2.11:3.0.0 ../sparkr-submit_test2.0.R

===================
sparkr-submit_test2.0.R
==================
library(SparkR, lib.loc = “/home/admnet/spark-2.0.0-bin-hadoop2.6/R/lib”)
library(magrittr)

sparkR.session(appName = ‘SparkR-submit-test’, sparkEnvir = list(spark.executor.cores=’4′, spark.executor.instances=’4′))

df % head %>% print
createOrReplaceTempView(df, “voos”)
airport <- sql("select Year,Dest, count(*) as cnt from voos group by Year, Dest order by Dest, cnt desc")
result <- head(airport,15000)
write.csv(result,"airport9.agg")

Using a RITA dataset from U.S. Air Traffic
Nice lab!

Bob
Guest
Bob

Hi,

I received this error when trying to run the stuff above :

Error: sparkr.zip does not exist for R application in YARN mode.

Do you know how to fix this?

praveen
Guest
praveen

hi bob,
i face this same error on spark2. were you able to resolve it?
thanks.

luiz fuzaro
Guest
luiz fuzaro

Enviroment variables are set ?
export YARN_CONF_DIR=”/etc/hadoop/conf”
export HADOOP_CONF_DIR=”/etc/hadoop/conf”

Aaron
Guest
Aaron

Hi Christos,
Any limitations you are aware of? I am very Hadoop savy person, but I like to know if this allows parallelization of ANY R analysis (excluding those impossible cases!).

Thanks