Skip to content

Commit 4ca557d

Browse files
committed
[FLINK-36451][runtime] Removes hasLeadership method from interfaces
[FLINK-36451][runtime] Moves leader-related logic into leaderOperation executor - introduces runAsLeader - makes confirmLeadership use runAsLeader - removes hasLeadership
1 parent 2731723 commit 4ca557d

22 files changed

+698
-384
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java

+23-7
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
2222
import org.apache.flink.runtime.leaderelection.LeaderContender;
2323
import org.apache.flink.runtime.leaderelection.LeaderElection;
24+
import org.apache.flink.runtime.leaderelection.LeadershipLostException;
2425
import org.apache.flink.runtime.rpc.FatalErrorHandler;
2526
import org.apache.flink.util.FlinkException;
2627
import org.apache.flink.util.concurrent.FutureUtils;
@@ -31,6 +32,7 @@
3132
import java.util.Arrays;
3233
import java.util.UUID;
3334
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.CompletionException;
3436

3537
/**
3638
* Runner for the {@link org.apache.flink.runtime.dispatcher.Dispatcher} which is responsible for
@@ -176,13 +178,27 @@ private void forwardConfirmLeaderSessionFuture(
176178
FutureUtils.assertNoException(
177179
newDispatcherLeaderProcess
178180
.getLeaderAddressFuture()
179-
.thenAccept(
180-
leaderAddress -> {
181-
if (leaderElection.hasLeadership(leaderSessionID)) {
182-
leaderElection.confirmLeadership(
183-
leaderSessionID, leaderAddress);
184-
}
185-
}));
181+
.thenCompose(
182+
leaderAddress ->
183+
leaderElection
184+
.confirmLeadershipAsLeader(
185+
leaderSessionID, leaderAddress)
186+
.exceptionally(
187+
error -> {
188+
if (error
189+
instanceof
190+
LeadershipLostException) {
191+
LOG.warn(
192+
"Leadership couldn't be confirmed due to leadership loss.",
193+
error);
194+
return null;
195+
}
196+
197+
// any other error is unexpected and
198+
// should be passed down to the
199+
// system-wide error handling
200+
throw new CompletionException(error);
201+
})));
186202
}
187203

188204
@Override

flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java

+41-28
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@
2121
import org.apache.flink.annotation.VisibleForTesting;
2222
import org.apache.flink.runtime.leaderelection.LeaderContender;
2323
import org.apache.flink.runtime.leaderelection.LeaderElection;
24+
import org.apache.flink.runtime.leaderelection.LeadershipLostException;
2425
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
2526
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
2627
import org.apache.flink.util.FlinkException;
2728
import org.apache.flink.util.Preconditions;
29+
import org.apache.flink.util.concurrent.Executors;
2830
import org.apache.flink.util.concurrent.FutureUtils;
31+
import org.apache.flink.util.function.ThrowingRunnable;
2932

3033
import org.slf4j.Logger;
3134
import org.slf4j.LoggerFactory;
@@ -244,38 +247,45 @@ private void removeContender(EmbeddedLeaderElection embeddedLeaderElection) {
244247
}
245248

246249
/** Callback from leader contenders when they confirm a leader grant. */
250+
@GuardedBy("lock")
247251
private void confirmLeader(
248252
final EmbeddedLeaderElection embeddedLeaderElection,
249253
final UUID leaderSessionId,
250254
final String leaderAddress) {
251-
synchronized (lock) {
252-
// if the leader election was shut down in the meantime, ignore this confirmation
253-
if (!embeddedLeaderElection.running || shutdown) {
254-
return;
255-
}
256-
257-
try {
258-
// check if the confirmation is for the same grant, or whether it is a stale grant
259-
if (embeddedLeaderElection == currentLeaderProposed
260-
&& currentLeaderSessionId.equals(leaderSessionId)) {
261-
LOG.info(
262-
"Received confirmation of leadership for leader {} , session={}",
263-
leaderAddress,
264-
leaderSessionId);
265-
266-
// mark leadership
267-
currentLeaderConfirmed = embeddedLeaderElection;
268-
currentLeaderAddress = leaderAddress;
269-
currentLeaderProposed = null;
255+
Preconditions.checkState(
256+
currentLeaderProposed == embeddedLeaderElection,
257+
"The confirmLeader method should only be called when having the leadership acquired.");
258+
LOG.info(
259+
"Received confirmation of leadership for leader {} , session={}",
260+
leaderAddress,
261+
leaderSessionId);
262+
263+
// mark leadership
264+
currentLeaderConfirmed = embeddedLeaderElection;
265+
currentLeaderAddress = leaderAddress;
266+
currentLeaderProposed = null;
267+
268+
// notify all listeners
269+
notifyAllListeners(leaderAddress, leaderSessionId);
270+
}
270271

271-
// notify all listeners
272-
notifyAllListeners(leaderAddress, leaderSessionId);
273-
} else {
274-
LOG.debug(
275-
"Received confirmation of leadership for a stale leadership grant. Ignoring.");
272+
private void runAsLeader(
273+
EmbeddedLeaderElection embeddedLeaderElection,
274+
UUID leaderSessionId,
275+
ThrowingRunnable<? extends Throwable> runnable)
276+
throws LeadershipLostException {
277+
synchronized (lock) {
278+
if (embeddedLeaderElection.running
279+
&& !shutdown
280+
&& embeddedLeaderElection.isLeader
281+
&& currentLeaderSessionId.equals(leaderSessionId)) {
282+
try {
283+
runnable.run();
284+
} catch (Throwable t) {
285+
fatalError(t);
276286
}
277-
} catch (Throwable t) {
278-
fatalError(t);
287+
} else {
288+
throw new LeadershipLostException(leaderSessionId);
279289
}
280290
}
281291
}
@@ -472,8 +482,11 @@ public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
472482
}
473483

474484
@Override
475-
public boolean hasLeadership(UUID leaderSessionId) {
476-
return isLeader && leaderSessionId.equals(currentLeaderSessionId);
485+
public CompletableFuture<Void> runAsLeader(
486+
UUID leaderSessionId, ThrowingRunnable<? extends Throwable> callback) {
487+
return FutureUtils.runAsync(
488+
() -> EmbeddedLeaderService.this.runAsLeader(this, leaderSessionId, callback),
489+
Executors.directExecutor());
477490
}
478491

479492
void shutdown(Exception cause) {

0 commit comments

Comments
 (0)