Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ public void print(ReadOnlyFateStore<T> zs, ZooReader zk,

public boolean prepDelete(FateStore<T> zs, ZooReaderWriter zk, ServiceLockPath path,
String txidStr) {
// TODO do not need global lock now
// TODO need way to see what process holds a reservation
if (!checkGlobalLock(zk, path)) {
return false;
}
Expand Down
209 changes: 0 additions & 209 deletions core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java

This file was deleted.

33 changes: 28 additions & 5 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.logging.FateLogger;
import org.apache.accumulo.core.manager.PartitionData;
import org.apache.accumulo.core.util.ShutdownUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.threads.ThreadPools;
Expand All @@ -72,13 +74,30 @@ public class Fate<T> {
private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN);

private final AtomicBoolean keepRunning = new AtomicBoolean(true);
private final Supplier<PartitionData> partitionDataSupplier;
private final TransferQueue<Long> workQueue;
private final Thread workFinder;

public enum TxInfo {
TX_NAME, AUTO_CLEAN, EXCEPTION, RETURN_VALUE
}

// TODO add a task that periodically looks for fate task reserved by dead instances, only run in
// partition zero
private class LockCleaner implements Runnable {

@Override
public void run() {
while (keepRunning.get()) {
var partitionData = partitionDataSupplier.get();
if (partitionData.shouldRun(PartitionData.SingletonManagerService.FATE_LOCK_CLEANUP)) {
// run cleanup
}
// sleep a long time
}
}
}

/**
* A single thread that finds transactions to work on and queues them up. Do not want each worker
* thread going to the store and looking for work as it would place more load on the store.
Expand All @@ -89,12 +108,14 @@ private class WorkFinder implements Runnable {
public void run() {
while (keepRunning.get()) {
try {
var iter = store.runnable(keepRunning);
PartitionData partitionData = partitionDataSupplier.get();
var iter = store.runnable(keepRunning, partitionData);

while (iter.hasNext() && keepRunning.get()) {
while (iter.hasNext() && keepRunning.get()
&& partitionData.equals(partitionDataSupplier.get())) {
Long txid = iter.next();
try {
while (keepRunning.get()) {
while (keepRunning.get() && partitionData.equals(partitionDataSupplier.get())) {
// The reason for calling transfer instead of queueing is avoid rescanning the
// storage layer and adding the same thing over and over. For example if all threads
// were busy, the queue size was 100, and there are three runnable things in the
Expand Down Expand Up @@ -144,7 +165,7 @@ private Optional<FateTxStore<T>> reserveFateTx() throws InterruptedException {
public void run() {
while (keepRunning.get()) {
long deferTime = 0;
FateTxStore<T> txStore = null;
FateStore.FateTxStore<T> txStore = null;
try {
var optionalopStore = reserveFateTx();
if (optionalopStore.isPresent()) {
Expand Down Expand Up @@ -296,9 +317,10 @@ private void undo(long tid, Repo<T> op) {
* @param toLogStrFunc A function that converts Repo to Strings that are suitable for logging
*/
public Fate(T environment, FateStore<T> store, Function<Repo<T>,String> toLogStrFunc,
AccumuloConfiguration conf) {
Supplier<PartitionData> partitionDataSupplier, AccumuloConfiguration conf) {
this.store = FateLogger.wrap(store, toLogStrFunc);
this.environment = environment;
this.partitionDataSupplier = partitionDataSupplier;
final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf,
Property.MANAGER_FATE_THREADPOOL_SIZE, true);
this.workQueue = new LinkedTransferQueue<>();
Expand Down Expand Up @@ -415,6 +437,7 @@ public boolean cancel(long tid) {

// resource cleanup
public void delete(long tid) {
// TODO need to handle case of not existing
FateTxStore<T> txStore = store.reserve(tid);
try {
switch (txStore.getStatus()) {
Expand Down
18 changes: 11 additions & 7 deletions core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import java.io.Serializable;
import java.util.Optional;

import org.apache.accumulo.core.manager.PartitionData;

/**
* Transaction Store: a place to save transactions
* FatesStore : a place to store fate data for all fate operations.
*
* A transaction consists of a number of operations. To use, first create a transaction id, and then
* seed the transaction with an initial operation. An executor service can then execute the
* transaction's operation, possibly pushing more operations onto the transaction as each step
* successfully completes. If a step fails, the stack can be unwound, undoing each operation.
* A fate operation consists of a number of smaller idempotent operations. To use, first create a
* transaction id, and then seed the transaction with an initial operation. An executor service can
* then execute the transaction's operation, possibly pushing more operations onto the transaction
* as each step successfully completes. If a step fails, the stack can be unwound, undoing each
* operation.
*/
public interface FateStore<T> extends ReadOnlyFateStore<T> {

Expand All @@ -42,6 +45,7 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
* An interface that allows read/write access to the data related to a single fate operation.
*/
interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {

@Override
Repo<T> top();

Expand Down Expand Up @@ -74,7 +78,6 @@ interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {

/**
* Remove the transaction from the store.
*
*/
void delete();

Expand All @@ -85,7 +88,8 @@ interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
* longer interact with it.
*
* @param deferTime time in millis to keep this transaction from being returned by
* {@link #runnable(java.util.concurrent.atomic.AtomicBoolean)}. Must be non-negative.
* {@link #runnable(java.util.concurrent.atomic.AtomicBoolean, PartitionData)}. Must be
* non-negative.
*/
void unreserve(long deferTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.accumulo.core.manager.PartitionData;

/**
* Read only access to a Transaction Store.
*
Expand All @@ -35,6 +37,7 @@ public interface ReadOnlyFateStore<T> {
/**
* Possible operational status codes. Serialized by name within stores.
*/
// TODO rename to FateTxStatus
enum TStatus {
/** Unseeded transaction */
NEW,
Expand Down Expand Up @@ -124,10 +127,12 @@ interface ReadOnlyFateTxStore<T> {
*/
List<Long> list();

// TODO need to handle partitionDataChanging, probably pass a supplier
/**
* @return an iterator over fate op ids that are (IN_PROGRESS or FAILED_IN_PROGRESS) and
* unreserved. This method will block until it finds something that is runnable or until
* the keepWaiting parameter is false.
* the keepWaiting parameter is false. Also filter the transaction using the partitioning
* data so that each fate instance sees a different subset of all fate transactions.
*/
Iterator<Long> runnable(AtomicBoolean keepWaiting);
Iterator<Long> runnable(AtomicBoolean keepWaiting, PartitionData partitionData);
}
Loading