There have been a couple
recent posts on Apama's Monitorscript
language, both here and here. To provide a bit more insight
into the Apama EPL, below is a working sample that
demonstrates a number of its capabilities. The language includes a declarative
nature for defining and registering listeners for specific event types and it
has a java-like syntax for imperative logic. The language provides a
balance between a recognizable vernacular and a purposed nature for event
processing.
Example narrative
Prior to an annotated
walk-thru of the code sample, I thought it would help to first explain its purpose
and what event streams it's processing. This simple example defines a work
dispatcher. It receives a request in the form of an event (AddSymbol)
to dispatch a discrete listener against an event stream of market depth (bids
and asks) events. This discrete listener processes the market depth for a
specific symbol. The actual work performed as it pertains to this example
is inconsequential and is represented by an empty method (processDepth).
Additionally, once a listener is dispatched it also listens for a request to
terminate.
The subtleness of this
example is its ability to leverage the simplicity of the Apama
EPL and the power of the runtime engine wherein it executes. Thousands or even
tens of thousands of listeners can be dispatched each running in its own
independent context processing its unique slice of the streaming market data.
In reality there are a
number of techniques that can be employed within the MonitorScript
EPL to accomplish this sort of work dispatcher. The EPL includes a spawn
operator which I've outlined in a previous blog. The spawn operator is the primary means for
establishing independent worker threads and is the basis for instance creation.
The example below focuses on event listeners to define discrete units of work.
1 package com.apamax.sample;
2 monitor ProcessMarket {
3
sequence <string> symbols; // contains list of symbols to process.
4 com.apama.marketdata.SubscribeDepth subDepth;
5 com.apama.marketdata.Depth adepth;
6
dictionary< string, string > emptyDict;
7 action onload {
8
// Listen for incoming AddSymbol events and
9
// add to symbols list if not already present
10
AddSymbol addSymbol;
11
on all AddSymbol(): addSymbol {
12
if symbols.indexOf(addSymbol.symbol) = -1 then {
13
string local_symbol := addSymbol.symbol;
14
symbols.append(local_symbol);
15 // Subscribe to this symbol
16
route com.apama.marketdata.SubscribeDepth("", "", local_symbol,
emptyDict );
17
// wait for 20.0 seconds, if no depth event received, terminate
18
listener waitListener;
19
waitListener := on wait(20.0) {
20
route RemoveSymbol(local_symbol);
21
}
22
listener depthListener;
23
depthListener := on all com.apama.marketdata.Depth(symbol=local_symbol):adepth {
24
waitListener.quit();
25
processDepth(adepth);
26
}
27
// Listen for RemoveSymbol events and remove from
symbols list,
28
// unsubscribe & quit
29
RemoveSymbol removeSymbol;
30
on RemoveSymbol(symbol=local_symbol): removeSymbol {
31
integer index := symbols.indexOf(removeSymbol.symbol);
32
if index != -1 then {
33
symbols.remove(index);
34
processRemove(removeSymbol.symbol);
35
36
// Unsubscribe to this symbol
37
route com.apama.marketdata.UnsubscribeDepth("", "",
removeSymbol.symbol,
emptyDict
);
38
depthListener.quit();
39
}
40
}
41
}
42
else {
43
log "Debug: Ignored (existing) Add Symbol Event = " + addSymbol.symbol;
44
}
45
}
46
}
47
action processDepth(com.apama.marketdata.Depth d) {
48
// Do something
49
}
50
action processRemove(string s) {
51
// Do something.
52
}
53
}
Example Annotation
In describing this
example, the first point to note is that the event definitions are not
included. For the sake of brevity they're assumed to be defined elsewhere.
Actually there are only a few anyway. They can be categorized into two logical
groups; control events (AddSymbol, RemoveSymbol, SubscribeDepth, UnsubscribeDepth) and data
events (Depth). This categorization is only for a semantic understanding
of the example, there is no such classification in the language. Additionally, Monitorscript has an easily recognizable syntax to anyone
schooled in Java, C++ and other classic languages.
A monitor (line 2)
defines the encapsulating block definition. Similar to a java class it is typically
scoped to a package name space (line 1). Monitors are the main block scope and
a typical Apama application is made up of many
monitors that interact with one another by sending and receiving events. Within
a monitor one can declare events, define actions (i.e. methods) and variables
(integers, strings, floats, etc.). This example defines a handful of
monitor-scoped variables. The language also supports a number of complex data
types; the sequence and dictionary both use a C++ template style
declaration to define array types and collection types respectively (lines 3
and 6).
The onload
action (line 7) is the main entry point of a monitor. When a monitor is loaded
into the runtime engine, it's onload action is
immediately invoked. This work dispatcher example is entirely implemented
within this action, for the sake of brevity it's a
simple way to describe the language. Line 10 defines an instance of an AddSymbol event and declares a listener for all occurrences
of this event type (line 11). The remainder of the functionality of this
example is scoped to the encapsulating block of this listener (lines 12 – 45).
This is an important note, since the intent is to receive and process multiple AddSymbol events (potentially 1,000's) where each AddSymbol will cause the invocation (dispatch) of a
discrete unit of work that is represented by this encapsulating block of code.
Within this block of code we communicate with other monitors and establish a
number of unique listeners for this unique symbol name.
The route statement
(line 16) sends a SubscribeDepth event. Route is the
standardized form of communication between monitors. Under the covers, the
route statement causes the event to be routed to be placed at the head of the
engine's input queue – thus become the next event to the processed by the
engine. Semantically, routing a SubscribeDepth
event starts the flow of Depth events for this symbol (i.e. local_symbol).
Lines 22-26 establish a listener to receive the stream of Depth events for this
symbol, calling the action processDepth upon
receipt of each one.
In addition to
establishing a Depth listener, this block of code also creates a wait timer in
lines 17-21. The purpose of this timer is to terminate this dispatched unit of
work for this unique symbol if we do not receive an initial Depth event within
20 seconds. Line 24 kills that wait listener once the Depth events start
flowing. Termination is handled by the RemoveSymbol
listener declared at line 30. Note that since it will be executing within the
context of a specific symbol's unit of work we're only interested in receiving
a single occurrence of RemoveSymbol. This is
specified in the on statement – sans the all modifier. Upon
receipt of a RemoveSymbol event we unsubscribe,
remove the symbol's entry from the list and terminate (i.e. quit) the Depth
listener for this symbol. Like AddSymbol, RemoveSymbol control events can arrive from another
monitor or a client connected to the runtime engine.
I hope this simple example
sheds light on the simplicity, elegance and power of the Apama
Montorscript EPL.
Post Script …
After posting this blog, one of my esteemed colleagues with a much better
command of the Monitorscript language offered a few
refinements to avoid the need to manually handle termination (i.e. lines 17 –
21 in the code snippet). It does add one new control event - Terminate, but it avoids the need to use listener
variables.
on com.apama.marketdata.Depth(symbol=local_symbol):adepth and not wait (20.0) {
on
all com.apama.marketdata.Depth(symbol=local_symbol):adepth and not
Terminate(local_symbol) {
processDepth(adepth);
}
}
...
on RemoveSymbol(symbol=local_symbol):removeSymbol {
...
route Terminate(removeSymbol.symbol);
}
This enhancement shows the
declaration of complex (or compound) listeners against multiple event streams
(and a timeout condition) concurrently. This is a commonly used technique in MonitorScript – and clearly quite powerful.