Logic programming, as supported by inference engines, have been recently extended to support the development of Complex Event Processing (CEP) applications. One such an example is the product Drools Fusion.
Let’s investigate logic programming extensions needed for CEP, by grouping them into functional groups:
1. Event definition
The first logical step in supporting CEP is to be able to annotate an object as an event. If one considers the definition of an event as an “observable thing that happens”, a natural mapping in logical programming is a fact; hence events can be modeled as types of facts, which have additional metadata, such as expiration and timestamp. The latter is crucial to be able to support temporal constraints, as we shall see later.
Finally, it is important that the inference engine understands the concept of a clock, so that it can determine what is the current time. This is particularly important when supporting the idea of non-events.
2. Input handling
CEP applications are flooded with events; hence there is a need to limit the events that make it into the working memory of the inference engines.
Drools fusion provides two facilities for doing this, the ability to partition the input set, and the definition of windows of events.
To partition the input set, the developer can define entry points, which function as a named channel, providing an entity that can receive and send events. This is done with the following syntax:
Event from entry-point Channel-Name
For example, the following clause informs the engine that we are interested on an event named Stock coming from the channel Exchange-Market:
Stock() from entry-point “Exchange-Market”
Generally, the semantic of a channel is similar to that of a JMS queue, that is, an event is removed from the channel after a client has consumed it. However, due to the nature of inference engines, this is not the case for entry-points; the event continues to exist until it has been retracted from the working memory, either explicitly by the developer, or implicitly through some other facility, such as the usage of the @expires metadata associated to the event definition, or through the usage of the window function, as we shall see next.
Drools fusion supports the definition of time-based and length-based sliding windows:
Event over window:time(time-argument) Event over window:length(number-argument)
A window defines a workable sub-set of a possibly infinite set of events. A time- based window is defined in terms of time, and a length-based is defined in terms of number of events. A sliding window moves on each progression of time for time- based windows or on each new event for a length-based window. For example, the following clause informs the engine that we are interested only on stocks that happened on the last 1 minute:
Stock() over window:time(1m)
Hence, one can use windows to manipulate the retraction of events from the working memory. However, there is some model impedance when compared to other CEP models. As logic programming is centered on symbols (e.g. variables, functions) under a single namespace sort of speak, the retraction of an event happens across the inference engine’s working memory; that is, it is not limited to a entry-point, or channel, which could have been desired effect. For example, the use-case could be to have different windows for different exchange markets, each exchange market being defined as a separate channel.
Sliding windows is just one of the common patterns for window functions. Other window functions are:
• Time-based and length based batching windows: in this case the progression of the window happens in batches of time or events.
• Partitioned windows: in this case sub-windows are created for each partition of the input events as established by some partitioning attribute of the events
• User-defined windows: these are arbitrarily windows plugged in by the developer.
Does logic programming have any model impedance towards these window functions? This remains an open research question, however intuitively batch windows should not provide to be a problem. The other two types are less clear.
3. Event matching
In logic programming, event matching relates to the unification process, or in simpler terms, the “if condition” of the rules. Events are ground terms (facts), hence inference engines already have a native and powerful mechanism for matching events through first order logic predicates, which include (the non-exhaustive) list of operators: and, or, exists, not, for-all, collect, accumulate, etc.
In particular, collect and accumulate allow for the creation of complex events from simple events, which is one of the primary definitions of CEP.
Furthermore, temporal operators as defined by Allen  have been added: after, before, coincides, during, finishes, includes, meets, overlaps, and starts.
For example, the following clause creates a FireAlarm event after a SmokeDetection event is detected:
FireAlarm(this after SmokeDetection())
The support for these operators are based upon the fact that each event is tagged with a timestamp, which can either be set by the event producer (external) or by the inference engine itself (internal).
Also, as it should have been made clear by the presence of operators such as coincides and during that the time domain is defined as being interval based, rather than a point-in-time. The advantages of an interval-based approach are well explained by , however it is worth noticing that most other CEP solutions are point-in-time.
An important matching function is correlation (i.e. join operator). In logic programming, join is achieved through the unification of terms on common variables . This process has been subject of innumerous researches and optimizations. In particular, most modern inference engines are implemented using the RETE approach . However, at the time of writing, the author has not been able to find relevant work on the performance of join in inference engines when subject to the volume of events and the latency requirements needed in CEP.
4. External data integration
An example of external data integration is to join a stream of customer sales transactions with a remote database containing the customer profile, with the intent of verifying the customer’s risk level for a sale transaction.
In the context of logic programming, data takes the role of facts, hence integration to external data relates to being able to work with remote working memory in a seamless and efficient manner.
However, the common practice for supporting such a scenario is to “copy” the data over to the local working memory of the inference engine that is handling the events. In other words, the external data is inserted into the local inference engine, which then allows the enrichment to proceed normally.
This approach provides good performance, however has the following draw-backs:
• It is only applicable when the size of the external data is reasonably small; it is not viable to copy over entire RDBM tables into the working memory.
• Data integrity is lost, as the data is duplicated and no longer in sync with original. For example, the original data can change and the inference engine would be working with stale data until the change gets propagated.
An alternative approach is to use associative memory, or tuple spaces . Nevertheless, this also assumes that the external data is part of the overall system, even though it is distributed.
The underlying issue is to realize that the external data is owned by external systems, and therefore what is needed is a native bridge in the inference engine, whereby it is able to convert the logic programming paradigm seamlessly into the external system’s model. An example of this approach is Java programs using the JDBC API  to integrate to external database systems.
5. Output handling
Conversely to input handling, output handling is related on how to convert the RHS (i.e. right-hand-side of if-then statement) back into events, or more properly, into streams that flow out through output channels.
Before we address this, it is important to ask why is this needed, as it is not directly clear from our requirements. The reason being is that CEP applications are a component of a larger event processing network (EPN), that is, a CEP application plays the role of an event processing agent (EPA) node that receives events from other upstream nodes in the network and whose output must be send to downstream nodes to it in the network. Please, reference  for the definition of EPNs and EPAs.
This larger context entails that the CEP application must be able to send output that conforms to the expected semantic, as defined by the EPN to facilitate communication.
This semantic, which is similar to that of the input events, is presented here informally:
- Events must be able to be published to different channels. Within each channel, events must be ordered in time forming a stream.
Getting back to our original question, how does one generate stream of events out of the RHS, that is, out of the facts that exist in the working memory?
Consider that the facts (i.e. objects) in the working memory have no ordering constraints, that is, they are not ordered in time by any means. Further, we may be interested on different types of conversions; for example, we may want to notify not only when a new object is inserted into working memory, but also when an object is removed from the working memory.
In general, there is no first-class support for publishing time-ordered events out of an inference engine.
One approach is to support a new send(Object event, String entry- point) verb in the action clause, which guarantees the time-ordering restrictions.
Due to its declarative nature, logic programming is a natural fit for performing event matching. First-order predicates suffice for several complex pattern-matching scenarios, and, as seen, logic programming can be extended to support temporal constraints. Conversely, integration of logic programming inference engines into the large event processing network (EPN) eco-system is less preferable. Finally, future work is needed to research the performance of joins and other complex operators in light of the demanding CEP environment.
1. Luckham, D. The Power of Events, An Introduction to Complex Event Processing in Distributed Enterprise Systems (2002).
2. Allen, J.F..An Interval-based Representation of Temporal Knowledge. 1981. 3. The Drools Fusion User Guide (2009),
http://downloads.jboss.com/drools/docs/126.96.36.199597.FINAL/drools- fusion/html_single/index.html#d0e1169.8 Extensions to logic programming inference engines to support CEP
4. Carriero, Nicholas (1992). “Coordination Languages and their Significance” Communications of the ACM.
5. Wells, George. “Coordination Languages: Back to the Future with Linda” (PDF). Rhodes University.
6. Kowalski, R., “Predicate Logic as Programming Language”, in Proceedings IFIP Congress, Stockholm, North Holland Publishing Co., 1974, pp. 569-574.
7. Arasu, A. and Babcock, B. and Babu, S. and Cieslewicz, J. and Datar, M. and Ito, K. and Motwani, R and Srivastava, U. and Widom, J. (2004) STREAM: The Stanford Data Stream Management System.
8. Schulte, R., Bradely, A.: A Gartner Reference Architecture for Event Processing Networks, ID G00162454, 2009.
9. Charles Forgy, “Rete: A Fast Algorithm for the Many Pattern/Many Object Pattern Match Problem”, Artificial Intelligence, 19, pp 17–37, 1982
10.Luckham, D. and Schulte, R. Event Processing Glossary – Version 1.1 (2008). 11.Mitchell, J. Concepts in Programming Languages (2003). 12. JDBC API, http://java.sun.com/javase/technologies/database/.
Although Allen’s temporal operator approach is interesting, many people explored those ideas in the 80’s and 90’s and decided they weren’t practical beyond simple academic examples. For example, the engineers at ART explored those ideas. I know other have also explored them in depth.
If you want to read more about temporal logic, I have a paper here, and I’m currently working on a Stream RETE engine named Arpeggio.
If you want to learn about arpeggio, I keep a detailed development journal on my blog. email me directly for an invite.
Excellent post. It overalps with some of the stuff we were discussing at least week’s October Rules Fest conference in Dallas, so I wrote up some stuff I was talking about in a presentation I gave at that conference and posted it at http://geekswithblogs.net/cyoung/archive/2009/11/04/136003.aspx.
Hi Alexandre. You wrote:
“…how does one generate stream of events out of the RHS, that is, out of the facts that exist in the working memory?…Consider that the facts (i.e. objects) in the working memory have no ordering constraints, that is, they are not ordered in time by any means…In general, there is no first-class support for publishing time-ordered events out of an inference engine.”
Certainly in the Rete world this is not normally true. If you look at individual memories within the Rete network, the facts are often ordered by timestamp. Most, though not all, engines are heavily dependent on these timestamps, and use them to ensure a total ordering of activations on the agenda within rules of the same salience. Indeed, this is pretty much the definition of ‘depth-first’ conflict resolution which many engines use as their default. Hence, generating ordered streams of complex events is not a problem.
Now, of course, in reality things are not that simple. If you look at the way different engines implement the fine details, there are differences. A complex event signifies or denotes an aggregation of events. So, how do we order them? Maybe by the timestamp of the earliest aggregated event that we received. That’s a simple model, although we may have problems if any of our event sources exhibit delays in delivery. Maybe we order by the earliest occurance time value we find within the aggregated events. That works OK as long as we can use a common clock across multiple event sources. What about the time interval between the earliest and latest of the aggregated events? Should we take that into account when deciding on the order of our output event streams? At least one Rete engine implements that model.
There is, of course, the issue of dealing with detection of complex events in the context of temporal windows. All CEP technologies have to deal with this at some level. How long do you keep old event data hanging around in case a delayed event is delivered that matches within a given time window? How do you know when you have enough events to safely emit an ordered stream of complex events and perform garbage collection of old events? As Rete engines evolve to include CEP features, these are issues they have to address like any other CEP technology.
A handful of engines eschew conflict resolution strategies and force an approach where ordering of rule activations of the same salience is deliberately arbitrary. I use one engine quite frequently which is purposely built to pseudo-randomise the order of facts at every level within the engine. The philosophy appears to be that ordering should be explicitly controlled by the rules developer as part of the overall logic of their rules, and not dependent on declarative pre-defined strategies whose exact implications are often opaque, and which are often influenced by undocumented side effects of the engine. This ‘arbitrary’ approach has its merits, but may not be an appropriate strategy on which to build CEP support. These engines are very much in the minority, though.
…although, thinking about it, a default ‘depth-first’ strategy is not appropriate here because it orders facts from the most recent to the most aged. No problem. Most engines offer the reverse approach as well. ‘Breadth-first’ gives us a more appropriate order from the most aged to the most recent. The point stands. Most Rete engines make extensive use of timestamps to order their outputs, but need to be extended to handle the many complexities of temporal logic required for CEP.
I re-read the blog entry and one thing caught my attention.
“Consider that the facts (i.e. objects) in the working memory have no ordering constraints, that is, they are not ordered in time by any means. Further, we may be interested on different types of conversions; for example, we may want to notify not only when a new object is inserted into working memory, but also when an object is removed from the working memory.”
I’ve asked the same question on the CEP forum. So far, none of the answers given on the forum are satisfactory or simple. The approach I’ve taken is to extend clips language with “modification logic”. Briefly, modification logic is some user defined logic when a piece of data is modified. It’s already implemented in Arpeggio and I wrote a paper on it.
I know others are struggling with the same problem, so hopefully the paper will stimulate some thought and exploration.
Thanks for the information, I will take a look at the paper.
Incidentally, we solved the problem in OCEP by defining the following additional operators: ISTREAM (new events), RSTREAM (all events), DSTREAM (delete events).
That’s interesting. The approach appears to be different than other EP engines. I chose the modification logic approach for production rule engines, for a few reasons. The first is that modification logic is a first class citizen and developers/users can define what kind of corrective action is needed. The second reason is a production rule engine doesn’t care what stream it comes from, it’s just data.
Nice post! Thank you for sharing your thoughts. The references are also great for anyone looking for source material in the subject.
Just some additional comments:
“Hence, one can use windows to manipulate the retraction of events from the working memory. However, there is some model impedance when compared to other CEP models. As logic programming is centered on symbols (e.g. variables, functions) under a single namespace sort of speak, the retraction of an event happens across the inference engine’s working memory; that is, it is not limited to a entry-point, or channel, which could have been desired effect. For example, the use-case could be to have different windows for different exchange markets, each exchange market being defined as a separate channel.”
This is true for pure rule engines (based on Rete or not), but not for Drools Fusion that you use as example. Although the underlying technology of Fusion is different, its observable behavior is similar to traditional ESP implementations, i.e., if you have multiple rules/queries with multiple and different sliding-windows, the expiration of an event in one sliding-window will only cause a “partial expiration” of the event. Meaning that the event no longer matches that specific windowed pattern, but continues to match all other larger/longer windows. When the event expires from all windows it belonged to, then it is completely expired from its source working memory or entry-point.
“However, at the time of writing, the author has not been able to find relevant work on the performance of join in inference engines when subject to the volume of events and the latency requirements needed in CEP.”
Yes, right to the point. My experience is that performance (as in throughput) is not really a concern but you are right about latency. Rete based engines have a harder time on that due to the two-phase (detect-act) cycle of their model. There are solutions, but this is a lenghtly topic to discuss here. Maybe in a future appropriate blog post.
Finally, I agree with most of what Charles Young state in his papers and presented on his talk last week. I think the question of using CEP/ESP products based on traditional streaming approaches or on rule-based approaches inside the EPN is not a matter of which one to use, but where to use them. Rule-based approaches (as they are today) are great for bridging the gap between the EPN and the applications down in the network, while streaming based approach (as they are today) are more suitable for the agents closer to the event sources. The functionality overlap between approaches is clear. The borders where one switches from one to the other will still require research and working experience.
First of all, thanks for the clarification around the retraction of events and how it relates to channels and the working memory in Drools Fusion.
Second of all, I agree with your observation that the stream-based approach is better suitable upstream (closer to the source) and the rule-based approach is better suitable downstream (closer to the sink) in the processing flow (EPN), this plays well to the higher latency issue raised for the latter. Of course, this is albeit some caveats.
Thank you for a nice post. We also work to give a logical semantics to CEP for a while now and I’m trying to relate your approach to ours, but I think they are very different.
Your approach is very operational in nature: it basically says that CEP can definitely benefit from matching and unification in logic programming, but I think that it’s more important to give a declarative semantics to CEP.
Thanks for the feedback. Could you give some details or examples of your work?