Detecting Fraud using Event Processing

September 5, 2013

Let’s consider a simple fraud detection scenario, one which I have run into with customers in the past.

Say we want to detect when a particular credit card customer spends more than 10000 dollars in a 24 hour (rolling) period.

Offhand, this appears to be a simple scenario to implement. Let’s get to it. Intuitively, this seems like a good use-case for pattern matching queries:

SELECT

M.clientId, M.txId, “ALARM” AS status

FROM creditcardTransactionStream

MATCH_RECOGNIZE (

PARTITION BY cardNumber

MEASURES cardNumber as clientId, transactionId AS txId

PATTERN (A)

DEFINE A AS price > 10000

) AS M

We can break down this query into the following steps:

  1. Partition the events coming through the credit-card transaction stream by its cardNumber;
  2. Check if the price of the transaction (event) is greater than 10000 dollars. We call this pattern A, or more precisely, the correlation variable A.
  3. When the pattern is detected, a process called matching, you can specify how to represent the matched pattern. These are called measures of the pattern. In this particular case, two measures are defined, one identifying the card number and another for the transaction ID.
  4. The final step is to output the two measures (i.e. cardNumber, transactionId) and a status string set to ‘ALARM’ indicating that the customer has spent more than 10000 dollars.

This pattern matching query is able to identify if a single transaction is above the threshold, but not multiple transactions that individually may cross the threshold.

For example, consider the following table of input and output events:

Time (h) Input Output
0 {cardNumber=10, transactionId=1, price=10000} {clientId=10, txId=1, ‘ALARM’}
1 {cardNumber=10, transactionId=2, price=5000}
2 {cardNumber=10, transactionId=3, price=6000}

In this case, note how there is no output at time t = 2h even though transactions 2 and 3 collectively do cross the 10K threshold.

The following query addresses this issue:

SELECT

M.clientId, M.txId, “ALARM” AS status

FROM creditcardTransactionStream

MATCH_RECOGNIZE (

PARTITION BY cardNumber

MEASURES cardNumber as clientId, transactionId as txId

PATTERN (A+? B) WITHIN 24 HOURS

DEFINE B as SUM(A.price) + B.price > 10000

) as M

One way to look at pattern matching is to consider it as a state diagram, where each correlation variable (e.g. A) represents a state in the state diagram. Typically, you need at least two states, the starting state and the accepting state (i.e. the end state). In this particular scenario, state A represents the individual transactions and state B represents  the condition where collectively the events cross the 10K threshold.

However, there are a few caveats. For example, could we go along with a single state (i.e. A) using the expression ‘A as SUM(A.price)’? No, we can’t define a variable in terms of an aggregation of itself, hence the reason we created variable B, which is then defined as the aggregation of A. Because we intent to aggregate several events as part of A, A must be defined as a collection, that is, a correlation group variable. In this case, we do this by using the regular expression operator of ‘+’ as in ‘A+’. This indicates that the correlation variable A should attempt to match one or more events.

Further, why do we need to add ‘B.price’ to the ‘SUM(a.price)’ as in the expression ‘B as SUM(A.price) + B.price’ ? This is needed because the ending event itself is a transaction and thus its price needs to be considered as part of the equation. If we don’t do that, then the price of the last received event would be ignored.

Why do we need the token ‘?’ as in the expression ‘A+?’ in the query? Typically, correlation group variables, as it is the case of A, are greedy, that is, they try to match as many events as possible. However, because A and B are similar, if we keep A greedy, then B would never match. To avoid this situation, we use the token ‘?’ to indicate A should be reluctant, and match the minimal set of events possible, therefore also allowing B to match and accept the final sequence.

Finally, note that the clause ‘within 24 hours’ specifies the window of interest. This means that the sum of the events are always constrained to a rolling 24 hours window. Here is a state diagram that represents the query:

state-machine

Using this new query, we will get a proper alarm output when the query receives the third event (i.e transactionId = 3). However, there is one new problem, this query ceases to output an alarm when we receive the first event (i.e. transactionId = 1).

Why is that? The problem is that the new query expects at least two events, one that matches variable A and another to match variable B. We can fix this by introducing an alternation in our regular expression, as follows:

PATTERN (C | (A*? B)) WITHIN 24 HOURS

DEFINE B as SUM(A.price) + B.price > 10000, C as price > 10000

In this case, either we match the expression ‘(A+? B)’ or the expression ‘C’, where C is simply the case where we have a single event whose price is already crossing the threshold. Note that we need to specify C as the first variable in our pattern, otherwise the query would only consider the ‘(A+? B)’ expression and never have a chance to match for ‘C’. Also note that any variable that is not defined, as it is the case of A, means that it is always true for all events.

Are we done?

Not yet, consider the case where we want to output the total sum of the transactions that are crossing the threshold. The introduction of an alternation makes this a bit harder, as the query is either matching C or A and B. How can we solve this?

We can address this by using subsets. Subsets are correlation variables that represent the union of other correlation variables. Take a look a look at the modified query:

SELECT M.clientId, M.total, “ALARM” as status

FROM creditcardTransactionStream

MATCH_RECOGNIZE (

PARTITION BY cardNumber

MEASURES cardNumber as clientId, SUM(ABC.price) as total

PATTERN (C | (A+? B)) WITHIN 24 HOURS

SUBSET ABC = (A,B,C)

DEFINE B as SUM(A.price) + B.price > 10000, C as price > 10000

) as M

As you can see, pattern matching queries gives us a lot of power, however this comes at a price, as one needs to consider a lot of things, such as the starting conditions, the accepting conditions, and all the corner-case scenarios, which are never trivial, even for apparently simple use-cases.

It also highlights the crucial but subtle point that as much as one tries to create black-box solutions for event processing scenarios, real-world use-cases are always more complicated and have more subtleties then originally conceived and for this reason there is always going to be a place for computational rich domain-specific event processing languages.

Advertisements

Answer to CEP quiz: streams and relations

January 4, 2012

Sorry for the long delay on posting an answer to last week’s, or rather, last year’s quiz. It is funny how time seems to stretch out sometimes and a few days turn into weeks.

Well, let’s revisit the queries from the last post:

  1. SELECT * FROM C1[NOW]
  2. ISTREAM (SELECT * FROM C1[NOW])
  3. DSTREAM (SELECT * FROM C1[NOW])

In the first query, the answer is that at the time t = 2 the CACHE is empty. Why empty and not 1?

To understand this, consider the following sequence of events:

  • At time t = 0, the NOW (window) operator creates an empty relation.
  • At time t = 1, the NOW operator converts the stream with the single event { p1 = 1 } to a relation containing a single entry { p1 = 1 }. The result of the NOW operator is a relation, and hence in the absence of other conversion operators, the query likewise outputs a relation. As CEP deals with continuous queries, the best way to represent the difference between the empty relation at time t = 0 and the relation at time t = 1 is to output the insertion of the entry { p1 = 1 }, or in other words, an insert event { p1 = 1 }. The CACHE receives this insert event, and puts the entry { p1 = 1} into it.
  • At time t = 2 (or more precisely at the immediate next moment after t = 1), the NOW operator outputs an empty relation, as the event e1 has moved on from the input stream. The difference between the relation at t = 1 and the relation at t = 2 is the deletion of the entry { p1 = 1 }, therefore the query outputs the delete event { p1 = 1 }. The CACHE receives this delete event, and consistently removes the entry { p1 = 1 }, leaving the cache empty.

Next, let’s consider the second query. In this case, the answer is that at the end the CACHE contains a single entry with the value of 1.

Let’s explore this. In this query, we are using an ISTREAM operator after the NOW operator. The ISTREAM converts the relation into a stream by keeping the insert events. This means that at time t = 1, the insert event being output from the NOW operator is converted into a stream containing the single event { p1 = 1 }. The CACHE receives this event and puts it into it. Next, at time t = 2, the delete event output from the NOW operator is ignored (dropped) by the ISTREAM (convert) operator and never makes it into the CACHE.

The answer for the third query is likewise that at the end the CACHE contains the single entry of 1. The rationale is similar to that of the previous case, however off by one time tick.

At time t = 1, the insert event being output from the NOW operator is ignored by the DSTREAM operator, however the delete event output at time t = 2 is used and converted to a stream. The conversion is simple, the delete event from the relation becomes an insert event in the stream, as the streams only support inserts anyway. The CACHE then picks up this insert event and puts the event into it. Just keep in mind that for this third query this happens at time t = 2, rather than at time t = 1 as it is the case of the second query.

Here is a quick summary:

I would like to thank those people who pinged me, some of them several times, to gently remind me to post the answer.


Concurrency at the EPN

January 3, 2011

Opher, from IBM, recently posted an interesting article explaining the need for an EPN.

As I have posted in the past, an Event Processing Network (EPN) is a directed graph that specifies the flow of the events from and to event processing agents  (EPAs), or, for short, processors.

Opher’s posting raised the valid question on why do we need an EPN at all, and instead couldn’t we just let all processors receive all events and drop those that are not of interest.

He pointed out two advantages of using an EPN, firstly it improves usability, and secondly it improves efficiency.

I believe there is a third advantage, the EPN allows one to specify concurrency.

For example, the following EPN specifies three sequential processor A, B and C. It is clear from the graph that processor C will only commence processing its events after B has finished, which likewise only processes its events after they have been processed by A.

Conversely, in the following EPN, processor B and C execute in parallel, only after the events have been processed by A.

Events are concurrent by nature, therefore being able to specify concurrency is a very important aspect of designing a CEP system. Surely, there are cases when the concurrency model can be inferred from the queries (i.e. rules) themselves by looking at their dependencies, however that is not always the case, or rather, that may not in itself be enough.

By the way, do see any resemblances between an EPN and a Petri-Network? This is not merely coincidental, but alas the subject of a later posting.