Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11401][Manager] Support Dolphinscheduler schedule engine #11468

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

emptyOVO
Copy link
Contributor

@emptyOVO emptyOVO commented Nov 7, 2024

Fixes #11401

Motivation

  • add dolphinscheduler package in org.apache.inlong.manager.schedule
  • add Client and Engine for DS, and Utils for operating open-API of DS
  • add pojo class for DS interaction
  • add UT, provide image env for test cases, mock DS for inlong to test schedule ability

Modifications

Because DS officially does not provide SDK to call, only provides openAPI call mode, so all scheduling behavior is based on open-api requests

  1. when inlong enabled the DS schedule mode, a project in DS will be initialized to handle processes
  2. when inlong starts a offline schedule job for a inlong-group, it will create a workflow process definition in project, scheduled by info, with a script periodically sends requests to call back to inlong
  3. when unregistered, it offline the process running on DS and delete it

Provides some disaster recovery logic

  1. After inlong shutdown and restart in unexpected situations, the workflow process data will recovered and continue work, prevent duplicate data generation and data loss
  2. The latest ScheduleInfo is enabled during repeated registration to avoid data redundancy
  3. ConcurrentHashMap to store schedule data ensure thread safety during start, running or stop

How to use

  1. Specify the url and token in the configuration file
# DolphinScheduler related config
inlong.schedule.dolphinscheduler.url=
inlong.schedule.dolphinscheduler.token=

inlong manager performs dependency injection at startup via the @value annotation

  1. Configure the DolphinScheduleEngine when it is initialized
DolphinScheduleEngine dolphinScheduleEngine = new DolphinScheduleEngine(INLONG_DS_TEST_ADDRESS, INLONG_DS_TEST_PORT, INLONG_DS_TEST_USERNAME, INLONG_DS_TEST_PASSWORD, DS_URL, DS_TOKEN);
  • then use .start() or .stop() to manage DS engine

Verifying this change

  • How is the token generated?
    image
    image

  • DS url
    for example:http://{ip}:{port}/dolphinscheduler

  • testRegisterScheduleInfo
    image
    image
    image
    image
    image
    the callback request was successfully sent to inlong, and the related tasks are processed in flink

log info:

2024-11-05 21:43:03.104 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 - Dolphin Scheduler handle register begin for test-group
2024-11-05 21:43:03.104 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 - Checking process definition id uniqueness...
2024-11-05 21:43:03.111 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 - Generate task code for process definition success, task code: 124296993905792
2024-11-05 21:43:03.154 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 - Create process definition success, process definition code: 124296993945728
2024-11-05 21:43:03.167 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 - Release process definition success, release status: ONLINE
2024-11-05 21:43:03.183 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 - Create schedule for process definition success, schedule info: ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=0, scheduleUnit=S, scheduleInterval=2, startTime=2024-11-05 21:43:03.0, endTime=2024-11-05 21:43:13.0, delayTime=null, selfDepend=null, taskParallelism=null, crontabExpression=null, version=null)
2024-11-05 21:43:03.199 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 - Online schedule for process definition, status: true
2024-11-05 21:43:03.203 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 - Dolphin Scheduler handle register begin for test-group
2024-11-05 21:43:03.204 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 - Checking process definition id uniqueness...
2024-11-05 21:43:03.209 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:144 - Process definition exists, process definition id: 124296993945728, deleting...
2024-11-05 21:43:03.237 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 - Generate task code for process definition success, task code: 124296994033792
2024-11-05 21:43:03.262 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 - Create process definition success, process definition code: 124296994056320
2024-11-05 21:43:03.275 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 - Release process definition success, release status: ONLINE
2024-11-05 21:43:03.290 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 - Create schedule for process definition success, schedule info: ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=1, scheduleUnit=null, scheduleInterval=null, startTime=2024-11-05 21:43:03.0, endTime=2024-11-05 21:43:13.0, delayTime=null, selfDepend=null, taskParallelism=null, crontabExpression=*/1 * * * * ?, version=null)
2024-11-05 21:43:03.306 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 - Online schedule for process definition, status: true
  • testUnregisterScheduleInfo
    image

log info:

2024-11-05 21:43:03.314 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 - Dolphin Scheduler handle register begin for test-group
2024-11-05 21:43:03.314 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 - Checking process definition id uniqueness...
2024-11-05 21:43:03.319 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:144 - Process definition exists, process definition id: 124296994056320, deleting...
2024-11-05 21:43:03.356 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 - Generate task code for process definition success, task code: 124296994155648
2024-11-05 21:43:03.368 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 - Create process definition success, process definition code: 124296994165888
2024-11-05 21:43:03.382 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 - Release process definition success, release status: ONLINE
2024-11-05 21:43:03.397 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 - Create schedule for process definition success, schedule info: ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=0, scheduleUnit=S, scheduleInterval=2, startTime=2024-11-05 21:43:03.0, endTime=2024-11-05 21:43:13.0, delayTime=null, selfDepend=null, taskParallelism=null, crontabExpression=null, version=null)
2024-11-05 21:43:03.412 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 - Online schedule for process definition, status: true
2024-11-05 21:43:03.413 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:184 - Dolphin Scheduler handle Unregister begin for test-group
2024-11-05 21:43:03.413 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:185 - Checking process definition id uniqueness...
2024-11-05 21:43:03.417 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:190 - Deleting process definition, process definition id: 124296994165888
2024-11-05 21:43:03.445 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:195 - Process definition deleted
2024-11-05 21:43:03.445 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:198 - Un-registered dolphin schedule info for test-group
2024-11-05 21:43:03.445 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 - Dolphin Scheduler handle register begin for test-group
2024-11-05 21:43:03.445 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 - Checking process definition id uniqueness...
2024-11-05 21:43:03.451 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 - Generate task code for process definition success, task code: 124296994252928
2024-11-05 21:43:03.460 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 - Create process definition success, process definition code: 124296994260096
2024-11-05 21:43:03.475 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 - Release process definition success, release status: ONLINE
2024-11-05 21:43:03.491 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 - Create schedule for process definition success, schedule info: ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=1, scheduleUnit=null, scheduleInterval=null, startTime=2024-11-05 21:43:03.0, endTime=2024-11-05 21:43:13.0, delayTime=null, selfDepend=null, taskParallelism=null, crontabExpression=*/1 * * * * ?, version=null)
2024-11-05 21:43:03.508 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 - Online schedule for process definition, status: true
2024-11-05 21:43:03.508 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:184 - Dolphin Scheduler handle Unregister begin for test-group
2024-11-05 21:43:03.508 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:185 - Checking process definition id uniqueness...
2024-11-05 21:43:03.512 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:190 - Deleting process definition, process definition id: 124296994260096
2024-11-05 21:43:03.540 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:195 - Process definition deleted
2024-11-05 21:43:03.540 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:198 - Un-registered dolphin schedule info for test-group
  • testUpdateScheduleInfo
    image
    image
2024-11-05 21:43:03.541 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 - Dolphin Scheduler handle register begin for test-group
2024-11-05 21:43:03.541 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 - Checking process definition id uniqueness...
2024-11-05 21:43:03.547 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 - Generate task code for process definition success, task code: 124296994352256
2024-11-05 21:43:03.554 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 - Create process definition success, process definition code: 124296994357376
2024-11-05 21:43:03.569 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 - Release process definition success, release status: ONLINE
2024-11-05 21:43:03.586 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 - Create schedule for process definition success, schedule info: ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=0, scheduleUnit=S, scheduleInterval=2, startTime=2024-11-05 21:43:03.0, endTime=2024-11-05 21:43:13.0, delayTime=null, selfDepend=null, taskParallelism=null, crontabExpression=null, version=null)
2024-11-05 21:43:03.601 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 - Online schedule for process definition, status: true
2024-11-05 21:43:03.601 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:212 - Update dolphin schedule info for test-group
2024-11-05 21:43:03.601 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:184 - Dolphin Scheduler handle Unregister begin for test-group
2024-11-05 21:43:03.601 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:185 - Checking process definition id uniqueness...
2024-11-05 21:43:03.606 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:190 - Deleting process definition, process definition id: 124296994357376
2024-11-05 21:43:03.635 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:195 - Process definition deleted
2024-11-05 21:43:03.635 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:198 - Un-registered dolphin schedule info for test-group
2024-11-05 21:43:03.635 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 - Dolphin Scheduler handle register begin for test-group
2024-11-05 21:43:03.636 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 - Checking process definition id uniqueness...
2024-11-05 21:43:03.641 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 - Generate task code for process definition success, task code: 124296994448512
2024-11-05 21:43:03.648 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 - Create process definition success, process definition code: 124296994453632
2024-11-05 21:43:03.663 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 - Release process definition success, release status: ONLINE
2024-11-05 21:43:03.678 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 - Create schedule for process definition success, schedule info: ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=1, scheduleUnit=null, scheduleInterval=null, startTime=2024-11-05 21:43:03.0, endTime=2024-11-05 21:43:13.0, delayTime=null, selfDepend=null, taskParallelism=null, crontabExpression=*/1 * * * * ?, version=null)
2024-11-05 21:43:03.695 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 - Online schedule for process definition, status: true

(Please pick either of the following options)

  • This change is a trivial rework/code cleanup without any test coverage.

  • This change is already covered by existing tests, such as:
    (please describe tests)

  • This change added tests and can be verified as follows:

    (example:)

    • Added integration tests for end-to-end deployment with large payloads (10MB)
    • Extended integration test for recovery after broker failure

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation

@aloyszhang
Copy link
Contributor

Nice work!
Overall LGTM, just one thing, does dolphinscheduler only support crontab schedule type?

@emptyOVO
Copy link
Contributor Author

emptyOVO commented Nov 8, 2024

Nice work! Overall LGTM, just one thing, does dolphinscheduler only support crontab schedule type?

Yes, natively, only Crontab expressions are accepted for task scheduling. And i also check DS related implements, their underlying implementation were similar to inlong's built-in engine, uses quartz for scheduling,the difference is that DS only use TriggerBuilder in quartz for parsing crontab expressions and create trigger, unlike that used in inlong which can create trigger based on scheduling units and scheduling intervals

aloyszhang
aloyszhang previously approved these changes Nov 10, 2024
dockerzhang
dockerzhang previously approved these changes Nov 12, 2024
@emptyOVO emptyOVO dismissed stale reviews from dockerzhang and aloyszhang via be574d1 November 13, 2024 08:48
* @param searchVal The name of the project to search for.
* @return The unique project ID if found, or 0 if not found or an error occurs.
*/
public long checkAndGetUniqueId(String url, String token, String searchVal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a Utils class, it should not be used through the new method. It is recommended to refer to HttpUtils.

/**
* Exceptions occur in the schedule procedure.
* */
public class DolphinScheduleException extends RuntimeException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception class has no meaning except for exceptions marked as DolphinSchedule.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Manager] Support Dolphinscheduler schedule engine
4 participants