Scalable concurrency, a design pattern in the Apama EPL
Posted by Louis Lovas
This
is my final installment in a series devoted to a specific
example in the Apama EPL. I began this example by describing the basic design pattern of
a consumer/producer. Further enhancements enabled multiple
consumers and as a result the instance
idiom. Finally below, I will again enhance
this consumer/producer by illustrating how one can leverage multi-core
processors for massive scalability and parallelism.
As I have mentioned before, instances or 'sub-monitors' as they're often referred to in the Apama EPL define a discrete unit of work. That unit of work represents a set of business logic however large (a complete application scenario) or small (a simple analytic). Instances are created on demand using the spawn operator in the language. Each scenario instance is invoked with a unique set of input parameters that represent that occurrence. Each instance can then uniquely maintain its own reference data, timers and event streams, in effect its own state. In general programming patterns this is known as a factory behavioral model but we've extended it to include an execution model.
To provide a means to leverage multi-core processors, the Apama EPL provides a syntax and a simple semantic to allow those instances to execute in parallel. We do this with a language feature called contexts. These are silos of execution which take the factory model to the next level. A context defines a logical container that holds and executes instances of a scenario (of the same or differing types). The EPL provides a semantic for inter-context communication, there is no need for mutexes, semaphores or other locking schemes thus avoiding common deadlock code patterns typical of imperative languages such as java. Each context in effect has it's own logical input queue to which events are streamed from external sources or other contexts. Behind contexts our CEP engine squeezes the most out of operating system threads to leverage maximum use of multi-core processors.
The same CEP engine can create multiple contexts (a context pool as you'll soon see in the code example below), they can be used to hold and execute multiple scenario instances, additionally those instances can create sub-contexts for additional parallelism. If for example, these instances are an application for pricing Options and require a compute-intensive calculation such as Black Scholes, additional contexts can be spawned for these calculations. Furthermore, sub-contexts can be designed as shared compute services to be leveraged by multiple scenario instances running in different (parallel) contexts.
Contexts take the factory model and extend it to include a parallel execution model with a few simple keywords in the EPL as you'll soon see below.
The enhancements to the Item consumer/producer include a Context Pool which I've listed the code for below and the enhanced Item Producer that leverages it. The interface is unchanged except for one new event and the Consumer (client) has a minor revision (thus adhering to my belief that an EPL should follow the principles of structured programming of modularity and encapsulation that I've blogged on at the start of this series). The complete example for this revision is available here and requires Apama version 4.1 (or later of course).
The Context Pool.
package com.apamax.sample; event ContextPool { integer numContexts; sequence<context> contexts; integer idx; action create(integer nc, string name) { self.numContexts := nc; while(nc > 0) { contexts.append(context(name, false)); nc := nc - 1; } } action getContext() returns context { context c:= contexts[idx]; idx := idx + 1; if(idx=numContexts) then { idx := 0; } return c; } } |
The ContextPool
as implemented here is a general-purpose utility that provides a pool
of contexts via a create
method (i.e. action) and a means to distribute a workload across them
in a simple round-robining technique each time
the getContext
action is called.
The (parallel) Item Producer.
package com.apamax.sample; monitor ItemService { event ClearUserID { integer id; } integer count := 0; float price := 0.0; action onload { ContextPool cf:=new ContextPool; cf.create(4, "ClientService"); // list of subscriber (user) identifiers sequence<integer> ids := new sequence<integer>; SubscribeToItems s; on all SubscribeToItems():s { if ids.indexOf(s.subscriberId)= -1 then { context c:= cf.getContext(); ids.append(s.subscriberId); route SubscriptionResponse(s.subscriberId, c); on completed SubscriptionResponse() { spawn startSubscriptions(s.subscriberId, s.item_name, context.current()) to c; } } } ClearUserID c; on all ClearUserID():c { log "in " + c.toString(); integer index := ids.indexOf(c.id); if index != -1 then { ids.remove(index); } } } action startSubscriptions(integer this_subscriberId, string name, context mainContext) { log "in startSubscriptions"; on all wait(0.1) and not UnsubscribeFromItems(subscriberId = this_subscriberId) { route Item(this_subscriberId, name, count, price); count := count + 1; price := price + 0.1; } on UnsubscribeFromItems(subscriberId = this_subscriberId){ enqueue ClearUserID(this_subscriberId) to mainContext; } } } |
To get a general sense of what the multi-instance Item Producer code is intended to do, I
suggest a quick scan of my last installment,
this revision does not change that basic foundation it only
parallelizes it. It is worth pointing out how little the code and
design has changed yet this implementation has the ability to scale
massively to tens of thousands of instances across multiple
processor cores. Clearly this is just a simple example
that does very little real
work (producing Item events). However structurally, it's a model that
represents how one would design such a scalable service in the Apama
EPL.
Louie
Really great posts on the Apama EPL in the last few months! Thanks for taking the time.
Posted by: Hans | Thursday, April 09, 2009 at 09:25 AM
By the way, something is off with your RSS feeds. The Feedburner link works and produces an updated list of posts. However, many readers like Google allow one to enter the URL of the blog. They scan the blog, find a feed and subscribe to that. If you enter the Apama blog URL into a reader, it will find your old RSS feed which is no longer updated. So it's easy to subscribe to the wrong feed. It's being done by some tag that Typepad embeds in the blog HTML.
Posted by: Hans | Thursday, April 09, 2009 at 09:28 AM
Thanks Hans, I also have to thank a number of people in Apama whom I've asked to review these code examples for accuracy, correctness and basic design. They've gone through a few iterations. I wanted to put forth a well thoughtout series of examples.
Regarding RSS feed, I use SharpReader and have recommended it to many. It seems to do a good just on all feeds I subscribe to, even the typepad ones.
Posted by: Louie Lovas | Thursday, April 09, 2009 at 12:43 PM
Great examples on MonitorScript, Louis. But since you suggest having one context per core or CPU, this is more like simply running multiple instances of the Apama server, since each server is still single threaded, right?
And since these features are built in MonitorScript, that means that customers who use the graphical Apama tools, or Apama's JMON for Java can't access these features, right?
Plus, since Apama's algorithmic trading applications are built with the graphical development tools, that means they can't access this feature either, right?
And how do you suggest debugging this stuff? You still can't do that, right?
Posted by: Mark Palmer | Thursday, April 09, 2009 at 03:30 PM
Hi,
Nice article....i think you have done a great job , all the best for your upcoming projects....
r4 kartes
Posted by: r4 kartes | Monday, June 29, 2009 at 02:01 AM