Skip to content

Commit 1099926

Browse files
committed
[FLINK-36451] Removes hasLeadership method from interfaces
1 parent dd670a7 commit 1099926

18 files changed

+293
-266
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
2222
import org.apache.flink.runtime.leaderelection.LeaderContender;
2323
import org.apache.flink.runtime.leaderelection.LeaderElection;
24+
import org.apache.flink.runtime.leaderelection.LeadershipLostException;
2425
import org.apache.flink.runtime.rpc.FatalErrorHandler;
2526
import org.apache.flink.util.FlinkException;
2627
import org.apache.flink.util.concurrent.FutureUtils;
@@ -176,13 +177,24 @@ private void forwardConfirmLeaderSessionFuture(
176177
FutureUtils.assertNoException(
177178
newDispatcherLeaderProcess
178179
.getLeaderAddressFuture()
179-
.thenAccept(
180-
leaderAddress -> {
181-
if (leaderElection.hasLeadership(leaderSessionID)) {
182-
leaderElection.confirmLeadership(
183-
leaderSessionID, leaderAddress);
184-
}
185-
}));
180+
.thenCompose(
181+
leaderAddress ->
182+
leaderElection
183+
.confirmLeadership(leaderSessionID, leaderAddress)
184+
.exceptionally(
185+
error -> {
186+
if (error
187+
instanceof
188+
LeadershipLostException) {
189+
LOG.warn(
190+
"Leadership couldn't be confirmed due to leadership loss.",
191+
error);
192+
} else {
193+
fatalErrorHandler.onFatalError(
194+
error);
195+
}
196+
return null;
197+
})));
186198
}
187199

188200
@Override

flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.flink.runtime.leaderelection.LeaderContender;
2323
import org.apache.flink.runtime.leaderelection.LeaderElection;
2424
import org.apache.flink.runtime.leaderelection.LeaderElectionUtils;
25-
import org.apache.flink.runtime.leaderelection.LeadershipLostException;
2625
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
2726
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
2827
import org.apache.flink.util.FlinkException;
@@ -468,15 +467,13 @@ public void close() {
468467
}
469468

470469
@Override
471-
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
470+
public CompletableFuture<Void> confirmLeadership(
471+
UUID leaderSessionID, String leaderAddress) {
472472
checkNotNull(leaderSessionID);
473473
checkNotNull(leaderAddress);
474474
confirmLeader(this, leaderSessionID, leaderAddress);
475-
}
476475

477-
@Override
478-
public boolean hasLeadership(UUID leaderSessionId) {
479-
return isLeader && leaderSessionId.equals(currentLeaderSessionId);
476+
return FutureUtils.completedVoidFuture();
480477
}
481478

482479
@Override

flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java

Lines changed: 49 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.UUID;
4848
import java.util.concurrent.CompletableFuture;
4949
import java.util.concurrent.CompletionException;
50+
import java.util.function.Function;
5051
import java.util.function.Supplier;
5152

5253
/**
@@ -256,27 +257,21 @@ private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
256257
sequentialOperation =
257258
sequentialOperation.thenCompose(
258259
unused ->
259-
supplyAsyncIfValidLeader(
260+
supplyAsyncAsLeader(
260261
leaderSessionId,
261262
() ->
262263
jobResultStore.hasJobResultEntryAsync(
263-
getJobID()),
264-
() ->
265-
FutureUtils.completedExceptionally(
266-
new LeadershipLostException(
267-
"The leadership is lost.")))
264+
getJobID()))
268265
.handle(
269266
(hasJobResult, throwable) -> {
270267
if (throwable
271268
instanceof LeadershipLostException) {
272-
printLogIfNotValidLeader(
269+
logLeadershipLoss(
273270
"verify job result entry",
274271
leaderSessionId);
275-
return null;
276272
} else if (throwable != null) {
277273
ExceptionUtils.rethrow(throwable);
278-
}
279-
if (hasJobResult) {
274+
} else if (hasJobResult) {
280275
handleJobAlreadyDoneIfValidLeader(
281276
leaderSessionId);
282277
} else {
@@ -289,12 +284,12 @@ private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
289284
}
290285

291286
private void handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) {
292-
runIfValidLeader(
287+
runAsyncAsLeader(
293288
leaderSessionId, () -> jobAlreadyDone(leaderSessionId), "check completed job");
294289
}
295290

296291
private void createNewJobMasterServiceProcessIfValidLeader(UUID leaderSessionId) {
297-
runIfValidLeader(
292+
runAsyncAsLeader(
298293
leaderSessionId,
299294
() ->
300295
ThrowingRunnable.unchecked(
@@ -303,7 +298,20 @@ private void createNewJobMasterServiceProcessIfValidLeader(UUID leaderSessionId)
303298
"create new job master service process");
304299
}
305300

306-
private void printLogIfNotValidLeader(String actionDescription, UUID leaderSessionId) {
301+
private Function<Throwable, ? extends Void> handleLeadershipError(
302+
String actionDescription, UUID leaderSessionId) {
303+
return error -> {
304+
if (error instanceof LeadershipLostException) {
305+
logLeadershipLoss(actionDescription, leaderSessionId);
306+
} else {
307+
fatalErrorHandler.onFatalError(error);
308+
}
309+
310+
return null;
311+
};
312+
}
313+
314+
private static void logLeadershipLoss(String actionDescription, UUID leaderSessionId) {
307315
LOG.debug(
308316
"Ignore leader action '{}' because the leadership runner is no longer the valid leader for {}.",
309317
actionDescription,
@@ -354,23 +362,21 @@ private void createNewJobMasterServiceProcess(UUID leaderSessionId) throws Flink
354362
private void confirmLeadership(
355363
UUID leaderSessionId, CompletableFuture<String> leaderAddressFuture) {
356364
FutureUtils.assertNoException(
357-
leaderAddressFuture.thenAccept(
365+
leaderAddressFuture.thenCompose(
358366
address ->
359-
runIfValidLeader(
360-
leaderSessionId,
361-
() -> {
362-
LOG.debug("Confirm leadership {}.", leaderSessionId);
363-
leaderElection.confirmLeadership(
364-
leaderSessionId, address);
365-
},
366-
"confirming leadership")));
367+
leaderElection
368+
.confirmLeadership(leaderSessionId, address)
369+
.exceptionally(
370+
handleLeadershipError(
371+
"confirming leadership",
372+
leaderSessionId))));
367373
}
368374

369375
private void forwardResultFuture(
370376
UUID leaderSessionId, CompletableFuture<JobManagerRunnerResult> resultFuture) {
371377
resultFuture.whenComplete(
372378
(jobManagerRunnerResult, throwable) ->
373-
runIfValidLeader(
379+
runAsyncAsLeader(
374380
leaderSessionId,
375381
() -> onJobCompletion(jobManagerRunnerResult, throwable),
376382
"result future forwarding"));
@@ -497,55 +503,41 @@ private boolean isRunning() {
497503
return state == State.RUNNING;
498504
}
499505

500-
private void runIfValidLeader(
501-
UUID expectedLeaderId, Runnable action, Runnable noLeaderFallback) {
502-
synchronized (lock) {
503-
if (isValidLeader(expectedLeaderId)) {
504-
action.run();
505-
} else {
506-
noLeaderFallback.run();
507-
}
508-
}
509-
}
510-
511-
private void runIfValidLeader(
506+
private void runAsyncAsLeader(
512507
UUID expectedLeaderId, Runnable action, String noLeaderFallbackCommandDescription) {
513-
runIfValidLeader(
514-
expectedLeaderId,
515-
action,
516-
() ->
517-
printLogIfNotValidLeader(
518-
noLeaderFallbackCommandDescription, expectedLeaderId));
508+
FutureUtils.assertNoException(
509+
leaderElection
510+
.runAsLeader(expectedLeaderId, action::run)
511+
.exceptionally(
512+
handleLeadershipError(
513+
noLeaderFallbackCommandDescription, expectedLeaderId)));
519514
}
520515

521-
private <T> CompletableFuture<T> supplyAsyncIfValidLeader(
522-
UUID expectedLeaderId,
523-
Supplier<CompletableFuture<T>> supplier,
524-
Supplier<CompletableFuture<T>> noLeaderFallback) {
516+
private <T> CompletableFuture<T> supplyAsyncAsLeader(
517+
UUID expectedLeaderId, Supplier<CompletableFuture<T>> supplier) {
525518
final CompletableFuture<T> resultFuture = new CompletableFuture<>();
526-
runIfValidLeader(
527-
expectedLeaderId,
528-
() -> FutureUtils.forward(supplier.get(), resultFuture),
529-
() -> FutureUtils.forward(noLeaderFallback.get(), resultFuture));
519+
FutureUtils.assertNoException(
520+
leaderElection
521+
.runAsLeader(
522+
expectedLeaderId,
523+
() -> FutureUtils.forward(supplier.get(), resultFuture))
524+
.exceptionally(
525+
t -> {
526+
resultFuture.completeExceptionally(t);
527+
return null;
528+
}));
530529

531530
return resultFuture;
532531
}
533532

534-
@GuardedBy("lock")
535-
private boolean isValidLeader(UUID expectedLeaderId) {
536-
return isRunning()
537-
&& leaderElection != null
538-
&& leaderElection.hasLeadership(expectedLeaderId);
539-
}
540-
541533
private <T> void forwardIfValidLeader(
542534
UUID expectedLeaderId,
543535
CompletableFuture<? extends T> source,
544536
CompletableFuture<T> target,
545537
String forwardDescription) {
546538
source.whenComplete(
547539
(t, throwable) ->
548-
runIfValidLeader(
540+
runAsyncAsLeader(
549541
expectedLeaderId,
550542
() -> {
551543
if (throwable != null) {

flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,8 @@ public void startLeaderElection(LeaderContender contender) throws Exception {
4545
}
4646

4747
@Override
48-
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
49-
parentService.confirmLeadership(componentId, leaderSessionID, leaderAddress);
50-
}
51-
52-
@Override
53-
public boolean hasLeadership(UUID leaderSessionId) {
54-
return parentService.hasLeadership(componentId, leaderSessionId);
48+
public CompletableFuture<Void> confirmLeadership(UUID leaderSessionID, String leaderAddress) {
49+
return parentService.confirmLeadership(componentId, leaderSessionID, leaderAddress);
5550
}
5651

5752
@Override
@@ -88,18 +83,9 @@ abstract static class ParentService {
8883
* Confirms the leadership with the {@code leaderSessionID} and {@code leaderAddress} for
8984
* the {@link LeaderContender} that is associated with the {@code componentId}.
9085
*/
91-
abstract void confirmLeadership(
86+
abstract CompletableFuture<Void> confirmLeadership(
9287
String componentId, UUID leaderSessionID, String leaderAddress);
9388

94-
/**
95-
* Checks whether the {@code ParentService} has the leadership acquired for the {@code
96-
* componentId} and {@code leaderSessionID}.
97-
*
98-
* @return {@code true} if the service has leadership with the passed {@code
99-
* leaderSessionID} acquired; {@code false} otherwise.
100-
*/
101-
abstract boolean hasLeadership(String componentId, UUID leaderSessionID);
102-
10389
/**
10490
* Runs passed {@code callback} in the leadership main thread if the leadership is still
10591
* valid or returns a future that failed with {@link LeadershipLostException} otherwise.

flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java

Lines changed: 28 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ public void close() throws Exception {
315315
}
316316

317317
@Override
318-
protected void confirmLeadership(
318+
protected CompletableFuture<Void> confirmLeadership(
319319
String componentId, UUID leaderSessionID, String leaderAddress) {
320320
Preconditions.checkArgument(leaderContenderRegistry.containsKey(componentId));
321321
LOG.debug(
@@ -326,60 +326,33 @@ protected void confirmLeadership(
326326

327327
checkNotNull(leaderSessionID);
328328

329-
synchronized (lock) {
330-
if (hasLeadership(componentId, leaderSessionID)) {
331-
Preconditions.checkState(
332-
leaderElectionDriver != null,
333-
"The leadership check should only return true if a driver is instantiated.");
334-
Preconditions.checkState(
335-
!confirmedLeaderInformation.hasLeaderInformation(componentId),
336-
"No confirmation should have happened, yet.");
337-
338-
final LeaderInformation newConfirmedLeaderInformation =
339-
LeaderInformation.known(leaderSessionID, leaderAddress);
340-
confirmedLeaderInformation =
341-
LeaderInformationRegister.merge(
342-
confirmedLeaderInformation,
343-
componentId,
344-
newConfirmedLeaderInformation);
345-
leaderElectionDriver.publishLeaderInformation(
346-
componentId, newConfirmedLeaderInformation);
347-
} else {
348-
if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
349-
LOG.debug(
350-
"Received an old confirmation call of leader session ID {} for component '{}' (current issued session ID is {}).",
351-
leaderSessionID,
352-
componentId,
353-
issuedLeaderSessionID);
354-
} else {
355-
LOG.warn(
356-
"The leader session ID {} for component '{}' was confirmed even though the corresponding "
357-
+ "service was not elected as the leader or has been stopped already.",
358-
componentId,
359-
leaderSessionID);
360-
}
361-
}
362-
}
363-
}
364-
365-
@Override
366-
protected boolean hasLeadership(String componentId, UUID leaderSessionId) {
367-
synchronized (lock) {
368-
if (leaderElectionDriver != null) {
369-
if (leaderContenderRegistry.containsKey(componentId)) {
370-
return leaderElectionDriver.hasLeadership()
371-
&& leaderSessionId.equals(issuedLeaderSessionID);
372-
} else {
373-
LOG.debug(
374-
"hasLeadership is called for component '{}' while there is no contender registered under that ID in the service, returning false.",
375-
componentId);
376-
return false;
377-
}
378-
} else {
379-
LOG.debug("hasLeadership is called after the service is closed, returning false.");
380-
return false;
381-
}
382-
}
329+
return runAsLeader(
330+
componentId,
331+
leaderSessionID,
332+
() -> {
333+
Preconditions.checkState(
334+
leaderElectionDriver != null,
335+
"The leadership check should only return true if a driver is instantiated.");
336+
Preconditions.checkState(
337+
!confirmedLeaderInformation.hasLeaderInformation(componentId),
338+
"No confirmation should have happened, yet.");
339+
340+
final LeaderInformation newConfirmedLeaderInformation =
341+
LeaderInformation.known(leaderSessionID, leaderAddress);
342+
confirmedLeaderInformation =
343+
LeaderInformationRegister.merge(
344+
confirmedLeaderInformation,
345+
componentId,
346+
newConfirmedLeaderInformation);
347+
leaderElectionDriver.publishLeaderInformation(
348+
componentId, newConfirmedLeaderInformation);
349+
})
350+
.exceptionally(
351+
error -> {
352+
LOG.debug(
353+
"Leadership was lost for component '{}'.", componentId, error);
354+
return null;
355+
});
383356
}
384357

385358
@Override

0 commit comments

Comments
 (0)