Skip to content

Commit 4c12066

Browse files
committed
[FLINK-36451][runtime] Adds Async suffix to method names
1 parent ed0697d commit 4c12066

19 files changed

+78
-74
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ private void forwardConfirmLeaderSessionFuture(
178178
.getLeaderAddressFuture()
179179
.thenCompose(
180180
leaderAddress ->
181-
leaderElection.confirmLeadership(
181+
leaderElection.confirmLeadershipAsync(
182182
leaderSessionID, leaderAddress)));
183183
}
184184

flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -467,15 +467,15 @@ public void close() {
467467
}
468468

469469
@Override
470-
public CompletableFuture<Void> confirmLeadership(
470+
public CompletableFuture<Void> confirmLeadershipAsync(
471471
UUID leaderSessionID, String leaderAddress) {
472472
checkNotNull(leaderSessionID);
473473
checkNotNull(leaderAddress);
474474
return confirmLeader(this, leaderSessionID, leaderAddress);
475475
}
476476

477477
@Override
478-
public CompletableFuture<Boolean> hasLeadership(UUID leaderSessionId) {
478+
public CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId) {
479479
return CompletableFuture.completedFuture(
480480
isLeader && leaderSessionId.equals(currentLeaderSessionId));
481481
}

flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ private void confirmLeadership(
344344
LOG.debug(
345345
"Confirm leadership {}.",
346346
leaderSessionId);
347-
return leaderElection.confirmLeadership(
347+
return leaderElection.confirmLeadershipAsync(
348348
leaderSessionId, address);
349349
},
350350
"confirming leadership")
@@ -487,7 +487,7 @@ private CompletableFuture<Void> runIfValidLeader(
487487
synchronized (lock) {
488488
if (isRunning() && leaderElection != null) {
489489
return leaderElection
490-
.hasLeadership(expectedLeaderId)
490+
.hasLeadershipAsync(expectedLeaderId)
491491
.thenAccept(
492492
hasLeadership -> {
493493
synchronized (lock) {

flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,14 @@ public void startLeaderElection(LeaderContender contender) throws Exception {
4444
}
4545

4646
@Override
47-
public CompletableFuture<Void> confirmLeadership(UUID leaderSessionID, String leaderAddress) {
48-
return parentService.confirmLeadership(componentId, leaderSessionID, leaderAddress);
47+
public CompletableFuture<Void> confirmLeadershipAsync(
48+
UUID leaderSessionID, String leaderAddress) {
49+
return parentService.confirmLeadershipAsync(componentId, leaderSessionID, leaderAddress);
4950
}
5051

5152
@Override
52-
public CompletableFuture<Boolean> hasLeadership(UUID leaderSessionId) {
53-
return parentService.hasLeadership(componentId, leaderSessionId);
53+
public CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId) {
54+
return parentService.hasLeadershipAsync(componentId, leaderSessionId);
5455
}
5556

5657
@Override
@@ -82,7 +83,7 @@ abstract static class ParentService {
8283
* the {@link LeaderContender} that is associated with the {@code componentId}. The
8384
* information is only propagated to the HA backend if the leadership is still acquired.
8485
*/
85-
abstract CompletableFuture<Void> confirmLeadership(
86+
abstract CompletableFuture<Void> confirmLeadershipAsync(
8687
String componentId, UUID leaderSessionID, String leaderAddress);
8788

8889
/**
@@ -92,6 +93,7 @@ abstract CompletableFuture<Void> confirmLeadership(
9293
* @return {@code true} if the service has leadership with the passed {@code
9394
* leaderSessionID} acquired; {@code false} otherwise.
9495
*/
95-
abstract CompletableFuture<Boolean> hasLeadership(String componentId, UUID leaderSessionID);
96+
abstract CompletableFuture<Boolean> hasLeadershipAsync(
97+
String componentId, UUID leaderSessionID);
9698
}
9799
}

flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ public void close() throws Exception {
313313
}
314314

315315
@Override
316-
protected CompletableFuture<Void> confirmLeadership(
316+
protected CompletableFuture<Void> confirmLeadershipAsync(
317317
String componentId, UUID leaderSessionID, String leaderAddress) {
318318
Preconditions.checkArgument(leaderContenderRegistry.containsKey(componentId));
319319
LOG.debug(
@@ -365,7 +365,8 @@ protected CompletableFuture<Void> confirmLeadership(
365365
}
366366

367367
@Override
368-
protected CompletableFuture<Boolean> hasLeadership(String componentId, UUID leaderSessionId) {
368+
protected CompletableFuture<Boolean> hasLeadershipAsync(
369+
String componentId, UUID leaderSessionId) {
369370
return CompletableFuture.supplyAsync(
370371
() -> {
371372
synchronized (lock) {

flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public interface LeaderElection extends AutoCloseable {
4343
* @param leaderSessionID The new leader session ID
4444
* @param leaderAddress The address of the new leader
4545
*/
46-
CompletableFuture<Void> confirmLeadership(UUID leaderSessionID, String leaderAddress);
46+
CompletableFuture<Void> confirmLeadershipAsync(UUID leaderSessionID, String leaderAddress);
4747

4848
/**
4949
* Returns {@code true} if the service's {@link LeaderContender} has the leadership under the
@@ -52,7 +52,7 @@ public interface LeaderElection extends AutoCloseable {
5252
* @param leaderSessionId identifying the current leader
5353
* @return true if the associated {@link LeaderContender} is the leader, otherwise false
5454
*/
55-
CompletableFuture<Boolean> hasLeadership(UUID leaderSessionId);
55+
CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId);
5656

5757
/**
5858
* Closes the {@code LeaderElection} by deregistering the {@link LeaderContender} from the

flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
* instantiate its own leader election service.
2929
*
3030
* <p>Once a contender has been granted leadership he has to confirm the received leader session ID
31-
* by calling the method {@link LeaderElection#confirmLeadership(UUID, String)}. This will notify
32-
* the leader election service, that the contender has accepted the leadership specified and that
33-
* the leader session id as well as the leader address can now be published for leader retrieval
34-
* services.
31+
* by calling the method {@link LeaderElection#confirmLeadershipAsync(UUID, String)}. This will
32+
* notify the leader election service, that the contender has accepted the leadership specified and
33+
* that the leader session id as well as the leader address can now be published for leader
34+
* retrieval services.
3535
*/
3636
public interface LeaderElectionService {
3737

flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElection.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,13 @@ public void startLeaderElection(LeaderContender contender) throws Exception {
5959
}
6060

6161
@Override
62-
public CompletableFuture<Void> confirmLeadership(UUID leaderSessionID, String leaderAddress) {
62+
public CompletableFuture<Void> confirmLeadershipAsync(
63+
UUID leaderSessionID, String leaderAddress) {
6364
return FutureUtils.completedVoidFuture();
6465
}
6566

6667
@Override
67-
public CompletableFuture<Boolean> hasLeadership(UUID leaderSessionId) {
68+
public CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId) {
6869
synchronized (lock) {
6970
return CompletableFuture.completedFuture(
7071
this.leaderContender != null && this.sessionID.equals(leaderSessionId));

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ private void startNewLeaderResourceManager(UUID newLeaderSessionID) throws Excep
266266
.thenAcceptAsync(
267267
(isStillLeader) -> {
268268
if (isStillLeader) {
269-
leaderElection.confirmLeadership(
269+
leaderElection.confirmLeadershipAsync(
270270
newLeaderSessionID, newLeaderResourceManager.getAddress());
271271
}
272272
},

flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1213,7 +1213,7 @@ public void grantLeadership(final UUID leaderSessionID) {
12131213
"{} was granted leadership with leaderSessionID={}",
12141214
getRestBaseUrl(),
12151215
leaderSessionID);
1216-
leaderElection.confirmLeadership(leaderSessionID, getRestBaseUrl());
1216+
leaderElection.confirmLeadershipAsync(leaderSessionID, getRestBaseUrl());
12171217
}
12181218

12191219
@Override

flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ public void grantLeadership_oldLeader_doesNotConfirmLeaderSession() throws Excep
273273
// complete the confirmation future after losing the leadership
274274
contenderConfirmationFuture.complete("leader address");
275275

276-
assertThat(leaderElection.hasLeadership(leaderSessionId).get(), is(false));
276+
assertThat(leaderElection.hasLeadershipAsync(leaderSessionId).get(), is(false));
277277
}
278278
}
279279

flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private void runLeaderRetrievalTest(
140140

141141
final UUID leaderId = leaderContender.getLeaderSessionFuture().get();
142142

143-
leaderElection.confirmLeadership(leaderId, ADDRESS).get();
143+
leaderElection.confirmLeadershipAsync(leaderId, ADDRESS).get();
144144

145145
final LeaderInformation leaderInformation =
146146
leaderRetrievalListener.getLeaderInformationFuture().get();
@@ -172,20 +172,20 @@ public void testConcurrentLeadershipOperations() throws Exception {
172172

173173
final UUID oldLeaderSessionId = leaderContender.getLeaderSessionFuture().get();
174174

175-
assertThat(leaderElection.hasLeadership(oldLeaderSessionId).get(), is(true));
175+
assertThat(leaderElection.hasLeadershipAsync(oldLeaderSessionId).get(), is(true));
176176

177177
embeddedHaServices.getDispatcherLeaderService().revokeLeadership().get();
178-
assertThat(leaderElection.hasLeadership(oldLeaderSessionId).get(), is(false));
178+
assertThat(leaderElection.hasLeadershipAsync(oldLeaderSessionId).get(), is(false));
179179

180180
embeddedHaServices.getDispatcherLeaderService().grantLeadership();
181181
final UUID newLeaderSessionId = leaderContender.getLeaderSessionFuture().get();
182182

183-
assertThat(leaderElection.hasLeadership(newLeaderSessionId).get(), is(true));
183+
assertThat(leaderElection.hasLeadershipAsync(newLeaderSessionId).get(), is(true));
184184

185-
leaderElection.confirmLeadership(oldLeaderSessionId, ADDRESS).get();
186-
leaderElection.confirmLeadership(newLeaderSessionId, ADDRESS).get();
185+
leaderElection.confirmLeadershipAsync(oldLeaderSessionId, ADDRESS).get();
186+
leaderElection.confirmLeadershipAsync(newLeaderSessionId, ADDRESS).get();
187187

188-
assertThat(leaderElection.hasLeadership(newLeaderSessionId).get(), is(true));
188+
assertThat(leaderElection.hasLeadershipAsync(newLeaderSessionId).get(), is(true));
189189

190190
leaderContender.tryRethrowException();
191191
}

flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java

+17-17
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ void testAllLeaderInformationChangeEventWithUnknownComponentId() throws Exceptio
703703
}
704704

705705
@Test
706-
void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws Exception {
706+
void testHasLeadershipAsyncWithLeadershipButNoGrantEventProcessed() throws Exception {
707707
new Context() {
708708
{
709709
runTestWithManuallyTriggeredEvents(
@@ -714,10 +714,10 @@ void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws Exception
714714
applyToBothContenderContexts(
715715
ctx -> {
716716
final CompletableFuture<Boolean> validSessionFuture =
717-
leaderElectionService.hasLeadership(
717+
leaderElectionService.hasLeadershipAsync(
718718
ctx.componentId, expectedSessionID);
719719
final CompletableFuture<Boolean> invalidSessionFuture =
720-
leaderElectionService.hasLeadership(
720+
leaderElectionService.hasLeadershipAsync(
721721
ctx.componentId, UUID.randomUUID());
722722
executorService.triggerAll();
723723

@@ -734,7 +734,7 @@ void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws Exception
734734
}
735735

736736
@Test
737-
void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws Exception {
737+
void testHasLeadershipAsyncWithLeadershipAndGrantEventProcessed() throws Exception {
738738
new Context() {
739739
{
740740
runTestWithManuallyTriggeredEvents(
@@ -753,10 +753,10 @@ void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws Exception {
753753
.isEqualTo(expectedSessionID);
754754

755755
final CompletableFuture<Boolean> validSessionFuture =
756-
leaderElectionService.hasLeadership(
756+
leaderElectionService.hasLeadershipAsync(
757757
ctx.componentId, expectedSessionID);
758758
final CompletableFuture<Boolean> invalidSessionFuture =
759-
leaderElectionService.hasLeadership(
759+
leaderElectionService.hasLeadershipAsync(
760760
ctx.componentId, UUID.randomUUID());
761761
executorService.triggerAll();
762762

@@ -773,7 +773,7 @@ void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws Exception {
773773
}
774774

775775
@Test
776-
void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws Exception {
776+
void testHasLeadershipAsyncWithLeadershipLostButNoRevokeEventProcessed() throws Exception {
777777
new Context() {
778778
{
779779
runTestWithManuallyTriggeredEvents(
@@ -787,10 +787,10 @@ void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws Excep
787787
applyToBothContenderContexts(
788788
ctx -> {
789789
final CompletableFuture<Boolean> validSessionFuture =
790-
leaderElectionService.hasLeadership(
790+
leaderElectionService.hasLeadershipAsync(
791791
ctx.componentId, expectedSessionID);
792792
final CompletableFuture<Boolean> invalidSessionFuture =
793-
leaderElectionService.hasLeadership(
793+
leaderElectionService.hasLeadershipAsync(
794794
ctx.componentId, UUID.randomUUID());
795795

796796
executorService.triggerAll();
@@ -814,7 +814,7 @@ void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws Excep
814814
}
815815

816816
@Test
817-
void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws Exception {
817+
void testHasLeadershipAsyncWithLeadershipLostAndRevokeEventProcessed() throws Exception {
818818
new Context() {
819819
{
820820
runTestWithSynchronousEventHandling(
@@ -826,12 +826,12 @@ void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws Excepti
826826
applyToBothContenderContexts(
827827
ctx -> {
828828
assertThatFuture(
829-
leaderElectionService.hasLeadership(
829+
leaderElectionService.hasLeadershipAsync(
830830
ctx.componentId, expectedSessionID))
831831
.eventuallySucceeds()
832832
.isEqualTo(false);
833833
assertThatFuture(
834-
leaderElectionService.hasLeadership(
834+
leaderElectionService.hasLeadershipAsync(
835835
ctx.componentId, UUID.randomUUID()))
836836
.eventuallySucceeds()
837837
.isEqualTo(false);
@@ -842,7 +842,7 @@ void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws Excepti
842842
}
843843

844844
@Test
845-
void testHasLeadershipAfterLeaderElectionClose() throws Exception {
845+
void testHasLeadershipAsyncAfterLeaderElectionClose() throws Exception {
846846
new Context() {
847847
{
848848
runTestWithSynchronousEventHandling(
@@ -855,7 +855,7 @@ void testHasLeadershipAfterLeaderElectionClose() throws Exception {
855855
ctx.leaderElection.close();
856856

857857
assertThatFuture(
858-
leaderElectionService.hasLeadership(
858+
leaderElectionService.hasLeadershipAsync(
859859
ctx.componentId, expectedSessionID))
860860
.eventuallySucceeds()
861861
.isEqualTo(false);
@@ -1061,7 +1061,7 @@ void testOldConfirmLeaderInformationWhileHavingNewLeadership() throws Exception
10611061
.hasValue(expectedLeaderInformation);
10621062

10631063
// Old confirm call should be ignored.
1064-
ctx.leaderElection.confirmLeadership(
1064+
ctx.leaderElection.confirmLeadershipAsync(
10651065
UUID.randomUUID(), ctx.address);
10661066
assertThat(
10671067
leaderElectionService.getLeaderSessionID(
@@ -1092,7 +1092,7 @@ void testOldConfirmationWhileHavingLeadershipLost() throws Exception {
10921092
applyToBothContenderContexts(
10931093
ctx -> {
10941094
// Old confirm call should be ignored.
1095-
ctx.leaderElection.confirmLeadership(
1095+
ctx.leaderElection.confirmLeadershipAsync(
10961096
currentLeaderSessionId, ctx.address);
10971097

10981098
assertThat(
@@ -1316,7 +1316,7 @@ void testNestedDeadlockInLeadershipConfirmation() throws Exception {
13161316
revocationFuture = CompletableFuture.runAsync(testInstance::onRevokeLeadership);
13171317
contenderLockAcquireLatch.await();
13181318
confirmLeadershipFuture =
1319-
leaderElection.confirmLeadership(leaderSessionId, "random-address");
1319+
leaderElection.confirmLeadershipAsync(leaderSessionId, "random-address");
13201320
}
13211321

13221322
assertThatFuture(revocationFuture).eventuallySucceeds();

0 commit comments

Comments
 (0)