-
Notifications
You must be signed in to change notification settings - Fork 467
Add support for creating a FATE transaction by key #4204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This updates FATE to support creating a transaction by a key which will be hashed to a transaction id. This can be used to guarantee only one transaction exists at the same time if the key is the same. Currently the key is just a byte array so anything can be used for the key. This closes apache#4183
There will only be two use cases (split and compaction commit) for the fate key at the moment. So could have a class with static methods that support creating a FateKey object for the two uses case. |
That sounds good, we can expand it if we end up with more use cases in the future. I will work on re-working the byte array into a FateKey object this week as I think that is cleaner. |
test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
Show resolved
Hide resolved
test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
Show resolved
Hide resolved
This commit moves FateKey storage to the node for the ZooStore so it will be inserted atomically with the transaction id and also moves it to the TxColumnFamily for the Accumulo store.
@keith-turner - I think this is ready for another review. Since you last reviewed it I added the FateKey type instead of a byte array, I made FateKey create method atomic for ZK and added a conditional mutation for Accumulo and I updated the tests (including testing the collision detection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes look good. I am still reviewing but I am mostly done. May also need some changes in Fate class to do something like the following.
- Attempt to seed a split operation with a repo
- If the split exists with the given key and is not NEW then log a debug or trace. This is expected to happen when the system is busy and operations are not running and we keep seeding them
- If there is a collision log an error
Not exactly sure how this should be done, trying to figure out case 2 above where we want to avoid spamming the logs w/ warn/error for an expected case. Also wondering what the new Fate method should be to use this new functionality in the store, maybe something like the following.
class Fate {
public FateId startAndSeedTransaction(FateKey key, Repo repo) {
}
}
The fate changes could be a separate PR or done in this PR.
test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
Outdated
Show resolved
Hide resolved
// Replace the default hasing algorithm with one that always returns the same tid so | ||
// we can check duplicate detection with different keys | ||
executeTest(this::testCreateWithKeyCollision, AbstractFateStore.DEFAULT_MAX_DEFERRED, | ||
(instanceType, fateKey) -> FateId.from(instanceType, 1000)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice way to test collisions.
test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
Outdated
Show resolved
Hide resolved
var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey) | ||
.putCreateTime(System.currentTimeMillis()).tryMutate(); | ||
|
||
// TODO: Any reason to retry here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An unknown status means that a failure happened w/ the RPC for the conditional mutation and its not know if it went through. So need to scan in that case and see if it was created or not and possibly retry.
// to reserve/seed as long as the existing key is the same and not different as that would | ||
// mean a collision | ||
} else if (status == TStatus.NEW) { | ||
Preconditions.checkState(tFateKey.isPresent(), "Tx key column is missing"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is a test for a collision between a random fate id and hash derived fate id. Im 99.9% sure this code works, but was wondering if creating a test for that is worthwhile for detecting regressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@keith-turner - I just added a test for collision with key generated fate id and random id
The test works by creating the transaction using the key and then the test deletes the key outside of fate (either out of accumulo table or replaces the ZK node with an empty key). After removing the key the transaction appears to the store just like a normal random transaction so the test then verifies the failure when calling create again and also ensures the transaction can still be reserved and used since it was essentially "converted" to a random transaction.
I'll take a look in detail tomorrow, I see you added |
What lead to the createAndReserve call was trying to deal with the following race condition when I was trying to stub out using this feature. class Fate {
seed(..., FateKey key,...) {
var fateId = create(key);
// another thread could sneak in here
var fateTxStore = store.reserve(fateId);
try{
// between create and reserve, its possible something happened with the key so now that we hold the reservation check that things are as expected
store.checkKeyIsStillSame(fateId,key);
}finally{
fateTxStore.unreserve();
}
} So hoping having the store create and reserve can avoid this. |
Co-authored-by: Keith Turner <[email protected]>
this will make sure that if a transaction is created by a key it will reserve at the same time to prevent race conditions
I made a bunch of updates and addressed everything and changed create(fateKey) to createAndReserve(fateKey) (which technically is implemented by reserving first in memory and then creating). I also added a test for verifying that collisions when using a key and with random ids work as expected. |
Preconditions.checkState(create(fateKey) != null, | ||
"Unexpected null FateId when creating and reserving fateKey %s", fateKey); | ||
if (create(fateKey).isPresent()) { | ||
txStore = reservedTxStore; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could modify the code to keep the return value of create and validate it.
txStore = reservedTxStore; | |
Precondition.checkState(fateId.equals(fateIdFromCreate)); | |
txStore = reservedTxStore; |
} | ||
} | ||
} else { | ||
// Could not reserve so return empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created the following after seeing another log stmt in this PR. Thought it would be good to log something here like it does elsewhere. These trace stmts will be nice for tracking down bugs
// Could not reserve so return empty | |
// Could not reserve so return empty | |
log.trace("Another thread currently has transaction {} key {} reserved", fateId, | |
fateKey); |
|
||
@Override | ||
public Optional<FateTxStore<String>> createAndReserve(FateKey key) { | ||
HashCode hashCode = Hashing.murmur3_128().hashBytes(key.getSerialized()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This TestStore is only used by unit test. If the existing unit test does not call thsi method then could throw an UnsupportedOpException.
This updates FATE to support creating a transaction by a key which will be hashed to a transaction id. This can be used to guarantee only one transaction exists at the same time if the key is the same. Currently the key is just a byte array so anything can be used for the key. The key gets hashed to generate the transaction id (currently a long) and then encoded to be stored just like the existing randomly generated transaction ids.
I marked this as a draft PR for now to get some feedback as some changes are probably needed, in particular maybe changing the key from a byte array to something more strongly typed (see below) I added a few store tests but some tests are missing such as testing the collision detection. (Probably will require manually modifying some things in the store or ZK to simulate a collision or googling the algorithm to find two keys that cause a collision).
Something else is the hashing algorithm is using 128 bit murmur3 but converting to a positive long (since that's currently or tid) so the size is really only 63 bits and likely would lead to collisions at scale. Once the new FateId is added in #4191 we should explore changing the long tid into a byte array and using a 128 bit transaction id size. The algorithm could also be switched out to if desired (ie sha256)
For now the key is just a byte array to keep things simple but we can change it. The goal is to have unique fate operations a byte array is the most flexible as we can put anything into it. For example, if we wanted to create a key for a split operation for an extent we could serialize the operation name (ie split) and append it to the serialized KeyExtent. In the tests for now I'm only serializing the KeyExtent. We could create some utility methods to help with the serialization and deserialization.
However, we could also make the API more strongly typed and instead of a byte array as the key would create a new interface or class (maybe call it
FateKey
) and that could then contain an operation name and a KeyExtent and then handle the serialization. This could be nice as it would match up with the newFateId
being created in #4191 but the downside is if we want different key types in the future it's not as flexible and we'd need to change or extend the API. If we just use an interface and had different key implementations it might be ok as we could add new key types in the future without changing the API.This closes #4183