Map-Reduce with JDK8, Clojure, Scala, Spark, and yes, Lisp!

August 19, 2014

With the recent addition of Streams and Lambda expressions in JDK8,  a developer can finally adopt a functional programming style in Java, and, for example, implement algorithms that make use of the popular Map-Reduce design pattern natively with the standard Java SDK.

Starting with the examples provided by the JDK 8 Stream tutorial, let’s explore the use of functional programming and applications of the Map-Reduce design pattern.

First, let’s define a simple list-based collection of persons:

roster = new LinkedList<Person>();
roster.add(new Person("Lucas", Person.Gender.MALE, 30));
roster.add(new Person("Juliana", Person.Gender.FEMALE, 25));
roster.add(new Person("Gabriel", Person.Gender.MALE, 8));

Next, let’s associate two mapping functions to these collection, the first function filters each Person element by its gender, and another mapping function maps a Person to their name (i.e. one could think of this as a projection function):

roster.stream().
    filter(Person::isMale).
        forEach(e -> System.out.println(e.getName()));

The result of running this program is:

Lucas
Gabriel

Rather than iterating through the collection, we traverse its stream representation (i.e. roster.stream()). This decouples the application of our mapping functions from the actual collection data structure, therefore the function application can be done in different ways, for example, serially or concurrently. This is know as the characteristics of the stream and allows for the optimization of the mapping and reduction implementation.

The previous code made use of two mapping functions, the filter() and the forEach() functions. Both of these take a function as their arguments. This is what is known as high-order functions. In the case of the example, we do this in two different forms, by referencing to the isMale() method of the Person class, and by creating an anonymous function at the time it is needed. This latter form is known as a lambda abstraction, and in Java, takes the form of input-arguments -> function-body.

In our next example, let’s incorporate a reduction function into the mix:

double sum =
roster.stream()
    .filter(p -> p.getGender() == Person.Gender.MALE)
        .mapToInt(e -> e.getAge() /*Person::getAge*/)
            .reduce(0, (a, b) -> a + b);

The reduce() function takes a binary function, that is, a function that has two input arguments or operands. The binary function we use is sum. The first argument is the result of a previous sum function or 0 if it is the first application of sum. This allows us to associatively add together the elements of our stream. For example, if the stream has elements 1, 2, 3, 5, 8, then one particular instance of invocations of reduce is:

(0, 1) -> 1
(1, 2) -> 3
(3, 3) -> 6
(6, 5) -> 11
(11, 8) -> 19

As expected, the result of running the previous program is:

38.0

If one looks deeper into the JDK8 API, one can see that it has been really well thought of and organized. There are useful abstractions for Unit-returning (void) functions, called Consumers, Tuple-returning Producers, n-ary functions, etc.

Would one also say this is novel? I would hardly say so; consider how this same example can be implemented in Lisp, a language developed by John McCarthy in the late 1950s (that’s more than 60 years ago!).

First, let’s define our list-based Person container in Lisp:

(defparameter *roster* '((Lucas male 30)(Juliana female 25)(Gabriel male 8)))

And the equivalent of our isMale() method strictly as a function:

(defun ismale (x) (if (eq (second x) 'male) t nil))

And, finally, the equivalent of our map to int and reduce by summing:

(reduce '+
    (mapcar (lambda (x) (third x))
        (remove-if-not 'ismale *roster*)))

The prefix  form may strike some as odd, but the general structure is essentially the same. That is, the application of the reduce of ‘+’ to the result of a mapping to int to the result of a filter to the predicate isMale to the roster collection. In Lisp, a lambda abstraction is defined as lambda (input-arguments) (function-body).

One could even argue that the Lisp program is even cleaner (and more compact) than the Java one!

Lisp is not the only option. In fact, Java itself has at least another two options with Clojure and Scala, as demonstrated next:

Clojure:

(def *roster* '((Lucas male 30)(Juliana female 25)(Gabriel male 8)))
(defn ismale [x] (if (= (compare (second x) 'male) 0) true nil)
(reduce +
    (map (fn [x] (nth x 2))
        (filter ismale *roster*)))

Scala:

var roster = List(List("Lucas", "male", 30),List("Juliana", "female", 25),List("Gabriel", "male", 8))
def isMale(x:List[Any]) = {if (x(1) == "male") true else false}
roster.filter(isMale).
    map((x:List[Any]) => x(2).asInstanceOf[Int]).
        reduce((x:Int, y:Int) => x + y)

The syntax of a lambda expression in Clojure is fn [input-arguments] (function-body) and in Scala it is (input-arguments) => function-body.

Clojure has a closer resemblance to Lisp, where as Scala with its static typing and its infix form reminds us of the imperative programming style in Java.

If one is used to Big Data and Hadoop, then another option is Spark. For sake of completeness, here is the Spark version of the use-case:

val roster = sc.textFile("examples/Roster.txt") // space-delimeted text file with one Person by line
roster.filter(line => line.contains(" male")).
    map(line => new Integer(line.split(" ")(2))).
        reduce((a,b) => a + b)

Historically, whenever the Java SDK picks up a new API, it tends to surpass other existing forms of this same API. For example, see how java.util.logging is more predominant today than log4j. This is only natural, due to the easy availability of the SDK. Considering this, I wonder what will be the implications of java.util.stream to the Apache Hadoop API.

I would definitely appreciate being able to use the SDK to author a Map-Reduce application, and then simply configure its Characteristics to DISTRIBUTED to have it run over a Hadoop cluster, wouldn’t you?


Detecting Fraud using Event Processing

September 5, 2013

Let’s consider a simple fraud detection scenario, one which I have run into with customers in the past.

Say we want to detect when a particular credit card customer spends more than 10000 dollars in a 24 hour (rolling) period.

Offhand, this appears to be a simple scenario to implement. Let’s get to it. Intuitively, this seems like a good use-case for pattern matching queries:

SELECT

M.clientId, M.txId, “ALARM” AS status

FROM creditcardTransactionStream

MATCH_RECOGNIZE (

PARTITION BY cardNumber

MEASURES cardNumber as clientId, transactionId AS txId

PATTERN (A)

DEFINE A AS price > 10000

) AS M

We can break down this query into the following steps:

  1. Partition the events coming through the credit-card transaction stream by its cardNumber;
  2. Check if the price of the transaction (event) is greater than 10000 dollars. We call this pattern A, or more precisely, the correlation variable A.
  3. When the pattern is detected, a process called matching, you can specify how to represent the matched pattern. These are called measures of the pattern. In this particular case, two measures are defined, one identifying the card number and another for the transaction ID.
  4. The final step is to output the two measures (i.e. cardNumber, transactionId) and a status string set to ‘ALARM’ indicating that the customer has spent more than 10000 dollars.

This pattern matching query is able to identify if a single transaction is above the threshold, but not multiple transactions that individually may cross the threshold.

For example, consider the following table of input and output events:

Time (h) Input Output
0 {cardNumber=10, transactionId=1, price=10000} {clientId=10, txId=1, ‘ALARM’}
1 {cardNumber=10, transactionId=2, price=5000}
2 {cardNumber=10, transactionId=3, price=6000}

In this case, note how there is no output at time t = 2h even though transactions 2 and 3 collectively do cross the 10K threshold.

The following query addresses this issue:

SELECT

M.clientId, M.txId, “ALARM” AS status

FROM creditcardTransactionStream

MATCH_RECOGNIZE (

PARTITION BY cardNumber

MEASURES cardNumber as clientId, transactionId as txId

PATTERN (A+? B) WITHIN 24 HOURS

DEFINE B as SUM(A.price) + B.price > 10000

) as M

One way to look at pattern matching is to consider it as a state diagram, where each correlation variable (e.g. A) represents a state in the state diagram. Typically, you need at least two states, the starting state and the accepting state (i.e. the end state). In this particular scenario, state A represents the individual transactions and state B represents  the condition where collectively the events cross the 10K threshold.

However, there are a few caveats. For example, could we go along with a single state (i.e. A) using the expression ‘A as SUM(A.price)’? No, we can’t define a variable in terms of an aggregation of itself, hence the reason we created variable B, which is then defined as the aggregation of A. Because we intent to aggregate several events as part of A, A must be defined as a collection, that is, a correlation group variable. In this case, we do this by using the regular expression operator of ‘+’ as in ‘A+’. This indicates that the correlation variable A should attempt to match one or more events.

Further, why do we need to add ‘B.price’ to the ‘SUM(a.price)’ as in the expression ‘B as SUM(A.price) + B.price’ ? This is needed because the ending event itself is a transaction and thus its price needs to be considered as part of the equation. If we don’t do that, then the price of the last received event would be ignored.

Why do we need the token ‘?’ as in the expression ‘A+?’ in the query? Typically, correlation group variables, as it is the case of A, are greedy, that is, they try to match as many events as possible. However, because A and B are similar, if we keep A greedy, then B would never match. To avoid this situation, we use the token ‘?’ to indicate A should be reluctant, and match the minimal set of events possible, therefore also allowing B to match and accept the final sequence.

Finally, note that the clause ‘within 24 hours’ specifies the window of interest. This means that the sum of the events are always constrained to a rolling 24 hours window. Here is a state diagram that represents the query:

state-machine

Using this new query, we will get a proper alarm output when the query receives the third event (i.e transactionId = 3). However, there is one new problem, this query ceases to output an alarm when we receive the first event (i.e. transactionId = 1).

Why is that? The problem is that the new query expects at least two events, one that matches variable A and another to match variable B. We can fix this by introducing an alternation in our regular expression, as follows:

PATTERN (C | (A*? B)) WITHIN 24 HOURS

DEFINE B as SUM(A.price) + B.price > 10000, C as price > 10000

In this case, either we match the expression ‘(A+? B)’ or the expression ‘C’, where C is simply the case where we have a single event whose price is already crossing the threshold. Note that we need to specify C as the first variable in our pattern, otherwise the query would only consider the ‘(A+? B)’ expression and never have a chance to match for ‘C’. Also note that any variable that is not defined, as it is the case of A, means that it is always true for all events.

Are we done?

Not yet, consider the case where we want to output the total sum of the transactions that are crossing the threshold. The introduction of an alternation makes this a bit harder, as the query is either matching C or A and B. How can we solve this?

We can address this by using subsets. Subsets are correlation variables that represent the union of other correlation variables. Take a look a look at the modified query:

SELECT M.clientId, M.total, “ALARM” as status

FROM creditcardTransactionStream

MATCH_RECOGNIZE (

PARTITION BY cardNumber

MEASURES cardNumber as clientId, SUM(ABC.price) as total

PATTERN (C | (A+? B)) WITHIN 24 HOURS

SUBSET ABC = (A,B,C)

DEFINE B as SUM(A.price) + B.price > 10000, C as price > 10000

) as M

As you can see, pattern matching queries gives us a lot of power, however this comes at a price, as one needs to consider a lot of things, such as the starting conditions, the accepting conditions, and all the corner-case scenarios, which are never trivial, even for apparently simple use-cases.

It also highlights the crucial but subtle point that as much as one tries to create black-box solutions for event processing scenarios, real-world use-cases are always more complicated and have more subtleties then originally conceived and for this reason there is always going to be a place for computational rich domain-specific event processing languages.


Best-case analysis is bogus

June 21, 2009

One deals with worst-case analysis of algorithms in a day-to-day basis, especially if one is working with systems that have real-time constraints, such as CEP. As I’ve written in the past, real-time systems do not necessarily need to run fast, that is, they can take days to finish, but if the real-time system states that it will finish a task in 1 hour, then it must. To be able to make such time assertions with certainty, the real-time system developer must know the worst-case time for all algorithms that make up such a task.

As an example, what is the worst-case cost of inserting 1 element in a sorted linked list of size n? A naive approach is to start at the first position and compare the elements in the linked list until you find one that is larger than the element being inserted, in which case then you insert the new element in that position and move the remaining elements back. The worst-case scenario is when the element to be inserted is larger than any other element in the list, thus the cost being the cost of n node traversals (i.e. n*c1) and the cost of the actual insertion at the end (i.e. c2). Or, eliminating the constants, we would arrive at the asymptotic worst-case cost of n.

What about the best-case analysis of this algorithm?

Well, I believe that best-case analysis are rather bogus. It has very limited analytical and statistical value, as it not only adds no insight to the analysis of the algorithm, but it also rather hardly presents a real-world case.

However, the biggest problem of best-case algorithm analysis is that the result can be manufactured. To test for best-case scenarios means that you can choose your best input, which allows the algorithm developer to hard-code the algorithm for this well known best input. For example, the algorithm could do the following:

if (bestInput) then return x;

// otherwise do proper work

In other words, you could have a very complex algorithm whose worst-case cost is n square, but whose best-case cost for this one selected input is 1!

Is this valid? Well, it is a valid algorithm, but, in my opinion, a bogus analysis.


Moving on from dev2dev…

June 22, 2008

With the acquisition of BEA Systems by Oracle, I suppose that those who have blogged in the past on dev2dev should move their blogs and articles to Oracle’s Technology Network (OTN) web-site, or look for some personal alternative.

I have decided to do the latter. I will be importing all my published dev2dev blogs into this blog account.

Partially, I was enticed by the additional freedom one has when using one’s own blog account, such as being able to select extra widgets, like calendars, tag clouds, etc; hopefully this offsets the supposedly increased publicity had I opted on moving onward to OTN.