In the previous posts, two groups of ways to implement parallel processing on a single machine are introduced. The first group is provided by the **snow** or **parallel** package and the functions are an extension of `lapply()`

(LINK). The second group is based on an extension of the *for* construct (*foreach*, *%dopar%* and *%:%*). The *foreach* construct is provided by the *foreach* package while clusters are made and registered by the **parallel** and **doParallel** packages respectively (LINK). To conclude this series, three practical examples are discussed for comparison in this article.

Let’s get started.

The following packages are loaded at first. Note that the **randomForest**, **rpart** and **ISLR** packages are necessary for the second and third examples and they are loaded later.

```
1library(parallel)
2library(iterators)
3library(foreach)
4library(doParallel)
```

## k-means clustering

This example is from McCallum and Weston (2012). It is originally created using `clusterApply()`

in the **snow** package. Firstly a slight modification is made to be used with `parLapplyLB()`

in the **parallel** package. Also a *foreach* construct is created for comparison.

According to the document,

*the data given by x are clustered by the k-means method, which aims to partition the points into k groups such that the sum of squares from points to the assigned cluster centres is minimized*

At the minimum, all data points are nearest to the cluster centres. The number of centers are specified by *centers* (4 in this example). The distance value is kept in *tot.withinss*. As initial clusters are randomly assigned at the beginning, fitting is performed multiple times and it is determined by *nstart*.

### parallel package

The clusters are initialized by `clusterEvalQ()`

as *Boston* data is available in the **MASS** package. A list of outputs are returned by **parLapplyLB()** and *tot.withinss* is extract by `sapply()`

. The final outcome is what gives the minimum *tot.withinss*.

```
1# parallel
2split = detectCores()
3eachStart = 25
4
5cl = makeCluster(split)
6init = clusterEvalQ(cl, { library(MASS); NULL })
7results = parLapplyLB(cl
8 ,rep(eachStart, split)
9 ,function(nstart) kmeans(Boston, 4, nstart=nstart))
10withinss = sapply(results, function(result) result$tot.withinss)
11result = results[[which.min(withinss)]]
12stopCluster(cl)
13
14result$tot.withinss
```

```
## [1] 1814438
```

### foreach package

The corresponding implementation using the **foreach** package is shown below. An iterator object is created to repeat the individual *nstart* value for the number of clusters (*iters*). A funtion to combine the outcome is created (`comb()`

), which just keeps the outcome that gives the minimum *tot.withinss* - as *.combine* doesn’t seem to allow an argument, this kind of modification would be necessary.

```
1# foreach
2split = detectCores()
3eachStart = 25
4# set up iterators
5iters = iter(rep(eachStart, split))
6# set up combine function
7comb = function(res1, res2) {
8 if(res1$tot.withinss < res2$tot.withinss) res1 else res2
9}
10
11cl = makeCluster(split)
12registerDoParallel(cl)
13result = foreach(nstart=iters, .combine="comb", .packages="MASS") %dopar%
14 kmeans(Boston, 4, nstart=nstart)
15stopCluster(cl)
16
17result$tot.withinss
```

```
## [1] 1814438
```

## random forest

This example is from **foreach** packages’s vignette.

According to the package document,

*randomForest implements Breiman’s random forest algorithm (based on Breiman and Cutler’s original Fortran code) for classification and regression*.

### parallel package

*x* and *y* keep the predictors and response. A function (`rf()`

) is created to implement the algorithm. If data has to be sent to each worker, it can be sent either by `clusterCall()`

or by a function. If `clusterApply()`

or `clusterApplyLB()`

are used, the former should be used to reduce I/O operations time and it’d be alright to send by a function if `parLapply()`

or `parLapplyLB()`

are used - single I/O for each task split. (for details, see the first article) As the **randomForest** package provides a function to combine the objects (`combine()`

), it is used in `do.call()`

. Finally a confusion table is created.

```
1## Random forest
2library(randomForest)
3
4# parallel
5rm(list = ls())
6set.seed(1237)
7x = matrix(runif(500), 100)
8y = gl(2,50)
9
10split = detectCores()
11eachTrees = 250
12# define function to fit random forest given predictors and response
13# data has to be sent to workers using this function
14rf = function(ntree, pred, res) {
15 randomForest(pred, res, ntree=ntree)
16}
17
18cl = makeCluster(split)
19clusterSetRNGStream(cl, iseed=1237)
20init = clusterEvalQ(cl, { library(randomForest); NULL })
21results = parLapplyLB(cl, rep(eachTrees, split), rf, pred=x, res=y)
22result = do.call("combine", results)
23stopCluster(cl)
24
25cm = table(data.frame(actual=y, fitted=result$predicted))
26cm
```

```
## fitted
## actual 1 2
## 1 23 27
## 2 28 22
```

### foreach package

`randomForest()`

is directly used in the *foreach* construct and the returned outcomes are combined by `combine()`

(*.combine=“combine”*). The fitting function has to be available in each worker and it is set by *.packages=“randomForest”*. As there are multiple argument in the combine function, it is necessary to set the multi-combine option to be *TRUE* (*.multicombine=TRUE*) - this option will be discussed further in the next example. As the above example, a confusion matrix is created at the end - both the results should be the same as the same streams of random numbers are set to be generated by `clusterSetRNGStream()`

.

```
1# foreach
2rm(list = ls())
3set.seed(1237)
4x = matrix(runif(500), 100)
5y = gl(2,50)
6
7split = detectCores()
8eachTrees = 250
9# set up iterators
10iters = iter(rep(eachTrees, split))
11
12cl = makeCluster(split)
13clusterSetRNGStream(cl, iseed=1237)
14registerDoParallel(cl)
15result = foreach(ntree=iters, .combine="combine", .multicombine=TRUE, .packages="randomForest") %dopar%
16 randomForest(x, y, ntree=ntree)
17stopCluster(cl)
18
19cm = table(data.frame(actual=y, fitted=result$predicted))
20cm
```

```
## fitted
## actual 1 2
## 1 23 27
## 2 28 22
```

## bagging

Although bagging can be implemented using the **randomForest** package, another quick implementation is tried using the **rpart** package for illustration (`cartBGG()`

). Specifically bootstrap samples can be created for the number of trees specified by *ntree*. To simplify discussion, only the variable importance values are kept - an *rpart* object keeps this details in *variable.importance*. Therefore `cartBGG()`

returns a list where its only element is a data frame where the number of rows is the same to the number of predictors and the number of columns is the same to the number of trees. In fact, `cartBGG()`

is a constructor that generates a S3 object (*rpartbgg*).

```
1## Bagging
2rm(list = ls())
3# update variable importance measure of bagged trees
4cartBGG = function(formula, trainData, ntree=1, ...) {
5 # extract response name and index
6 res.name = gsub(" ","",unlist(strsplit(formula,split="~"))[[1]])
7 res.ind = match(res.name, colnames(trainData))
8
9 # variable importance - merge by 'var'
10 var.imp = data.frame(var=colnames(trainData[,-res.ind]))
11 require(rpart)
12 for(i in 1:ntree) {
13 # create in bag and out of bag sample
14 bag = sample(nrow(trainData), size=nrow(trainData), replace=TRUE)
15 inbag = trainData[bag,]
16 outbag = trainData[-bag,]
17 # fit model
18 mod = rpart(formula=formula, data=inbag, control=rpart.control(cp=0))
19 # set helper variables
20 colname = paste("s",i,sep=".")
21 pred.type = ifelse(class(trainData[,res.ind])=="factor","class","vector")
22 # merge variable importance
23 imp = data.frame(names(mod$variable.importance), mod$variable.importance)
24 colnames(imp) = c("var", colname)
25 var.imp = merge(var.imp, imp, by="var", all=TRUE)
26 }
27 # adjust outcome
28 rownames(var.imp) = var.imp[,1]
29 var.imp = var.imp[,2:ncol(var.imp)]
30
31 # create outcome as a list
32 result=list(var.imp = var.imp)
33 class(result) = c("rpartbgg")
34 result
35}
```

If the total number of trees are split into clusters (eg into 4 clusters), there will be 4 lists and it is possible to *combine* them. Below is an example of such a function (`comBGG()`

) - it just sums individual variable importance values.

Specifically

- arguments shouldn’t be dertermined (
*…*) as the number of clusters can vary (an thus the number of lists) - a list is created that binds the variable number of arguments (
`list(...)`

) - only the elements that keeps variable importance are extracted into another list by
`lapply()`

- the new list of variable importance values can be restructured using
`do.call()`

and`cbind()`

- finally each row values are added by
`apply()`

```
1# combine variable importance of bagged trees
2comBGG = function(...) {
3 # add rpart objects in a list
4 bgglist = list(...)
5 # extract variable importance
6 var.imp = lapply(bgglist, function(x) x$var.imp)
7 # combine and sum by row
8 var.imp = do.call("cbind", var.imp)
9 var.imp = apply(var.imp, 1, sum, na.rm=TRUE)
10 var.imp
11}
```

### parallel package

Similar to the above example, bagged trees are generated across clusters using `cartBGG()`

. Then the result is combined by `comBGG()`

.

```
1# data
2library(ISLR)
3data(Carseats)
4
5# parallel
6split = detectCores()
7eachTree = 250
8
9cl = makeCluster(split)
10clusterSetRNGStream(cl, iseed=1237)
11# rpart is required in cartBGG(), not need to load in each cluster
12results = parLapplyLB(cl
13 ,rep(eachTree, split)
14 ,cartBGG, formula="Sales ~ .", trainData=Carseats, ntree=eachTree)
15result = do.call("comBGG", results)
16stopCluster(cl)
17
18result
```

```
## Advertising Age CompPrice Education Income Population
## 295646.22 366140.36 514889.28 144642.19 271524.31 232959.06
## Price ShelveLoc Urban US
## 964252.07 978758.83 24794.35 80023.66
```

### foreach package

This example is simplar to the random forest example. Note that *.multicombine* determines how many arguments are combined. For performance, the maximum number is 100 if the value is *TRUE* and 2 if *FALSE* by default (*.maxcombine=if (.multicombine) 100 else 2*). As there are more than 2 lists in this example, it should be set *TRUE* so that all lists generated by the clusters can be combined.

```
1# foreach
2split = detectCores()
3eachTrees = 250
4# set up iterators
5iters = iter(rep(eachTrees, split))
6
7cl = makeCluster(split)
8clusterSetRNGStream(cl, iseed=1237)
9registerDoParallel(cl)
10# note .multicombine=TRUE as > 2 arguments
11# .maxcombine=if (.multicombine) 100 else 2
12result = foreach(ntree=iters, .combine="comBGG", .multicombine=TRUE) %dopar%
13 cartBGG(formula="Sales ~ .", trainData=Carseats, ntree=ntree)
14stopCluster(cl)
15
16result
```

```
## Advertising Age CompPrice Education Income Population
## 295646.22 366140.36 514889.28 144642.19 271524.31 232959.06
## Price ShelveLoc Urban US
## 964252.07 978758.83 24794.35 80023.66
```

Three practical examples of implementing parallel processing in a single machine are discussed in this post. They are relatively easy to implement and all existing packages can be used directly. I hope this series of posts are useful.

## Comments