In the previous post, it is demonstrated how to start SparkR in local and cluster mode. While SparkR is in active development, it is yet to fully support Spark’s key libraries such as MLlib and Spark Streaming. Even, as a data processing engine, this R API is still limited as it is not possible to manipulate RDDs directly but only via Spark SQL/DataFrame API. As can be checked in the API doc, SparkR rebuilds many existing R functions to work with Spark DataFrame and notably it borrows some functions from the dplyr package. Also there are some alien functions (eg from_utc_timestamp()) and many of them are from Hive Query Language (HiveQL). In relation to those functions from HiveQL, although some Hive user defined functions (UDFs) are ported, still many useful UDFs and Window functions don’t exist.

In this circumstances, I consider one option to boost SparkR’s performance as a data processing engine is manipulating data in Hive Context rather than in limited SQL Context. There is good and bad news. The good one is existing Hive installation is not necessary to setup Hive Context and the other one is Spark has to be built from source with Hive. In this post, several examples of using Hive UDFs and Window functions are demonstrated, comparing to the dplyr package. Also a summary of Spark build with Hive is discussed.

SparkR in Hive Context

I tried in local mode on my Windows machine and here is how SparkR context (sc) is created - for details, see this post. Here the key difference is spark_home - this is where my pre-built Spark with Hive locates.

 1#### set up environment variables
 2base_path <- getwd()
 3## SPARK_HOME
 4#spark_home <- paste0(base_path, '/spark')
 5spark_home <- paste0(base_path, '/spark-1.6.0-bin-spark-1.6.0-bin-hadoop2.4-hive-yarn')
 6Sys.setenv(SPARK_HOME = spark_home)
 7## $SPARK_HOME/bin to PATH
 8spark_bin <- paste0(spark_home, '/bin')
 9Sys.setenv(PATH = paste(Sys.getenv(c('PATH')), spark_bin, sep=':')) 
10## HADOOP_HOME
11hadoop_home <- paste0(spark_home, '/hadoop') # hadoop-common missing on Windows
12Sys.setenv(HADOOP_HOME = hadoop_home) # hadoop-common missing on Windows
13
14#### extra driver jar to be passed
15postgresql_drv <- paste0(getwd(), '/postgresql-9.3-1103.jdbc3.jar')
16
17#### add SparkR to search path
18sparkr_lib <- paste0(spark_home, '/R/lib')
19.libPaths(c(.libPaths(), sparkr_lib))
20
21#### specify master host name or localhost
22spark_link <- "local[*]"
23
24library(magrittr)
25library(dplyr)
26library(SparkR)
27
28## include spark-csv package
29Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.3.0" "sparkr-shell"')
30
31sc <- sparkR.init(master = spark_link,
32                  sparkEnvir = list(spark.driver.extraClassPath = postgresql_drv),
33                  sparkJars = postgresql_drv) 
## Launching java with spark-submit command C:/workspace/sparkr-test/spark-1.6.0-bin-spark-1.6.0-bin-hadoop2.4-hive-yarn/bin/spark-submit.cmd --jars C:\workspace\sparkr-test\postgresql-9.3-1103.jdbc3.jar  --driver-class-path "C:/workspace/sparkr-test/postgresql-9.3-1103.jdbc3.jar" "--packages" "com.databricks:spark-csv_2.10:1.3.0" "sparkr-shell" C:\Users\jaehyeon\AppData\Local\Temp\Rtmp2HKUIN\backend_port295475933bd8

I set up both SQL and Hive contexts for comparison.

1sqlContext <- sparkRSQL.init(sc)
2sqlContext
## Java ref type org.apache.spark.sql.SQLContext id 1
1hiveContext <- sparkRHive.init(sc)
2hiveContext
## Java ref type org.apache.spark.sql.hive.HiveContext id 4

A synthetic sales data is used in the examples.

1sales <- data.frame(dealer = c(rep("xyz", 9), "abc"),
2                    make = c("highlander", rep("prius", 3), rep("versa", 3), "s3", "s3", "forrester"),
3                    type = c("suv", rep("hatch", 6), "sedan", "sedan", "suv"),
4                    day = c(0:3, 1:3, 1:2, 1), stringsAsFactors = FALSE)
5sales
##    dealer       make  type day
## 1     xyz highlander   suv   0
## 2     xyz      prius hatch   1
## 3     xyz      prius hatch   2
## 4     xyz      prius hatch   3
## 5     xyz      versa hatch   1
## 6     xyz      versa hatch   2
## 7     xyz      versa hatch   3
## 8     xyz         s3 sedan   1
## 9     xyz         s3 sedan   2
## 10    abc  forrester   suv   1

Basic data manipulation

The first example is a basic data manipulation, which counts the number of records per dealer, make and type.

1sales_s <- createDataFrame(sqlContext, sales)
2
3sales_s %>%
4  select(sales_s$dealer, sales_s$make, sales_s$type) %>%
5  group_by(sales_s$dealer, sales_s$make, sales_s$type) %>%
6  summarize(count = count(sales_s$dealer)) %>%
7  arrange(sales_s$dealer, sales_s$make) %>% head()
##   dealer       make  type count
## 1    abc  forrester   suv     1
## 2    xyz highlander   suv     1
## 3    xyz      prius hatch     3
## 4    xyz         s3 sedan     2
## 5    xyz      versa hatch     3

This kind of manipulation also works in Spark SQL after registering the RDD as a temporary table. Here another Spark DataFrame (sales_h) is created using Hive Context and the equivalent query is applied - this works in both SQL and Hive context.

1sales_h <- createDataFrame(hiveContext, sales)
2registerTempTable(sales_h, "sales_h")
3
4qry_h1 <- "SELECT dealer, make, type, count(*) AS count FROM sales_h GROUP BY dealer, type, make ORDER BY dealer, make"
5sql(hiveContext, qry_h1) %>% head()
##   dealer       make  type count
## 1    abc  forrester   suv     1
## 2    xyz highlander   suv     1
## 3    xyz      prius hatch     3
## 4    xyz         s3 sedan     2
## 5    xyz      versa hatch     3

Window function example

Window functions can be useful as data can be summarized by partition but they are not supported in SQL context. Here is an example of adding rank by the number of records per group, followed by dplyr equivalent. Note that some functions in the dplyr package are masked by the SparkR package so that their name space (dplyr) is indicated where appropriate.

1qry_h2 <- "
2SELECT dealer, make, type, rank() OVER (PARTITION BY dealer ORDER BY make, count DESC) AS rank FROM (
3  SELECT dealer, make, type, count(*) AS count FROM sales_h GROUP BY dealer, type, make
4) t"
5sql(hiveContext, qry_h2) %>% head()
##   dealer       make  type rank
## 1    abc  forrester   suv    1
## 2    xyz highlander   suv    1
## 3    xyz      prius hatch    2
## 4    xyz         s3 sedan    3
## 5    xyz      versa hatch    4
 1sales %>% dplyr::select(dealer, make, type) %>%
 2  dplyr::group_by(dealer, type, make) %>%
 3  dplyr::mutate(count = n()) %>%
 4  dplyr::distinct(dealer, make, type) %>%
 5  dplyr::arrange(dealer, -count) %>%
 6  dplyr::ungroup() %>% 
 7  dplyr::arrange(dealer, make) %>%
 8  dplyr::group_by(dealer) %>% 
 9  dplyr::mutate(rank = row_number()) %>%
10  dplyr::select(-count)
## Source: local data frame [5 x 4]
## Groups: dealer [2]
## 
##   dealer       make  type  rank
##    (chr)      (chr) (chr) (int)
## 1    abc  forrester   suv     1
## 2    xyz highlander   suv     1
## 3    xyz      prius hatch     2
## 4    xyz         s3 sedan     3
## 5    xyz      versa hatch     4

The next window function example is adding cumulative counts per dealer and make.

1qry_h3 <- "SELECT dealer, make, count, SUM(count) OVER (PARTITION BY dealer ORDER BY dealer, make) as cumsum FROM (
2  SELECT dealer, make, count(*) AS count FROM sales_h GROUP BY dealer, make
3) t"
4sql(hiveContext, qry_h3) %>% head()
##   dealer       make count cumsum
## 1    abc  forrester     1      1
## 2    xyz highlander     1      1
## 3    xyz      prius     3      4
## 4    xyz         s3     2      6
## 5    xyz      versa     3      9
1sales %>% dplyr::select(dealer, make) %>%
2  dplyr::group_by(dealer, make) %>%
3  dplyr::mutate(count = n()) %>%
4  dplyr::distinct(dealer, make, count) %>%
5  dplyr::arrange(dealer, make) %>%
6  dplyr::ungroup() %>%
7  dplyr::group_by(dealer) %>%
8  dplyr::mutate(cumsum = cumsum(count))
## Source: local data frame [5 x 4]
## Groups: dealer [2]
## 
##   dealer       make count cumsum
##    (chr)      (chr) (int)  (int)
## 1    abc  forrester     1      1
## 2    xyz highlander     1      1
## 3    xyz      prius     3      4
## 4    xyz         s3     2      6
## 5    xyz      versa     3      9

UDF example

There are lots of useful UDFs in HiveQL and many of them are currently missing in the SparkR package. Here collect_list() is used for illustration where sales paths per dealer and type are created - for further details of this and other functions, see this language manual.

1qry_h4 <- "SELECT dealer, type, concat_ws(' > ', collect_list(make)) AS sales_order FROM (
2  SELECT dealer, day, type, make FROM sales_h ORDER BY dealer, type, day
3) t GROUP BY dealer, type ORDER BY dealer, type
4"
5sql(hiveContext, qry_h4) %>% head()
##   dealer  type                                   sales_order
## 1    abc   suv                                     forrester
## 2    xyz hatch prius > versa > prius > versa > prius > versa
## 3    xyz sedan                                       s3 > s3
## 4    xyz   suv                                    highlander
1sales %>% dplyr::arrange(dealer, type, day) %>%
2  dplyr::group_by(dealer, type) %>%
3  dplyr::summarise(sales_order = paste(make, collapse = " > ")) %>%
4  dplyr::arrange(dealer, type)
## Source: local data frame [4 x 3]
## Groups: dealer [2]
## 
##   dealer  type                                   sales_order
##    (chr) (chr)                                         (chr)
## 1    abc   suv                                     forrester
## 2    xyz hatch prius > versa > prius > versa > prius > versa
## 3    xyz sedan                                       s3 > s3
## 4    xyz   suv                                    highlander

Spark build with Hive

I built Spark with Hive in the latest LTS Ubuntu - Ubuntu 16.04 Xenial Xerus. I just used the default JAVA version and Scala 2.10.3. The source is built for Hadoop 2.4 (-Phadoop-2.4 and -Dhadoop.version=2.4.0) with YARN (-Pyarn) and Hive (-Phive and -Phive-thriftserver). I also selected to include SparkR (-Psparkr). See the official documentation for further details.

Here is a summary of steps followed.

  • Update packages
    • sudo apt-get update
  • Install JAVA and set JAVA_HOME
    • sudo apt-get install default-jdk
    • export JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64"
  • Install Scala 2.10.3
    • wget http://www.scala-lang.org/files/archive/scala-2.10.3.tgz
    • tar xvf scala-2.10.3.tgz
    • sudo mv scala-2.10.3 /usr/bin
    • sudo ln -s /usr/bin/scala-2.10.3 /usr/bin/scala
    • export PATH=$PATH:/usr/bin/scala/bin
  • Download Spark 1.6.0 and run make-distribution.sh
    • wget http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0.tgz
    • tar xvf spark-1.6.0.tgz
    • cd spark-1.6.0
    • export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
    • ./make-distribution.sh --name spark-1.6.0-bin-hadoop2.4-hive-yarn --tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Psparkr -Phive -Phive-thriftserver -DskipTests

The build was done in a VirtualBox guest where 2 cores and 8 GB of memory were allocated. After about 30 minutes, I was able to see the following output and the pre-built Spark source (spark-1.6.0-bin-spark-1.6.0-bin-hadoop2.4-hive-yarn.tgz).

I hope this post is useful.