Skip to content

Commit 61b1c33

Browse files
committed
[FLINK-36451][runtime] Makes hasLeadership and confirmLeadership work asynchronously
A test case is added that illustrates the concurrent lock acquisition problem in the DefaultLeaderElectionService
1 parent 5bd0477 commit 61b1c33

20 files changed

+385
-193
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,9 @@ private void forwardConfirmLeaderSessionFuture(
176176
FutureUtils.assertNoException(
177177
newDispatcherLeaderProcess
178178
.getLeaderAddressFuture()
179-
.thenAccept(
179+
.thenCompose(
180180
leaderAddress ->
181-
leaderElection.confirmLeadership(
181+
leaderElection.confirmLeadershipAsync(
182182
leaderSessionID, leaderAddress)));
183183
}
184184

flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java

+11-7
Original file line numberDiff line numberDiff line change
@@ -244,14 +244,14 @@ private void removeContender(EmbeddedLeaderElection embeddedLeaderElection) {
244244
}
245245

246246
/** Callback from leader contenders when they confirm a leader grant. */
247-
private void confirmLeader(
247+
private CompletableFuture<Void> confirmLeader(
248248
final EmbeddedLeaderElection embeddedLeaderElection,
249249
final UUID leaderSessionId,
250250
final String leaderAddress) {
251251
synchronized (lock) {
252252
// if the leader election was shut down in the meantime, ignore this confirmation
253253
if (!embeddedLeaderElection.running || shutdown) {
254-
return;
254+
return FutureUtils.completedVoidFuture();
255255
}
256256

257257
try {
@@ -269,7 +269,7 @@ private void confirmLeader(
269269
currentLeaderProposed = null;
270270

271271
// notify all listeners
272-
notifyAllListeners(leaderAddress, leaderSessionId);
272+
return notifyAllListeners(leaderAddress, leaderSessionId);
273273
} else {
274274
LOG.debug(
275275
"Received confirmation of leadership for a stale leadership grant. Ignoring.");
@@ -278,6 +278,8 @@ private void confirmLeader(
278278
fatalError(t);
279279
}
280280
}
281+
282+
return FutureUtils.completedVoidFuture();
281283
}
282284

283285
private CompletableFuture<Void> notifyAllListeners(String address, UUID leaderSessionId) {
@@ -465,15 +467,17 @@ public void close() {
465467
}
466468

467469
@Override
468-
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
470+
public CompletableFuture<Void> confirmLeadershipAsync(
471+
UUID leaderSessionID, String leaderAddress) {
469472
checkNotNull(leaderSessionID);
470473
checkNotNull(leaderAddress);
471-
confirmLeader(this, leaderSessionID, leaderAddress);
474+
return confirmLeader(this, leaderSessionID, leaderAddress);
472475
}
473476

474477
@Override
475-
public boolean hasLeadership(UUID leaderSessionId) {
476-
return isLeader && leaderSessionId.equals(currentLeaderSessionId);
478+
public CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId) {
479+
return CompletableFuture.completedFuture(
480+
isLeader && leaderSessionId.equals(currentLeaderSessionId));
477481
}
478482

479483
void shutdown(Exception cause) {

flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java

+41-27
Original file line numberDiff line numberDiff line change
@@ -257,28 +257,34 @@ private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
257257
unused ->
258258
jobResultStore
259259
.hasJobResultEntryAsync(getJobID())
260-
.thenAccept(
260+
.thenCompose(
261261
hasJobResult -> {
262262
if (hasJobResult) {
263-
handleJobAlreadyDoneIfValidLeader(
263+
return handleJobAlreadyDoneIfValidLeader(
264264
leaderSessionId);
265265
} else {
266-
createNewJobMasterServiceProcessIfValidLeader(
266+
return createNewJobMasterServiceProcessIfValidLeader(
267267
leaderSessionId);
268268
}
269269
}));
270270
handleAsyncOperationError(sequentialOperation, "Could not start the job manager.");
271271
}
272272

273-
private void handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) {
274-
runIfValidLeader(
273+
private CompletableFuture<Void> handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) {
274+
return runIfValidLeader(
275275
leaderSessionId, () -> jobAlreadyDone(leaderSessionId), "check completed job");
276276
}
277277

278-
private void createNewJobMasterServiceProcessIfValidLeader(UUID leaderSessionId) {
279-
runIfValidLeader(
278+
private CompletableFuture<Void> createNewJobMasterServiceProcessIfValidLeader(
279+
UUID leaderSessionId) {
280+
return runIfValidLeader(
280281
leaderSessionId,
281282
() ->
283+
// the heavy lifting of the JobMasterServiceProcess instantiation is still
284+
// done asynchronously (see
285+
// DefaultJobMasterServiceFactory#createJobMasterService executing the logic
286+
// on the leaderOperation thread in the DefaultLeaderElectionService should
287+
// be, therefore, fine
282288
ThrowingRunnable.unchecked(
283289
() -> createNewJobMasterServiceProcess(leaderSessionId))
284290
.run(),
@@ -336,15 +342,18 @@ private void createNewJobMasterServiceProcess(UUID leaderSessionId) throws Flink
336342
private void confirmLeadership(
337343
UUID leaderSessionId, CompletableFuture<String> leaderAddressFuture) {
338344
FutureUtils.assertNoException(
339-
leaderAddressFuture.thenAccept(
345+
leaderAddressFuture.thenCompose(
340346
address ->
341-
runIfStateRunning(
342-
() -> {
343-
LOG.debug("Confirm leadership {}.", leaderSessionId);
344-
leaderElection.confirmLeadership(
345-
leaderSessionId, address);
346-
},
347-
"confirming leadership")));
347+
callIfRunning(
348+
() -> {
349+
LOG.debug(
350+
"Confirm leadership {}.",
351+
leaderSessionId);
352+
return leaderElection.confirmLeadershipAsync(
353+
leaderSessionId, address);
354+
},
355+
"confirming leadership")
356+
.orElse(FutureUtils.completedVoidFuture())));
348357
}
349358

350359
private void forwardResultFuture(
@@ -478,34 +487,39 @@ private boolean isRunning() {
478487
return state == State.RUNNING;
479488
}
480489

481-
private void runIfValidLeader(
490+
private CompletableFuture<Void> runIfValidLeader(
482491
UUID expectedLeaderId, Runnable action, Runnable noLeaderFallback) {
483492
synchronized (lock) {
484-
if (isValidLeader(expectedLeaderId)) {
485-
action.run();
493+
if (isRunning() && leaderElection != null) {
494+
return leaderElection
495+
.hasLeadershipAsync(expectedLeaderId)
496+
.thenAccept(
497+
hasLeadership -> {
498+
synchronized (lock) {
499+
if (isRunning() && hasLeadership) {
500+
action.run();
501+
} else {
502+
noLeaderFallback.run();
503+
}
504+
}
505+
});
486506
} else {
487507
noLeaderFallback.run();
508+
return FutureUtils.completedVoidFuture();
488509
}
489510
}
490511
}
491512

492-
private void runIfValidLeader(
513+
private CompletableFuture<Void> runIfValidLeader(
493514
UUID expectedLeaderId, Runnable action, String noLeaderFallbackCommandDescription) {
494-
runIfValidLeader(
515+
return runIfValidLeader(
495516
expectedLeaderId,
496517
action,
497518
() ->
498519
printLogIfNotValidLeader(
499520
noLeaderFallbackCommandDescription, expectedLeaderId));
500521
}
501522

502-
@GuardedBy("lock")
503-
private boolean isValidLeader(UUID expectedLeaderId) {
504-
return isRunning()
505-
&& leaderElection != null
506-
&& leaderElection.hasLeadership(expectedLeaderId);
507-
}
508-
509523
private <T> void forwardIfValidLeader(
510524
UUID expectedLeaderId,
511525
CompletableFuture<? extends T> source,

flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.util.Preconditions;
2222

2323
import java.util.UUID;
24+
import java.util.concurrent.CompletableFuture;
2425

2526
/**
2627
* {@code DefaultLeaderElection} implements the {@link LeaderElection} based on the {@link
@@ -43,13 +44,14 @@ public void startLeaderElection(LeaderContender contender) throws Exception {
4344
}
4445

4546
@Override
46-
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
47-
parentService.confirmLeadership(componentId, leaderSessionID, leaderAddress);
47+
public CompletableFuture<Void> confirmLeadershipAsync(
48+
UUID leaderSessionID, String leaderAddress) {
49+
return parentService.confirmLeadershipAsync(componentId, leaderSessionID, leaderAddress);
4850
}
4951

5052
@Override
51-
public boolean hasLeadership(UUID leaderSessionId) {
52-
return parentService.hasLeadership(componentId, leaderSessionId);
53+
public CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId) {
54+
return parentService.hasLeadershipAsync(componentId, leaderSessionId);
5355
}
5456

5557
@Override
@@ -81,7 +83,7 @@ abstract static class ParentService {
8183
* the {@link LeaderContender} that is associated with the {@code componentId}. The
8284
* information is only propagated to the HA backend if the leadership is still acquired.
8385
*/
84-
abstract void confirmLeadership(
86+
abstract CompletableFuture<Void> confirmLeadershipAsync(
8587
String componentId, UUID leaderSessionID, String leaderAddress);
8688

8789
/**
@@ -91,6 +93,7 @@ abstract void confirmLeadership(
9193
* @return {@code true} if the service has leadership with the passed {@code
9294
* leaderSessionID} acquired; {@code false} otherwise.
9395
*/
94-
abstract boolean hasLeadership(String componentId, UUID leaderSessionID);
96+
abstract CompletableFuture<Boolean> hasLeadershipAsync(
97+
String componentId, UUID leaderSessionID);
9598
}
9699
}

flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java

+62-48
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ public void close() throws Exception {
313313
}
314314

315315
@Override
316-
protected void confirmLeadership(
316+
protected CompletableFuture<Void> confirmLeadershipAsync(
317317
String componentId, UUID leaderSessionID, String leaderAddress) {
318318
Preconditions.checkArgument(leaderContenderRegistry.containsKey(componentId));
319319
LOG.debug(
@@ -324,59 +324,73 @@ protected void confirmLeadership(
324324

325325
checkNotNull(leaderSessionID);
326326

327-
synchronized (lock) {
328-
if (hasLeadership(componentId, leaderSessionID)) {
329-
Preconditions.checkState(
330-
leaderElectionDriver != null,
331-
"The leadership check should only return true if a driver is instantiated.");
332-
Preconditions.checkState(
333-
!confirmedLeaderInformation.hasLeaderInformation(componentId),
334-
"No confirmation should have happened, yet.");
335-
336-
final LeaderInformation newConfirmedLeaderInformation =
337-
LeaderInformation.known(leaderSessionID, leaderAddress);
338-
confirmedLeaderInformation =
339-
LeaderInformationRegister.merge(
340-
confirmedLeaderInformation,
341-
componentId,
342-
newConfirmedLeaderInformation);
343-
leaderElectionDriver.publishLeaderInformation(
344-
componentId, newConfirmedLeaderInformation);
345-
} else {
346-
if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
347-
LOG.debug(
348-
"Received an old confirmation call of leader session ID {} for component '{}' (current issued session ID is {}).",
349-
leaderSessionID,
350-
componentId,
351-
issuedLeaderSessionID);
352-
} else {
353-
LOG.warn(
354-
"The leader session ID {} for component '{}' was confirmed even though the corresponding "
355-
+ "service was not elected as the leader or has been stopped already.",
356-
componentId,
357-
leaderSessionID);
358-
}
359-
}
360-
}
327+
return CompletableFuture.runAsync(
328+
() -> {
329+
synchronized (lock) {
330+
if (hasLeadershipInternal(componentId, leaderSessionID)) {
331+
Preconditions.checkState(
332+
leaderElectionDriver != null,
333+
"The leadership check should only return true if a driver is instantiated.");
334+
Preconditions.checkState(
335+
!confirmedLeaderInformation.hasLeaderInformation(componentId),
336+
"No confirmation should have happened, yet.");
337+
338+
final LeaderInformation newConfirmedLeaderInformation =
339+
LeaderInformation.known(leaderSessionID, leaderAddress);
340+
confirmedLeaderInformation =
341+
LeaderInformationRegister.merge(
342+
confirmedLeaderInformation,
343+
componentId,
344+
newConfirmedLeaderInformation);
345+
leaderElectionDriver.publishLeaderInformation(
346+
componentId, newConfirmedLeaderInformation);
347+
} else {
348+
if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
349+
LOG.debug(
350+
"Received an old confirmation call of leader session ID {} for component '{}' (current issued session ID is {}).",
351+
leaderSessionID,
352+
componentId,
353+
issuedLeaderSessionID);
354+
} else {
355+
LOG.warn(
356+
"The leader session ID {} for component '{}' was confirmed even though the corresponding "
357+
+ "service was not elected as the leader or has been stopped already.",
358+
componentId,
359+
leaderSessionID);
360+
}
361+
}
362+
}
363+
},
364+
leadershipOperationExecutor);
361365
}
362366

363367
@Override
364-
protected boolean hasLeadership(String componentId, UUID leaderSessionId) {
365-
synchronized (lock) {
366-
if (leaderElectionDriver != null) {
367-
if (leaderContenderRegistry.containsKey(componentId)) {
368-
return leaderElectionDriver.hasLeadership()
369-
&& leaderSessionId.equals(issuedLeaderSessionID);
370-
} else {
371-
LOG.debug(
372-
"hasLeadership is called for component '{}' while there is no contender registered under that ID in the service, returning false.",
373-
componentId);
374-
return false;
375-
}
368+
protected CompletableFuture<Boolean> hasLeadershipAsync(
369+
String componentId, UUID leaderSessionId) {
370+
return CompletableFuture.supplyAsync(
371+
() -> {
372+
synchronized (lock) {
373+
return hasLeadershipInternal(componentId, leaderSessionId);
374+
}
375+
},
376+
leadershipOperationExecutor);
377+
}
378+
379+
@GuardedBy("lock")
380+
private boolean hasLeadershipInternal(String componentId, UUID leaderSessionId) {
381+
if (leaderElectionDriver != null) {
382+
if (leaderContenderRegistry.containsKey(componentId)) {
383+
return leaderElectionDriver.hasLeadership()
384+
&& leaderSessionId.equals(issuedLeaderSessionID);
376385
} else {
377-
LOG.debug("hasLeadership is called after the service is closed, returning false.");
386+
LOG.debug(
387+
"hasLeadership is called for component '{}' while there is no contender registered under that ID in the service, returning false.",
388+
componentId);
378389
return false;
379390
}
391+
} else {
392+
LOG.debug("hasLeadership is called after the service is closed, returning false.");
393+
return false;
380394
}
381395
}
382396

flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.runtime.leaderelection;
2020

2121
import java.util.UUID;
22+
import java.util.concurrent.CompletableFuture;
2223

2324
/**
2425
* {@code LeaderElection} serves as a proxy between {@code LeaderElectionService} and {@link
@@ -42,7 +43,7 @@ public interface LeaderElection extends AutoCloseable {
4243
* @param leaderSessionID The new leader session ID
4344
* @param leaderAddress The address of the new leader
4445
*/
45-
void confirmLeadership(UUID leaderSessionID, String leaderAddress);
46+
CompletableFuture<Void> confirmLeadershipAsync(UUID leaderSessionID, String leaderAddress);
4647

4748
/**
4849
* Returns {@code true} if the service's {@link LeaderContender} has the leadership under the
@@ -51,7 +52,7 @@ public interface LeaderElection extends AutoCloseable {
5152
* @param leaderSessionId identifying the current leader
5253
* @return true if the associated {@link LeaderContender} is the leader, otherwise false
5354
*/
54-
boolean hasLeadership(UUID leaderSessionId);
55+
CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId);
5556

5657
/**
5758
* Closes the {@code LeaderElection} by deregistering the {@link LeaderContender} from the

0 commit comments

Comments
 (0)