diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskDefinition.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskDefinition.java new file mode 100644 index 00000000000..20d8250796f --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskDefinition.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.dolphinschedule; + +import com.google.gson.annotations.SerializedName; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +@Data +public class DSTaskDefinition { + + @ApiModelProperty("DolphinScheduler task definition code") + @SerializedName("code") + private long code; + + @ApiModelProperty("DolphinScheduler task definition code") + @SerializedName("delayTime") + private String delayTime; + + @ApiModelProperty("DolphinScheduler task definition description") + @SerializedName("description") + private String description; + + @ApiModelProperty("DolphinScheduler task definition environment code") + @SerializedName("environmentCode") + private int environmentCode; + + @ApiModelProperty("DolphinScheduler task fail retry interval") + @SerializedName("failRetryInterval") + private String failRetryInterval; + + @ApiModelProperty("DolphinScheduler task definition fail retry times") + @SerializedName("failRetryTimes") + private String failRetryTimes; + + @ApiModelProperty("DolphinScheduler task definition flag") + @SerializedName("flag") + private String flag; + + @ApiModelProperty("DolphinScheduler task definition isCache") + @SerializedName("isCache") + private String isCache; + + @ApiModelProperty("DolphinScheduler task definition name") + @SerializedName("name") + private String name; + + @ApiModelProperty("DolphinScheduler task definition params") + @SerializedName("taskParams") + private DSTaskParams taskParams; + + @ApiModelProperty("DolphinScheduler task definition priority") + @SerializedName("taskPriority") + private String taskPriority; + + @ApiModelProperty("DolphinScheduler task definition type") + @SerializedName("taskType") + private String taskType; + + @ApiModelProperty("DolphinScheduler task definition timeout") + @SerializedName("timeout") + private int timeout; + + @ApiModelProperty("DolphinScheduler task definition timeout flag") + @SerializedName("timeoutFlag") + private String timeoutFlag; + + @ApiModelProperty("DolphinScheduler task definition timeout notify strategy") + @SerializedName("timeoutNotifyStrategy") + private String timeoutNotifyStrategy; + + @ApiModelProperty("DolphinScheduler task definition worker group") + @SerializedName("workerGroup") + private String workerGroup; + + @ApiModelProperty("DolphinScheduler task definition apu quota") + @SerializedName("cpuQuota") + private int cpuQuota; + + @ApiModelProperty("DolphinScheduler task definition memory max") + @SerializedName("memoryMax") + private int memoryMax; + + @ApiModelProperty("DolphinScheduler task definition execute type") + @SerializedName("taskExecuteType") + private String taskExecuteType; + + public DSTaskDefinition() { + this.delayTime = "0"; + this.description = ""; + this.environmentCode = -1; + this.failRetryInterval = "1"; + this.failRetryTimes = "0"; + this.flag = "YES"; + this.isCache = "NO"; + this.taskPriority = "MEDIUM"; + this.taskType = "SHELL"; + this.timeoutFlag = "CLOSE"; + this.timeoutNotifyStrategy = ""; + this.workerGroup = "default"; + this.cpuQuota = -1; + this.memoryMax = -1; + this.taskExecuteType = "BATCH"; + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskParams.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskParams.java new file mode 100644 index 00000000000..a5344f5facf --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskParams.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.dolphinschedule; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.util.ArrayList; +import java.util.List; +@Data +public class DSTaskParams { + + @ApiModelProperty("DolphinScheduler task params local params") + @JsonProperty("localParams") + private List localParams; + + @ApiModelProperty("DolphinScheduler task params raw script") + @JsonProperty("rawScript") + private String rawScript; + + @ApiModelProperty("DolphinScheduler task params resource list") + @JsonProperty("resourceList") + private List resourceList; + + public DSTaskParams() { + this.localParams = new ArrayList<>(); + this.resourceList = new ArrayList<>(); + this.rawScript = ""; + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskRelation.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskRelation.java new file mode 100644 index 00000000000..19e1ddc8276 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DSTaskRelation.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.dolphinschedule; + +import com.google.gson.annotations.SerializedName; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +@Data +public class DSTaskRelation { + + @ApiModelProperty("DolphinScheduler task relation name") + @SerializedName("name") + private String name; + + @ApiModelProperty("DolphinScheduler task relation pre-task code") + @SerializedName("preTaskCode") + private int preTaskCode; + + @ApiModelProperty("DolphinScheduler task relation pre-task version") + @SerializedName("preTaskVersion") + private int preTaskVersion; + + @ApiModelProperty("DolphinScheduler task relation post-task code") + @SerializedName("postTaskCode") + private long postTaskCode; + + @ApiModelProperty("DolphinScheduler task relation post-task version") + @SerializedName("postTaskVersion") + private int postTaskVersion; + + @ApiModelProperty("DolphinScheduler task relation condition type") + @SerializedName("conditionType") + private String conditionType; + + @ApiModelProperty("DolphinScheduler task relation condition params") + @SerializedName("conditionParams") + private Object conditionParams; + + public DSTaskRelation() { + this.name = ""; + this.conditionType = "NONE"; + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DScheduleInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DScheduleInfo.java new file mode 100644 index 00000000000..ac45b26f197 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/dolphinschedule/DScheduleInfo.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.schedule.dolphinschedule; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +@Data +public class DScheduleInfo { + + @ApiModelProperty("DolphinScheduler schedule start time") + @JsonProperty("startTime") + private String startTime; + + @ApiModelProperty("DolphinScheduler schedule end time") + @JsonProperty("endTime") + private String endTime; + + @ApiModelProperty("DolphinScheduler schedule crontab expression") + @JsonProperty("crontab") + private String crontab; + + @ApiModelProperty("DolphinScheduler schedule timezone id") + @JsonProperty("timezoneId") + private String timezoneId; + +} diff --git a/inlong-manager/manager-schedule/pom.xml b/inlong-manager/manager-schedule/pom.xml index a9d9fb3e1ed..aa3db31fec8 100644 --- a/inlong-manager/manager-schedule/pom.xml +++ b/inlong-manager/manager-schedule/pom.xml @@ -73,5 +73,11 @@ junit-jupiter test + + org.testcontainers + testcontainers + ${testcontainers.version} + test + diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java index 71949ef7445..ac71e4e2d13 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineType.java @@ -23,7 +23,8 @@ public enum ScheduleEngineType { NONE("None"), - QUARTZ("Quartz"); + QUARTZ("Quartz"), + DOLPHINSCHEDULER("DolphinScheduler"); private final String type; diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleClient.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleClient.java new file mode 100644 index 00000000000..a75085b9ac9 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleClient.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleEngineClient; +import org.apache.inlong.manager.schedule.ScheduleEngineType; + +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * Built-in implementation of third-party schedule engine client corresponding with {@link DolphinScheduleEngine}. + * DolphinScheduleClient simply invokes the {@link DolphinScheduleEngine} to register/unregister/update + * schedule info, all the logic for invoking the remote scheduling service is implemented in {@link DolphinScheduleEngine} + */ +@Service +public class DolphinScheduleClient implements ScheduleEngineClient { + + @Resource + public DolphinScheduleEngine scheduleEngine; + + @Override + public boolean accept(String engineType) { + return ScheduleEngineType.DOLPHINSCHEDULER.getType().equalsIgnoreCase(engineType); + } + + @Override + public boolean register(ScheduleInfo scheduleInfo) { + return scheduleEngine.handleRegister(scheduleInfo); + } + + @Override + public boolean unregister(String groupId) { + return scheduleEngine.handleUnregister(groupId); + } + + @Override + public boolean update(ScheduleInfo scheduleInfo) { + return scheduleEngine.handleUpdate(scheduleInfo); + } +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java new file mode 100644 index 00000000000..89dcda5b779 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleConstants.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +public class DolphinScheduleConstants { + + // DS public constants + public static final String DS_ID = "id"; + public static final String DS_CODE = "code"; + public static final String DS_TOKEN = "token"; + public static final String DS_PAGE_SIZE = "pageSize"; + public static final String DS_PAGE_NO = "pageNo"; + public static final String DS_SEARCH_VAL = "searchVal"; + public static final String DS_RESPONSE_DATA = "data"; + public static final String DS_RESPONSE_NAME = "name"; + public static final String DS_RESPONSE_TOTAL_LIST = "totalList"; + public static final String DS_DEFAULT_PAGE_SIZE = "10"; + public static final String DS_DEFAULT_PAGE_NO = "1"; + public static final String DS_DEFAULT_TIMEZONE_ID = "Asia/Shanghai"; + + // DS project related constants + public static final String DS_PROJECT_URL = "/projects"; + public static final String DS_PROJECT_NAME = "projectName"; + public static final String DS_PROJECT_DESC = "description"; + public static final String DS_DEFAULT_PROJECT_NAME = "default_inlong_offline_scheduler"; + public static final String DS_DEFAULT_PROJECT_DESC = "default scheduler project for inlong offline job"; + + // DS task related constants + public static final String DS_TASK_CODE_URL = "/task-definition/gen-task-codes"; + public static final String DS_TASK_RELATION = "taskRelationJson"; + public static final String DS_TASK_DEFINITION = "taskDefinitionJson"; + public static final String DS_TASK_GEN_NUM = "genNum"; + public static final String DS_DEFAULT_TASK_GEN_NUM = "1"; + public static final String DS_DEFAULT_TASK_NAME = "default-inlong-http-callback"; + public static final String DS_DEFAULT_TASK_DESC = "default http request using shell script callbacks to inlong"; + + // DS process definition related constants + public static final String DS_PROCESS_URL = "/process-definition"; + public static final String DS_PROCESS_QUERY_URL = "/query-process-definition-list"; + public static final String DS_PROCESS_NAME = "name"; + public static final String DS_PROCESS_DESC = "description"; + public static final String DS_PROCESS_CODE = "processDefinitionCode"; + public static final String DS_DEFAULT_PROCESS_NAME = "_inlong_offline_process_definition"; + public static final String DS_DEFAULT_PROCESS_DESC = "scheduler process definition for inlong group: "; + + // DS release related constants + public static final String DS_RELEASE_URL = "/release"; + public static final String DS_RELEASE_STATE = "releaseState"; + + // DS schedule related constants + public static final String DS_SCHEDULE_URL = "/schedules"; + public static final String DS_SCHEDULE_DEF = "schedule"; + public static final String DS_DEFAULT_SCHEDULE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + // DS online/offline related constants + public static final String DS_ONLINE_URL = "/online"; + public static final String DS_ONLINE_STATE = "ONLINE"; + public static final String DS_OFFLINE_URL = "/offline"; + public static final String DS_OFFLINE_STATE = "OFFLINE"; + +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java new file mode 100644 index 00000000000..5506c126300 --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleEngine; +import org.apache.inlong.manager.schedule.exception.DolphinScheduleException; + +import com.google.common.annotations.VisibleForTesting; +import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_OFFLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_QUERY_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_CODE_URL; + +/** + * The default implementation of DolphinScheduler engine based on DolphinScheduler API. Response for processing + * the register/unregister/update requests from {@link DolphinScheduleClient} + */ +@Data +@Service +public class DolphinScheduleEngine implements ScheduleEngine { + + private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class); + + @Value("${server.host:127.0.0.1}") + private String host; + + @Value("${server.port:8083}") + private int port; + + @Value("${default.admin.user:admin}") + private String username; + + @Value("${default.admin.password:inlong}") + private String password; + + @Value("${inlong.schedule.dolphinscheduler.url:}") + private String dolphinUrl; + + @Value("${inlong.schedule.dolphinscheduler.token:}") + private String token; + + private long projectCode; + private final String projectBaseUrl; + private final DolphinScheduleUtils dsUtils; + private final Map scheduledProcessMap; + + public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl, + String token) { + this.host = host; + this.port = port; + this.username = username; + this.password = password; + this.dolphinUrl = dolphinUrl; + this.token = token; + this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; + try { + LOGGER.info("Dolphin Scheduler engine http client initialized"); + this.dsUtils = new DolphinScheduleUtils(); + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to init dolphin scheduler ", e); + } + } + + public DolphinScheduleEngine() { + this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; + try { + LOGGER.info("Dolphin Scheduler engine http client initialized"); + this.dsUtils = new DolphinScheduleUtils(); + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to init dolphin scheduler ", e); + } + } + + /** + * check if there already exists a project for inlong offline schedule + * if no then build a new project for inlong-group-id in DolphinScheduler + */ + @Override + public void start() { + LOGGER.info("Starting dolphin scheduler engine"); + LOGGER.info("Checking project exists..."); + long code = dsUtils.checkAndGetUniqueId(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME); + if (code != 0) { + LOGGER.info("Project exists, project code: {}", code); + this.projectCode = code; + + LOGGER.info("Starting synchronize existing process definition"); + String queryProcessDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL + DS_PROCESS_QUERY_URL; + scheduledProcessMap.putAll(dsUtils.queryAllProcessDef(queryProcessDefUrl, token)); + + } else { + LOGGER.info("There is no inlong offline project exists, default project will be created"); + this.projectCode = + dsUtils.creatNewProject(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME, DS_DEFAULT_PROJECT_DESC); + } + } + + /** + * Handle schedule register. + * @param scheduleInfo schedule info to register + */ + @Override + @VisibleForTesting + public boolean handleRegister(ScheduleInfo scheduleInfo) { + String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; + String scheduleUrl = projectBaseUrl + "/" + projectCode + DS_SCHEDULE_URL; + String processName = scheduleInfo.getInlongGroupId() + DS_DEFAULT_PROCESS_NAME; + String processDesc = DS_DEFAULT_PROCESS_DESC + scheduleInfo.getInlongGroupId(); + + LOGGER.info("Dolphin Scheduler handle register begin for {}", scheduleInfo.getInlongGroupId()); + LOGGER.info("Checking process definition id uniqueness..."); + try { + long processDefCode = dsUtils.checkAndGetUniqueId(processDefUrl, token, processName); + + boolean online = false; + if (processDefCode != 0 || scheduledProcessMap.containsKey(processDefCode)) { + + // process definition already exists, delete and rebuild + LOGGER.info("Process definition exists, process definition id: {}, deleting...", processDefCode); + if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE)) { + dsUtils.deleteProcessDef(processDefUrl, token, processDefCode); + scheduledProcessMap.remove(processDefCode); + } + } + String taskCodeUrl = projectBaseUrl + "/" + projectCode + DS_TASK_CODE_URL; + + long taskCode = dsUtils.genTaskCode(taskCodeUrl, token); + LOGGER.info("Generate task code for process definition success, task code: {}", taskCode); + + long offset = dsUtils.calculateOffset(scheduleInfo); + processDefCode = + dsUtils.createProcessDef(processDefUrl, token, processName, processDesc, taskCode, host, port, + username, password, offset, scheduleInfo.getInlongGroupId()); + LOGGER.info("Create process definition success, process definition code: {}", processDefCode); + + if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_ONLINE_STATE)) { + LOGGER.info("Release process definition success, release status: {}", DS_ONLINE_STATE); + + int scheduleId = dsUtils.createScheduleForProcessDef(scheduleUrl, processDefCode, token, scheduleInfo); + LOGGER.info("Create schedule for process definition success, schedule info: {}", scheduleInfo); + + online = dsUtils.onlineScheduleForProcessDef(scheduleUrl, scheduleId, token); + LOGGER.info("Online schedule for process definition, status: {}", online); + } + + scheduledProcessMap.putIfAbsent(processDefCode, processName); + return online; + } catch (Exception e) { + throw new DolphinScheduleException("Failed to handle unregister dolphin scheduler", e); + } + } + + /** + * Handle schedule unregister. + * @param groupId group to un-register schedule info + */ + @Override + @VisibleForTesting + public boolean handleUnregister(String groupId) { + String processName = groupId + DS_DEFAULT_PROCESS_NAME; + String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; + + LOGGER.info("Dolphin Scheduler handle Unregister begin for {}", groupId); + LOGGER.info("Checking process definition id uniqueness..."); + try { + long processDefCode = dsUtils.checkAndGetUniqueId(processDefUrl, token, processName); + if (processDefCode != 0 || scheduledProcessMap.containsKey(processDefCode)) { + + LOGGER.info("Deleting process definition, process definition id: {}", processDefCode); + if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE)) { + + dsUtils.deleteProcessDef(processDefUrl, token, processDefCode); + scheduledProcessMap.remove(processDefCode); + LOGGER.info("Process definition deleted"); + } + } + LOGGER.info("Un-registered dolphin schedule info for {}", groupId); + return !scheduledProcessMap.containsKey(processDefCode); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to handle unregister dolphin scheduler", e); + } + } + + /** + * Handle schedule update. + * @param scheduleInfo schedule info to update + */ + @Override + @VisibleForTesting + public boolean handleUpdate(ScheduleInfo scheduleInfo) { + LOGGER.info("Update dolphin schedule info for {}", scheduleInfo.getInlongGroupId()); + try { + return handleUnregister(scheduleInfo.getInlongGroupId()) && handleRegister(scheduleInfo); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to handle update dolphin scheduler", e); + } + } + + /** + * stop and delete all process definition in DolphinScheduler + * remove all process stored in scheduledProcessMap + * delete project for inlong-group-id in DolphinScheduler + */ + @Override + public void stop() { + LOGGER.info("Stopping dolphin scheduler engine..."); + String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; + try { + + String queryProcessDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL + DS_PROCESS_QUERY_URL; + Map allProcessDef = dsUtils.queryAllProcessDef(queryProcessDefUrl, token); + + for (Long processDefCode : allProcessDef.keySet()) { + + LOGGER.info("delete process definition id: {}", processDefCode); + dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE); + dsUtils.deleteProcessDef(processDefUrl, token, processDefCode); + scheduledProcessMap.remove(processDefCode); + } + + dsUtils.deleteProject(projectBaseUrl, token, projectCode); + LOGGER.info("Dolphin scheduler engine stopped"); + + } catch (Exception e) { + throw new DolphinScheduleException("Failed to stop dolphin scheduler", e); + } + } + +} diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java new file mode 100644 index 00000000000..d83292ab9ff --- /dev/null +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java @@ -0,0 +1,692 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DSTaskDefinition; +import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DSTaskParams; +import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DSTaskRelation; +import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleUnit; +import org.apache.inlong.manager.schedule.exception.DolphinScheduleException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import org.apache.logging.log4j.core.util.CronExpression; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_CODE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_NO; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_SIZE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_SCHEDULE_TIME_FORMAT; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_GEN_NUM; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TIMEZONE_ID; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ID; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PAGE_NO; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PAGE_SIZE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_CODE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RELEASE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RELEASE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_DATA; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_TOTAL_LIST; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_DEF; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SEARCH_VAL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_DEFINITION; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_GEN_NUM; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_RELATION; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TOKEN; + +/** + * A utility class for interacting with DolphinScheduler API. This class includes methods + * for creating, updating, and deleting projects, tasks, and process definitions in DolphinScheduler. + * It also provides methods for generating schedules, calculating offsets, and executing HTTP requests. + * This class leverages the OkHttp library for HTTP interactions and Gson for JSON parsing and serialization. + */ +public class DolphinScheduleUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class); + + private static final long MILLIS_IN_SECOND = 1000L; + private static final long MILLIS_IN_MINUTE = 60 * MILLIS_IN_SECOND; + private static final long MILLIS_IN_HOUR = 60 * MILLIS_IN_MINUTE; + private static final long MILLIS_IN_DAY = 24 * MILLIS_IN_HOUR; + private static final long MILLIS_IN_WEEK = 7 * MILLIS_IN_DAY; + private static final long MILLIS_IN_MONTH = 30 * MILLIS_IN_DAY; + private static final long MILLIS_IN_YEAR = 365 * MILLIS_IN_DAY; + private static final String HTTP_POST = "POST"; + private static final String HTTP_GET = "GET"; + private static final String HTTP_PUT = "PUT"; + private static final String HTTP_DELETE = "DELETE"; + private static final String CONTENT_TYPE = "Content-Type: application/json; charset=utf-8"; + private static final String SHELL_REQUEST_API = "/inlong/manager/api/group/submitOfflineJob"; + private final OkHttpClient client; + private final Gson gson; + + /** + * Constructs a new instance of the DolphinScheduleUtils class. + * Initializes the OkHttpClient and Gson instances. + */ + public DolphinScheduleUtils() { + this.client = new OkHttpClient(); + this.gson = new GsonBuilder().setPrettyPrinting().create(); + } + + /** + * Checks the uniqueness of a project ID based on the given search value. + * + * @param url The base URL of the DolphinScheduler API. + * @param token The authentication token to be used in the request header. + * @param searchVal The name of the project to search for. + * @return The unique project ID if found, or 0 if not found or an error occurs. + */ + public long checkAndGetUniqueId(String url, String token, String searchVal) { + + Map header = buildHeader(token); + Map queryParams = buildPageParam(searchVal); + + try { + JsonObject response = executeHttpRequest(url, HTTP_GET, queryParams, header); + + if (response == null) { + return 0; + } + JsonObject data = response.getAsJsonObject(DS_RESPONSE_DATA); + + if (data == null) { + return 0; + } + JsonArray totalList = data.getAsJsonArray(DS_RESPONSE_TOTAL_LIST); + // check uniqueness + if (totalList != null && totalList.size() == 1) { + JsonObject project = totalList.get(0).getAsJsonObject(); + String name = project.get(DS_RESPONSE_NAME).getAsString(); + if (name.equals(searchVal)) { + return project.get(DS_CODE).getAsLong(); + } + } + return 0; + + } catch (IOException e) { + LOGGER.error("Unexpected wrong in check id uniqueness", e); + return 0; + } + } + + /** + * Creates a new project in DolphinScheduler. + * + * @param url The base URL of the DolphinScheduler API. + * @param token The authentication token to be used in the request header. + * @param projectName The name of the new project. + * @param description The description of the new project. + * @return The project code (ID) if creation is successful, or 0 if an error occurs. + */ + public long creatNewProject(String url, String token, String projectName, String description) { + Map header = buildHeader(token); + + Map queryParams = new HashMap<>(); + queryParams.put(DS_PROJECT_NAME, projectName); + queryParams.put(DS_PROJECT_DESC, description); + try { + JsonObject response = executeHttpRequest(url, HTTP_POST, queryParams, header); + + if (response == null) { + return 0; + } + JsonObject data = response.getAsJsonObject(DS_RESPONSE_DATA); + if (data != null) { + return data.get(DS_CODE).getAsLong(); + } + return 0; + } catch (IOException e) { + LOGGER.error("Unexpected wrong in creating new project", e); + return 0; + } + } + + public Map queryAllProcessDef(String url, String token) { + Map header = buildHeader(token); + try { + JsonObject response = executeHttpRequest(url, HTTP_GET, new HashMap<>(), header); + + if (response == null) { + return null; + } + JsonArray data = response.getAsJsonArray(DS_RESPONSE_DATA); + Map processDef = new HashMap<>(); + + for (JsonElement processDefInfo : data) { + processDef.put(processDefInfo.getAsJsonObject().get(DS_CODE).getAsLong(), + processDefInfo.getAsJsonObject().get(DS_PROCESS_NAME).getAsString()); + } + LOGGER.info("Query all process definition success, processes info: {}", processDef); + return processDef; + } catch (IOException e) { + LOGGER.error("Unexpected wrong in generating task code", e); + return null; + } + } + + /** + * Generates a new task code in DolphinScheduler. + * + * @param url The base URL of the DolphinScheduler API. + * @param token The authentication token to be used in the request header. + * @return The task code (ID) if generation is successful, or 0 if an error occurs. + */ + public long genTaskCode(String url, String token) { + Map header = buildHeader(token); + Map queryParams = new HashMap<>(); + queryParams.put(DS_TASK_GEN_NUM, DS_DEFAULT_TASK_GEN_NUM); + try { + JsonObject response = executeHttpRequest(url, HTTP_GET, queryParams, header); + + if (response == null) { + return 0; + } + JsonArray data = response.getAsJsonArray(DS_RESPONSE_DATA); + if (data != null && data.size() == 1) { + return data.get(0).getAsLong(); + } + return 0; + } catch (IOException e) { + LOGGER.error("Unexpected wrong in generating task code", e); + return 0; + } + } + + /** + * Creates a process definition in DolphinScheduler. + * + * @param url The base URL of the DolphinScheduler API. + * @param token The authentication token to be used in the request header. + * @param name The name of the process definition. + * @param desc The description of the process definition. + * @param taskCode The task code to be associated with this process definition. + * @param host The host where the process will run. + * @param port The port where the process will run. + * @param username The username for authentication. + * @param password The password for authentication. + * @param offset The offset for the scheduling. + * @param groupId The group ID of the process. + * @return The process definition code (ID) if creation is successful, or 0 if an error occurs. + */ + public long createProcessDef(String url, String token, String name, String desc, long taskCode, String host, + int port, String username, String password, long offset, String groupId) { + Map header = buildHeader(token); + + DSTaskRelation taskRelation = new DSTaskRelation(); + taskRelation.setPostTaskCode(taskCode); + String taskRelationJson = gson.toJson(Collections.singletonList(taskRelation)); + + DSTaskParams taskParams = new DSTaskParams(); + taskParams.setRawScript(buildScript(host, port, username, password, offset, groupId)); + + DSTaskDefinition taskDefinition = new DSTaskDefinition(); + taskDefinition.setCode(taskCode); + taskDefinition.setName(DS_DEFAULT_TASK_NAME); + taskDefinition.setDescription(DS_DEFAULT_TASK_DESC); + taskDefinition.setTaskParams(taskParams); + String taskDefinitionJson = gson.toJson(Collections.singletonList(taskDefinition)); + + Map queryParams = new HashMap<>(); + queryParams.put(DS_TASK_RELATION, taskRelationJson); + queryParams.put(DS_TASK_DEFINITION, taskDefinitionJson); + queryParams.put(DS_PROCESS_NAME, name); + queryParams.put(DS_PROCESS_DESC, desc); + + try { + JsonObject response = executeHttpRequest(url, HTTP_POST, queryParams, header); + + if (response != null) { + JsonObject data = response.getAsJsonObject(DS_RESPONSE_DATA); + if (data != null) { + return data.get(DS_CODE).getAsLong(); + } + } + return 0; + } catch (IOException e) { + LOGGER.error("Unexpected wrong in creating process definition", e); + return 0; + } + } + + /** + * Releases a process definition in DolphinScheduler. + * + * @param processDefUrl The URL to release the process definition. + * @param processDefCode The ID of the process definition. + * @param token The authentication token to be used in the request header. + * @param status The status to set for the process definition (e.g., "online" or "offline"). + * @return true if the process definition was successfully released, false otherwise. + */ + public boolean releaseProcessDef(String processDefUrl, long processDefCode, String token, String status) { + String url = processDefUrl + "/" + processDefCode + DS_RELEASE_URL; + Map header = buildHeader(token); + Map queryParam = new HashMap<>(); + queryParam.put(DS_RELEASE_STATE, status); + + try { + JsonObject response = executeHttpRequest(url, HTTP_POST, queryParam, header); + if (response != null && !response.get(DS_RESPONSE_DATA).isJsonNull()) { + return response.get(DS_RESPONSE_DATA).getAsBoolean(); + } + return false; + } catch (IOException e) { + LOGGER.error("Unexpected wrong in release process definition", e); + return false; + } + } + + /** + * Create a schedule for process definition in DolphinScheduler. + * + * @param url The URL to create a schedule for the process definition. + * @param processDefCode The ID of the process definition. + * @param token The authentication token to be used in the request header. + * @param scheduleInfo The schedule info + * @return The schedule id + */ + public int createScheduleForProcessDef(String url, long processDefCode, String token, ScheduleInfo scheduleInfo) { + Map header = buildHeader(token); + + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DS_DEFAULT_SCHEDULE_TIME_FORMAT); + String startTime = scheduleInfo.getStartTime().toLocalDateTime() + .atZone(ZoneId.of(DS_DEFAULT_TIMEZONE_ID)).format(formatter); + String endTime = scheduleInfo.getEndTime().toLocalDateTime() + .atZone(ZoneId.of(DS_DEFAULT_TIMEZONE_ID)).format(formatter); + + String crontab; + if (scheduleInfo.getScheduleType() == 0) { + crontab = generateCrontabExpression(scheduleInfo.getScheduleUnit(), scheduleInfo.getScheduleInterval()); + } else if (scheduleInfo.getScheduleType() == 1) { + crontab = scheduleInfo.getCrontabExpression(); + } else { + throw new IllegalArgumentException("Unsupported schedule type: " + scheduleInfo.getScheduleType()); + } + + DScheduleInfo dScheduleInfo = new DScheduleInfo(); + dScheduleInfo.setStartTime(startTime); + dScheduleInfo.setEndTime(endTime); + dScheduleInfo.setCrontab(crontab); + dScheduleInfo.setTimezoneId(DS_DEFAULT_TIMEZONE_ID); + String scheduleDef = gson.toJson(dScheduleInfo); + + Map queryParams = new HashMap<>(); + queryParams.put(DS_PROCESS_CODE, String.valueOf(processDefCode)); + queryParams.put(DS_SCHEDULE_DEF, scheduleDef); + + try { + JsonObject response = executeHttpRequest(url, HTTP_POST, queryParams, header); + if (response == null) { + return 0; + } + JsonObject data = response.get(DS_RESPONSE_DATA).getAsJsonObject(); + if (data != null) { + return data.get(DS_ID).getAsInt(); + } + } catch (IOException e) { + LOGGER.error("Unexpected wrong in creating schedule for process definition", e); + return 0; + } + return 0; + } + + /** + * Online the schedule for process definition in DolphinScheduler. + * + * @param scheduleUrl The URL to online the schedule for process definition. + * @param scheduleId The ID of the schedule of process definition. + * @param token The authentication token to be used in the request header. + * @return whether online is succeeded + */ + public boolean onlineScheduleForProcessDef(String scheduleUrl, int scheduleId, String token) { + Map header = buildHeader(token); + String url = scheduleUrl + "/" + scheduleId + DS_ONLINE_URL; + try { + JsonObject response = executeHttpRequest(url, HTTP_POST, new HashMap<>(), header); + if (response != null && !response.get(DS_RESPONSE_DATA).isJsonNull()) { + return response.get(DS_RESPONSE_DATA).getAsBoolean(); + } + } catch (IOException e) { + LOGGER.error("Unexpected wrong in online process definition", e); + return false; + } + return false; + } + + /** + * Delete the process definition in DolphinScheduler. + * + * @param processDefUrl The URL to delete the process definition. + * @param token The authentication token to be used in the request header. + * @param processDefCode The process definition id + */ + public void deleteProcessDef(String processDefUrl, String token, long processDefCode) { + Map header = buildHeader(token); + String url = processDefUrl + "/" + processDefCode; + try { + executeHttpRequest(url, HTTP_DELETE, new HashMap<>(), header); + } catch (IOException e) { + LOGGER.error("Unexpected wrong in deleting process definition", e); + } + } + + /** + * Delete the project by project code in DolphinScheduler. + * + * @param projectBaseUrl The URL to delete project + * @param token The authentication token to be used in the request header. + * @param projectCode The project code + */ + public void deleteProject(String projectBaseUrl, String token, long projectCode) { + Map header = buildHeader(token); + String url = projectBaseUrl + "/" + projectCode; + try { + executeHttpRequest(url, HTTP_DELETE, new HashMap<>(), header); + } catch (IOException e) { + LOGGER.error("Unexpected wrong in deleting project definition", e); + } + } + + /** + * Builds the header map for HTTP requests, including the authentication token. + * + * @param token The authentication token for the request. + * @return A map representing the headers of the HTTP request. + */ + private Map buildHeader(String token) { + Map headers = new HashMap<>(); + headers.put(DS_TOKEN, token); + return headers; + } + + /** + * Builds a query parameter map used for API calls that need to paginate or filter results. + * This method can be used for searching projects or tasks. + * + * @param searchVal The value to search for. + * @return A map containing the necessary query parameters. + */ + private Map buildPageParam(String searchVal) { + Map queryParams = new HashMap<>(); + queryParams.put(DS_SEARCH_VAL, searchVal); + queryParams.put(DS_PAGE_SIZE, DS_DEFAULT_PAGE_SIZE); + queryParams.put(DS_PAGE_NO, DS_DEFAULT_PAGE_NO); + return queryParams; + } + + /** + * Executes an HTTP request using OkHttp. Supports various HTTP methods (GET, POST, PUT, DELETE). + * + * @param url The URL of the request. + * @param method The HTTP method (GET, POST, PUT, DELETE). + * @param queryParams The query parameters for the request (optional). + * @param headers The headers for the request. + * @return A JsonObject containing the response from the server. + * @throws IOException If an I/O error occurs during the request. + */ + @VisibleForTesting + private JsonObject executeHttpRequest(String url, String method, Map queryParams, + Map headers) throws IOException { + // build param + HttpUrl.Builder urlBuilder = HttpUrl.parse(url).newBuilder(); + + for (Map.Entry entry : queryParams.entrySet()) { + urlBuilder.addQueryParameter(entry.getKey(), entry.getValue()); + } + HttpUrl httpUrl = urlBuilder.build(); + + // build request + Request.Builder requestBuilder = new Request.Builder() + .url(httpUrl); + + // add header + for (Map.Entry entry : headers.entrySet()) { + requestBuilder.addHeader(entry.getKey(), entry.getValue()); + } + RequestBody body = RequestBody.create(MediaType.parse(CONTENT_TYPE), ""); + // handle request method + switch (method.toUpperCase()) { + case HTTP_POST: + requestBuilder.post(body); + break; + case HTTP_GET: + requestBuilder.get(); + break; + case HTTP_PUT: + requestBuilder.put(body); + break; + case HTTP_DELETE: + requestBuilder.delete(body); + break; + default: + throw new IllegalArgumentException("Unsupported request method: " + method); + } + + Request request = requestBuilder.build(); + + // get response + try (Response response = client.newCall(request).execute()) { + if (response.isSuccessful() && response.body() != null) { + String responseBody = response.body().string(); + return JsonParser.parseString(responseBody).getAsJsonObject(); + } else { + throw new IOException("Unexpected http response code " + response); + } + } + } + + /** + * Calculate the offset according to schedule info + * + * @param scheduleInfo The schedule info + * @return timestamp between two schedule task + */ + public long calculateOffset(ScheduleInfo scheduleInfo) { + if (scheduleInfo == null) { + throw new IllegalArgumentException("ScheduleInfo cannot be null"); + } + + long offset = 0; + + // Determine offset based on schedule type + if (scheduleInfo.getScheduleType() == null) { + throw new IllegalArgumentException("Schedule type cannot be null"); + } + + switch (scheduleInfo.getScheduleType()) { + case 0: // Normal scheduling + offset = calculateNormalOffset(scheduleInfo); + break; + case 1: // Crontab scheduling + offset = calculateCronOffset(scheduleInfo); + break; + default: + throw new IllegalArgumentException("Invalid schedule type"); + } + + // Add delay time if specified + if (scheduleInfo.getDelayTime() != null) { + offset += scheduleInfo.getDelayTime() * MILLIS_IN_SECOND; + } + + return offset; + } + + private long calculateNormalOffset(ScheduleInfo scheduleInfo) { + if (scheduleInfo.getScheduleInterval() == null || scheduleInfo.getScheduleUnit() == null) { + throw new IllegalArgumentException("Schedule interval and unit cannot be null for normal scheduling"); + } + switch (Objects.requireNonNull(ScheduleUnit.getScheduleUnit(scheduleInfo.getScheduleUnit()))) { + case YEAR: + return scheduleInfo.getScheduleInterval() * MILLIS_IN_YEAR; + case MONTH: + return scheduleInfo.getScheduleInterval() * MILLIS_IN_MONTH; + case WEEK: + return scheduleInfo.getScheduleInterval() * MILLIS_IN_WEEK; + case DAY: + return scheduleInfo.getScheduleInterval() * MILLIS_IN_DAY; + case HOUR: + return scheduleInfo.getScheduleInterval() * MILLIS_IN_HOUR; + case MINUTE: + return scheduleInfo.getScheduleInterval() * MILLIS_IN_MINUTE; + case SECOND: + return scheduleInfo.getScheduleInterval() * MILLIS_IN_SECOND; + case ONE_ROUND: + return scheduleInfo.getScheduleInterval(); + default: + throw new IllegalArgumentException("Invalid schedule unit"); + } + } + + private long calculateCronOffset(ScheduleInfo scheduleInfo) { + if (scheduleInfo.getCrontabExpression() == null) { + throw new IllegalArgumentException("Crontab expression cannot be null for schedule type crontab"); + } + + try { + CronExpression cronExpression = new CronExpression(scheduleInfo.getCrontabExpression()); + Date firstExecution = cronExpression.getNextValidTimeAfter(new Date()); + Date secondExecution = cronExpression.getNextValidTimeAfter(firstExecution); + + if (secondExecution != null) { + return secondExecution.getTime() - firstExecution.getTime(); + } else { + throw new IllegalArgumentException( + "Unable to calculate the next execution times for the cron expression"); + } + } catch (Exception e) { + throw new IllegalArgumentException("Invalid cron expression", e); + } + } + + private String generateCrontabExpression(String scheduleUnit, Integer scheduleInterval) { + if (scheduleUnit.isEmpty()) { + throw new DolphinScheduleException( + "Schedule unit and interval must not be null for generating crontab expression"); + } + String crontabExpression; + + switch (Objects.requireNonNull(ScheduleUnit.getScheduleUnit(scheduleUnit))) { + case SECOND: + crontabExpression = String.format("0/%d * * * * ? *", scheduleInterval); + break; + case MINUTE: + crontabExpression = String.format("* 0/%d * * * ? *", scheduleInterval); + break; + case HOUR: + crontabExpression = String.format("* * 0/%d * * ? *", scheduleInterval); + break; + case DAY: + crontabExpression = String.format("* * * 1/%d * ? *", scheduleInterval); + break; + case WEEK: + crontabExpression = String.format("* * * 1/%d * ? *", scheduleInterval * 7); + break; + case MONTH: + crontabExpression = String.format("* * * * 0/%d ? *", scheduleInterval); + break; + case YEAR: + crontabExpression = String.format("* * * * * ? 0/%d", scheduleInterval); + break; + default: + throw new DolphinScheduleException("Unsupported schedule unit for generating crontab: " + scheduleUnit); + } + + return crontabExpression; + } + + /** + * Shell node in DolphinScheduler need to write in a script + * When process definition schedule run, the shell node run, + * Call back in inlong, sending a request with parameters required + */ + private String buildScript(String host, int port, String username, String password, long offset, String groupId) { + + return "#!/bin/bash\n\n" + + + // Get current timestamp + "# Get current timestamp\n" + + "lowerBoundary=$(date +%s)\n" + + "echo \"get lowerBoundary: ${lowerBoundary}\"\n" + + "upperBoundary=$(($lowerBoundary + " + offset + "))\n" + + "echo \"get upperBoundary: ${upperBoundary}\"\n\n" + + + // Set URL + "# Set URL and HTTP method\n" + + "url=\"http://" + host + ":" + port + SHELL_REQUEST_API + + "?username=" + username + "&password=" + password + "\"\n" + + "echo \"get url: ${url}\"\n" + + + // Set HTTP method + "httpMethod=\"" + HTTP_POST + "\"\n\n" + + + // Set request body + "# Build request body\n" + + "jsonBody=$(cat < dolphinSchedulerContainer; + + private static final Logger DS_LOG = LoggerFactory.getLogger(DolphinScheduleContainerTestEnv.class); + + // DS env generated final url and final token + protected static String DS_URL; + protected static String DS_TOKEN; + + public static void envSetUp() throws Exception { + dolphinSchedulerContainer = + new GenericContainer<>(DS_IMAGE_NAME + ":" + DS_IMAGE_TAG) + .withExposedPorts(12345, 25333) + .withLogConsumer(outputFrame -> System.out.print(outputFrame.getUtf8String())); + dolphinSchedulerContainer.start(); + DS_URL = HTTP_BASE_URL + dolphinSchedulerContainer.getHost() + ":" + + dolphinSchedulerContainer.getMappedPort(12345) + DS_BASE_URL; + DS_LOG.info("DolphinScheduler is running at: {}", DS_URL); + + DS_TOKEN = accessToken(); + DS_LOG.info("DolphinScheduler token: {}", DS_TOKEN); + } + + /** + * This method just for DS testing, login by default admin username and password + * generate a 1-day expiring token for test, the token will disappear with the DS container shutting down + * + * @return the DS token + */ + private static String accessToken() throws Exception { + Map loginParams = new HashMap<>(); + loginParams.put(DS_USERNAME, DS_DEFAULT_USERNAME); + loginParams.put(DS_PASSWORD, DS_DEFAULT_PASSWORD); + JsonObject loginResponse = executeHttpRequest(DS_URL + DS_LOGIN_URL, loginParams, new HashMap<>()); + if (loginResponse.get("success").getAsBoolean()) { + String tokenGenUrl = DS_URL + DS_TOKEN_URL + DS_TOKEN_GEN_URL; + Map tokenParams = new HashMap<>(); + tokenParams.put(DS_USERID, String.valueOf(DS_DEFAULT_USERID)); + + LocalDateTime now = LocalDateTime.now(); + LocalDateTime tomorrow = now.plusDays(1); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DS_EXPIRE_TIME_FORMAT); + String expireTime = tomorrow.format(formatter); + tokenParams.put(DS_EXPIRE_TIME, expireTime); + + Map cookies = new HashMap<>(); + cookies.put(DS_COOKIE_SC_TYPE, loginResponse.get(DS_RESPONSE_DATA) + .getAsJsonObject().get(DS_COOKIE_SC_TYPE).getAsString()); + cookies.put(DS_COOKIE_SESSION_ID, loginResponse.get(DS_RESPONSE_DATA) + .getAsJsonObject().get(DS_COOKIE_SESSION_ID).getAsString()); + + JsonObject tokenGenResponse = executeHttpRequest(tokenGenUrl, tokenParams, cookies); + + String accessTokenUrl = DS_URL + DS_TOKEN_URL; + tokenParams.put(DS_RESPONSE_TOKEN, tokenGenResponse.get(DS_RESPONSE_DATA).getAsString()); + JsonObject result = executeHttpRequest(accessTokenUrl, tokenParams, cookies); + String token = result.get(DS_RESPONSE_DATA).getAsJsonObject().get(DS_RESPONSE_TOKEN).getAsString(); + DS_LOG.info("login and generate token success, token: {}", token); + return token; + } + DS_LOG.error("login and generate token fail"); + return null; + } + + public static void envShutDown() { + if (dolphinSchedulerContainer != null) { + dolphinSchedulerContainer.close(); + } + } + + private static JsonObject executeHttpRequest(String url, Map queryParams, + Map cookies) throws IOException { + OkHttpClient client = new OkHttpClient(); + + // Build query parameters + HttpUrl.Builder urlBuilder = Objects.requireNonNull(HttpUrl.parse(url)).newBuilder(); + for (Map.Entry entry : queryParams.entrySet()) { + urlBuilder.addQueryParameter(entry.getKey(), entry.getValue()); + } + HttpUrl httpUrl = urlBuilder.build(); + + // Build the request + Request.Builder requestBuilder = new Request.Builder() + .url(httpUrl); + + // Add cookies to the request + if (cookies != null && !cookies.isEmpty()) { + String cookieHeader = cookies.entrySet() + .stream() + .map(entry -> entry.getKey() + "=" + entry.getValue()) + .collect(Collectors.joining("; ")); + requestBuilder.header(DS_COOKIE, cookieHeader); + } + + RequestBody body = RequestBody.create(MediaType.parse(CONTENT_TYPE), ""); + requestBuilder.post(body); + + Request request = requestBuilder.build(); + + // Execute the request and parse the response + try (Response response = client.newCall(request).execute()) { + if (response.isSuccessful() && response.body() != null) { + String responseBody = response.body().string(); + return JsonParser.parseString(responseBody).getAsJsonObject(); + } else { + throw new IOException("Unexpected http response error " + response); + } + } + } + +} diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java new file mode 100644 index 00000000000..0364e1ccc7f --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngineTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.Timeout; + +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.INLONG_DS_TEST_ADDRESS; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.INLONG_DS_TEST_PASSWORD; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.INLONG_DS_TEST_PORT; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.INLONG_DS_TEST_USERNAME; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class DolphinScheduleEngineTest extends DolphinScheduleContainerTestEnv { + + private static DolphinScheduleEngine dolphinScheduleEngine; + + @BeforeAll + public static void initDolphinSchedulerEngine() throws Exception { + envSetUp(); + assertTrue(dolphinSchedulerContainer.isRunning(), "DolphinScheduler container should be running"); + + dolphinScheduleEngine = new DolphinScheduleEngine(INLONG_DS_TEST_ADDRESS, INLONG_DS_TEST_PORT, + INLONG_DS_TEST_USERNAME, INLONG_DS_TEST_PASSWORD, DS_URL, DS_TOKEN); + + dolphinScheduleEngine.start(); + } + + @Test + @Order(1) + @Timeout(30) + public void testRegisterScheduleInfo() { + // 1. test for normal schedule + ScheduleInfo scheduleInfo = genDefaultScheduleInfo(); + testRegister(scheduleInfo); + + // 2. test for cron schedule + scheduleInfo = genDefaultCronScheduleInfo(); + testRegister(scheduleInfo); + } + + private void testRegister(ScheduleInfo scheduleInfo) { + // register schedule info + dolphinScheduleEngine.handleRegister(scheduleInfo); + assertEquals(1, dolphinScheduleEngine.getScheduledProcessMap().size()); + } + + @Test + @Order(2) + @Timeout(30) + public void testUnRegisterScheduleInfo() { + // 1. test for normal schedule + ScheduleInfo scheduleInfo = genDefaultScheduleInfo(); + testUnRegister(scheduleInfo); + + // 2. test for cron schedule, gen cron schedule info, */2 * * * * ? + scheduleInfo = genDefaultCronScheduleInfo(); + testUnRegister(scheduleInfo); + } + + private void testUnRegister(ScheduleInfo scheduleInfo) { + // register schedule info + dolphinScheduleEngine.handleRegister(scheduleInfo); + assertEquals(1, dolphinScheduleEngine.getScheduledProcessMap().size()); + + // Un-register schedule info + dolphinScheduleEngine.handleUnregister(scheduleInfo.getInlongGroupId()); + assertEquals(0, dolphinScheduleEngine.getScheduledProcessMap().size()); + } + + @Test + @Order(3) + @Timeout(30) + public void testUpdateScheduleInfo() { + // 1. test for normal schedule + ScheduleInfo scheduleInfo = genDefaultScheduleInfo(); + testRegister(scheduleInfo); + + // 2. test for cron schedule, gen cron schedule info, */2 * * * * ? + scheduleInfo = genDefaultCronScheduleInfo(); + testUpdate(scheduleInfo); + } + + private void testUpdate(ScheduleInfo scheduleInfo) { + // register schedule info + dolphinScheduleEngine.handleUpdate(scheduleInfo); + assertEquals(1, dolphinScheduleEngine.getScheduledProcessMap().size()); + } + + @AfterAll + public static void testStopEngine() { + dolphinScheduleEngine.stop(); + envShutDown(); + } +} diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinSchedulerContainerEnvConstants.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinSchedulerContainerEnvConstants.java new file mode 100644 index 00000000000..94cfd93e9ff --- /dev/null +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinSchedulerContainerEnvConstants.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +public class DolphinSchedulerContainerEnvConstants { + + // DS-inlong test related constants + protected static final String INLONG_DS_TEST_ADDRESS = "127.0.0.1"; + protected static final int INLONG_DS_TEST_PORT = 8083; + protected static final String INLONG_DS_TEST_USERNAME = "admin"; + protected static final String INLONG_DS_TEST_PASSWORD = "inlong"; + + // DS env image related constants + protected static final String DS_IMAGE_NAME = "apache/dolphinscheduler-standalone-server"; + protected static final String DS_IMAGE_TAG = "3.2.2"; + + // DS env url related constants + protected static final String HTTP_BASE_URL = "http://"; + protected static final String DS_BASE_URL = "/dolphinscheduler"; + protected static final String DS_LOGIN_URL = "/login"; + protected static final String DS_TOKEN_URL = "/access-tokens"; + protected static final String DS_TOKEN_GEN_URL = "/generate"; + + // DS env api params related constants + protected static final String DS_USERNAME = "userName"; + protected static final String DS_PASSWORD = "userPassword"; + protected static final String DS_USERID = "userId"; + protected static final String DS_COOKIE = "Cookie"; + protected static final String DS_COOKIE_SC_TYPE = "securityConfigType"; + protected static final String DS_COOKIE_SESSION_ID = "sessionId"; + protected static final String DS_EXPIRE_TIME = "expireTime"; + protected static final String DS_EXPIRE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; + + // DS env token related constants + protected static final String DS_RESPONSE_TOKEN = "token"; + + // DS env default admin user info + protected static final String DS_DEFAULT_USERNAME = "admin"; + protected static final String DS_DEFAULT_PASSWORD = "dolphinscheduler123"; + protected static final Integer DS_DEFAULT_USERID = 1; + +} diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties index 794f201bda7..06baf34649d 100644 --- a/inlong-manager/manager-web/src/main/resources/application-dev.properties +++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties @@ -104,5 +104,9 @@ agent.install.temp.path=inlong/agent-installer-temp/ # The primary key id of the default agent module used default.module.id=1 # schedule engine type -# support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none \ No newline at end of file +# support none(no scheduler), quartz(quartz scheduler), dolphin scheduler(dolphin scheduler), default is none +inlong.schedule.engine=none + +# DolphinScheduler related config +# inlong.schedule.dolphinscheduler.url= +# inlong.schedule.dolphinscheduler.token= \ No newline at end of file diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties index 3e8f329470a..32bdef52590 100644 --- a/inlong-manager/manager-web/src/main/resources/application-prod.properties +++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties @@ -96,5 +96,9 @@ group.deleted.enabled=false cls.manager.endpoint=127.0.0.1 # schedule engine type -# support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none \ No newline at end of file +# support none(no scheduler), quartz(quartz scheduler), dolphin scheduler(dolphin scheduler), default is none +inlong.schedule.engine=none + +# DolphinScheduler related config +# inlong.schedule.dolphinscheduler.url= +# inlong.schedule.dolphinscheduler.token= \ No newline at end of file diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties index 5ff929c2b82..7a2b92442cd 100644 --- a/inlong-manager/manager-web/src/main/resources/application-test.properties +++ b/inlong-manager/manager-web/src/main/resources/application-test.properties @@ -97,5 +97,9 @@ group.deleted.enabled=false cls.manager.endpoint=127.0.0.1 # schedule engine type -# support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none \ No newline at end of file +# support none(no scheduler), quartz(quartz scheduler), dolphin scheduler(dolphin scheduler), default is none +inlong.schedule.engine=none + +# DolphinScheduler related config +# inlong.schedule.dolphinscheduler.url= +# inlong.schedule.dolphinscheduler.token= \ No newline at end of file