This
is my final installment in a series devoted to a specific
example in the Apama EPL. I began this example by describing the basic design pattern of
a consumer/producer. Further enhancements enabled multiple
consumers and as a result the instance
idiom. Finally below, I will again enhance
this consumer/producer by illustrating how one can leverage multi-core
processors for massive scalability and parallelism.
As I have mentioned before, instances or 'sub-monitors' as
they're often referred to in the Apama EPL define a discrete unit
of work. That unit of work represents a set of business logic however
large (a complete application scenario) or small (a simple analytic).
Instances are created on demand using the spawn operator in
the language. Each scenario instance is invoked with a unique set of
input parameters that represent that occurrence. Each instance can then
uniquely maintain its own reference data, timers and event streams, in
effect its own state. In general programming patterns this is
known as a factory
behavioral model but we've extended it to include an execution
model.
To provide a means to leverage multi-core processors, the Apama EPL
provides a syntax and a simple semantic to allow those instances to
execute in parallel. We do this with a language feature called contexts.
These are
silos of execution which take the factory model to the next level. A
context defines a logical container that holds and executes instances
of a scenario (of the same or differing types). The EPL provides a
semantic for inter-context
communication, there is no need for mutexes, semaphores or other
locking schemes thus avoiding common deadlock code patterns typical of
imperative languages such as java. Each context
in effect has it's own logical input queue to which events are streamed
from external sources or other contexts. Behind contexts our CEP
engine squeezes the most out of operating system threads to leverage
maximum use of multi-core processors.
The same CEP engine can create multiple contexts (a context pool as
you'll soon see in the code example below), they can be used to hold and execute multiple scenario instances, additionally
those instances can create sub-contexts for additional parallelism. If
for example, these instances are an application for pricing Options and
require a compute-intensive calculation such as Black
Scholes, additional contexts can be spawned for these
calculations. Furthermore, sub-contexts can be designed as shared compute services
to be leveraged by multiple scenario instances running in different
(parallel) contexts.
Contexts take the factory
model and extend it to include a parallel execution
model with a few simple keywords in the EPL as you'll soon see below.
The enhancements to the Item consumer/producer include a Context Pool
which I've listed the code for below and the enhanced Item Producer
that leverages it. The interface is unchanged except for one new
event and the Consumer (client) has a minor revision (thus
adhering to my belief that an EPL should follow the principles
of structured programming of modularity and encapsulation that I've blogged on at the start
of this series). The complete example for this revision is
available here and
requires Apama version 4.1 (or later of course).
The
Context Pool.
package com.apamax.sample;
event ContextPool {
integer numContexts;
sequence<context> contexts;
integer idx;
action create(integer
nc, string name) {
self.numContexts := nc;
while(nc > 0) {
contexts.append(context(name,
false));
nc := nc - 1;
}
}
action getContext() returns context {
context c:= contexts[idx];
idx := idx + 1;
if(idx=numContexts) then {
idx := 0;
}
return
c;
}
}
|
The ContextPool
as implemented here is a general-purpose utility that provides a pool
of contexts via a create
method (i.e. action) and a means to distribute a workload across them
in a simple round-robining technique each time
the getContext
action is called.
As I mentioned above contexts are mapped to operating system threads,
so judicious use of the create
action is expected. The basic rule-of-thumb is that number of total
contexts should equal the number of cores on a server. One
noteworthy point, contexts can be public or private. A public context
means that event listeners running within it can receive event streams
from external sources (i.e. adapters), listeners within a private
context can only receive events that are directed to
the context via the enqueue
statement in application logic running in another context. For my example, this context pool utility creates private contexts: context(name, false)
I've leveraged another general capability of the Apama EPL in the
implementation of this context pool, that of actions on events.
You'll notice these two actions are enclosed in an event definition which is
part of our com.apamax.sample package.
In keeping with it's charter of structured programming,
actions on events provides a means to promote code modularity by
encapsulating reusable utility functions (like a context pool).
The
(parallel) Item Producer.
package com.apamax.sample;
monitor ItemService {
event ClearUserID {
integer id;
}
integer count := 0;
float price := 0.0;
action onload {
ContextPool cf:=new ContextPool;
cf.create(4, "ClientService");
// list of subscriber (user) identifiers
sequence<integer> ids := new sequence<integer>;
SubscribeToItems s;
on all SubscribeToItems():s {
if ids.indexOf(s.subscriberId)= -1 then {
context c:= cf.getContext();
ids.append(s.subscriberId);
route SubscriptionResponse(s.subscriberId, c);
on completed SubscriptionResponse() {
spawn startSubscriptions(s.subscriberId, s.item_name,
context.current()) to c;
}
}
}
ClearUserID c;
on all ClearUserID():c {
log "in " + c.toString();
integer index := ids.indexOf(c.id);
if index != -1 then {
ids.remove(index);
}
}
}
action startSubscriptions(integer this_subscriberId, string name,
context mainContext) {
log "in startSubscriptions";
on all wait(0.1) and not UnsubscribeFromItems(subscriberId =
this_subscriberId) {
route Item(this_subscriberId, name, count, price);
count := count + 1;
price := price + 0.1;
}
on UnsubscribeFromItems(subscriberId = this_subscriberId){
enqueue ClearUserID(this_subscriberId) to mainContext;
}
}
}
|
To get a general sense of what the multi-instance Item Producer code is intended to do, I
suggest a quick scan of my last installment,
this revision does not change that basic foundation it only
parallelizes it. It is worth pointing out how little the code and
design has changed yet this implementation has the ability to scale
massively to tens of thousands of instances across multiple
processor cores. Clearly this is just a simple example
that does very little real
work (producing Item events). However structurally, it's a model that
represents how one would design such a scalable service in the Apama
EPL.
The parallel Item Producer (like it's previous incarnation)
manages multiple uniquely identified Consumers. For that it must
maintain a list of identifiers, one for each Consumer. But this
time, the Producer instance created on behalf of the Consumer is spawned into a context: spawn startSubscriptions(s.subscriberId, s.item_name, context.current()) to c; We're still passing the subscriberID and item_name, (the instance parameters) but we also pass the context handle of the main context (context.current()). This is necessary for the inter-context communication.
The Consumer implementation has undergone a minor change to support
this parallelized execution mode to match the Producer. A
good design pattern is to ensure that monitors that frequently pass
events operate within the same context. This is not a hard-fast rule,
only one that limits the amount of inter-context communication (i.e.
enqueueing). I've enhanced the interface slightly, there is a new
event, SubscriptionResponse that is used as a response to subscription requests (on all SubscribeToItems())
. This event is used to communicate back to the client the
context handle of the Producer spawned on its behalf. Once the Consumer
receives this event, it also spawns into this same context. By doing
so, both the Producer and Consumer operate as they always did sending
Item events (route Item(this_subscriberId, name, count, price)) and handling termination (on UnsubscribeFromItems). Within each context, the producer/consumer still adheres to that single-cast event
passing scheme where it creates and sends uniquely tagged
Item events. The Consumer and the Interface are included in the download (not shown here for brevity's sake).
Two additional noteworthy points to highlight in this Producer implementation.
1) The on completed SubscriptionResponse() listener. The completed keyword indicates that this listener wakes up after the SubscriptionResponse
event has been delivered. This way we can guarantee that
our Consumer has received this event and has the context handle before
spawning the Producer.
2) To process UnsubscribeFromItems events, the statement: enqueue ClearUserID(this_subscriberId) to mainContext; is executed. This statement is used to send an event to the listener (on all ClearUserID) which executes in another context. Recall, that the action startSubscriptions
is the target of the spawn operator. So this is the main body of code
for which multiple instances are parallelized running in contexts (from
the pool). The onload action, which is controlling all of this spawning is logically considered the main context.
Due to the strong semantic for inter-context
communication, events must be enqueued to another context's input
queue. Each context in effect has its own input queue and with the
context
handle the inter-context communication mechanism is defined. So to
communicate the client termination request from the spawned instance
running in a private context the ClearUserID event must be enqueued to
the main context where the appropriate listener is waiting.
Routing
(i.e. route Item(...)) is
still possible, but routed events stay within the boundaries on the
context where the Producer and it's corresponding Consumer reside.
To logically expand the example, multiple Consumers could reside
in the same context (i.e. a multi-cast design pattern as I described in
the previous revision of this example).
This example is designed to illustrate the simplicity of parallelism in
the
Apama EPL. With just a few simple statements, one can quickly and
easily leverage multi-core processor technologies for massive
scalability.
As I mentioned earlier this is the final entry for this specific
example, if you're just seeing this for the first time you can start
from the beginning (only three short segments) here.
I hope this has been informative and provided some insight into the
Apama EPL, I plan to have many more code examples in the future on
various use cases.
You can download the complete example here with the consumers,
interface and producer. Any questions or comments, just let me know,
Louie