Introduction
Apache Geode provides a Delta interface that facilitates serializing the changes to an object between two JVMs rather than the entire object when changes are made to that object. For large objects, this provides an optimization that is supported from:
- clients to servers
- servers to servers in the same DistributedSystem
- servers to clients
Sending Deltas from servers in one DistributedSystem to servers in another (e.g. two WAN sites) is not supported. Currently, each event sent between the DistributedSystems contains the entire object. Normally, objects are stored in Regions as byte arrays. For Deltas, that is not the case. Instead, Deltas are represented as fully-deserialized objects. When a change to a Delta is received, it is applied to the in-memory object. Combine this with the fact that sending Deltas between DistributedSystems is not supported, and that means the entire object is serialized each time it is updated in the sending DistributedSystem and deserialized in the receiving one. Since Deltas are mainly used for objects that can grow very large (like sessions), this can be inefficient.
This article describes a way to send the delta bytes between the DistributedSystems instead of sending the entire object bytes.
Architecture
For this implementation, each event travels the path below between a client in the sending DistributedSystem and a server in the receiving DistributedSystem:
- A client does a put operation on the data Region
- The full or delta bytes of the object are sent from the client to a server depending on whether the operation is a create or update
- The CacheWriter on the data Region in the server sets the GatewaySender queue key
- The CacheListener on the data Region in the server creates an EntryEvent on the proxy Region containing the appropriate bytes (either delta or full) and distributes that event to the GatewaySender attached to the proxy Region
- The GatewaySender attached to the proxy Region sends the event to a server in the receiving DistributedSystem
- A GatewayReceiver in a server on the receiving DistributedSystem receives the event
- The CacheWriter in the proxy Region puts the appropriate bytes (either delta or full) into the data Region
Note: The GatewaySender
and GatewayReceiver
in these steps actually encompass several different objects.
This diagram shows the architecture of this implementation with one server in each DistributedSystem for simplicity:
Region Configuration
The Region configuration of the above architecture looks like this in XML:
Data Region
<region name="Trade" refid="PARTITION_REDUNDANT">
<region-attributes>
<cache-writer>
<class-name>example.server.callback.GatewaySenderDeltaCacheWriter</class-name>
</cache-writer>
<cache-listener>
<class-name>example.server.callback.GatewaySenderDeltaCacheListener</class-name>
</cache-listener>
</region-attributes>
</region>
Proxy Region
<region name="Trade_gateway_sender_delta_proxy" refid="PARTITION_REDUNDANT">
<region-attributes gateway-sender-ids="ny">
<partition-attributes colocated-with="/Trade" redundant-copies="1"/>
<cache-writer>
<class-name>example.server.callback.GatewaySenderProxyCacheWriter</class-name>
</cache-writer>
</region-attributes>
</region>
Caveats
There are a few caveats to this implementation:
- The receiving DistributedSystem must have the full object to be able to apply the delta bytes so both DistributedSystems must start from the same state (either both empty or one a copy of the other).
- The receiving DistributedSystem proxy Region stores the most recent bytes for each key.
- Eviction cannot be enabled for the proxy Region. If it is enabled and an entry is evicted, a destroy event received from the sending DistributedSystem is ignored.
Implementation
All source code described in this article as well as an example usage is available here.
The implementation consists of the following three CacheCallback classes:
- A GatewaySenderDeltaCacheWriter attached to the data Region
- A GatewaySenderDeltaCacheListener attached to the data Region
- A GatewaySenderProxyCacheWriter attached to the proxy Region
GatewaySenderDeltaCacheWriter
The GatewaySenderDeltaCacheWriter process method:
- initializes the tail key in the event
- sets the tail key as the callback argument of the event
The tail key is the key in the GatewaySender queue. In normal GatewaySender-enabled
regions, the tail key is initialized by the primary BucketRegion’s handleWANEvent method. It is then replicated to redundant servers. Since the data Region in this case is not GatewaySender-enabled
, this doesn’t happen. Once the tail key is initialized in the event, it is set into the callback argument. This is done because the tail key is only replicated between servers in GatewaySender-enabled
Regions. It is ignored in non-GatewaySender-enabled
Regions.
private void process(EntryEvent event) {
EntryEventImpl eei = (EntryEventImpl) event;
if (!isFromRemoteWANSite(eei)) {
// Update the tailKey (which is the key in the queue)
// The tailKey is set by handleWANEvent in the event in the primary.
// It won't be called in this case since the data region is not WAN-enabled.
setTailKey(eei);
// Set the callback argument since the tail key is not serialized between members
// if the region is not wan-enabled.
eei.setCallbackArgument(eei.getTailKey());
}
}
The GatewaySenderDeltaCacheWriter setTailKey
method invokes the BucketRegion’s handleWANEvent
method to set the tail key.
private void setTailKey(EntryEventImpl event) {
PartitionedRegion pr = (PartitionedRegion) getProxyRegion(event.getRegion());
BucketRegion br = pr.getBucketRegion(event.getKey());
br.handleWANEvent(event);
}
GatewaySenderDeltaCacheListener
The GatewaySenderDeltaCacheListener process method:
- gets the co-located proxy Region
- creates an
EntryEvent
using the proxy Region and inputEntryEvent
- retrieves the proxy Region’s GatewaySenders
- distributes the
EntryEvent
to each GatewaySender
private void process(EntryEvent event) {
EntryEventImpl eei = (EntryEventImpl) event;
if (!isFromRemoteWANSite(eei)) {
// Get the GatewaySender proxy region
PartitionedRegion proxyRegion = (PartitionedRegion) getProxyRegion(event.getRegion());
// Create the appropriate event
EntryEventImpl proxyEvent = createProxyEntryEvent(eei, proxyRegion);
// Add the event to any GatewaySender queues
deliverToGatewaySenderQueues(proxyEvent);
}
}
The GatewaySenderDeltaCacheListener createProxyEntryEvent
method creates the EntryEvent
on the proxy Region.
The EntryEvent
contains:
- the Operation (CREATE, UPDATE, DESTROY)
- the proxy Region
- the key
- the value bytes if CREATE (full object) or UPDATE (delta)
- a boolean callback argument denoting whether the bytes are delta or full
- the tail key generated by the GatewaySenderDeltaCacheWriter
- the originating DistributedMember
- the originating EventID
- the originating ClientProxyMembershipID
- the originating VersionTag
private EntryEventImpl createProxyEntryEvent(EntryEventImpl event, PartitionedRegion proxyRegion) {
byte[] newValue = null;
boolean isDelta;
Operation operation;
if (event.getDeltaBytes() != null) {
newValue = event.getDeltaBytes();
operation = Operation.UPDATE;
isDelta = true;
} else if (event.getCachedSerializedNewValue() != null) {
newValue = event.getCachedSerializedNewValue();
operation = Operation.CREATE;
isDelta = false;
} else {
operation = Operation.DESTROY;
isDelta = false;
}
EntryEventImpl proxyEvent = EntryEventImpl.create(proxyRegion, operation, event.getKey(), newValue, isDelta /*callbackArg*/, event.isOriginRemote(), event.getDistributedMember(), false /* generateCallbacks */, event.getEventId());
proxyEvent.setContext(event.getContext());
proxyEvent.setVersionTag(event.getVersionTag());
proxyEvent.setTailKey((Long) event.getCallbackArgument());
return proxyEvent;
}
The GatewaySenderDeltaCacheListener deliverToGatewaySenderQueues
method retrieves the proxy Region’s GatewaySenders and distributes the event to each one.
private void deliverToGatewaySenderQueues(EntryEventImpl wanEvent) {
Region region = wanEvent.getRegion();
Cache cache = region.getCache();
Set<String> senderIds = region.getAttributes().getGatewaySenderIds();
for (String senderId : senderIds) {
// Get the GatewaySender
AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
// Distribute the EntryEvent to the GatewaySender
sender.distribute(getEnumListenerEvent(wanEvent.getOperation()), wanEvent, getRemoteDsIds(cache, senderIds));
}
}
GatewaySenderProxyCacheWriter
The GatewaySenderProxyCacheWriter process method:
- gets the co-located data Region
- invokes the
LocalRegion
basicBridgePut or basicBridgeDestroy method depending on theEntryEvent
’s operation and boolean callback argument. The basicBridgePut method is invoked with either the full bytes or delta bytes from the inputEntryEvent
.
private void process(EntryEvent event) {
EntryEventImpl eei = (EntryEventImpl) event;
if (isFromRemoteWANSite(eei)) {
byte[] newValue = (byte[]) eei.getNewValue();
Operation operation = event.getOperation();
boolean callbackArg = (Boolean) event.getCallbackArgument();
if (event.getOperation().isDestroy()) {
getDataRegion(event.getRegion()).basicBridgeDestroy(event.getKey(), eei.getRawCallbackArgument(), eei.getContext(), false, getClientEvent(eei));
} else {
Object value = null;
byte[] deltaBytes = null;
boolean isObject = false;
if (callbackArg) {
deltaBytes = (byte[]) eei.getNewValue();
} else {
value = eei.getNewValue();
isObject = true;
}
getDataRegion(event.getRegion()).basicBridgePut(event.getKey(), value, deltaBytes, isObject, eei.getRawCallbackArgument(), eei.getContext(), false, getClientEvent(eei));
}
}
}
Future
The GatewaySenderEventImpl represents an event being sent between two DistributedSystems. It needs to be modified to be able to store the delta bytes in the sending DistributedSystem, and the GatewayReceiverCommand should be modified to be able to apply those delta bytes in the receiving DistributedSystem.
In addition, the sending DistributedSystem currently has no knowledge of the state of the objects in the receiving DistributedSystem. This has to be changed so that the sending DistributedSystem knows when it must send the full bytes rather than the delta bytes in the case where the receiving DistributedSystem doesn’t have the full object.
One potential way to do this is to modify the AbstractGatewaySenderEventProcessor. The AbstractGatewaySenderEventProcessor creates GatewaySenderEventImpls
, builds batches of these and causes them to be sent to the receiving DistributedSystem. It could be modified to track which objects in the receiving DistributedSystem require full object bytes rather than the delta bytes. This can be done by tracking the time when the connection to the receiving DistributedSystem was made and also the last time the full object bytes were sent for each entry. If the entry time is before the connection time, the full bytes would be resent; otherwise the delta bytes would be sent. From the sending DistributedSystem’s perspective, if no connection can be made to any server in the receiving DistributedSystem, it is down. When it comes back up (when the sending DistributedSystem can connect to it), it could potentially be a brand new DistributedSystem. The sending DistributedSystem would have no knowledge of this, so the full bytes would have to be sent.
Another potential way to do this is to modify the GatewayAck and GatewaySenderEventRemoteDispatcher. The GatewayAck
is the acknowledgement returned from the receiving DistributedSystem for each batch of GatewaySenderEventImpls
. The GatewaySenderEventRemoteDispatcher
process the GatewayAcks
. The GatewayAck
currently contains among other fields a collection of exceptions that occur while processing the batch on the receiving DistributedSystem. The collection could be modified to contain an InvalidDeltaException
for each entry that doesn’t exist on the remote DistributedSystem. For each one, the GatewaySenderEventRemoteDispatcher
in the sending DistributedSystem could be modified to create and enqueue a GatewaySenderEventImpl with the full bytes.