Skip to content

Commit e65f1b2

Browse files
Adds support for multiple managers running distributed fate (#6168)
Lays the foundation for multiple manager with the following changes. The best place to start looking at these changes is in the Manager.run() method which sets everything and ties it all together. * Each manager process acquires two zookeeper locks now, a primary lock and an assistant lock. Only one manager process can obtain the primary lock and when it does it assumes the role of primary manager.  All manager processes acquire an assistant lock, which is similar to a tserver or compactor lock.  The assistant lock advertises the manager process as being available to other Accumulo processes to handle assistant manager operations. * Manager processes have a single thrift server and thrift services hosted on that thrift server are categorized into primary manager and assistant manager services. When an assistant manager receives an RPC for a primary manager thrift service it will not execute the request and will throw an error or ignore the request. * The primary manager process delegates manager responsibility via RPCs to assistant managers. * Any management responsibility not delegated runs on the primary manager. Using the changes above fate is now distributed across all manager processes. In the future the changes above should make it easy to delegate other responsibilities to assistant managers. The following is an outline of the fate changes. * New FateWorker class. This runs in every manager and handles request from the primary manager to adjust what range of the fate table its currently responsible for. FateWorker implements a new thrift service used to assign it ranges. * New FateManager class that is run by the primary manager and is responsible for partitioning fate processing across all assistant managers. As manager processes come and go this will repartition the fate table evenly across all available managers. The FateManager communicates with FateWorkers via thrift. * Some new RPCs for best effort notifications. Before these changes there were in memory notification systems that made the manager more responsive. These would allow a fate operation to signal the Tablet Group Watcher to take action sooner. FateWorkerEnv sends these notifications to the primary manger over a new RPC. Does not matter if they are lost, things will still eventually happen. Other than fate, the primary manager process does everything the current manager does. This change pulls from #3262 and #6139. Co-authored-by: Dave Marion <[email protected]>
1 parent 15c5397 commit e65f1b2

File tree

57 files changed

+9932
-565
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+9932
-565
lines changed

core/src/main/java/org/apache/accumulo/core/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class Constants {
4949

5050
public static final String ZMANAGERS = "/managers";
5151
public static final String ZMANAGER_LOCK = ZMANAGERS + "/lock";
52+
public static final String ZMANAGER_ASSISTANT_LOCK = ZMANAGERS + "/assistants";
5253
public static final String ZMANAGER_GOAL_STATE = ZMANAGERS + "/goal_state";
5354
public static final String ZMANAGER_TICK = ZMANAGERS + "/tick";
5455

core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1333,7 +1333,7 @@ private static Set<String> createPersistentWatcherPaths() {
13331333
Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK,
13341334
Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES,
13351335
Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET, Constants.ZTEST_LOCK,
1336-
Constants.ZRESOURCEGROUPS)) {
1336+
Constants.ZMANAGER_ASSISTANT_LOCK, Constants.ZRESOURCEGROUPS)) {
13371337
pathsToWatch.add(path);
13381338
}
13391339
return pathsToWatch;

core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -162,16 +162,21 @@ public FateTxStore<T> reserve(FateId fateId) {
162162
EnumSet.of(TStatus.SUBMITTED, TStatus.FAILED_IN_PROGRESS);
163163

164164
@Override
165-
public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> idConsumer) {
165+
public void runnable(Set<FatePartition> partitions, BooleanSupplier keepWaiting,
166+
Consumer<FateIdStatus> idConsumer) {
167+
168+
if (partitions.isEmpty()) {
169+
return;
170+
}
166171

167172
AtomicLong seen = new AtomicLong(0);
168173

169174
while (keepWaiting.getAsBoolean() && seen.get() == 0) {
170175
final long beforeCount = unreservedRunnableCount.getCount();
171176
final boolean beforeDeferredOverflow = deferredOverflow.get();
172177

173-
try (Stream<FateIdStatus> inProgress = getTransactions(IN_PROGRESS_SET);
174-
Stream<FateIdStatus> other = getTransactions(OTHER_RUNNABLE_SET)) {
178+
try (Stream<FateIdStatus> inProgress = getTransactions(partitions, IN_PROGRESS_SET);
179+
Stream<FateIdStatus> other = getTransactions(partitions, OTHER_RUNNABLE_SET)) {
175180
// read the in progress transaction first and then everything else in order to process those
176181
// first
177182
var transactions = Stream.concat(inProgress, other);
@@ -200,6 +205,8 @@ public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> idConsu
200205
if (beforeCount == unreservedRunnableCount.getCount()) {
201206
long waitTime = 5000;
202207
synchronized (deferred) {
208+
deferred.keySet().removeIf(
209+
fateId -> partitions.stream().noneMatch(partition -> partition.contains(fateId)));
203210
if (!deferred.isEmpty()) {
204211
waitTime = deferred.values().stream()
205212
.mapToLong(countDownTimer -> countDownTimer.timeLeft(TimeUnit.MILLISECONDS)).min()
@@ -240,9 +247,11 @@ public ReadOnlyFateTxStore<T> read(FateId fateId) {
240247
}
241248

242249
@Override
243-
public Map<FateId,FateReservation> getActiveReservations() {
244-
return list().filter(entry -> entry.getFateReservation().isPresent()).collect(Collectors
245-
.toMap(FateIdStatus::getFateId, entry -> entry.getFateReservation().orElseThrow()));
250+
public Map<FateId,FateReservation> getActiveReservations(Set<FatePartition> partitions) {
251+
try (var stream = getTransactions(partitions, EnumSet.allOf(TStatus.class))) {
252+
return stream.filter(entry -> entry.getFateReservation().isPresent()).collect(Collectors
253+
.toMap(FateIdStatus::getFateId, entry -> entry.getFateReservation().orElseThrow()));
254+
}
246255
}
247256

248257
protected boolean isRunnable(TStatus status) {
@@ -289,6 +298,9 @@ protected void verifyLock(ZooUtil.LockID lockID, FateId fateId) {
289298

290299
protected abstract Stream<FateIdStatus> getTransactions(EnumSet<TStatus> statuses);
291300

301+
protected abstract Stream<FateIdStatus> getTransactions(Set<FatePartition> partitions,
302+
EnumSet<TStatus> statuses);
303+
292304
protected abstract TStatus _getStatus(FateId fateId);
293305

294306
protected abstract Optional<FateKey> getKey(FateId fateId);
@@ -418,7 +430,8 @@ public interface FateIdGenerator {
418430
FateId newRandomId(FateInstanceType instanceType);
419431
}
420432

421-
protected void seededTx() {
433+
@Override
434+
public void seeded() {
422435
unreservedRunnableCount.increment();
423436
}
424437

core/src/main/java/org/apache/accumulo/core/fate/Fate.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.HashMap;
3131
import java.util.HashSet;
3232
import java.util.Map;
33+
import java.util.Objects;
3334
import java.util.Set;
3435
import java.util.TreeSet;
3536
import java.util.concurrent.ExecutorService;
@@ -51,6 +52,8 @@
5152
import org.slf4j.LoggerFactory;
5253

5354
import com.google.common.annotations.VisibleForTesting;
55+
import com.google.common.base.Preconditions;
56+
import com.google.common.collect.Sets;
5457
import com.google.gson.JsonParser;
5558

5659
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -76,6 +79,7 @@ public class Fate<T> extends FateClient<T> {
7679
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
7780
// Visible for FlakyFate test object
7881
protected final Set<FateExecutor<T>> fateExecutors = new HashSet<>();
82+
private Set<FatePartition> currentPartitions = Set.of();
7983

8084
public enum TxInfo {
8185
FATE_OP, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
@@ -208,8 +212,10 @@ public void run() {
208212
fe -> fe.getFateOps().equals(fateOps) && fe.getName().equals(fateExecutorName))) {
209213
log.debug("[{}] Adding FateExecutor for {} with {} threads", store.type(), fateOps,
210214
poolSize);
211-
fateExecutors.add(
212-
new FateExecutor<>(Fate.this, environment, fateOps, poolSize, fateExecutorName));
215+
var fateExecutor =
216+
new FateExecutor<>(Fate.this, environment, fateOps, poolSize, fateExecutorName);
217+
fateExecutors.add(fateExecutor);
218+
fateExecutor.setPartitions(currentPartitions);
213219
}
214220
}
215221
}
@@ -233,7 +239,11 @@ private class DeadReservationCleaner implements Runnable {
233239
@Override
234240
public void run() {
235241
if (keepRunning.get()) {
236-
store.deleteDeadReservations();
242+
Set<FatePartition> partitions;
243+
synchronized (fateExecutors) {
244+
partitions = currentPartitions;
245+
}
246+
store.deleteDeadReservations(partitions);
237247
}
238248
}
239249
}
@@ -369,6 +379,17 @@ public AtomicInteger getNeedMoreThreadsWarnCount() {
369379
return needMoreThreadsWarnCount;
370380
}
371381

382+
public void seeded(Set<FatePartition> partitions) {
383+
synchronized (fateExecutors) {
384+
if (Sets.intersection(currentPartitions, partitions).isEmpty()) {
385+
return;
386+
}
387+
}
388+
389+
log.trace("Notified of seeding for {}", partitions);
390+
store.seeded();
391+
}
392+
372393
/**
373394
* Initiates shutdown of background threads that run fate operations and cleanup fate data and
374395
* optionally waits on them. Leaves the fate object in a state where it can still update and read
@@ -432,6 +453,27 @@ public void close() {
432453
store.close();
433454
}
434455

456+
public Set<FatePartition> getPartitions() {
457+
synchronized (fateExecutors) {
458+
return currentPartitions;
459+
}
460+
}
461+
462+
public Set<FatePartition> setPartitions(Set<FatePartition> partitions) {
463+
Objects.requireNonNull(partitions);
464+
Preconditions.checkArgument(
465+
partitions.stream().allMatch(
466+
fp -> fp.start().getType() == store.type() && fp.end().getType() == store.type()),
467+
"type mismatch type:%s partitions:%s", store.type(), partitions);
468+
469+
synchronized (fateExecutors) {
470+
var old = currentPartitions;
471+
currentPartitions = Set.copyOf(partitions);
472+
fateExecutors.forEach(fe -> fe.setPartitions(currentPartitions));
473+
return old;
474+
}
475+
}
476+
435477
private boolean anyFateExecutorIsAlive() {
436478
synchronized (fateExecutors) {
437479
return fateExecutors.stream().anyMatch(FateExecutor::isAlive);

core/src/main/java/org/apache/accumulo/core/fate/FateClient.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
import java.time.Duration;
2929
import java.util.EnumSet;
3030
import java.util.Optional;
31+
import java.util.concurrent.CompletableFuture;
32+
import java.util.concurrent.atomic.AtomicReference;
33+
import java.util.function.Consumer;
3134
import java.util.function.Function;
3235
import java.util.stream.Stream;
3336

@@ -46,6 +49,8 @@ public class FateClient<T> {
4649
private static final EnumSet<ReadOnlyFateStore.TStatus> FINISHED_STATES =
4750
EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN);
4851

52+
private AtomicReference<Consumer<FateId>> seedingConsumer = new AtomicReference<>(fid -> {});
53+
4954
public FateClient(FateStore<T> store, Function<Repo<T>,String> toLogStrFunc) {
5055
this.store = FateLogger.wrap(store, toLogStrFunc, false);
5156
}
@@ -56,15 +61,33 @@ public FateId startTransaction() {
5661
}
5762

5863
public FateStore.Seeder<T> beginSeeding() {
59-
return store.beginSeeding();
64+
var seeder = store.beginSeeding();
65+
return new FateStore.Seeder<T>() {
66+
@Override
67+
public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(Fate.FateOperation fateOp,
68+
FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
69+
var cfuture = seeder.attemptToSeedTransaction(fateOp, fateKey, repo, autoCleanUp);
70+
return cfuture.thenApply(optional -> {
71+
optional.ifPresent(seedingConsumer.get());
72+
return optional;
73+
});
74+
}
75+
76+
@Override
77+
public void close() {
78+
seeder.close();
79+
}
80+
};
6081
}
6182

6283
public void seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo,
6384
boolean autoCleanUp) {
85+
CompletableFuture<Optional<FateId>> cfuture;
6486
try (var seeder = store.beginSeeding()) {
65-
@SuppressWarnings("unused")
66-
var unused = seeder.attemptToSeedTransaction(fateOp, fateKey, repo, autoCleanUp);
87+
cfuture = seeder.attemptToSeedTransaction(fateOp, fateKey, repo, autoCleanUp);
6788
}
89+
var optional = cfuture.join();
90+
optional.ifPresent(seedingConsumer.get());
6891
}
6992

7093
// start work in the transaction.. it is safe to call this
@@ -73,6 +96,7 @@ public void seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo<T> re
7396
boolean autoCleanUp, String goalMessage) {
7497
Fate.log.info("[{}] Seeding {} {} {}", store.type(), fateOp, fateId, goalMessage);
7598
store.seedTransaction(fateOp, fateId, repo, autoCleanUp);
99+
seedingConsumer.get().accept(fateId);
76100
}
77101

78102
// check on the transaction
@@ -176,4 +200,8 @@ public Exception getException(FateId fateId) {
176200
public Stream<FateKey> list(FateKey.FateKeyType type) {
177201
return store.list(type);
178202
}
203+
204+
public void setSeedingConsumer(Consumer<FateId> seedingConsumer) {
205+
this.seedingConsumer.set(seedingConsumer);
206+
}
179207
}

core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Collections;
3434
import java.util.HashSet;
3535
import java.util.Map;
36+
import java.util.Objects;
3637
import java.util.Optional;
3738
import java.util.Set;
3839
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -43,6 +44,8 @@
4344
import java.util.concurrent.TransferQueue;
4445
import java.util.concurrent.atomic.AtomicBoolean;
4546
import java.util.concurrent.atomic.AtomicInteger;
47+
import java.util.concurrent.atomic.AtomicReference;
48+
import java.util.function.BooleanSupplier;
4649

4750
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
4851
import org.apache.accumulo.core.conf.Property;
@@ -81,6 +84,7 @@ public class FateExecutor<T> {
8184
private final Set<Fate.FateOperation> fateOps;
8285
private final ConcurrentLinkedQueue<Integer> idleCountHistory = new ConcurrentLinkedQueue<>();
8386
private final FateExecutorMetrics<T> fateExecutorMetrics;
87+
private final AtomicReference<Set<FatePartition>> partitions = new AtomicReference<>(Set.of());
8488

8589
public FateExecutor(Fate<T> fate, T environment, Set<Fate.FateOperation> fateOps, int poolSize,
8690
String name) {
@@ -298,6 +302,11 @@ protected ConcurrentLinkedQueue<Integer> getIdleCountHistory() {
298302
return idleCountHistory;
299303
}
300304

305+
public void setPartitions(Set<FatePartition> partitions) {
306+
Objects.requireNonNull(partitions);
307+
this.partitions.set(Set.copyOf(partitions));
308+
}
309+
301310
/**
302311
* A single thread that finds transactions to work on and queues them up. Do not want each worker
303312
* thread going to the store and looking for work as it would place more load on the store.
@@ -308,7 +317,12 @@ private class WorkFinder implements Runnable {
308317
public void run() {
309318
while (fate.getKeepRunning().get() && !isShutdown()) {
310319
try {
311-
fate.getStore().runnable(() -> fate.getKeepRunning().get(), fateIdStatus -> {
320+
var localPartitions = partitions.get();
321+
// if the set of partitions changes, we should stop looking for work w/ the old set of
322+
// partitions
323+
BooleanSupplier keepRunning =
324+
() -> fate.getKeepRunning().get() && localPartitions == partitions.get();
325+
fate.getStore().runnable(localPartitions, keepRunning, fateIdStatus -> {
312326
// The FateId with the fate operation 'fateOp' is workable by this FateExecutor if
313327
// 1) This FateExecutor is assigned to work on 'fateOp' ('fateOp' is in 'fateOps')
314328
// 2) The transaction was cancelled while NEW. This is an edge case that needs to be
@@ -319,7 +333,7 @@ public void run() {
319333
var fateOp = fateIdStatus.getFateOperation().orElse(null);
320334
if ((fateOp != null && fateOps.contains(fateOp))
321335
|| txCancelledWhileNew(status, fateOp)) {
322-
while (fate.getKeepRunning().get() && !isShutdown()) {
336+
while (keepRunning.getAsBoolean() && !isShutdown()) {
323337
try {
324338
// The reason for calling transfer instead of queueing is avoid rescanning the
325339
// storage layer and adding the same thing over and over. For example if all

core/src/main/java/org/apache/accumulo/core/fate/FateExecutorMetrics.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@
2222
import java.util.concurrent.atomic.AtomicInteger;
2323

2424
import org.apache.accumulo.core.metrics.Metric;
25-
import org.apache.accumulo.core.metrics.MetricsProducer;
2625
import org.slf4j.Logger;
2726
import org.slf4j.LoggerFactory;
2827

2928
import io.micrometer.core.instrument.Gauge;
3029
import io.micrometer.core.instrument.MeterRegistry;
3130

32-
public class FateExecutorMetrics<T> implements MetricsProducer {
31+
public class FateExecutorMetrics<T> {
3332
private static final Logger log = LoggerFactory.getLogger(FateExecutorMetrics.class);
3433
private final FateInstanceType type;
3534
private final String poolName;
@@ -49,7 +48,6 @@ protected FateExecutorMetrics(FateInstanceType type, String poolName,
4948
this.idleWorkerCount = idleWorkerCount;
5049
}
5150

52-
@Override
5351
public void registerMetrics(MeterRegistry registry) {
5452
// noop if already registered or cleared
5553
if (state == State.UNREGISTERED) {

0 commit comments

Comments
 (0)