-
Notifications
You must be signed in to change notification settings - Fork 5k
[Improvement-17699][GrpcTask] Support cancel gRPC task #17700
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
|
@npofsi PTAL |
|
Awesome! |
The newly added gRPC task module from you is impressive! I also read the official Apache DolphinScheduler article—really awesome work! Thank you so much for the code review |
ruanwenjun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't directly use code generated by AI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request adds cancellation support for gRPC tasks in DolphinScheduler by implementing the previously empty cancel() method. The implementation introduces a gRPC CancellableContext that allows external cancellation signals to interrupt ongoing RPC calls, addressing issue #17699. The key mechanism is propagating cancellation through gRPC's context system, which will cause the RPC to throw a CANCELLED status that can be handled appropriately.
- Introduces volatile
CancellableContextfield to track active gRPC calls that can be cancelled - Implements
cancel()method to trigger context cancellation when tasks are killed manually or by timeout - Refactors exception handling to distinguish between cancelled tasks (EXIT_CODE_KILL) and failed tasks (EXIT_CODE_FAILURE)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review for a chance to win a $100 gift card. Take the survey.
|
|
||
| if (ctx != null && !ctx.isCancelled()) { | ||
| try { | ||
| log.info("Canceling gRPC task: method={}", grpcParameters.getMethodName()); |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing null check for grpcParameters when logging in the cancel method. If cancel() is called before init() completes or if init() fails, grpcParameters will be null, causing a NullPointerException at line 129. Consider adding a null check: log.info("Canceling gRPC task: method={}", grpcParameters != null ? grpcParameters.getMethodName() : "unknown");
| log.info("Canceling gRPC task: method={}", grpcParameters.getMethodName()); | |
| log.info("Canceling gRPC task: method={}", grpcParameters != null ? grpcParameters.getMethodName() : "unknown"); |
| public void cancel() { | ||
| // Read volatile reference once for thread safety (avoid repeated reads under race conditions) | ||
| Context.CancellableContext ctx = this.cancellableContext; | ||
|
|
||
| if (ctx != null && !ctx.isCancelled()) { | ||
| try { | ||
| log.info("Canceling gRPC task: method={}", grpcParameters.getMethodName()); | ||
|
|
||
| // Trigger gRPC cancellation by canceling the context. | ||
| // This interrupts the ongoing RPC and causes stubService.call() to throw CANCELLED. | ||
| ctx.cancel(new TaskException("gRPC task was canceled by user")); | ||
|
|
||
| // Record user intent: task was explicitly killed, not failed | ||
| setExitStatusCode(TaskConstants.EXIT_CODE_KILL); | ||
| log.info("gRPC task was successfully canceled"); | ||
| } catch (Exception ex) { | ||
| log.error("Failed to cancel gRPC context", ex); | ||
| throw new TaskException("Cancel gRPC task failed", ex); | ||
| } | ||
| } else { | ||
| // No active context: task may not have started, already finished, or already canceled | ||
| log.warn("gRPC task cancel requested, but no active cancellable context."); | ||
| } | ||
| } |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new cancel() functionality lacks test coverage. Since GrpcTaskTest.java already contains comprehensive tests for the gRPC task (testHandleStatusCodeDefaultOK, testHandleStatusCodeCustom, testAddDefaultOutput), consider adding a test case that verifies the cancellation behavior, such as starting a long-running gRPC call and ensuring that calling cancel() properly interrupts it and sets the exit code to EXIT_CODE_KILL.
| log.info("gRPC task was successfully canceled"); | ||
| } catch (Exception ex) { | ||
| log.error("Failed to cancel gRPC context", ex); | ||
| throw new TaskException("Cancel gRPC task failed", ex); |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The cancel() method wraps the caught exception in a TaskException and re-throws it, but this may cause issues since cancel() is typically called from a different thread (the cancellation thread). Consider logging the error instead of throwing an exception, as throwing from cancel() may not properly propagate to the caller. Most other task implementations (e.g., ShellTask) throw from cancel, so this follows the pattern, but the additional wrapping might not be necessary.
| throw new TaskException("Cancel gRPC task failed", ex); | |
| // Do not throw from cancel(); just log the error. |
| } else { | ||
| setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); | ||
| } | ||
| validateResponse(statusre.getStatus()); |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validateResponse() method is called after setting the exit status code in the StatusRuntimeException handler (line 110), which may overwrite the exit code. The validateResponse() method sets exitStatusCode internally (lines 156, 170, 185 in the full file). This can cause the exit code set on lines 106 or 108 to be overwritten. Consider restructuring to only call validateResponse() for non-cancelled cases, or ensure validateResponse() respects already-set exit codes.
| } else { | |
| setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); | |
| } | |
| validateResponse(statusre.getStatus()); | |
| // Do not call validateResponse for cancelled cases to avoid overwriting exit code | |
| } else { | |
| setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); | |
| validateResponse(statusre.getStatus()); | |
| } |
| @Override | ||
| public void cancel() throws TaskException { | ||
| // Do nothing when task to be canceled | ||
| public void cancel() { |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cancel() method signature should declare throws TaskException to match the abstract method in AbstractTask. All other task implementations (ShellTask, SqlTask, etc.) follow this pattern and declare throws TaskException.
| public void cancel() { | |
| public void cancel() throws TaskException { |
| this.cancellableContext = (Context.CancellableContext) Context.current().withCancellation().attach(); | ||
| Context previous = this.cancellableContext; |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect context management pattern. The attach() method returns the previous context that was attached, not the newly created cancellable context. This code incorrectly tries to cast the return value to CancellableContext and then assigns it again to previous.
The correct pattern should be:
this.cancellableContext = Context.current().withCancellation();
Context previous = this.cancellableContext.attach();This separates the creation of the cancellable context from the attachment, and correctly captures the previous context returned by attach().
| this.cancellableContext = (Context.CancellableContext) Context.current().withCancellation().attach(); | |
| Context previous = this.cancellableContext; | |
| this.cancellableContext = Context.current().withCancellation(); | |
| Context previous = this.cancellableContext.attach(); |
Purpose of the pull request
close #17699
Brief change log
When we kill (manual or timeout) the gRPC task, we should call ctx.cancel method to cancel the gRPC task
Verify this pull request
This pull request is code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(or)
Pull Request Notice
Pull Request Notice
If your pull request contains incompatible change, you should also add it to
docs/docs/en/guide/upgrade/incompatible.md