Skip to content

Commit cc6052d

Browse files
Make the job status poller thread interruptible
1 parent 8bec6ba commit cc6052d

File tree

1 file changed

+21
-17
lines changed

1 file changed

+21
-17
lines changed

apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTask.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -140,25 +140,26 @@ private Optional<JobState> submitJob(Attributes typeAttributes) {
140140

141141
private Optional<JobState> pollJobStatus(Attributes typeAttributes) {
142142
long startTime = System.currentTimeMillis();
143-
while (!jobFinished()) {
144-
long elapsedTime = System.currentTimeMillis() - startTime;
145-
if (elapsedTime > timeoutMs) {
146-
Optional<JobState> job = jobsClient.getState(jobId);
147-
// Do not cancel job if it is still in running state after 3 hours
148-
if (job.isPresent() && !job.get().equals(JobState.RUNNING)) {
149-
log.info(
150-
"Cancelling job: {} due to timeout for {}: jobState: {}",
151-
getType(),
152-
metadata,
153-
job.get());
154-
if (!jobsClient.cancelJob(jobId)) {
155-
log.error("Could not cancel job {} for {}", getType(), metadata);
156-
return Optional.empty();
143+
while (!Thread.currentThread().isInterrupted() && !jobFinished()) {
144+
try {
145+
long elapsedTime = System.currentTimeMillis() - startTime;
146+
if (elapsedTime > timeoutMs) {
147+
Optional<JobState> job = jobsClient.getState(jobId);
148+
// Do not cancel job if it is still in running state after 3 hours
149+
if (job.isPresent() && !job.get().equals(JobState.RUNNING)) {
150+
log.info(
151+
"Cancelling job: {} due to timeout for {}: jobState: {}",
152+
getType(),
153+
metadata,
154+
job.get());
155+
if (!jobsClient.cancelJob(jobId)) {
156+
log.error("Could not cancel job {} for {}", getType(), metadata);
157+
return Optional.empty();
158+
}
159+
break;
157160
}
158-
break;
159161
}
160-
}
161-
try {
162+
// Sleep for specified poll interval
162163
Thread.sleep(pollIntervalMs);
163164
} catch (InterruptedException e) {
164165
log.warn(
@@ -170,6 +171,9 @@ private Optional<JobState> pollJobStatus(Attributes typeAttributes) {
170171
log.error("Could not cancel job {} for {}", getType(), metadata);
171172
return Optional.empty();
172173
}
174+
// Interrupt the current thread and exit the loop
175+
Thread.currentThread().interrupt();
176+
break;
173177
}
174178
}
175179
Optional<JobResponseBody> ret = jobsClient.getJob(jobId);

0 commit comments

Comments
 (0)