Map-Reduce with JDK8, Clojure, Scala, Spark, and yes, Lisp!

August 19, 2014

With the recent addition of Streams and Lambda expressions in JDK8,  a developer can finally adopt a functional programming style in Java, and, for example, implement algorithms that make use of the popular Map-Reduce design pattern natively with the standard Java SDK.

Starting with the examples provided by the JDK 8 Stream tutorial, let’s explore the use of functional programming and applications of the Map-Reduce design pattern.

First, let’s define a simple list-based collection of persons:

roster = new LinkedList<Person>();
roster.add(new Person("Lucas", Person.Gender.MALE, 30));
roster.add(new Person("Juliana", Person.Gender.FEMALE, 25));
roster.add(new Person("Gabriel", Person.Gender.MALE, 8));

Next, let’s associate two mapping functions to these collection, the first function filters each Person element by its gender, and another mapping function maps a Person to their name (i.e. one could think of this as a projection function):

roster.stream().
    filter(Person::isMale).
        forEach(e -> System.out.println(e.getName()));

The result of running this program is:

Lucas
Gabriel

Rather than iterating through the collection, we traverse its stream representation (i.e. roster.stream()). This decouples the application of our mapping functions from the actual collection data structure, therefore the function application can be done in different ways, for example, serially or concurrently. This is know as the characteristics of the stream and allows for the optimization of the mapping and reduction implementation.

The previous code made use of two mapping functions, the filter() and the forEach() functions. Both of these take a function as their arguments. This is what is known as high-order functions. In the case of the example, we do this in two different forms, by referencing to the isMale() method of the Person class, and by creating an anonymous function at the time it is needed. This latter form is known as a lambda abstraction, and in Java, takes the form of input-arguments -> function-body.

In our next example, let’s incorporate a reduction function into the mix:

double sum =
roster.stream()
    .filter(p -> p.getGender() == Person.Gender.MALE)
        .mapToInt(e -> e.getAge() /*Person::getAge*/)
            .reduce(0, (a, b) -> a + b);

The reduce() function takes a binary function, that is, a function that has two input arguments or operands. The binary function we use is sum. The first argument is the result of a previous sum function or 0 if it is the first application of sum. This allows us to associatively add together the elements of our stream. For example, if the stream has elements 1, 2, 3, 5, 8, then one particular instance of invocations of reduce is:

(0, 1) -> 1
(1, 2) -> 3
(3, 3) -> 6
(6, 5) -> 11
(11, 8) -> 19

As expected, the result of running the previous program is:

38.0

If one looks deeper into the JDK8 API, one can see that it has been really well thought of and organized. There are useful abstractions for Unit-returning (void) functions, called Consumers, Tuple-returning Producers, n-ary functions, etc.

Would one also say this is novel? I would hardly say so; consider how this same example can be implemented in Lisp, a language developed by John McCarthy in the late 1950s (that’s more than 60 years ago!).

First, let’s define our list-based Person container in Lisp:

(defparameter *roster* '((Lucas male 30)(Juliana female 25)(Gabriel male 8)))

And the equivalent of our isMale() method strictly as a function:

(defun ismale (x) (if (eq (second x) 'male) t nil))

And, finally, the equivalent of our map to int and reduce by summing:

(reduce '+
    (mapcar (lambda (x) (third x))
        (remove-if-not 'ismale *roster*)))

The prefix  form may strike some as odd, but the general structure is essentially the same. That is, the application of the reduce of ‘+’ to the result of a mapping to int to the result of a filter to the predicate isMale to the roster collection. In Lisp, a lambda abstraction is defined as lambda (input-arguments) (function-body).

One could even argue that the Lisp program is even cleaner (and more compact) than the Java one!

Lisp is not the only option. In fact, Java itself has at least another two options with Clojure and Scala, as demonstrated next:

Clojure:

(def *roster* '((Lucas male 30)(Juliana female 25)(Gabriel male 8)))
(defn ismale [x] (if (= (compare (second x) 'male) 0) true nil)
(reduce +
    (map (fn [x] (nth x 2))
        (filter ismale *roster*)))

Scala:

var roster = List(List("Lucas", "male", 30),List("Juliana", "female", 25),List("Gabriel", "male", 8))
def isMale(x:List[Any]) = {if (x(1) == "male") true else false}
roster.filter(isMale).
    map((x:List[Any]) => x(2).asInstanceOf[Int]).
        reduce((x:Int, y:Int) => x + y)

The syntax of a lambda expression in Clojure is fn [input-arguments] (function-body) and in Scala it is (input-arguments) => function-body.

Clojure has a closer resemblance to Lisp, where as Scala with its static typing and its infix form reminds us of the imperative programming style in Java.

If one is used to Big Data and Hadoop, then another option is Spark. For sake of completeness, here is the Spark version of the use-case:

val roster = sc.textFile("examples/Roster.txt") // space-delimeted text file with one Person by line
roster.filter(line => line.contains(" male")).
    map(line => new Integer(line.split(" ")(2))).
        reduce((a,b) => a + b)

Historically, whenever the Java SDK picks up a new API, it tends to surpass other existing forms of this same API. For example, see how java.util.logging is more predominant today than log4j. This is only natural, due to the easy availability of the SDK. Considering this, I wonder what will be the implications of java.util.stream to the Apache Hadoop API.

I would definitely appreciate being able to use the SDK to author a Map-Reduce application, and then simply configure its Characteristics to DISTRIBUTED to have it run over a Hadoop cluster, wouldn’t you?