Warning
|
This document has been superseded by Non-Blocking State Transfer V2 |
This document describes the changes we need to make in order to implement non-blocking state transfer on top of our current push-based state transfer (ISPN-1424).
I’m calling state transfer as the process of adjusting the cache’s internal state in response to a view change.
-
The view change can be a join, a leave, a merge, or any combination of joins, leaves and merges.
-
Since we support asymmetric clusters, we deal with each cache independently. So the view changes I’m talking about are "cache view changes" and not JGroups view changes.
The goals of the non-blocking state transfer design are:
-
Minimize the interval(s) where the entire cluster can’t respond to requests because of a state transfer in progress.
-
Minimize the interval(s) where an existing node stops responding to requests because of a state transfer in progress.
-
Allow the performance of the cluster to drop during state transfer, but it should not throw any exception
-
Don’t add a mechanism for resolving data conflicts after a merge, but make sure it will be feasible to add one in the future.
In distributed caches, the cache state that we need to disseminate between members consists of several components:
-
The cache membership
-
The consistent hash
-
The actual cache entries
-
Current transactions + acquired locks
-
The L1 entries + requestors information (only with L1 enabled)
-
In-doubt transactions information (only with recovery enabled)
In replicated caches, since every member has the same information, there is no L1 and state can only be transferred to a joiner. However, we want every member to send a portion of the state, so that the coordinator isn’t stressed a lot more than the other nodes. So we will use a consistent hash even for replicated caches.
In invalidation-mode caches, there isn’t even any state to transfer, and transactions never leave the originator.
In the following sections I will focus on transactional DIST_SYNC caches with synchronous commit. At the end I will discuss how other cache modes differ from DIST_SYNC.
-
The basic flow of state transfer will be like this:
-
The user starts a cache on a node. The node sends a join request to the coordinator, containing the consistent hash configuration.
-
The JGroups coordinator sees a change in the cache membership and computes a new consistent hash that doesn’t require any state transfer (it doesn’t change key ownership).
-
The coordinator broadcasts the new cache view and the consistent hash to all the cache members.
-
Each node installs the new consistent hash, and all commands use the new consistent hash.
-
A separate thread on the coordinator notices that the consistent hash is not balanced, so it computes and broadcasts a new "pending" consistent hash to all the cache members. This pending consistent adds new owners to make it more balanced, but does not change the existing owners or their order.
-
Each node receives the "pending" consistent hash and starts using it for write operations.
-
Read operations still go through the current consistent hash.
-
Remote write/tx commands using the old consistent hash are forwarded to the new owners.
-
-
Each node computes the hash wheel segments that it owns in the pending consistent hash and their owners in the old consistent hash.
-
For each segment of the hash wheel, it asks one of the current owners for their part of the data, transaction table etc.
-
The pusher node only starts sending after it has installed the pending consistent hash for writing.
-
-
After a node has received everything it needs from the current owners, it signals the coordinator.
-
After the coordinator received the confirmation from all the cache members, it computes and broadcasts a new, balanced, consistent hash based on the pending consistent hash, but without the extra owners.
-
Each node installs the new consistent hash and removes the data it no longer owns.
-
The joiners have to send their CH configuration to the coordinator because the cache may not be defined on the coordinator, and each cache has its own number of segments, number of owners, etc.
-
We always compute the CH on the coordinator because (CH({A} + {B}) + {C} is not guaranteed to be the same as CH({A} + {B, C}., and we don’t want to worry about all the members processing the exact same view updates in exactly the same order.
-
In the general case, we require nodes to forward requests to the new owners at step 4. This is because, numOwners nodes can leave at the same time, and so the coordinator has to assign new owners for some of the keys (no state transfer required, though).
-
A state transfer should never be interrupted; instead, if one of the members leaves the cache, the others will adjust their consistent hashes and will ask another node for that data.
If we have a merge, or the coordinator leaves the JGroups cluster, the new coordinator will gather the state of all the nodes and will continue the state transfer normally if any of the partitions are in the middle of state transfer. After all the partitions are "at rest", it will start a new state transfer in which the "pending" CH is a union between all the partitions' current CHs.
In earlier versions of Infinispan, a key property of the consistent hash function was that it was deterministic: each member could determine the ownership of a key on its own, without having to communicate with the other members.
This has changed a bit with the introduction of asymmetric caches in 5.1: since a cache is not necessarily running on all the members of the JGroups view, the members need to communicate with the coordinator in order to receive the list of members for each cache. During state transfer, nodes need to receive the previous cache view membership as well, especially when joining and merging partitions. And since state transfer can be interrupted, the coordinator needs to send out the full history of the cluster since the last "steady state" as a tree of cache views. (The tree degenerates into a list if we don’t have any merges.)
When the coordinator changes (because of a merge or because the old coordinator died), the new coordinator will have to query all the nodes in the cluster for their version of this history and merge these histories into a single tree of cache views that will be sent to everyone.
To determine the current owner of a key, a node will always use the latest cache view it had received from the coordinator. However, in order to determine if it should push a key to the new owners, the node will have to check if it was a "pushing owner" (i.e. the last backup in the old cache view that is still a member in the latest cache view) in each of the old cache views in the cache view tree it has received from the coordinator. Likewise, a new owner that hasn’t received the value of a key yet has to ask the key owners in all the cache views of the tree for the value.
Note: We don’t have a strategy for handling data conflicts after a merge yet. We do, however, expect to allow data conflict resolution in future versions. So what we do want to enforce now is that all the new owners of a key see the same set of values after a merge, making an eventual conflict resolution policy deterministic. Another requirement for a proper conflict resolution policy would be tombstones for deleted keys, but we won’t discuss that here.
In the figure above, C did not detect the network partition, and the cache view tree looks like this: v4 {A, B, C, D} → [v3 {A, B, D}, v1 {A, B, C}]. A and B appear on both branches of the tree, but that doesn’t cause problems:
-
C did not detect the network partition, so it will only push a small amount of (old?) state to D. Note that, if the cache was async or numOwners==1, C could have modified its values without detecting the network partition, and in those scenarios we will lose data. But that’s ok because we don’t make any persistence guarantees in those scenarios.
-
A and B, on the other hand, will see the ancient (for them) view v1 {A, B, C} and will try to push data to D again. Again, not a problem, because they already finished installing v3 {A, B, D}, so they don’t have any entries to actually push to D.
Each node will iterate over its data container and push the entries for which it was the "pushing owner" in one of the views in the cache view tree to their new owners in the latest cache view. The pushing is done in batches, and each batch is sent as a StateTransferControlCommand{APPLY_STATE}. When state transfer is completed, each node will again iterate over its data container and remove (or move to L1) all the entries that are no longer local.
The implicit data container iterator behaves as a snapshot: changes made after the start of the iteration are not guaranteed to be visible through the iterator. So we must make sure that if the node is no longer an owner for a key, we don’t write it to the data container (except as an L1 entry). This is the main reason we tag all commands with the current view id on the originator and we throw an exception (StaleViewIdException?) whenever we detect a command tagged with an old view id. The exception can be thrown both on the originator and on a remote node, but only the originator will catch the exception and retry the command.
During state transfer, there can be more than numOwners copies of an entry in the entire cluster. In fact, since the old owners keep their copy of the data until the coordinator confirmed the state transfer success, it’s almost guaranteed that there will be > numOwners copies of each entry. However, a write command will update the value only on the latest cache view’s owners, and it will invalidate the key on all the other owners - just like they were L1 entries. Because of this invalidation, we can be sure that all the copies of an entry that still exist in the cluster have the same value (as long as we don’t have any merges in the cache views tree, but we don’t care about merges yet). That means we don’t have to keep track of when a particular entry has been modified - we can just do a putIfAbsent, assuming that whatever value we have locally on the new owners is the right one.
We can still have some extra network traffic and serialization/deserialization work because we only invalidate old copies on a write. If state transfer keeps beeing interrupted, the owners of a key in an "intermediate" view don’t know if they have invalidated the key on the previous owner or not, so everyone who does have a copy of the key will send it to the new owners. This is still better than keeping a "transaction log" for each view id.
If a new owner receives a get command (including an implicit get for a put) before it has received that entry from an old owner, it can make a get request to the old owners (without checking the ownership of the key). The entry is then stored on the new owner, and the value that arrives via the APPLY_STATE command will be ignored.
If a key is removed on the new owner, we’ll need a tombstone so that we don’t resurrect it when we receive the old entry via the APPLY_STATE command. We can clean up these tombstones when state transfer ends (unlike the tombstones we’d need for conflict resolution).
When fetchPersistentState==true
, nodes will push persistent cache entries as well as in-memory cache entries, using exactly the same algorithm.
One catch is that the cache store might be local and purgeOnStartup could be false, meaning joiners may have some (different) persistent state as well. We could consider this as a merge, because we don’t know the shutdown order and so the joiner could actually have newer data than the existing members. However, we don’t handle merge conflict resolution yet, so we should instead follow the solution described in ISPN-1586 and always clear the cache store on joiners.
We’ll add a new notification for individual keys being moved from one node to another. The new event type will be something like this:
public interface EntryMovedEvent {
CacheEntry getEntry();
boolean isArrival();
}
The listener annotated with the @EntryMoved annotation will be called on the new owner for every entry it receives with an APPLY_STATE command, even if the entry already exists on the new owner. This allows us to simplify the implementation, but it means that the user code has to handle duplicate events.
On the old owners, the listener will be called (with isArrival() returning false) as the entries are invalidated at the end of state transfer. So we cannot have duplicate leave notifications.
With Infinispan’s single-node locking scheme, we only keep the key lock on one node: the primary owner. On all the backup owners, we only keep a list of the keys that each transaction attempted to lock.
When the primary owner dies and one of the backup owners becomes primary owner, it treats all the keys that any transaction attempted to lock as already locked (pottentially by multiple transactions at the same time). There are three possibilities:
-
If the transaction prepare did return with success or failure on the old primary node, the new primary owner will also receive a commit command at some point in the future. When all the transactions that attempted to lock a key have been committed or rolled back, the key will be unlocked.
-
If the primary owner died before responding to the prepare command, the originator will see a SuspectException and will retry the command on the new primary owner.
-
If the originator died as well, all the new owners will drop the transaction. That is, unless recovery is enabled, but we’ll consider that separately.
If a new node joins the cache, it may become primary owner for keys that have been already locked on the old primary owner. In order to keep the keys locked on the new primary owner as well, it will have to block any lock acquisition until it has received the lock information from the old owners. The 'pushing owner' of a key will push all transactions that touch that key. We have four cases:
-
The originator only sent the prepare before the state transfer started, and every owner received the . . The new primary owner receives the transaction information and then it will handle a commit/rollback command as if it was a backup owner in the previous view. Same as case 1, but the 'pushing owner' didn’t receive the prepare command yet. It will not push the transaction to the new primary owner, and it will reply to the originator with a "must retry" response (because the command has an old view id). The originator then retries the prepare on the new owners before returning to the TM.
-
The originator already sent the commit/rollback command to the previous owners, but the 'pushing owner' did not receive it yet - so it still has the transaction in the transaction table and will send it to the new primary owner during state transfer. It will then reply to the commit/rollback command with a "must retry" response, and the originator will resend the commit to the new owners.
-
Same as case 3, but the originator dies as well before it can resend the commit command. The transaction will be discarded on the new primary owner, but it is already committed on the rest of the owners - so we have the potential for data inconsistency.
If the transaction touches multiple keys, we can have multiple 'pushing owners', one for each key. So the new primary owner can receive the transaction multiple times, but we can generalize the other cases: if we receive a transaction from at least one 'pushing owner', it means that node will afterwards reject any commit command. So we know the originator will retry on the proper owner (as long as it is still alive).
Regarding the data inconsistency possibility described in case 4, I believe this can happen even if the primary owner of the key did not change. Since we use unicasts in distributed mode, JGroups can’t guarantee atomic delivery for the commit message. So when one of the owners discards the commit command message (e.g. because it is too busy) and the originator dies before it gets to retransmit it, that owner will end up rolling back the transaction while the other owners commit it. Enabling recovery should however fix this inconsistency.
Note: The primary owner also receives a TxCompletionNotification command. The primary owner should not push backup locks for transactions for which it received the commit command, even if it didn’t receive the TxCompletionNotification command yet.
When a L1 is enabled, the non-owner nodes store the result of remote get commands as regular entries in the data container. The only differences between those and regular entries are that a) DistributionManager.isLocal(key) returns false and b) the entry’s lifespan is changed to the L1 lifespan (10 minutes by default).
Our state transfer design relies on the new owner’s local copy always being correct, and for entries already in L1 this is not true (because of the modified lifespan). To avoid additional checks when receiving state, we just clear the L1 on every node at the start of state transfer.
Whenever a non-owner stores an entry in L1, the owners also store the address of the non-owner in a "requestors map". When the entry is modified, this requestors map is used to invalidate the old value on all the nodes that have stored it in L1. We clear L1 on every node at the start of state transfer, so we can clear the L1 requestors map as well and start with an empty slate.
Since the L1 requestors map is cleared at the start of state transfer, remote get commands must not write their results to L1 if a state transfer has started (i.e. the view id has changed since the command was started). They can return to the user without retrying, however.
When L1OnRehash is enabled, instead of deleting all the entries that are no longer local at the end of state transfer, we are transforming them into L1 entries. To account for these extra L1 entries, the new owners will add the addresses of the no-longer-owner nodes to their requestors map in two phases:
-
For keys that were already local at the start of state transfer, it will compute which nodes are no longer owners and add them to the requestors map at the beginning of state transfer.
-
For keys received from old owners via APPLY_STATE commands, it will also compute which nodes are no longer owners and add them to the requestors map. We need to do this every time, not only if the key was missing (because the key could have been written by a remote get command, and we want to send at least one invalidate command).
Q: When a tx tries to acquire a lock on a primary owner, does it consider the backup locks of in-doubt transactions as well? It should do so, otherwise it wouldn’t be possible to commit the in-doubt transactions when the old transaction manager comes back up…
Q: When a node dies after sending a commit command and only one of the targets receives it (e.g. because of high load on the other nodes), could we use the in-doubt transaction information to automatically commit the transaction? I’m thinking mainly about the async commit phase configuration, where on restart the originator’s TM will think the transaction is already committed.
The in-doubt transactions' keys should not be accessible to new transactions, so when a new node becomes the primary owner for a key it needs to retrieve any in-doubt transactions from the other owners at the beginning of state transfer. (If it was already a backup owner for that key it should have the in-doubt transactions already, but if it just joined it won’t have any transaction information.)
Note: The information about in-doubt transactions is held in a special cache, which is local-only, in memory by default. We could force the recovery cache to be replicated instead, then the state transfer of any recovery-enabled cache will only have to block on the state transfer of the recovery cache.
If the commit command is async, the originator doesn’t wait for the commit response and doesn’t send a separate TxNotificationCompletion command.
Because of this, the originator cannot retry commit commands - so when a new node joins, we have to ensure that it doesn’t receive backup lock information for commit commands that have already been sent. Otherwise we could end up with something like the figure below:
In the figure, A is the originator of transaction tx1, which modifies key k. C is the primary owner of k, B is the backup owner. After D joins, D becomes the primary owner, and it receives the "backup lock" for k from C. A, however, has already sent the commit command to B and C, and D will never release the lock for k.
Note: The same thing happens in the old, blocking, state transfer design. Since all state transfer RPCs are OOB, they don’t wait on the commit command. Instead, the commit command will block on the owners, waiting for the state transfer to end. So we can just replace "D joins, state transfer starts" in the figure above with "D joins, state transfer happens", remove the "send tx1 info" bit, and we have a picture of what happens in the blocking design.
I think solving this problem in Infinispan would require something very similar to FLUSH in JGroups, except we don’t block anyone from sending messages: we just block anyone from starting state transfer until all the (non-OOB) commit commands with the old view id have been delivered. A crude way to do this would be for every node sends a non-OOB "done with view x" to everyone in the cache view (unicasts, not multicasts, to preserve the ordering) and then everyone blocks until they receive the "done with view x" message from everyone else. That would ensure that any commit commands arriving afterwards will have the proper view id, and we can start pushing the backup lock information to the new owners.
Q: Is this any better than using FLUSH and calling channel.flush() on each cache view change?
Q: With async commit it’s already possible to have data inconsistencies even without state transfer, if the commit commands of two transactions arrive in reversed order. Could we just say that this is one of the things async commit doesn’t support and remove the tx1 backup lock on D after a timeout?
Just like with sync commit, if the originator of the transaction dies after it had sent the commit but before the message reached all the targets, some of the targets will roll back the transaction and some will commit it. If recovery is enabled, however, we have the additional complication that the transaction manager may think the transaction is already committed, so it won’t appear as an in-doubt transaction in the TM. If the TM doesn’t notify the user or automatically commit the transaction, then Infinispan will have to do it.
RPCs in async mode don’t return anything, so the originator doesn’t know if the prepare command succeeded or it failed (e.g. because the key ownership changed, or because lock acquisition timed out). Because of this transactions in async caches should always be 1PC, and Infinispan doesn’t actually guarantee that all nodes have the same data.
To at least guarantee that we don’t completely lose updates if numOwner nodes join at the same time, we’d need to guarantee the CH is updated only after a node received all prepare commands with the old CH. (The same thing that we need to do for commit commands in async-commit caches).
fetchInMemoryData==false will disaoptimistic andble just the transfer of data container entries. All the other phases of state transfer are still performed, e.g. the no-longer-local entries are still invalidated.
In non-transactional caches a write command will acquire key locks separately on each owner. So it allows inconsistencies between the owners when they receive two put commands from different originators in a different order.
However, it’s not ok for an owner to lose the value written by a command without any overwrite from the user. Tagging the commands with the current view id, rejecting commands with an old view id and retrying on the new owners should ensure this is still the case during/after state transfer.
If the non-transactional cache is also async, we can’t retry the write command on the new owners. We could instead use the same approach as for (transactional) async commit caches, blocking state transfer until all the commands send with the old view id have been delivered.
For replicated caches the state transfer is a bit simpler, since everything is replicated we only need to transfer state to the joiner(s). However, we try to push state from all pre-existing members to the joiner, so they all need to agree on a common CH in order to compute the 'pushing owner' of each key.
Because of the CH, we still need to interrupt state transfer when one node dies. But we don’t have to keep track of a tree of cache views as for distributed cache, we only need the CH of the last "steady state" CH and the list of joiners since then. Any command with the old view id is rejected by the pre-existing nodes, so we know every node processes the same transaction. As with distributed mode, when a joiner processes a transaction that touches a missing key, it queries the old nodes for the value (it may use the CH to choose a node, but it’s not required to do so).
Note: It might make more sense to have only one node sending all the data to the joiner(s), and not stopping the state transfer unless the sender or the joiner leave. But at least for now we’re sticking to the same approach in distributed and replicated mode.
The list of pending transactions is mostly the same between all nodes in a replicated cluster. So we can pick a single node to send transaction information (say the first node in the cache view’s members list). We know that node will reject any commands that arrive later but don’t reach the joiner (because we check the view id of the command), which guarantees that the joiner will see all the transactions that the node pushing transaction information did (as long as the originator remains alive).
As for distributed caches, in async mode (or sync prepare/async commit mode) we need a FLUSH-like mechanism to guarantee commits/prepares sent in one cache view are processed in the same cache view. Because we choose a single node to push transaction information, we could send the "done with view x" messages just to that node. But because regular commands are multicasts (to the entire cluster), and unicasts and multicasts are ordered separately, we have to send that command as a multicast as well.
Invalidation caches don’t have proper state transfer (can’t set fetchInMemoryData == true). In fact the whole cache behaves very much like a distributed cache’s L1 cache - data will never be copied from one node to another directly, instead it will be populated from an external source and invalidated across the cluster when that source is modified.
We don’t need use a CH at all, and transactions never have to leave the originating node, so there isn’t really anything to synchronize.
We can check the view id on the joiner to ensure that we don’t invalidate "too much", but that’s not really necessary.