Map-Reduce with JDK8, Clojure, Scala, Spark, and yes, Lisp!

August 19, 2014

With the recent addition of Streams and Lambda expressions in JDK8,  a developer can finally adopt a functional programming style in Java, and, for example, implement algorithms that make use of the popular Map-Reduce design pattern natively with the standard Java SDK.

Starting with the examples provided by the JDK 8 Stream tutorial, let’s explore the use of functional programming and applications of the Map-Reduce design pattern.

First, let’s define a simple list-based collection of persons:

roster = new LinkedList<Person>();
roster.add(new Person("Lucas", Person.Gender.MALE, 30));
roster.add(new Person("Juliana", Person.Gender.FEMALE, 25));
roster.add(new Person("Gabriel", Person.Gender.MALE, 8));

Next, let’s associate two mapping functions to these collection, the first function filters each Person element by its gender, and another mapping function maps a Person to their name (i.e. one could think of this as a projection function):

roster.stream().
    filter(Person::isMale).
        forEach(e -> System.out.println(e.getName()));

The result of running this program is:

Lucas
Gabriel

Rather than iterating through the collection, we traverse its stream representation (i.e. roster.stream()). This decouples the application of our mapping functions from the actual collection data structure, therefore the function application can be done in different ways, for example, serially or concurrently. This is know as the characteristics of the stream and allows for the optimization of the mapping and reduction implementation.

The previous code made use of two mapping functions, the filter() and the forEach() functions. Both of these take a function as their arguments. This is what is known as high-order functions. In the case of the example, we do this in two different forms, by referencing to the isMale() method of the Person class, and by creating an anonymous function at the time it is needed. This latter form is known as a lambda abstraction, and in Java, takes the form of input-arguments -> function-body.

In our next example, let’s incorporate a reduction function into the mix:

double sum =
roster.stream()
    .filter(p -> p.getGender() == Person.Gender.MALE)
        .mapToInt(e -> e.getAge() /*Person::getAge*/)
            .reduce(0, (a, b) -> a + b);

The reduce() function takes a binary function, that is, a function that has two input arguments or operands. The binary function we use is sum. The first argument is the result of a previous sum function or 0 if it is the first application of sum. This allows us to associatively add together the elements of our stream. For example, if the stream has elements 1, 2, 3, 5, 8, then one particular instance of invocations of reduce is:

(0, 1) -> 1
(1, 2) -> 3
(3, 3) -> 6
(6, 5) -> 11
(11, 8) -> 19

As expected, the result of running the previous program is:

38.0

If one looks deeper into the JDK8 API, one can see that it has been really well thought of and organized. There are useful abstractions for Unit-returning (void) functions, called Consumers, Tuple-returning Producers, n-ary functions, etc.

Would one also say this is novel? I would hardly say so; consider how this same example can be implemented in Lisp, a language developed by John McCarthy in the late 1950s (that’s more than 60 years ago!).

First, let’s define our list-based Person container in Lisp:

(defparameter *roster* '((Lucas male 30)(Juliana female 25)(Gabriel male 8)))

And the equivalent of our isMale() method strictly as a function:

(defun ismale (x) (if (eq (second x) 'male) t nil))

And, finally, the equivalent of our map to int and reduce by summing:

(reduce '+
    (mapcar (lambda (x) (third x))
        (remove-if-not 'ismale *roster*)))

The prefix  form may strike some as odd, but the general structure is essentially the same. That is, the application of the reduce of ‘+’ to the result of a mapping to int to the result of a filter to the predicate isMale to the roster collection. In Lisp, a lambda abstraction is defined as lambda (input-arguments) (function-body).

One could even argue that the Lisp program is even cleaner (and more compact) than the Java one!

Lisp is not the only option. In fact, Java itself has at least another two options with Clojure and Scala, as demonstrated next:

Clojure:

(def *roster* '((Lucas male 30)(Juliana female 25)(Gabriel male 8)))
(defn ismale [x] (if (= (compare (second x) 'male) 0) true nil)
(reduce +
    (map (fn [x] (nth x 2))
        (filter ismale *roster*)))

Scala:

var roster = List(List("Lucas", "male", 30),List("Juliana", "female", 25),List("Gabriel", "male", 8))
def isMale(x:List[Any]) = {if (x(1) == "male") true else false}
roster.filter(isMale).
    map((x:List[Any]) => x(2).asInstanceOf[Int]).
        reduce((x:Int, y:Int) => x + y)

The syntax of a lambda expression in Clojure is fn [input-arguments] (function-body) and in Scala it is (input-arguments) => function-body.

Clojure has a closer resemblance to Lisp, where as Scala with its static typing and its infix form reminds us of the imperative programming style in Java.

If one is used to Big Data and Hadoop, then another option is Spark. For sake of completeness, here is the Spark version of the use-case:

val roster = sc.textFile("examples/Roster.txt") // space-delimeted text file with one Person by line
roster.filter(line => line.contains(" male")).
    map(line => new Integer(line.split(" ")(2))).
        reduce((a,b) => a + b)

Historically, whenever the Java SDK picks up a new API, it tends to surpass other existing forms of this same API. For example, see how java.util.logging is more predominant today than log4j. This is only natural, due to the easy availability of the SDK. Considering this, I wonder what will be the implications of java.util.stream to the Apache Hadoop API.

I would definitely appreciate being able to use the SDK to author a Map-Reduce application, and then simply configure its Characteristics to DISTRIBUTED to have it run over a Hadoop cluster, wouldn’t you?


Hadoop’s Programming Model

December 12, 2010

Hadoop is a Java implementation of Map-Reduce. Map-Reduce is a software architecture used to process large amounts of data, also know as “big data”, in a distributed fashion. It is based upon the idea of mapping data items into key and value pairs, which are grouped by the key and reduced into a single value. From a service perspective, Hadoop allows an application to map, group, and reduce data across a distributed cloud of machines, thus permitting the applications to process an enormous amount of data.

A common Hadoop application is the processing of data located in web sites. For example, let’s consider an application that counts the number of occurrences of a word in web pages. In other words, if we had 10 web pages, where each uses the word “hello” twice, then we expect the result to include the key and value pair: {“hello” -> 20}. This word counting application can be easily hosted in Hadoop by using Hadoop’s map and reduce services in the following form:

  1. Generate a map of word token to word occurrence for each word present in the input web pages. For example, if a page includes the word “hello” only once, we should generate the map entry {“hello” -> 1}.
  2. Reduce all maps that have been grouped together by Hadoop with the same key into a single map entry where the key is the word token and the value is the sum of all values in that group. In other words, Hadoop collects all maps that have the same key, that is, the same word token, and then groups them together providing the application with their values. The application is then responsible in reducing all values into a single item. For example, if step one generated the entries {“hello” -> 1} and {“hello” -> 2}, then we reduce these to a single entry {“hello” -> 3}.

Following, we have a walk-through of a simple scenario:

This is done by:

  • Load each web-page as input.

    web-page-1: “first web-page”

    web-page-2: “and the second and final web-page”

    • Map each input (i.e, page) into a collection of sequences (word, occurrences).

    {(first, 1), (web-page, 1)},

    {(and, 2), (the, 1), (second, 1), (final, 1), (web-page, 1)}

    • Group all sequences by ‘word’. Thus, the output will be collections in which all member sequences have the same ‘word’.

    {(first, 1)},

    {(web-page, 1), (web-page, 1)},

    {(and, 2)},

    {(the, 1)},

    {(second, 1)},

    {(final, 1)}

    • For each group, reduce to a single sequence by summing to together all word occurrences.

    {(first, 1)},

    {(web-page, 2)},

    {(and, 2)},

    {(the, 1)},

    {(second, 1)},

    {(final, 1)}

    • Store each sequence.

    We have described a word counting Hadoop application, the next task is to implement it using Hadoop’s programming model. Hadoop provides two basic programming models. The first one is a collection of Java classes, centered on a Mapper and Reducer interfaces. The application needs to extend a base class called MapReduceBase, and implement the Mapper and Reducer interfaces, specifying the data types of the input and output data. The application then registers its Mapper and Reducer classes into a Hadoop job, together with the distributed location of the input and output, and fires it away into the framework. The framework takes care of reading the data from the input location, calls back the Mapper and Reducer application classes when needed in a concurrent and distributed fashion, and writes the result to the output location.

    The second option is to use a domain language called Pig. Pig defines keywords such as FOREACH, GROUP, and GENERATE, which fit naturally into the map, group and reduce actions. Using Pig, a developer can write a Hadoop application in a matter of a few lines of code, almost as if writing a SQL query, although Pig is rather more imperative than declarative as SQL.

    map_result = FOREACH webpage GENERATE FLATTEN(count_word_occurrences(*)); 
    key_groups = GROUP map_result BY $0; 
    output = FOREACH key_groups GENERATE sum_word_occurrences(*);

    Hadoop is configured through XML configuration files. A good part of Hadoop is to deal with the distribution of jobs across a distributed file system; hence a large aspect of Hadoop’s configuration is related to configuring servers in the processing cloud.

    Hadoop is an excellent example of a newly created application framework targeted for the emergent problem presented by the web where we need to deal with mammoth amounts of data in a soft real-time fashion. As new problems arise, they will be accompanied by new solutions, some of which will certainly take the form of new development platforms, as Hadoop does.