Skip to content

Commit 6bf84a2

Browse files
committed
[INLONG-11401][Manager] Support Dolphinscheduler schedule engine
1 parent 58fe6ee commit 6bf84a2

File tree

17 files changed

+1765
-7
lines changed

17 files changed

+1765
-7
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.manager.pojo.schedule.dolphinschedule;
19+
20+
import com.google.gson.annotations.SerializedName;
21+
import io.swagger.annotations.ApiModelProperty;
22+
import lombok.Data;
23+
24+
@Data
25+
public class DSTaskDefinition {
26+
27+
@ApiModelProperty("DolphinScheduler task definition code")
28+
@SerializedName("code")
29+
private long code;
30+
31+
@ApiModelProperty("DolphinScheduler task definition code")
32+
@SerializedName("delayTime")
33+
private String delayTime;
34+
35+
@ApiModelProperty("DolphinScheduler task definition description")
36+
@SerializedName("description")
37+
private String description;
38+
39+
@ApiModelProperty("DolphinScheduler task definition environment code")
40+
@SerializedName("environmentCode")
41+
private int environmentCode;
42+
43+
@ApiModelProperty("DolphinScheduler task fail retry interval")
44+
@SerializedName("failRetryInterval")
45+
private String failRetryInterval;
46+
47+
@ApiModelProperty("DolphinScheduler task definition fail retry times")
48+
@SerializedName("failRetryTimes")
49+
private String failRetryTimes;
50+
51+
@ApiModelProperty("DolphinScheduler task definition flag")
52+
@SerializedName("flag")
53+
private String flag;
54+
55+
@ApiModelProperty("DolphinScheduler task definition isCache")
56+
@SerializedName("isCache")
57+
private String isCache;
58+
59+
@ApiModelProperty("DolphinScheduler task definition name")
60+
@SerializedName("name")
61+
private String name;
62+
63+
@ApiModelProperty("DolphinScheduler task definition params")
64+
@SerializedName("taskParams")
65+
private DSTaskParams taskParams;
66+
67+
@ApiModelProperty("DolphinScheduler task definition priority")
68+
@SerializedName("taskPriority")
69+
private String taskPriority;
70+
71+
@ApiModelProperty("DolphinScheduler task definition type")
72+
@SerializedName("taskType")
73+
private String taskType;
74+
75+
@ApiModelProperty("DolphinScheduler task definition timeout")
76+
@SerializedName("timeout")
77+
private int timeout;
78+
79+
@ApiModelProperty("DolphinScheduler task definition timeout flag")
80+
@SerializedName("timeoutFlag")
81+
private String timeoutFlag;
82+
83+
@ApiModelProperty("DolphinScheduler task definition timeout notify strategy")
84+
@SerializedName("timeoutNotifyStrategy")
85+
private String timeoutNotifyStrategy;
86+
87+
@ApiModelProperty("DolphinScheduler task definition worker group")
88+
@SerializedName("workerGroup")
89+
private String workerGroup;
90+
91+
@ApiModelProperty("DolphinScheduler task definition apu quota")
92+
@SerializedName("cpuQuota")
93+
private int cpuQuota;
94+
95+
@ApiModelProperty("DolphinScheduler task definition memory max")
96+
@SerializedName("memoryMax")
97+
private int memoryMax;
98+
99+
@ApiModelProperty("DolphinScheduler task definition execute type")
100+
@SerializedName("taskExecuteType")
101+
private String taskExecuteType;
102+
103+
public DSTaskDefinition() {
104+
this.delayTime = "0";
105+
this.description = "";
106+
this.environmentCode = -1;
107+
this.failRetryInterval = "1";
108+
this.failRetryTimes = "0";
109+
this.flag = "YES";
110+
this.isCache = "NO";
111+
this.taskPriority = "MEDIUM";
112+
this.taskType = "SHELL";
113+
this.timeoutFlag = "CLOSE";
114+
this.timeoutNotifyStrategy = "";
115+
this.workerGroup = "default";
116+
this.cpuQuota = -1;
117+
this.memoryMax = -1;
118+
this.taskExecuteType = "BATCH";
119+
}
120+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.manager.pojo.schedule.dolphinschedule;
19+
20+
import com.fasterxml.jackson.annotation.JsonProperty;
21+
import io.swagger.annotations.ApiModelProperty;
22+
import lombok.Data;
23+
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
@Data
27+
public class DSTaskParams {
28+
29+
@ApiModelProperty("DolphinScheduler task params local params")
30+
@JsonProperty("localParams")
31+
private List<Object> localParams;
32+
33+
@ApiModelProperty("DolphinScheduler task params raw script")
34+
@JsonProperty("rawScript")
35+
private String rawScript;
36+
37+
@ApiModelProperty("DolphinScheduler task params resource list")
38+
@JsonProperty("resourceList")
39+
private List<Object> resourceList;
40+
41+
public DSTaskParams() {
42+
this.localParams = new ArrayList<>();
43+
this.resourceList = new ArrayList<>();
44+
this.rawScript = "";
45+
}
46+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.manager.pojo.schedule.dolphinschedule;
19+
20+
import com.google.gson.annotations.SerializedName;
21+
import io.swagger.annotations.ApiModelProperty;
22+
import lombok.Data;
23+
24+
@Data
25+
public class DSTaskRelation {
26+
27+
@ApiModelProperty("DolphinScheduler task relation name")
28+
@SerializedName("name")
29+
private String name;
30+
31+
@ApiModelProperty("DolphinScheduler task relation pre-task code")
32+
@SerializedName("preTaskCode")
33+
private int preTaskCode;
34+
35+
@ApiModelProperty("DolphinScheduler task relation pre-task version")
36+
@SerializedName("preTaskVersion")
37+
private int preTaskVersion;
38+
39+
@ApiModelProperty("DolphinScheduler task relation post-task code")
40+
@SerializedName("postTaskCode")
41+
private long postTaskCode;
42+
43+
@ApiModelProperty("DolphinScheduler task relation post-task version")
44+
@SerializedName("postTaskVersion")
45+
private int postTaskVersion;
46+
47+
@ApiModelProperty("DolphinScheduler task relation condition type")
48+
@SerializedName("conditionType")
49+
private String conditionType;
50+
51+
@ApiModelProperty("DolphinScheduler task relation condition params")
52+
@SerializedName("conditionParams")
53+
private Object conditionParams;
54+
55+
public DSTaskRelation() {
56+
this.name = "";
57+
this.conditionType = "NONE";
58+
}
59+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.manager.pojo.schedule.dolphinschedule;
19+
20+
import com.fasterxml.jackson.annotation.JsonProperty;
21+
import io.swagger.annotations.ApiModelProperty;
22+
import lombok.Data;
23+
24+
@Data
25+
public class DScheduleInfo {
26+
27+
@ApiModelProperty("DolphinScheduler schedule start time")
28+
@JsonProperty("startTime")
29+
private String startTime;
30+
31+
@ApiModelProperty("DolphinScheduler schedule end time")
32+
@JsonProperty("endTime")
33+
private String endTime;
34+
35+
@ApiModelProperty("DolphinScheduler schedule crontab expression")
36+
@JsonProperty("crontab")
37+
private String crontab;
38+
39+
@ApiModelProperty("DolphinScheduler schedule timezone id")
40+
@JsonProperty("timezoneId")
41+
private String timezoneId;
42+
43+
}

inlong-manager/manager-schedule/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,11 @@
7373
<artifactId>junit-jupiter</artifactId>
7474
<scope>test</scope>
7575
</dependency>
76+
<dependency>
77+
<groupId>org.testcontainers</groupId>
78+
<artifactId>testcontainers</artifactId>
79+
<version>${testcontainers.version}</version>
80+
<scope>test</scope>
81+
</dependency>
7682
</dependencies>
7783
</project>

inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
public enum ScheduleEngineType {
2424

2525
NONE("None"),
26-
QUARTZ("Quartz");
26+
QUARTZ("Quartz"),
27+
DOLPHINSCHEDULER("DolphinScheduler");
2728

2829
private final String type;
2930

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.manager.schedule.dolphinscheduler;
19+
20+
import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
21+
import org.apache.inlong.manager.schedule.ScheduleEngineClient;
22+
import org.apache.inlong.manager.schedule.ScheduleEngineType;
23+
24+
import org.springframework.stereotype.Service;
25+
26+
import javax.annotation.Resource;
27+
28+
/**
29+
* Built-in implementation of third-party schedule engine client corresponding with {@link DolphinScheduleEngine}.
30+
* DolphinScheduleClient simply invokes the {@link DolphinScheduleEngine} to register/unregister/update
31+
* schedule info, all the logic for invoking the remote scheduling service is implemented in {@link DolphinScheduleEngine}
32+
*/
33+
@Service
34+
public class DolphinScheduleClient implements ScheduleEngineClient {
35+
36+
@Resource
37+
public DolphinScheduleEngine scheduleEngine;
38+
39+
@Override
40+
public boolean accept(String engineType) {
41+
return ScheduleEngineType.DOLPHINSCHEDULER.getType().equalsIgnoreCase(engineType);
42+
}
43+
44+
@Override
45+
public boolean register(ScheduleInfo scheduleInfo) {
46+
return scheduleEngine.handleRegister(scheduleInfo);
47+
}
48+
49+
@Override
50+
public boolean unregister(String groupId) {
51+
return scheduleEngine.handleUnregister(groupId);
52+
}
53+
54+
@Override
55+
public boolean update(ScheduleInfo scheduleInfo) {
56+
return scheduleEngine.handleUpdate(scheduleInfo);
57+
}
58+
}

0 commit comments

Comments
 (0)