« Apama showcased at Intel Nehalem Launch | Main | Empower the Business, but Keep Control »

Thursday, April 09, 2009

Scalable concurrency, a design pattern in the Apama EPL

Posted by Louis Lovas

This is my final installment in a series devoted to a specific example in the Apama EPL. I began this example by describing the basic design pattern of a  consumer/producer.  Further enhancements enabled multiple consumers and as a result the instance idiom.  Finally below, I will again enhance this consumer/producer by illustrating how one can leverage multi-core processors for massive scalability and parallelism.

As I have mentioned before, instances or 'sub-monitors' as they're often referred to in the Apama EPL define a discrete unit of work. That unit of work represents a set of business logic however large (a complete application scenario) or small (a simple analytic).  Instances are created on demand using the spawn operator in the language. Each scenario instance is invoked with a unique set of input parameters that represent that occurrence. Each instance can then uniquely maintain its own reference data, timers and event streams, in effect its own state.  In general programming patterns this is known as a factory behavioral model but we've extended it to include an execution model.

To provide a means to leverage multi-core processors, the Apama EPL provides a syntax and a simple semantic to allow those instances to execute in parallel. We do this with a language feature called contexts. These are silos of execution which take the factory model to the next level. A context defines a logical container that holds and executes instances of a scenario (of the same or differing types). The EPL provides a semantic for inter-context communication, there is no need for mutexes, semaphores or other locking schemes thus avoiding common deadlock code patterns typical of imperative languages such as java. Each context in effect has it's own logical input queue to which events are streamed from external sources or other contexts.  Behind contexts our CEP engine squeezes the most out of operating system threads to leverage maximum use of multi-core processors.

The same CEP engine can create multiple contexts (a context pool as you'll soon see in the code example below), they can be used to hold and execute multiple scenario instances, additionally those instances can create sub-contexts for additional parallelism. If for example, these instances are an application for pricing Options and require a compute-intensive calculation such as Black Scholes, additional contexts can be spawned for these calculations. Furthermore, sub-contexts can be designed as shared compute services to be leveraged by multiple scenario instances running in different (parallel) contexts.

Contexts take the factory model and extend it to include a parallel execution model with a few simple keywords in the EPL as you'll soon see below.

The enhancements to the Item consumer/producer include a Context Pool which I've listed the code for below and the enhanced Item Producer that leverages it. The interface is unchanged except for one new event and the Consumer (client) has a minor revision  (thus adhering to my belief that an EPL should follow the principles of structured programming of modularity and encapsulation that I've blogged on at the start of this series).  The complete example for this revision is available here and requires Apama version 4.1 (or later of course).

The Context Pool

package com.apamax.sample;

event ContextPool {
    integer numContexts;
    sequence<context> contexts;
    integer idx;
    action create(integer nc, string name) {
        self.numContexts := nc;
        while(nc > 0) {
            contexts.append(context(name, false));
            nc := nc - 1;
    action getContext() returns context {
        context c:= contexts[idx];
        idx := idx + 1;
        if(idx=numContexts) then {
            idx := 0;
        return c;       

The ContextPool as implemented here is a general-purpose utility that provides a pool of contexts via a create method (i.e. action) and a means to distribute a workload across them in a simple round-robining technique each time the getContext action is called.

As I mentioned above contexts are mapped to operating system threads, so judicious use of the create action is expected. The basic rule-of-thumb is that number of total contexts should equal the number of cores on a server.  One noteworthy point, contexts can be public or private. A public context means that event listeners running within it can receive event streams from external sources (i.e. adapters), listeners within a private context can only receive events that are directed  to the context via the enqueue statement in application logic running in another context. For my example, this context pool utility creates private contexts: context(name, false)

I've leveraged another general capability of the Apama EPL in the implementation of this context pool, that of actions on events. You'll notice these two actions are enclosed in an event definition which is part of our com.apamax.sample package.

In keeping with it's charter of structured programming,  actions on events provides a means to promote code modularity by encapsulating reusable utility functions (like a context pool).


The (parallel) Item Producer
package com.apamax.sample;

monitor ItemService {
  event ClearUserID {
      integer id;

  integer count := 0;
  float price := 0.0;
  action onload {
      ContextPool cf:=new ContextPool;
      cf.create(4, "ClientService");
      // list of subscriber (user) identifiers
      sequence<integer> ids := new sequence<integer>;
      SubscribeToItems s;
      on all SubscribeToItems():s {
          if ids.indexOf(s.subscriberId)= -1 then {
              context c:= cf.getContext();
              route SubscriptionResponse(s.subscriberId, c);
              on completed SubscriptionResponse() {
                  spawn startSubscriptions(s.subscriberId, s.item_name,
                                           context.current()) to c; 
      ClearUserID c;
      on all ClearUserID():c {
          log "in " + c.toString();   
          integer index := ids.indexOf(c.id);
          if index != -1 then {

  action startSubscriptions(integer this_subscriberId, string name,
                            context mainContext) {
      log "in startSubscriptions";
      on all wait(0.1) and not UnsubscribeFromItems(subscriberId =
                                               this_subscriberId) {
          route Item(this_subscriberId, name, count, price);
          count := count + 1;
          price := price + 0.1;

      on UnsubscribeFromItems(subscriberId = this_subscriberId){
          enqueue ClearUserID(this_subscriberId) to mainContext;

To get a general sense of what the multi-instance Item Producer code is intended to do, I suggest a quick scan of my last installment, this revision does not change that basic foundation it only parallelizes it. It is worth pointing out how little the code and design has changed yet this implementation has the ability to scale massively to tens of thousands of instances across multiple processor cores.  Clearly this is just a simple example that does very little real work (producing Item events). However structurally, it's a model that represents how one would design such a scalable service in the Apama EPL.

The parallel Item Producer (like it's previous incarnation) manages multiple uniquely identified Consumers. For that it must maintain a list of identifiers, one for each Consumer.  But this time, the Producer instance created on behalf of the Consumer is spawned into a context:  spawn startSubscriptions(s.subscriberId, s.item_name, context.current()) to c; We're still passing the subscriberID and item_name, (the instance parameters) but we also pass the context handle of the main context (context.current()).   This is necessary for the inter-context communication.  

The Consumer implementation has undergone a minor change to support this parallelized execution mode to match the Producer.  A good design pattern is to ensure that monitors that frequently pass events operate within the same context. This is not a hard-fast rule, only one that limits the amount of inter-context communication (i.e. enqueueing).  I've enhanced the interface slightly, there is a new event, SubscriptionResponse  that is used as a response to subscription requests (on all SubscribeToItems()) .  This event is used to communicate back to the client the context handle of the Producer spawned on its behalf. Once the Consumer receives this event, it also spawns into this same context. By doing so, both the Producer and Consumer operate as they always did sending Item events (route Item(this_subscriberId, name, count, price)) and handling termination (on UnsubscribeFromItems).  Within each context, the producer/consumer still adheres to that single-cast event passing scheme where it creates and sends uniquely tagged Item events. The Consumer and the Interface are included in the download (not shown here for brevity's sake).

Two additional noteworthy points to highlight in this Producer implementation.

1) The on completed SubscriptionResponse() listener.  The completed  keyword indicates that this listener wakes up after the SubscriptionResponse  event has been delivered.  This way we can guarantee that our Consumer has received this event and has the context handle before spawning the Producer.

2) To process UnsubscribeFromItems events, the statement: enqueue ClearUserID(this_subscriberId) to mainContext; is executed.  This statement is used to send an event to the listener (on all ClearUserID) which executes in another context. Recall, that the action startSubscriptions is the target of the spawn operator. So this is the main body of code for which multiple instances are parallelized running in contexts (from the pool). The onload action, which is controlling all of this spawning is logically considered the main context. Due to the strong semantic for inter-context communication, events must be enqueued to another context's input queue. Each context in effect has its own input queue and with the context handle the inter-context communication mechanism is defined. So to communicate the client termination request from the spawned instance running in a private context the ClearUserID event must be enqueued to the main context where the appropriate listener is waiting.

Routing (i.e. route Item(...)) is still possible, but routed events stay within the boundaries on the context where the Producer and it's corresponding Consumer reside.  To logically expand the example, multiple Consumers could reside in the same context (i.e. a multi-cast design pattern as I described in the previous revision of this example).


This example is designed to illustrate the simplicity of parallelism in the Apama EPL. With just a few simple statements, one can quickly and easily leverage multi-core processor technologies for massive scalability.

As I mentioned earlier this is the final entry for this specific example, if you're just seeing this for the first time you can start from the beginning (only three short segments) here. I hope this has been informative and provided some insight into the Apama EPL, I plan to have many more code examples in the future on various use cases.

You can download the complete example here with the consumers, interface and producer. Any questions or comments, just let me know,


TrackBack URL for this entry:

Listed below are links to weblogs that reference Scalable concurrency, a design pattern in the Apama EPL:



Really great posts on the Apama EPL in the last few months! Thanks for taking the time.


By the way, something is off with your RSS feeds. The Feedburner link works and produces an updated list of posts. However, many readers like Google allow one to enter the URL of the blog. They scan the blog, find a feed and subscribe to that. If you enter the Apama blog URL into a reader, it will find your old RSS feed which is no longer updated. So it's easy to subscribe to the wrong feed. It's being done by some tag that Typepad embeds in the blog HTML.

Louie Lovas

Thanks Hans, I also have to thank a number of people in Apama whom I've asked to review these code examples for accuracy, correctness and basic design. They've gone through a few iterations. I wanted to put forth a well thoughtout series of examples.

Regarding RSS feed, I use SharpReader and have recommended it to many. It seems to do a good just on all feeds I subscribe to, even the typepad ones.

Mark Palmer

Great examples on MonitorScript, Louis. But since you suggest having one context per core or CPU, this is more like simply running multiple instances of the Apama server, since each server is still single threaded, right?

And since these features are built in MonitorScript, that means that customers who use the graphical Apama tools, or Apama's JMON for Java can't access these features, right?

Plus, since Apama's algorithmic trading applications are built with the graphical development tools, that means they can't access this feature either, right?

And how do you suggest debugging this stuff? You still can't do that, right?

r4 kartes 


Nice article....i think you have done a great job , all the best for your upcoming projects....
r4 kartes

The comments to this entry are closed.

<-- end entry-individual -->