|
27 | 27 | import org.apache.flink.runtime.jobgraph.JobGraph;
|
28 | 28 | import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
|
29 | 29 | import org.apache.flink.runtime.jobgraph.JobVertex;
|
| 30 | +import org.apache.flink.runtime.jobgraph.JobVertexID; |
30 | 31 | import org.apache.flink.runtime.minicluster.MiniCluster;
|
31 | 32 | import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
|
32 | 33 | import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
40 | 41 | import org.junit.jupiter.api.Test;
|
41 | 42 | import org.junit.jupiter.api.extension.ExtendWith;
|
42 | 43 | import org.junit.jupiter.api.extension.RegisterExtension;
|
| 44 | +import org.slf4j.Logger; |
| 45 | +import org.slf4j.LoggerFactory; |
43 | 46 |
|
44 | 47 | import java.time.Duration;
|
45 | 48 | import java.util.Iterator;
|
|
51 | 54 | @ExtendWith(TestLoggerExtension.class)
|
52 | 55 | class RescaleOnCheckpointITCase {
|
53 | 56 |
|
| 57 | + private static final Logger LOG = LoggerFactory.getLogger(RescaleOnCheckpointITCase.class); |
| 58 | + |
54 | 59 | // Scaling down is used here because scaling up is not supported by the NumberSequenceSource
|
55 | 60 | // that's used in this test.
|
56 | 61 | private static final int NUMBER_OF_SLOTS = 4;
|
@@ -111,34 +116,59 @@ void testRescaleOnCheckpoint(
|
111 | 116 | assertThat(jobVertexIterator.hasNext())
|
112 | 117 | .as("There needs to be at least one JobVertex.")
|
113 | 118 | .isTrue();
|
| 119 | + final JobVertexID jobVertexId = jobVertexIterator.next().getID(); |
114 | 120 | final JobResourceRequirements jobResourceRequirements =
|
115 | 121 | JobResourceRequirements.newBuilder()
|
116 |
| - .setParallelismForJobVertex( |
117 |
| - jobVertexIterator.next().getID(), 1, AFTER_RESCALE_PARALLELISM) |
| 122 | + .setParallelismForJobVertex(jobVertexId, 1, AFTER_RESCALE_PARALLELISM) |
118 | 123 | .build();
|
119 | 124 | assertThat(jobVertexIterator.hasNext())
|
120 | 125 | .as("This test expects to have only one JobVertex.")
|
121 | 126 | .isFalse();
|
122 | 127 |
|
123 | 128 | restClusterClient.submitJob(jobGraph).join();
|
| 129 | + |
| 130 | + final JobID jobId = jobGraph.getJobID(); |
124 | 131 | try {
|
125 |
| - final JobID jobId = jobGraph.getJobID(); |
126 | 132 |
|
| 133 | + LOG.info( |
| 134 | + "Waiting for job {} to reach parallelism of {} for vertex {}.", |
| 135 | + jobId, |
| 136 | + BEFORE_RESCALE_PARALLELISM, |
| 137 | + jobVertexId); |
127 | 138 | waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM);
|
128 | 139 |
|
| 140 | + LOG.info( |
| 141 | + "Job {} reached parallelism of {} for vertex {}. Updating the vertex parallelism next to {}.", |
| 142 | + jobId, |
| 143 | + BEFORE_RESCALE_PARALLELISM, |
| 144 | + jobVertexId, |
| 145 | + AFTER_RESCALE_PARALLELISM); |
129 | 146 | restClusterClient.updateJobResourceRequirements(jobId, jobResourceRequirements).join();
|
130 | 147 |
|
131 | 148 | // timeout to allow any unexpected rescaling to happen anyway
|
132 | 149 | Thread.sleep(REQUIREMENT_UPDATE_TO_CHECKPOINT_GAP.toMillis());
|
133 | 150 |
|
134 | 151 | // verify that the previous timeout didn't result in a change of parallelism
|
| 152 | + LOG.info( |
| 153 | + "Checking that job {} hasn't changed its parallelism even after some delay, yet.", |
| 154 | + jobId); |
135 | 155 | waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM);
|
136 | 156 |
|
137 | 157 | miniCluster.triggerCheckpoint(jobId);
|
138 | 158 |
|
| 159 | + LOG.info( |
| 160 | + "Waiting for job {} to reach parallelism of {} for vertex {}.", |
| 161 | + jobId, |
| 162 | + AFTER_RESCALE_PARALLELISM, |
| 163 | + jobVertexId); |
139 | 164 | waitForRunningTasks(restClusterClient, jobId, AFTER_RESCALE_PARALLELISM);
|
140 | 165 |
|
141 |
| - waitForAvailableSlots(restClusterClient, NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM); |
| 166 | + final int expectedFreeSlotCount = NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM; |
| 167 | + LOG.info( |
| 168 | + "Waiting for {} slot(s) to become available due to the scale down.", |
| 169 | + expectedFreeSlotCount); |
| 170 | + waitForAvailableSlots(restClusterClient, expectedFreeSlotCount); |
| 171 | + LOG.info("{} free slot(s) detected. Finishing test.", expectedFreeSlotCount); |
142 | 172 | } finally {
|
143 | 173 | restClusterClient.cancel(jobGraph.getJobID()).join();
|
144 | 174 | }
|
|
0 commit comments