Introduction
An Apache Geode AsyncEventQueue is used to asynchronously process events after they have been applied to a Region. They are normally used to replay Region events into a relational database or other remote data store. Other use cases want to take advantage of asynchronously processing events in parallel without actually storing entries in a Region. In these cases, each event just needs to be routed directly to the AsyncEventQueue. This behavior is effectively possible with serial AsyncEventQueues and replicated Regions. All servers can define a Region as a REPLICATE_PROXY and operations are allowed on that Region. Events go through the Region without being applied to it and are delivered to the serial AsyncEventQueue. The same cannot be done with parallel AsyncEventQueues and partitioned Regions. If all servers define a Region as PARTITION_PROXY, an operation on that Region will fail with a PartitionedRegionStorageException.
This article shows how to route events directly to a parallel AsyncEventQueue using Functions.
Parallel AsyncEventQueue Architecture
Normal Usage With Region Operations
Normally, events get delivered to AsyncEventsQueues as a result of Region operations like create, update and destroy. Here is a simplified diagram of that architecture:
Usage With Function Invocations
An alternate architecture bypasses the Region operation and replication in the diagram above and instead uses Function invocations to route the data between the primary and redundant servers. Here is a simplified diagram of that architecture:
Implementation
All of the code for this example is here.
The implementation consists mainly of a PrimaryRoutingFunction and a SecondaryRoutingFunction.
The PrimaryRoutingFunction:
- Retrieves the routing partitioned Region
- Creates an EntryEventImpl with the Region, key, value and EventID as parameters
** Uses the key’s BucketRegion to set the EntryEventImpl’s tail key (which is the key in the AsyncEventQueue queue Region) - Gets the redundant DistributedMembers for the key
- Invokes the SecondaryRoutingFunction on those DistributedMembers with the Region name, key, value, EventID and tail key as arguments
- Gets the (primary) AsyncEventQueue for the routing Region
- Gets the AsyncEventQueue’s GatewaySender
- Distributes the EntryEventImpl to the GatewaySender
The SecondaryRoutingFunction:
- Retrieves the routing partitioned Region
- Creates an EntryEventImpl with the Region, key, value, EventID and tail key as parameters
- Gets the (secondary) AsyncEventQueue for the routing Region
- Gets the AsyncEventQueue’s GatewaySender
- Distributes the EntryEventImpl to the GatewaySender
The BaseFunction provides methods common to both functions.
The RoutingAsyncEventListener is an AsyncEventListener that processes AsyncEvents by logging and counting them.
Create EntryEventImpl
An EntryEventImpl is created by both functions. It represents the Region operation and is delivered to the AsyncEventQueue.
The createEvent method:
- creates the EntryEventImpl with the Region, key, value and EventID as parameters
- sets the tail key (in the constructor if secondary; through the BucketRegion if primary)
protected EntryEventImpl createEvent(PartitionedRegion pr, Object key, Object value,
EventID eventId, long tailKey) {
// Create the EntryEventImpl
EntryEventImpl event = this.entryEventFactory
.create(pr, Operation.CREATE, key, value, null, true, pr.getCache().getMyId(), false, eventId);
// Set the event's tail key. If the input tailKey == -1, then this is the primary.
// The tailKey is set directly in the event in the secondary.
if (tailKey != -1) {
event.setTailKey(tailKey);
}
// The tailKey is set by handleWANEvent in the event in the primary.
// handleWANEvent also updates the BucketRegion's most recent tail key.
pr.getBucketRegion(key).handleWANEvent(event);
return event;
}
Invoke SecondaryRoutingFunction
The PrimaryRoutingFunction invokes the SecondaryRoutingFunction on any redundant DistributedMembers. This invocation replaces the Region event replication in the normal usage. The routeToSecondaryMembers method:
- gets the redundant DistributedMembers for the key
- invokes the SecondaryRoutingFunction on those redundant DistributedMembers
private void routeToSecondaryMembers(PartitionedRegion pr, EntryEventImpl event) {
// Get the redundant members for this key
Set<DistributedMember> redundantMembers = PartitionRegionHelper
.getRedundantMembersForKey(pr, event.getKey());
// Create and execute the SecondaryRoutingFunction on those members if necessary
if (!redundantMembers.isEmpty()) {
Object[] args = new Object[] {
pr.getFullPath(), event.getKey(), event.getValue(), event.getEventId(), event.getTailKey()
};
try {
FunctionService.onMembers(redundantMembers)
.setArguments(args)
.execute("SecondaryRoutingFunction")
.getResult();
} catch (FunctionException e) {
// If the remote member is killed, a FunctionException is thrown. Log a warning and continue.
pr.getCache().getLogger()
.warning("PrimaryRoutingFunction caught exception executing SecondaryRoutingFunction for key="
+ event.getKey() + "; value=" + event.getNewValue(), e);
}
}
}
Deliver EntryEventImpl to AsyncEventQueue
The EntryEventImpl is delivered to the AsyncEventQueue by both functions. The deliverToAsyncEventQueue method:
- gets the AsyncEventQueue for the event’s Region
- gets the AsyncEventQueue’s GatewaySender
- distributes the event to the GatewaySender
protected void deliverToAsyncEventQueue(EntryEventImpl event) {
// Get the AsyncEventQueue
String queueId = (String) event.getRegion().getAsyncEventQueueIds().iterator().next();
AsyncEventQueueImpl queue = (AsyncEventQueueImpl) event.getRegion()
.getCache().getAsyncEventQueue(queueId);
// Get the GatewaySender
AbstractGatewaySender sender = (AbstractGatewaySender) queue.getSender();
// Distribute the EntryEvent to the GatewaySender
sender.distribute(EnumListenerEvent.AFTER_CREATE, event, REMOTE_DS_IDS);
}
Server Logging Output
Normal Usage
In normal usage, the PrimaryRoutingFunction and RoutingAsyncEventListener in the primary server will log messages like:
[info 2020/08/23 13:12:13.750 HST <ServerConnection on port 51965 Thread 1> tid=0x5b] PrimaryRoutingFunction processing args= [PDX[14273398,example.client.domain.Trade]{id=2}, EventID[id=31 bytes;threadID=1;sequenceID=2]]
[info 2020/08/23 13:12:13.762 HST <ServerConnection on port 51965 Thread 1> tid=0x5b] PrimaryRoutingFunction processing args= [PDX[14273398,example.client.domain.Trade]{id=3}, EventID[id=31 bytes;threadID=1;sequenceID=3]]
[info 2020/08/23 13:12:13.789 HST <ServerConnection on port 51965 Thread 1> tid=0x5b] PrimaryRoutingFunction processing args= [PDX[14273398,example.client.domain.Trade]{id=8}, EventID[id=31 bytes;threadID=1;sequenceID=8]]
[info 2020/08/23 13:12:13.797 HST <ServerConnection on port 51965 Thread 1> tid=0x5b] PrimaryRoutingFunction processing args= [PDX[14273398,example.client.domain.Trade]{id=9}, EventID[id=31 bytes;threadID=1;sequenceID=9]]
[info 2020/08/23 13:12:14.412 HST <Event Processor for GatewaySender_AsyncEventQueue_routing_aeq_0> tid=0x47] RoutingAsyncEventListener: Processing 4 events (total=4; possibleDuplicate=0)
key=2; value=PDX[14273398,example.client.domain.Trade]{id=2}; possibleDuplicate=false
key=3; value=PDX[14273398,example.client.domain.Trade]{id=3}; possibleDuplicate=false
key=8; value=PDX[14273398,example.client.domain.Trade]{id=8}; possibleDuplicate=false
key=9; value=PDX[14273398,example.client.domain.Trade]{id=9}; possibleDuplicate=false
The SecondaryRoutingFunction in the secondary servers will log messages like:
[info 2020/08/23 13:12:13.756 HST <Function Execution Processor2> tid=0x3d] SecondaryRoutingFunction processing args= [/Router, 2, PDX[14273398,example.client.domain.Trade]{id=2}, EventID[id=31 bytes;threadID=1;sequenceID=2], 163]
[info 2020/08/23 13:12:13.763 HST <Function Execution Processor2> tid=0x3d] SecondaryRoutingFunction processing args= [/Router, 3, PDX[14273398,example.client.domain.Trade]{id=3}, EventID[id=31 bytes;threadID=1;sequenceID=3], 164]
[info 2020/08/23 13:12:13.790 HST <Function Execution Processor2> tid=0x3d] SecondaryRoutingFunction processing args= [/Router, 8, PDX[14273398,example.client.domain.Trade]{id=8}, EventID[id=31 bytes;threadID=1;sequenceID=8], 169]
[info 2020/08/23 13:12:13.799 HST <Function Execution Processor2> tid=0x39] SecondaryRoutingFunction processing args= [/Router, 9, PDX[14273398,example.client.domain.Trade]{id=9}, EventID[id=31 bytes;threadID=1;sequenceID=9], 170]
Killed Server
If a server is killed while routing events, the server logs will contain messages like below.
In this example, the last log messages from the PrimaryRoutingFunction in the killed server were:
[info 2020/08/23 14:17:05.681 HST <ServerConnection on port 56033 Thread 1> tid=0x5a] PrimaryRoutingFunction processing args= [PDX[8290614,example.client.domain.Trade]{id=756}, EventID[id=31 bytes;threadID=1;sequenceID=756]]
[info 2020/08/23 14:17:05.691 HST <ServerConnection on port 56033 Thread 1> tid=0x5a] PrimaryRoutingFunction processing args= [PDX[8290614,example.client.domain.Trade]{id=758}, EventID[id=31 bytes;threadID=1;sequenceID=758]]
[info 2020/08/23 14:17:05.709 HST <ServerConnection on port 56033 Thread 1> tid=0x5a] PrimaryRoutingFunction processing args= [PDX[8290614,example.client.domain.Trade]{id=764}, EventID[id=31 bytes;threadID=1;sequenceID=764]]
[info 2020/08/23 14:17:05.717 HST <ServerConnection on port 56033 Thread 1> tid=0x5a] PrimaryRoutingFunction processing args= [PDX[8290614,example.client.domain.Trade]{id=766}, EventID[id=31 bytes;threadID=1;sequenceID=766]]
[info 2020/08/23 14:17:05.742 HST <ServerConnection on port 56033 Thread 1> tid=0x5a] PrimaryRoutingFunction processing args= [PDX[8290614,example.client.domain.Trade]{id=769}, EventID[id=31 bytes;threadID=1;sequenceID=769]]
[info 2020/08/23 14:17:05.757 HST <ServerConnection on port 56033 Thread 1> tid=0x5a] PrimaryRoutingFunction processing args= [PDX[8290614,example.client.domain.Trade]{id=772}, EventID[id=31 bytes;threadID=1;sequenceID=772]]
This server was killed before the RoutingAsyncEventListener could process these six events.
Of these six events, three of them were processed by the SecondaryRoutingFunction in one redundant server. Once the server was killed, these events were processed as possible duplicates by the RoutingAsyncEventListener:
[info 2020/08/23 14:17:05.682 HST <Function Execution Processor2> tid=0x3a] SecondaryRoutingFunction processing args= [/Router, 756, PDX[8290614,example.client.domain.Trade]{id=756}, EventID[id=31 bytes;threadID=1;sequenceID=756], 990]
[info 2020/08/23 14:17:05.692 HST <Function Execution Processor2> tid=0x3a] SecondaryRoutingFunction processing args= [/Router, 758, PDX[8290614,example.client.domain.Trade]{id=758}, EventID[id=31 bytes;threadID=1;sequenceID=758], 766]
[info 2020/08/23 14:17:05.718 HST <Function Execution Processor2> tid=0x3a] SecondaryRoutingFunction processing args= [/Router, 766, PDX[8290614,example.client.domain.Trade]{id=766}, EventID[id=31 bytes;threadID=1;sequenceID=766], 908]
[info 2020/08/23 14:17:11.505 HST <Event Processor for GatewaySender_AsyncEventQueue_routing_aeq_0> tid=0x45] RoutingAsyncEventListener: Processing 5 events (total=266; possibleDuplicate=3)
key=758; value=PDX[8290614,example.client.domain.Trade]{id=758}; possibleDuplicate=true
key=766; value=PDX[8290614,example.client.domain.Trade]{id=766}; possibleDuplicate=true
key=756; value=PDX[8290614,example.client.domain.Trade]{id=756}; possibleDuplicate=true
The other redundant server processed the other three events in the same way:
[info 2020/08/23 14:17:05.710 HST <Function Execution Processor2> tid=0x37] SecondaryRoutingFunction processing args= [/Router, 764, PDX[8290614,example.client.domain.Trade]{id=764}, EventID[id=31 bytes;threadID=1;sequenceID=764], 793]
[info 2020/08/23 14:17:05.744 HST <Function Execution Processor2> tid=0x37] SecondaryRoutingFunction processing args= [/Router, 769, PDX[8290614,example.client.domain.Trade]{id=769}, EventID[id=31 bytes;threadID=1;sequenceID=769], 572]
[info 2020/08/23 14:17:05.758 HST <Function Execution Processor2> tid=0x37] SecondaryRoutingFunction processing args= [/Router, 772, PDX[8290614,example.client.domain.Trade]{id=772}, EventID[id=31 bytes;threadID=1;sequenceID=772], 822]
[info 2020/08/23 14:17:11.490 HST <Event Processor for GatewaySender_AsyncEventQueue_routing_aeq_0> tid=0x45] RoutingAsyncEventListener: Processing 4 events (total=266; possibleDuplicate=3)
key=769; value=PDX[8290614,example.client.domain.Trade]{id=769}; possibleDuplicate=true
key=764; value=PDX[8290614,example.client.domain.Trade]{id=764}; possibleDuplicate=true
key=772; value=PDX[8290614,example.client.domain.Trade]{id=772}; possibleDuplicate=true
Future
Two useful additions to Apache Geode would be:
- Allowing configuration of a PARTITION_PROXY Region on all members into which operations can be done
- An API to deliver events directly to AsyncEventQueues