Skip to content

Commit 196b47e

Browse files
committed
[KYUUBI #6997] Get the latest batch app info after submit process terminated to prevent batch ERROR due to engine submit timeout
### Why are the changes needed? We meet below issue: For spark on yarn: ``` spark.yarn.submit.waitAppCompletion=false kyuubi.engine.yarn.submit.timeout=PT10M ``` Due to network issue, the application submission was very slow. It was submitted after 15 minutes. <img width="1430" alt="image" src="https://github.com/user-attachments/assets/a326c3d1-4d39-42da-b6aa-cad5f8e7fc4b" /> <img width="1350" alt="image" src="https://github.com/user-attachments/assets/8e20056a-bd71-4515-a5e3-f881509a34b2" /> Then the batch failed from PENDING state to ERRO state directly, due to application state NOT_FOUND(exceeds the kyuubi.engine.yarn.submit.timeout). https://github.com/apache/kyuubi/blob/a54ee39ab338e310c6b9a508ad8f14c0bd82fa0f/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala#L99-L106 <img width="1727" alt="image" src="https://github.com/user-attachments/assets/20a2987c-675c-4136-a107-001f30b1b217" /> Here is the operation event: <img width="1727" alt="image" src="https://github.com/user-attachments/assets/e2bab9c3-a959-4e2b-a207-813ae6489b30" /> But from the batch log, the current application status should be `PENDING`. ``` :2025-03-21 17:36:19.350 INFO [KyuubiSessionManager-exec-pool: Thread-176922] org.apache.kyuubi.operation.BatchJobSubmission: Batch report for bbba09c8-3704-4a87-8394-9bcbbd39cc34, Some(ApplicationInfo(application_1741747369441_2258235,6042072c-e8fa-425d-a6a3-3d5bbb4ec1e3-275732_6042072c-e8fa-425d-a6a3-3d5bbb4ec1e3-275732.e3a34b86-7fc7-43ea-b4a5-1b6f27df54b5.0_20250322002147.stm,PENDING,Some(https://apollo-rno-rm-2.vip.hadoop.ebay.com:50030/proxy/application_1741747369441_2258235/),Some())) ``` So, we should retrieve the batch application info after the submission process terminated before checking the application failed, to get the current application information to prevent the corner case: 1. the application submission time exceeds the `kyuubi.engine.yarn.submit.timeout` and the app state is NOT FOUND 2. can not get the application report before the submission process terminated 3. then the batch state to ERROR from PENDING directly. Conclusion: The application state transition was: UNKNOWN(before submit timeout) -> NOT_FOUND(reach submit timeout) -> processExit -> batchOpError -> PENDING(updateApplicationInfoMetadataIfNeeded) -> UNKNOWN(batchError but app not terminated) After this PR, it should be: UNKNOWN(before submit timeout) -> NOT_FOUND(reach submit timeout) -> processExit-> PENDING(after process terminated) -> .... ### How was this patch tested? Existing GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #6997 from turboFei/app_not_found_v2. Closes #6997 370cf49 [Wang, Fei] v2 912ec28 [Wang, Fei] nit 3c376f9 [Wang, Fei] log the op ex d9cbdb8 [Wang, Fei] fix app not found Authored-by: Wang, Fei <[email protected]> Signed-off-by: Wang, Fei <[email protected]>
1 parent 2080c21 commit 196b47e

File tree

2 files changed

+44
-33
lines changed

2 files changed

+44
-33
lines changed

kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
124124

125125
protected def setOperationException(opEx: KyuubiSQLException): Unit = {
126126
this.operationException = opEx
127+
withOperationLog(error(s"Error operating $opType: ${opEx.getMessage}", opEx))
127128
}
128129

129130
def getOperationJobProgress: TProgressUpdateResp = operationJobProgress

kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala

Lines changed: 43 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ class BatchJobSubmission(
165165
opState: OperationState,
166166
appState: ApplicationState.ApplicationState): ApplicationState.ApplicationState = {
167167
if (opState == OperationState.ERROR && !ApplicationState.isTerminated(appState)) {
168+
withOperationLog(error(s"Batch $batchId state is $opState," +
169+
s" but the application state is $appState and not terminated, set to UNKNOWN."))
168170
ApplicationState.UNKNOWN
169171
} else {
170172
appState
@@ -250,50 +252,58 @@ class BatchJobSubmission(
250252
private def submitAndMonitorBatchJob(): Unit = {
251253
var appStatusFirstUpdated = false
252254
var lastStarvationCheckTime = createTime
255+
256+
def doUpdateApplicationInfoMetadataIfNeeded(): Unit = {
257+
updateApplicationInfoMetadataIfNeeded()
258+
if (!appStatusFirstUpdated) {
259+
// only the ApplicationInfo with non-empty id indicates that batch is RUNNING
260+
if (applicationId(_applicationInfo).isDefined) {
261+
setStateIfNotCanceled(OperationState.RUNNING)
262+
updateBatchMetadata()
263+
appStatusFirstUpdated = true
264+
} else {
265+
val currentTime = System.currentTimeMillis()
266+
if (currentTime - lastStarvationCheckTime > applicationStarvationTimeout) {
267+
lastStarvationCheckTime = currentTime
268+
warn(s"Batch[$batchId] has not started, check the Kyuubi server to ensure" +
269+
s" that batch jobs can be submitted.")
270+
}
271+
}
272+
}
273+
}
274+
253275
try {
254276
info(s"Submitting $batchType batch[$batchId] job:\n$builder")
255277
val process = builder.start
256-
while (!applicationFailed(_applicationInfo) && process.isAlive) {
257-
updateApplicationInfoMetadataIfNeeded()
258-
if (!appStatusFirstUpdated) {
259-
// only the ApplicationInfo with non-empty id indicates that batch is RUNNING
260-
if (applicationId(_applicationInfo).isDefined) {
261-
setStateIfNotCanceled(OperationState.RUNNING)
262-
updateBatchMetadata()
263-
appStatusFirstUpdated = true
264-
} else {
265-
val currentTime = System.currentTimeMillis()
266-
if (currentTime - lastStarvationCheckTime > applicationStarvationTimeout) {
267-
lastStarvationCheckTime = currentTime
268-
warn(s"Batch[$batchId] has not started, check the Kyuubi server to ensure" +
269-
s" that batch jobs can be submitted.")
270-
}
271-
}
272-
}
278+
while (process.isAlive && !applicationFailed(_applicationInfo)) {
279+
doUpdateApplicationInfoMetadataIfNeeded()
273280
process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
274281
}
275282

283+
if (!process.isAlive) {
284+
doUpdateApplicationInfoMetadataIfNeeded()
285+
}
286+
276287
if (applicationFailed(_applicationInfo)) {
277288
Utils.terminateProcess(process, applicationStartupDestroyTimeout)
278289
throw new KyuubiException(s"Batch job failed: ${_applicationInfo}")
279-
} else {
280-
process.waitFor()
281-
if (process.exitValue() != 0) {
282-
throw new KyuubiException(s"Process exit with value ${process.exitValue()}")
283-
}
290+
}
284291

285-
while (!appStarted && applicationId(_applicationInfo).isEmpty &&
286-
!applicationTerminated(_applicationInfo)) {
287-
Thread.sleep(applicationCheckInterval)
288-
updateApplicationInfoMetadataIfNeeded()
289-
}
292+
if (process.waitFor() != 0) {
293+
throw new KyuubiException(s"Process exit with value ${process.exitValue}")
294+
}
290295

291-
applicationId(_applicationInfo) match {
292-
case Some(appId) => monitorBatchJob(appId)
293-
case None if !appStarted =>
294-
throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
295-
case None =>
296-
}
296+
while (!appStarted && applicationId(_applicationInfo).isEmpty &&
297+
!applicationTerminated(_applicationInfo)) {
298+
Thread.sleep(applicationCheckInterval)
299+
doUpdateApplicationInfoMetadataIfNeeded()
300+
}
301+
302+
applicationId(_applicationInfo) match {
303+
case Some(appId) => monitorBatchJob(appId)
304+
case None if !appStarted =>
305+
throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
306+
case None =>
297307
}
298308
} finally {
299309
val waitCompletion = batchConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION.key)

0 commit comments

Comments
 (0)