Skip to content
Simon Urbanek edited this page Jan 15, 2014 · 1 revision

The aim of this document is to describe and explore the approaches to perform computation on Big Data in R.

There are two orthogonal issues here:

  1. The fundamental limitations imposed by the systems used. For example, Hadoop Map Reduce poses very specific requirements on the tasks in can perform.
  2. The range from implicit to explicit parallelization. For example, rmr and friends provide direct interface to Hadoop Map Reduce, foreach provides framework of expressing parallelizable tasks without any restrictions on the implementation and multicore provides parallel version of standard functions such as lapply.

Distributing computation alone is fairly well supported in R. foreach allows backend-independent specification of independent tasks. mclapply and clusterApply shows how function map (apply in R speak) can be spread across machines. The challenge here is the distribution of the initial state. In the case of typical SOCK cluster setup the state is broadcast explicitly, in case of Rserve it can be pre-loaded etc. The state is either implicit (multicore - the entire current state is available) or explicit (all others have a way of defining packages and objects to push).

The foreach paradigm implicitly relies on iterators which provide a powerful abstraction layer. Anything that can be expressed using iterators and has no side-effects can be parallelized.

In order to encourage re-use and participation of others, there are two steps:

  1. define an abstraction that is sufficiently back-end agnostic and general, yet encapsulates the main constraints
  2. implement parallel versions of generic R functions using this abstraction

For example, the map/reduce abstraction is too specific and limiting to use here. The concept of key/value pairs is not natural to Big Data. Interestingly, the main point of Hadoop is not really map/reduce -- it is the proximity of data and computation. In that sense there is a more general underlying concept of split + compute + combine.

Split + Compute + Combine (aka Divide/Recombine)

The workflow of is as follows:

  1. split data into chunks (this is typically done at data load time implicitly)
  2. perform a local operation X on the chunk
  3. combine results from 2. into a final result

This approach is sometimes called Divide/Recombine (D&R). Map/Reduce is a special case of this paradigm, where 2. consists of running a map step on each key/value pair and 3. is the reduce step.

Assuming that the split part is implicit (e.g. via HDFS), one can imagine implementations of simple function using D/R API like dr <- function(x, compute, combine, merge=c) such as:

dr.sum <- function(x) dr(x, sum, sum)
dr.table <- function(x) dr(x, table, function(x) tapply(x, names(x), sum))
dr.mean <- function(x) dr(x, function(x) c(sum(x), length(x)),
                             function(x) sum(x[1,]) / sum(x[2,]), merge=cbind)

The above D/R is very easy to implement for Hadoop input, for example, by simply using file chunks (or smaller if desired) as splits, map as compute and reduce as merge + combine. [Note: the separation of recombine steps into merge + combine is just a convenience of expression - they are both part of the recombine step].

Alternative ways may replace exact solutions, e.g., chunk-wise LM is an approximation of actual LM that can be good enough for practical use. This will also need theoretical methodology considerations to quantify the error associated with using such an approximation and it is commonly considered in current research.

RHipe (Map/Reduce framework for D/R), D/R LM approximation paper, datadr package with explicit divide/recombine functions -- NB: of those are Hadoop M/R specific

To facilitate the above concept, a notion of distributed objects is needed so that let's say the regular sum method can dispatch on the type of the object and call dr.sum automatically. This abstraction should be part of the design - e.g., a path in HDFS may represent a data frame as a distributed object, column subset operations create a virtual distributed vectors. The actual implementation defines the operation needed to convert the input stream into the necessary chunked R objects (e.g., parse ASCII, extract fields, convert content according to type). This back-end should be opaque to the operation performed, so it can be changed without impacting the user, e.g when using Hive which performs some operations itself. The same abstraction works for any other distributed system, including in-memory DSS.

Summary

We define a general API for divide/recombine tasks. It consists of an abstraction of distributed objects (distributed data frames, distributed vectors) which encapsulates the opaque information about the source and back-end. The second part is the API (see dr above) to define operations on such objects by arbitrary distribute/recombine closures. The result is a function that performs a given task in distributed fashion.

Given this API, we can define some basic functions/methods in a distributed way, proving the value and usability. Then others can contribute further methods - either as a direct replacement, or as new methodology (e.g. for models that cannot be parallelized easily).

Clone this wiki locally