In the previous posts, two groups of ways to implement parallel processing on a single machine are introduced. The first group is provided by the snow or parallel package and the functions are an extension of lapply() (LINK). The second group is based on an extension of the for construct (foreach, %dopar% and %:%). The foreach construct is provided by the foreach package while clusters are made and registered by the parallel and doParallel packages respectively (LINK). To conclude this series, three practical examples are discussed for comparison in this article.

Let’s get started.

The following packages are loaded at first. Note that the randomForest, rpart and ISLR packages are necessary for the second and third examples and they are loaded later.

1library(parallel)
2library(iterators)
3library(foreach)
4library(doParallel)

k-means clustering

This example is from McCallum and Weston (2012). It is originally created using clusterApply() in the snow package. Firstly a slight modification is made to be used with parLapplyLB() in the parallel package. Also a foreach construct is created for comparison.

According to the document,

  • the data given by x are clustered by the k-means method, which aims to partition the points into k groups such that the sum of squares from points to the assigned cluster centres is minimized

At the minimum, all data points are nearest to the cluster centres. The number of centers are specified by centers (4 in this example). The distance value is kept in tot.withinss. As initial clusters are randomly assigned at the beginning, fitting is performed multiple times and it is determined by nstart.

parallel package

The clusters are initialized by clusterEvalQ() as Boston data is available in the MASS package. A list of outputs are returned by parLapplyLB() and tot.withinss is extract by sapply(). The final outcome is what gives the minimum tot.withinss.

 1# parallel
 2split = detectCores()
 3eachStart = 25
 4
 5cl = makeCluster(split)
 6init = clusterEvalQ(cl, { library(MASS); NULL })
 7results = parLapplyLB(cl
 8                      ,rep(eachStart, split)
 9                      ,function(nstart) kmeans(Boston, 4, nstart=nstart))
10withinss = sapply(results, function(result) result$tot.withinss)
11result = results[[which.min(withinss)]]
12stopCluster(cl)
13
14result$tot.withinss
## [1] 1814438

foreach package

The corresponding implementation using the foreach package is shown below. An iterator object is created to repeat the individual nstart value for the number of clusters (iters). A funtion to combine the outcome is created (comb()), which just keeps the outcome that gives the minimum tot.withinss - as .combine doesn’t seem to allow an argument, this kind of modification would be necessary.

 1# foreach
 2split = detectCores()
 3eachStart = 25
 4# set up iterators
 5iters = iter(rep(eachStart, split))
 6# set up combine function
 7comb = function(res1, res2) {
 8  if(res1$tot.withinss < res2$tot.withinss) res1 else res2
 9}
10
11cl = makeCluster(split)
12registerDoParallel(cl)
13result = foreach(nstart=iters, .combine="comb", .packages="MASS") %dopar%
14  kmeans(Boston, 4, nstart=nstart)
15stopCluster(cl)
16
17result$tot.withinss
## [1] 1814438

random forest

This example is from foreach packages’s vignette.

According to the package document,

  • randomForest implements Breiman’s random forest algorithm (based on Breiman and Cutler’s original Fortran code) for classification and regression.

parallel package

x and y keep the predictors and response. A function (rf()) is created to implement the algorithm. If data has to be sent to each worker, it can be sent either by clusterCall() or by a function. If clusterApply() or clusterApplyLB() are used, the former should be used to reduce I/O operations time and it’d be alright to send by a function if parLapply() or parLapplyLB() are used - single I/O for each task split. (for details, see the first article) As the randomForest package provides a function to combine the objects (combine()), it is used in do.call(). Finally a confusion table is created.

 1## Random forest
 2library(randomForest)
 3
 4# parallel
 5rm(list = ls())
 6set.seed(1237)
 7x = matrix(runif(500), 100)
 8y = gl(2,50)
 9
10split = detectCores()
11eachTrees = 250
12# define function to fit random forest given predictors and response
13# data has to be sent to workers using this function
14rf = function(ntree, pred, res) {
15  randomForest(pred, res, ntree=ntree)
16}
17
18cl = makeCluster(split)
19clusterSetRNGStream(cl, iseed=1237)
20init = clusterEvalQ(cl, { library(randomForest); NULL })
21results = parLapplyLB(cl, rep(eachTrees, split), rf, pred=x, res=y)
22result = do.call("combine", results)
23stopCluster(cl)
24
25cm = table(data.frame(actual=y, fitted=result$predicted))
26cm
##       fitted
## actual  1  2
##      1 23 27
##      2 28 22

foreach package

randomForest() is directly used in the foreach construct and the returned outcomes are combined by combine() (.combine=“combine”). The fitting function has to be available in each worker and it is set by .packages=“randomForest”. As there are multiple argument in the combine function, it is necessary to set the multi-combine option to be TRUE (.multicombine=TRUE) - this option will be discussed further in the next example. As the above example, a confusion matrix is created at the end - both the results should be the same as the same streams of random numbers are set to be generated by clusterSetRNGStream().

 1# foreach
 2rm(list = ls())
 3set.seed(1237)
 4x = matrix(runif(500), 100)
 5y = gl(2,50)
 6
 7split = detectCores()
 8eachTrees = 250
 9# set up iterators
10iters = iter(rep(eachTrees, split))
11
12cl = makeCluster(split)
13clusterSetRNGStream(cl, iseed=1237)
14registerDoParallel(cl)
15result = foreach(ntree=iters, .combine="combine", .multicombine=TRUE, .packages="randomForest") %dopar%
16  randomForest(x, y, ntree=ntree)
17stopCluster(cl)
18
19cm = table(data.frame(actual=y, fitted=result$predicted))
20cm
##       fitted
## actual  1  2
##      1 23 27
##      2 28 22

bagging

Although bagging can be implemented using the randomForest package, another quick implementation is tried using the rpart package for illustration (cartBGG()). Specifically bootstrap samples can be created for the number of trees specified by ntree. To simplify discussion, only the variable importance values are kept - an rpart object keeps this details in variable.importance. Therefore cartBGG() returns a list where its only element is a data frame where the number of rows is the same to the number of predictors and the number of columns is the same to the number of trees. In fact, cartBGG() is a constructor that generates a S3 object (rpartbgg).

 1## Bagging
 2rm(list = ls())
 3# update variable importance measure of bagged trees
 4cartBGG = function(formula, trainData, ntree=1, ...) {
 5  # extract response name and index
 6  res.name = gsub(" ","",unlist(strsplit(formula,split="~"))[[1]])
 7  res.ind = match(res.name, colnames(trainData))
 8  
 9  # variable importance - merge by 'var'
10  var.imp = data.frame(var=colnames(trainData[,-res.ind]))
11  require(rpart)
12  for(i in 1:ntree) {
13    # create in bag and out of bag sample
14    bag = sample(nrow(trainData), size=nrow(trainData), replace=TRUE)
15    inbag = trainData[bag,]
16    outbag = trainData[-bag,]
17    # fit model
18    mod = rpart(formula=formula, data=inbag, control=rpart.control(cp=0))
19    # set helper variables
20    colname = paste("s",i,sep=".")
21    pred.type = ifelse(class(trainData[,res.ind])=="factor","class","vector")
22    # merge variable importance
23    imp = data.frame(names(mod$variable.importance), mod$variable.importance)
24    colnames(imp) = c("var", colname)
25    var.imp = merge(var.imp, imp, by="var", all=TRUE)
26  }
27  # adjust outcome
28  rownames(var.imp) = var.imp[,1]
29  var.imp = var.imp[,2:ncol(var.imp)]
30  
31  # create outcome as a list
32  result=list(var.imp = var.imp)
33  class(result) = c("rpartbgg")
34  result
35}

If the total number of trees are split into clusters (eg into 4 clusters), there will be 4 lists and it is possible to combine them. Below is an example of such a function (comBGG()) - it just sums individual variable importance values.

Specifically

  1. arguments shouldn’t be dertermined () as the number of clusters can vary (an thus the number of lists)
  2. a list is created that binds the variable number of arguments (list(...))
  3. only the elements that keeps variable importance are extracted into another list by lapply()
  4. the new list of variable importance values can be restructured using do.call() and cbind()
  5. finally each row values are added by apply()
 1# combine variable importance of bagged trees
 2comBGG = function(...) {
 3  # add rpart objects in a list
 4  bgglist = list(...)
 5  # extract variable importance
 6  var.imp = lapply(bgglist, function(x) x$var.imp)
 7  # combine and sum by row
 8  var.imp = do.call("cbind", var.imp)
 9  var.imp = apply(var.imp, 1, sum, na.rm=TRUE)
10  var.imp
11}

parallel package

Similar to the above example, bagged trees are generated across clusters using cartBGG(). Then the result is combined by comBGG().

 1# data
 2library(ISLR)
 3data(Carseats)
 4
 5# parallel
 6split = detectCores()
 7eachTree = 250
 8
 9cl = makeCluster(split)
10clusterSetRNGStream(cl, iseed=1237)
11# rpart is required in cartBGG(), not need to load in each cluster
12results = parLapplyLB(cl
13                      ,rep(eachTree, split)
14                      ,cartBGG, formula="Sales ~ .", trainData=Carseats, ntree=eachTree)
15result = do.call("comBGG", results)
16stopCluster(cl)
17
18result
## Advertising         Age   CompPrice   Education      Income  Population 
##   295646.22   366140.36   514889.28   144642.19   271524.31   232959.06 
##       Price   ShelveLoc       Urban          US 
##   964252.07   978758.83    24794.35    80023.66

foreach package

This example is simplar to the random forest example. Note that .multicombine determines how many arguments are combined. For performance, the maximum number is 100 if the value is TRUE and 2 if FALSE by default (.maxcombine=if (.multicombine) 100 else 2). As there are more than 2 lists in this example, it should be set TRUE so that all lists generated by the clusters can be combined.

 1# foreach
 2split = detectCores()
 3eachTrees = 250
 4# set up iterators
 5iters = iter(rep(eachTrees, split))
 6
 7cl = makeCluster(split)
 8clusterSetRNGStream(cl, iseed=1237)
 9registerDoParallel(cl)
10# note .multicombine=TRUE as > 2 arguments
11# .maxcombine=if (.multicombine) 100 else 2
12result = foreach(ntree=iters, .combine="comBGG", .multicombine=TRUE) %dopar%
13  cartBGG(formula="Sales ~ .", trainData=Carseats, ntree=ntree)
14stopCluster(cl)
15
16result
## Advertising         Age   CompPrice   Education      Income  Population 
##   295646.22   366140.36   514889.28   144642.19   271524.31   232959.06 
##       Price   ShelveLoc       Urban          US 
##   964252.07   978758.83    24794.35    80023.66

Three practical examples of implementing parallel processing in a single machine are discussed in this post. They are relatively easy to implement and all existing packages can be used directly. I hope this series of posts are useful.