Map-Reduce with JDK8, Clojure, Scala, Spark, and yes, Lisp!

August 19, 2014

With the recent addition of Streams and Lambda expressions in JDK8,  a developer can finally adopt a functional programming style in Java, and, for example, implement algorithms that make use of the popular Map-Reduce design pattern natively with the standard Java SDK.

Starting with the examples provided by the JDK 8 Stream tutorial, let’s explore the use of functional programming and applications of the Map-Reduce design pattern.

First, let’s define a simple list-based collection of persons:

roster = new LinkedList<Person>();
roster.add(new Person("Lucas", Person.Gender.MALE, 30));
roster.add(new Person("Juliana", Person.Gender.FEMALE, 25));
roster.add(new Person("Gabriel", Person.Gender.MALE, 8));

Next, let’s associate two mapping functions to these collection, the first function filters each Person element by its gender, and another mapping function maps a Person to their name (i.e. one could think of this as a projection function):

roster.stream().
    filter(Person::isMale).
        forEach(e -> System.out.println(e.getName()));

The result of running this program is:

Lucas
Gabriel

Rather than iterating through the collection, we traverse its stream representation (i.e. roster.stream()). This decouples the application of our mapping functions from the actual collection data structure, therefore the function application can be done in different ways, for example, serially or concurrently. This is know as the characteristics of the stream and allows for the optimization of the mapping and reduction implementation.

The previous code made use of two mapping functions, the filter() and the forEach() functions. Both of these take a function as their arguments. This is what is known as high-order functions. In the case of the example, we do this in two different forms, by referencing to the isMale() method of the Person class, and by creating an anonymous function at the time it is needed. This latter form is known as a lambda abstraction, and in Java, takes the form of input-arguments -> function-body.

In our next example, let’s incorporate a reduction function into the mix:

double sum =
roster.stream()
    .filter(p -> p.getGender() == Person.Gender.MALE)
        .mapToInt(e -> e.getAge() /*Person::getAge*/)
            .reduce(0, (a, b) -> a + b);

The reduce() function takes a binary function, that is, a function that has two input arguments or operands. The binary function we use is sum. The first argument is the result of a previous sum function or 0 if it is the first application of sum. This allows us to associatively add together the elements of our stream. For example, if the stream has elements 1, 2, 3, 5, 8, then one particular instance of invocations of reduce is:

(0, 1) -> 1
(1, 2) -> 3
(3, 3) -> 6
(6, 5) -> 11
(11, 8) -> 19

As expected, the result of running the previous program is:

38.0

If one looks deeper into the JDK8 API, one can see that it has been really well thought of and organized. There are useful abstractions for Unit-returning (void) functions, called Consumers, Tuple-returning Producers, n-ary functions, etc.

Would one also say this is novel? I would hardly say so; consider how this same example can be implemented in Lisp, a language developed by John McCarthy in the late 1950s (that’s more than 60 years ago!).

First, let’s define our list-based Person container in Lisp:

(defparameter *roster* '((Lucas male 30)(Juliana female 25)(Gabriel male 8)))

And the equivalent of our isMale() method strictly as a function:

(defun ismale (x) (if (eq (second x) 'male) t nil))

And, finally, the equivalent of our map to int and reduce by summing:

(reduce '+
    (mapcar (lambda (x) (third x))
        (remove-if-not 'ismale *roster*)))

The prefix  form may strike some as odd, but the general structure is essentially the same. That is, the application of the reduce of ‘+’ to the result of a mapping to int to the result of a filter to the predicate isMale to the roster collection. In Lisp, a lambda abstraction is defined as lambda (input-arguments) (function-body).

One could even argue that the Lisp program is even cleaner (and more compact) than the Java one!

Lisp is not the only option. In fact, Java itself has at least another two options with Clojure and Scala, as demonstrated next:

Clojure:

(def *roster* '((Lucas male 30)(Juliana female 25)(Gabriel male 8)))
(defn ismale [x] (if (= (compare (second x) 'male) 0) true nil)
(reduce +
    (map (fn [x] (nth x 2))
        (filter ismale *roster*)))

Scala:

var roster = List(List("Lucas", "male", 30),List("Juliana", "female", 25),List("Gabriel", "male", 8))
def isMale(x:List[Any]) = {if (x(1) == "male") true else false}
roster.filter(isMale).
    map((x:List[Any]) => x(2).asInstanceOf[Int]).
        reduce((x:Int, y:Int) => x + y)

The syntax of a lambda expression in Clojure is fn [input-arguments] (function-body) and in Scala it is (input-arguments) => function-body.

Clojure has a closer resemblance to Lisp, where as Scala with its static typing and its infix form reminds us of the imperative programming style in Java.

If one is used to Big Data and Hadoop, then another option is Spark. For sake of completeness, here is the Spark version of the use-case:

val roster = sc.textFile("examples/Roster.txt") // space-delimeted text file with one Person by line
roster.filter(line => line.contains(" male")).
    map(line => new Integer(line.split(" ")(2))).
        reduce((a,b) => a + b)

Historically, whenever the Java SDK picks up a new API, it tends to surpass other existing forms of this same API. For example, see how java.util.logging is more predominant today than log4j. This is only natural, due to the easy availability of the SDK. Considering this, I wonder what will be the implications of java.util.stream to the Apache Hadoop API.

I would definitely appreciate being able to use the SDK to author a Map-Reduce application, and then simply configure its Characteristics to DISTRIBUTED to have it run over a Hadoop cluster, wouldn’t you?

Advertisements

Vectorized Functional Programing

January 27, 2014

There is a lot of new-born enthusiasm for functional programming, especially with the introduction of Lambda expressions in Java 8.

Functional programming is indeed quite interesting, but let me introduce this through an example.

Recently, I needed to calculate the correlation between a set of variables representing different characteristics of vehicles. For example, I wanted to find out if the weight (wt) of a vehicle influenced negatively (or positively) its miles per gallon (mpg), or if there is a relationship between the number of cylinders (cyl) of a vehicle and its horse-power (hp).

Having done that, I generated a table whose rows listed the variables that are strongly correlated. Here is an example of the output:

        x      y      corrCoef        prob
1  mpg   cyl            -0.85      6.11e-10
2  mpg disp           -0.85      9.38e-10
3  mpg   wt            -0.87      1.29e-10
4  cyl   disp             0.90      1.80e-12
5  cyl      hp            0.83       3.47e-09
6  cyl       vs          -0.81        1.84e-08
7 disp     wt           0.89        1.22e-11

My next task is to sort this table by decreasing order of correlation. In other words, I would like to see first those variables that correlate the most. Yet, note that it doesn’t matter if it is a positive (direct correlation as in the case of number of cylinders vs HP) or a negative (indirect correlation as in the case of mpg vs weight) correlation, in other words, I want to sort the absolute value of the correlation coefficients (column 3).

Here is where it starts to get interesting. Considering this table to be named as carsCorrTable, I can do this with the following expression:

carsCorrTable <- carsCorrTable[order(abs(carsCorrTable[,3]), decreasing=TRUE),]

Let’s distill this:

  1. abs(carsCorrTable[,3]): apply the absolute function to all members of the third column of carsCorrTable
  2. order(abs(carsCorrTable[,3], decreasing=TRUE): order in decreasing values
  3. carsCorrTable[order(abs(carsCorrTable[,3]), decreasing=TRUE),]: select the ordered rows
  4. carsCorrTable <- carsCorrTable[order(abs(carsCorrTable[,3]), decreasing=TRUE),]: reassign to the original table

Note the conciseness and powerfulness of the original expression! It all boils down to applying several functions to vectors of values. In other words, it is the combination of functional programming and vector-based programming together.

In my mind, a programming model has the right fluency for a task if you are able to code the task in an intuitive manner where you were sure it would not work the first time around, and yet it does! This is exactly what happened to me in this case, even though I hadn’t done this before, I was able to come up with the previous expression rather quickly and (perhaps because of it) was expecting to have to spend the next few hours debugging it, yet it simply worked as expected.

Consider how you would have to do this using your general imperative language. You would need to iterate through each correlation coefficient and calculate its absolute value, then again iterate through the result to sort them, and perhaps do a final iteration to assign the sorted values back to the table. I didn’t have the heart to implement this, but I bet you it wouldn’t look nice in Java 7, C++, etc.

Fortunately, Java 8 introduces lambda expressions and Iterable.forEach(), which should yield nicely clean programs as the previous one. Interestingly, note how in this example I used R, which has been available for over two decades now. (Granted, not sure it would have mattered to introduce these features before, would the community be matured enough to use it? Is it now?)

There is still one very interesting and fundamental question to this, which is how does it compare the execution time (i.e. time complexity) of running this small task in its vectorized functional form and in its imperative form. Would it be the same? Would the vectorized implementation be able to execute in parallel as expected, and therefore yield better results? Or would it eventually all boil down to nested iterations with some polynomial order of growth?


Hadoop’s Programming Model

December 12, 2010

Hadoop is a Java implementation of Map-Reduce. Map-Reduce is a software architecture used to process large amounts of data, also know as “big data”, in a distributed fashion. It is based upon the idea of mapping data items into key and value pairs, which are grouped by the key and reduced into a single value. From a service perspective, Hadoop allows an application to map, group, and reduce data across a distributed cloud of machines, thus permitting the applications to process an enormous amount of data.

A common Hadoop application is the processing of data located in web sites. For example, let’s consider an application that counts the number of occurrences of a word in web pages. In other words, if we had 10 web pages, where each uses the word “hello” twice, then we expect the result to include the key and value pair: {“hello” -> 20}. This word counting application can be easily hosted in Hadoop by using Hadoop’s map and reduce services in the following form:

  1. Generate a map of word token to word occurrence for each word present in the input web pages. For example, if a page includes the word “hello” only once, we should generate the map entry {“hello” -> 1}.
  2. Reduce all maps that have been grouped together by Hadoop with the same key into a single map entry where the key is the word token and the value is the sum of all values in that group. In other words, Hadoop collects all maps that have the same key, that is, the same word token, and then groups them together providing the application with their values. The application is then responsible in reducing all values into a single item. For example, if step one generated the entries {“hello” -> 1} and {“hello” -> 2}, then we reduce these to a single entry {“hello” -> 3}.

Following, we have a walk-through of a simple scenario:

This is done by:

  • Load each web-page as input.

    web-page-1: “first web-page”

    web-page-2: “and the second and final web-page”

    • Map each input (i.e, page) into a collection of sequences (word, occurrences).

    {(first, 1), (web-page, 1)},

    {(and, 2), (the, 1), (second, 1), (final, 1), (web-page, 1)}

    • Group all sequences by ‘word’. Thus, the output will be collections in which all member sequences have the same ‘word’.

    {(first, 1)},

    {(web-page, 1), (web-page, 1)},

    {(and, 2)},

    {(the, 1)},

    {(second, 1)},

    {(final, 1)}

    • For each group, reduce to a single sequence by summing to together all word occurrences.

    {(first, 1)},

    {(web-page, 2)},

    {(and, 2)},

    {(the, 1)},

    {(second, 1)},

    {(final, 1)}

    • Store each sequence.

    We have described a word counting Hadoop application, the next task is to implement it using Hadoop’s programming model. Hadoop provides two basic programming models. The first one is a collection of Java classes, centered on a Mapper and Reducer interfaces. The application needs to extend a base class called MapReduceBase, and implement the Mapper and Reducer interfaces, specifying the data types of the input and output data. The application then registers its Mapper and Reducer classes into a Hadoop job, together with the distributed location of the input and output, and fires it away into the framework. The framework takes care of reading the data from the input location, calls back the Mapper and Reducer application classes when needed in a concurrent and distributed fashion, and writes the result to the output location.

    The second option is to use a domain language called Pig. Pig defines keywords such as FOREACH, GROUP, and GENERATE, which fit naturally into the map, group and reduce actions. Using Pig, a developer can write a Hadoop application in a matter of a few lines of code, almost as if writing a SQL query, although Pig is rather more imperative than declarative as SQL.

    map_result = FOREACH webpage GENERATE FLATTEN(count_word_occurrences(*)); 
    key_groups = GROUP map_result BY $0; 
    output = FOREACH key_groups GENERATE sum_word_occurrences(*);

    Hadoop is configured through XML configuration files. A good part of Hadoop is to deal with the distribution of jobs across a distributed file system; hence a large aspect of Hadoop’s configuration is related to configuring servers in the processing cloud.

    Hadoop is an excellent example of a newly created application framework targeted for the emergent problem presented by the web where we need to deal with mammoth amounts of data in a soft real-time fashion. As new problems arise, they will be accompanied by new solutions, some of which will certainly take the form of new development platforms, as Hadoop does.


    New Point Release for Oracle CEP 11gR1

    May 6, 2010

    This week, Oracle announced the release of Oracle CEP 11gR1 11.1.1.3.

    Even though it is a point release, there are noteworthy improvements and features:

    Integration of CQL with Java

    CQL (or any other event processing language) allows the authoring of event processing applications at a higher level of abstraction, making them less suitable for dealing with low-level tasks, such as String manipulation, and other programming-in-the-small problems; and lack the richness of other programming language libraries (e.g. Java), which have been built over several years of usage.

    In this new release of Oracle CEP, we solve this problem by fully integrating the Java programming language into CQL. This is done at the type-system level, rather than through User-Defined Functions or call-outs, allowing the usage of Java classes (e.g. constructors, methods, fields) directly in CQL in a blended form.

    CQL and Java.jpg

    In this example, we make use of the Java class Date, by invoking its constructor, and then we invoke the instance method toString() on the new object.

    The JDK has several useful utility classes, such as Date, RegExp, and String, making it a perfect choice for CQL.

    Integration of CQL and Spatial

    Location tracking and CEP go hand-in-hand. One example of a spatial-related CEP application is automobile traffic monitoring, where the automobile location is received as a continuous event stream.

    Oracle CEP now supports the direct usage of spatial types (e.g. Geometry) and spatial functions in CQL, as shown by the next example, which verifies if “the current location of a bus is contained within a pre-determined arrival location”.

     

    CQL and Spatial.jpg

    One very important aspect of this integration is that indexing of the spatial types (e.g. Geometry) are also being handled in the appropriate form. In other words, not only a user is able to leverage the spatial package, but also OCEP takes care of using the right indexing mechanism for the spatial data, such as a R-tree instead of a hash-based index.

    High-Availability Adapters

    CEP applications are characterized by their quick response-time. This is also applicable for high-available CEP applications, hence a common approach for achieving high-availability in CEP systems is to use an active/active architecture.

    In the previous release of OCEP, several APIs were made available for OCEP developers to create their active/active HA CEP solutions.

    HA OCEP app.jpgIn this new release, we take a step further and provide several built-in adapters to aide in the creation of HA OCEP applications. Amongst these, there are HA adapters that synchronize time in the upstream portions of the EPN, and synchronize the events in the downstream portions of the EPN, as illustrated in the previous figure.

    Much More…

    There is much more, please check the documentation for the full set of features and their details, but here are other examples:

    • Visualizer’s Event Injection and Trace, which allows a user to easily and dynamically send and receive events into and from a runnign application without having to write any code
    • Manage the revision history of a OCEP configuration
    • Deploy OCEP application libraries, facilitating re-use of adapters, JDBC drivers, and event-types
    • Support for a WITHIN clause in CQL’s pattern matching to limit the amount of time to wait for a pattern to be detected
    • Create aliases in CQL, facilitating the creation and management of complex queries
    • Support for TABLE functions in CQL, thus allowing a query to invoke a function that returns a full set of rows (e.g. table)

    Memory management for real-time applications in Java

    May 22, 2006

    One of the main advantages of using Java is not to have to worry about disposing objects [1], that is, to let the Java runtime take care of the memory management of Java objects.

    This is done by letting the Java runtime garbage collect Java objects that are no longer being used.

    Garbage collection is a relatively complicated process. Generally, the Java runtime will traverse the heap, checking for objects that are no longer being referenced by any other objects, and thus can be safely deleted.

    However, as garbage collection uses CPU cycles, it may impact the execution of application code. That is, if during the execution of the application code, garbage collection is performed, the application code may take more time to respond. This causes the latency of the user transaction to increase. Even worse, as it is unknown to the user when a garbage collection may occur, the latency increase is unpredictable.

    Real-time applications have strict timing requirements, that is, they have to execute application code under some determined, known latency. Thus the unpredictable latency increase that may be caused by the garbage collection becomes a problem.

    What are the solutions to this problem? One obvious solution is not to use Java for real-time applications. This is a poor solution. Java brings a lot as a programming language and as a development platform; we should be able to solve this problem in Java.

    Another solution is to use a different memory management approach in Java instead of garbage collectors. RTSJ, the Real-Time Specification for Java, defines the concept of immortal memory, and scoped memory. Immortal memory is memory that is never garbage collected; it lives forever until the JVM is brought down. Scoped memory is memory that is allocated and released in chunks. That is, the user explicitly creates a scope of memory where the objects will live and the objects are released when the scope is exited or destroyed. In both cases of immortal memory and scoped memory, there is no need for garbage collection. However, there is a drawback, the onus of managing the memory has again moved back to the user, as it is the case for C/C++ applications. This still seems too high of a price to pay. Can we do better?

    Ok, so let’s re-consider garbage collection, the main problem with garbage collection is the unpredictable latency spikes it causes. Can we avoid this unpredictable behavior? Or rather, can we limit (i.e. bind) this unpredictable behavior? Yes, by doing garbage collection more often and consistently, we can bind the maximum latency pause time. This is the approach taken by WLRT. Thus, garbage collection becomes a predictable task, with a known cost, which can be considered and modeled by the real-time developer as needed. And, most importantly, we don’t sacrifice Java’s easy of use.

    [1] it should be noted that Java programs can still have memory leaks, in spite of garbage collection or any other memory management technique. For instance, by not removing component objects from containers, when they are no longer needed.


    JBI vocabulary

    October 10, 2005

    JBI is relatively new as a Java standard, and its deliberation is still going on.
    In spite of this, there is at least one aspect of JBI that has already become popular, and that is the vocabulary and patterns that it uses and defines.

    • Binding Components: Software components that understand specific protocols and are able to convert to a normalized or common system-wide protocol. They function as the entry and exit point of the system. Also commonly known as adapters.
    • Service Engines: Software components responsible for servicing requests, generally by means of transformation. Examples are XSLT transformers, BPEL engines, rule engines, etc.
    • Message Exchange: Message interchanging protocol, common exchange patterns are request-reply, in-only, etc.
    • Component Installation: Static or dynamic addition of a executable software component onto the runtime environment.
    • Service Deployment: Static or dynamic addition of services (or applications) onto a installed component.
    • Service Assemblies: A set of services.

    Standards are used for achieving interoperability and portability, however we should not overlook their importance on facilitating communication.