In the previous posts, two groups of ways to implement parallel processing on a single machine are introduced. The first group is provided by the snow or parallel package and the functions are an extension of lapply()
(LINK). The second group is based on an extension of the for construct (foreach, %dopar% and %:%). The foreach construct is provided by the foreach package while clusters are made and registered by the parallel and doParallel packages respectively (LINK). To conclude this series, three practical examples are discussed for comparison in this article.
Let’s get started.
The following packages are loaded at first. Note that the randomForest, rpart and ISLR packages are necessary for the second and third examples and they are loaded later.
1library(parallel)
2library(iterators)
3library(foreach)
4library(doParallel)
k-means clustering
This example is from McCallum and Weston (2012). It is originally created using clusterApply()
in the snow package. Firstly a slight modification is made to be used with parLapplyLB()
in the parallel package. Also a foreach construct is created for comparison.
According to the document,
- the data given by x are clustered by the k-means method, which aims to partition the points into k groups such that the sum of squares from points to the assigned cluster centres is minimized
At the minimum, all data points are nearest to the cluster centres. The number of centers are specified by centers (4 in this example). The distance value is kept in tot.withinss. As initial clusters are randomly assigned at the beginning, fitting is performed multiple times and it is determined by nstart.
parallel package
The clusters are initialized by clusterEvalQ()
as Boston data is available in the MASS package. A list of outputs are returned by parLapplyLB() and tot.withinss is extract by sapply()
. The final outcome is what gives the minimum tot.withinss.
1# parallel
2split = detectCores()
3eachStart = 25
4
5cl = makeCluster(split)
6init = clusterEvalQ(cl, { library(MASS); NULL })
7results = parLapplyLB(cl
8 ,rep(eachStart, split)
9 ,function(nstart) kmeans(Boston, 4, nstart=nstart))
10withinss = sapply(results, function(result) result$tot.withinss)
11result = results[[which.min(withinss)]]
12stopCluster(cl)
13
14result$tot.withinss
## [1] 1814438
foreach package
The corresponding implementation using the foreach package is shown below. An iterator object is created to repeat the individual nstart value for the number of clusters (iters). A funtion to combine the outcome is created (comb()
), which just keeps the outcome that gives the minimum tot.withinss - as .combine doesn’t seem to allow an argument, this kind of modification would be necessary.
1# foreach
2split = detectCores()
3eachStart = 25
4# set up iterators
5iters = iter(rep(eachStart, split))
6# set up combine function
7comb = function(res1, res2) {
8 if(res1$tot.withinss < res2$tot.withinss) res1 else res2
9}
10
11cl = makeCluster(split)
12registerDoParallel(cl)
13result = foreach(nstart=iters, .combine="comb", .packages="MASS") %dopar%
14 kmeans(Boston, 4, nstart=nstart)
15stopCluster(cl)
16
17result$tot.withinss
## [1] 1814438
random forest
This example is from foreach packages’s vignette.
According to the package document,
- randomForest implements Breiman’s random forest algorithm (based on Breiman and Cutler’s original Fortran code) for classification and regression.
parallel package
x and y keep the predictors and response. A function (rf()
) is created to implement the algorithm. If data has to be sent to each worker, it can be sent either by clusterCall()
or by a function. If clusterApply()
or clusterApplyLB()
are used, the former should be used to reduce I/O operations time and it’d be alright to send by a function if parLapply()
or parLapplyLB()
are used - single I/O for each task split. (for details, see the first article) As the randomForest package provides a function to combine the objects (combine()
), it is used in do.call()
. Finally a confusion table is created.
1## Random forest
2library(randomForest)
3
4# parallel
5rm(list = ls())
6set.seed(1237)
7x = matrix(runif(500), 100)
8y = gl(2,50)
9
10split = detectCores()
11eachTrees = 250
12# define function to fit random forest given predictors and response
13# data has to be sent to workers using this function
14rf = function(ntree, pred, res) {
15 randomForest(pred, res, ntree=ntree)
16}
17
18cl = makeCluster(split)
19clusterSetRNGStream(cl, iseed=1237)
20init = clusterEvalQ(cl, { library(randomForest); NULL })
21results = parLapplyLB(cl, rep(eachTrees, split), rf, pred=x, res=y)
22result = do.call("combine", results)
23stopCluster(cl)
24
25cm = table(data.frame(actual=y, fitted=result$predicted))
26cm
## fitted
## actual 1 2
## 1 23 27
## 2 28 22
foreach package
randomForest()
is directly used in the foreach construct and the returned outcomes are combined by combine()
(.combine=“combine”). The fitting function has to be available in each worker and it is set by .packages=“randomForest”. As there are multiple argument in the combine function, it is necessary to set the multi-combine option to be TRUE (.multicombine=TRUE) - this option will be discussed further in the next example. As the above example, a confusion matrix is created at the end - both the results should be the same as the same streams of random numbers are set to be generated by clusterSetRNGStream()
.
1# foreach
2rm(list = ls())
3set.seed(1237)
4x = matrix(runif(500), 100)
5y = gl(2,50)
6
7split = detectCores()
8eachTrees = 250
9# set up iterators
10iters = iter(rep(eachTrees, split))
11
12cl = makeCluster(split)
13clusterSetRNGStream(cl, iseed=1237)
14registerDoParallel(cl)
15result = foreach(ntree=iters, .combine="combine", .multicombine=TRUE, .packages="randomForest") %dopar%
16 randomForest(x, y, ntree=ntree)
17stopCluster(cl)
18
19cm = table(data.frame(actual=y, fitted=result$predicted))
20cm
## fitted
## actual 1 2
## 1 23 27
## 2 28 22
bagging
Although bagging can be implemented using the randomForest package, another quick implementation is tried using the rpart package for illustration (cartBGG()
). Specifically bootstrap samples can be created for the number of trees specified by ntree. To simplify discussion, only the variable importance values are kept - an rpart object keeps this details in variable.importance. Therefore cartBGG()
returns a list where its only element is a data frame where the number of rows is the same to the number of predictors and the number of columns is the same to the number of trees. In fact, cartBGG()
is a constructor that generates a S3 object (rpartbgg).
1## Bagging
2rm(list = ls())
3# update variable importance measure of bagged trees
4cartBGG = function(formula, trainData, ntree=1, ...) {
5 # extract response name and index
6 res.name = gsub(" ","",unlist(strsplit(formula,split="~"))[[1]])
7 res.ind = match(res.name, colnames(trainData))
8
9 # variable importance - merge by 'var'
10 var.imp = data.frame(var=colnames(trainData[,-res.ind]))
11 require(rpart)
12 for(i in 1:ntree) {
13 # create in bag and out of bag sample
14 bag = sample(nrow(trainData), size=nrow(trainData), replace=TRUE)
15 inbag = trainData[bag,]
16 outbag = trainData[-bag,]
17 # fit model
18 mod = rpart(formula=formula, data=inbag, control=rpart.control(cp=0))
19 # set helper variables
20 colname = paste("s",i,sep=".")
21 pred.type = ifelse(class(trainData[,res.ind])=="factor","class","vector")
22 # merge variable importance
23 imp = data.frame(names(mod$variable.importance), mod$variable.importance)
24 colnames(imp) = c("var", colname)
25 var.imp = merge(var.imp, imp, by="var", all=TRUE)
26 }
27 # adjust outcome
28 rownames(var.imp) = var.imp[,1]
29 var.imp = var.imp[,2:ncol(var.imp)]
30
31 # create outcome as a list
32 result=list(var.imp = var.imp)
33 class(result) = c("rpartbgg")
34 result
35}
If the total number of trees are split into clusters (eg into 4 clusters), there will be 4 lists and it is possible to combine them. Below is an example of such a function (comBGG()
) - it just sums individual variable importance values.
Specifically
- arguments shouldn’t be dertermined (…) as the number of clusters can vary (an thus the number of lists)
- a list is created that binds the variable number of arguments (
list(...)
) - only the elements that keeps variable importance are extracted into another list by
lapply()
- the new list of variable importance values can be restructured using
do.call()
andcbind()
- finally each row values are added by
apply()
1# combine variable importance of bagged trees
2comBGG = function(...) {
3 # add rpart objects in a list
4 bgglist = list(...)
5 # extract variable importance
6 var.imp = lapply(bgglist, function(x) x$var.imp)
7 # combine and sum by row
8 var.imp = do.call("cbind", var.imp)
9 var.imp = apply(var.imp, 1, sum, na.rm=TRUE)
10 var.imp
11}
parallel package
Similar to the above example, bagged trees are generated across clusters using cartBGG()
. Then the result is combined by comBGG()
.
1# data
2library(ISLR)
3data(Carseats)
4
5# parallel
6split = detectCores()
7eachTree = 250
8
9cl = makeCluster(split)
10clusterSetRNGStream(cl, iseed=1237)
11# rpart is required in cartBGG(), not need to load in each cluster
12results = parLapplyLB(cl
13 ,rep(eachTree, split)
14 ,cartBGG, formula="Sales ~ .", trainData=Carseats, ntree=eachTree)
15result = do.call("comBGG", results)
16stopCluster(cl)
17
18result
## Advertising Age CompPrice Education Income Population
## 295646.22 366140.36 514889.28 144642.19 271524.31 232959.06
## Price ShelveLoc Urban US
## 964252.07 978758.83 24794.35 80023.66
foreach package
This example is simplar to the random forest example. Note that .multicombine determines how many arguments are combined. For performance, the maximum number is 100 if the value is TRUE and 2 if FALSE by default (.maxcombine=if (.multicombine) 100 else 2). As there are more than 2 lists in this example, it should be set TRUE so that all lists generated by the clusters can be combined.
1# foreach
2split = detectCores()
3eachTrees = 250
4# set up iterators
5iters = iter(rep(eachTrees, split))
6
7cl = makeCluster(split)
8clusterSetRNGStream(cl, iseed=1237)
9registerDoParallel(cl)
10# note .multicombine=TRUE as > 2 arguments
11# .maxcombine=if (.multicombine) 100 else 2
12result = foreach(ntree=iters, .combine="comBGG", .multicombine=TRUE) %dopar%
13 cartBGG(formula="Sales ~ .", trainData=Carseats, ntree=ntree)
14stopCluster(cl)
15
16result
## Advertising Age CompPrice Education Income Population
## 295646.22 366140.36 514889.28 144642.19 271524.31 232959.06
## Price ShelveLoc Urban US
## 964252.07 978758.83 24794.35 80023.66
Three practical examples of implementing parallel processing in a single machine are discussed in this post. They are relatively easy to implement and all existing packages can be used directly. I hope this series of posts are useful.
Comments