From cc6052d71f2082246e2db5bc7e83a7836be751bc Mon Sep 17 00:00:00 2001 From: Abhishek Nath Date: Mon, 9 Jun 2025 19:28:24 -0700 Subject: [PATCH] Make the job status poller thread interruptible --- .../jobs/scheduler/tasks/OperationTask.java | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTask.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTask.java index 7a1c2d925..703c345ac 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTask.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTask.java @@ -140,25 +140,26 @@ private Optional submitJob(Attributes typeAttributes) { private Optional pollJobStatus(Attributes typeAttributes) { long startTime = System.currentTimeMillis(); - while (!jobFinished()) { - long elapsedTime = System.currentTimeMillis() - startTime; - if (elapsedTime > timeoutMs) { - Optional job = jobsClient.getState(jobId); - // Do not cancel job if it is still in running state after 3 hours - if (job.isPresent() && !job.get().equals(JobState.RUNNING)) { - log.info( - "Cancelling job: {} due to timeout for {}: jobState: {}", - getType(), - metadata, - job.get()); - if (!jobsClient.cancelJob(jobId)) { - log.error("Could not cancel job {} for {}", getType(), metadata); - return Optional.empty(); + while (!Thread.currentThread().isInterrupted() && !jobFinished()) { + try { + long elapsedTime = System.currentTimeMillis() - startTime; + if (elapsedTime > timeoutMs) { + Optional job = jobsClient.getState(jobId); + // Do not cancel job if it is still in running state after 3 hours + if (job.isPresent() && !job.get().equals(JobState.RUNNING)) { + log.info( + "Cancelling job: {} due to timeout for {}: jobState: {}", + getType(), + metadata, + job.get()); + if (!jobsClient.cancelJob(jobId)) { + log.error("Could not cancel job {} for {}", getType(), metadata); + return Optional.empty(); + } + break; } - break; } - } - try { + // Sleep for specified poll interval Thread.sleep(pollIntervalMs); } catch (InterruptedException e) { log.warn( @@ -170,6 +171,9 @@ private Optional pollJobStatus(Attributes typeAttributes) { log.error("Could not cancel job {} for {}", getType(), metadata); return Optional.empty(); } + // Interrupt the current thread and exit the loop + Thread.currentThread().interrupt(); + break; } } Optional ret = jobsClient.getJob(jobId);