Skip to content

Commit

Permalink
fix: exception handle, default configure value
Browse files Browse the repository at this point in the history
  • Loading branch information
emptyOVO committed Nov 13, 2024
1 parent 893f755 commit 44da2e2
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ public class DolphinScheduleEngine implements ScheduleEngine {
@Value("${default.admin.password:inlong}")
private String password;

@Value("${inlong.schedule.dolphinscheduler.url:}")
@Value("${inlong.schedule.dolphinscheduler.url:http://127.0.0.1:12345/dolphinscheduler}")
private String dolphinUrl;

@Value("${inlong.schedule.dolphinscheduler.token:}")
@Value("${inlong.schedule.dolphinscheduler.token:default_token_value}")
private String token;

private long projectCode;
Expand All @@ -90,7 +90,8 @@ public DolphinScheduleEngine(String host, int port, String username, String pass
this.dsUtils = new DolphinScheduleUtils();
this.scheduledProcessMap = new ConcurrentHashMap<>();
} catch (Exception e) {
throw new DolphinScheduleException("Failed to init dolphin scheduler ", e);
LOGGER.error("Failed to init dolphin scheduler: ", e);
throw new DolphinScheduleException(String.format("Failed to init dolphin scheduler: %s", e.getMessage()));
}
}

Expand All @@ -101,7 +102,8 @@ public DolphinScheduleEngine() {
this.dsUtils = new DolphinScheduleUtils();
this.scheduledProcessMap = new ConcurrentHashMap<>();
} catch (Exception e) {
throw new DolphinScheduleException("Failed to init dolphin scheduler ", e);
LOGGER.error("Failed to init dolphin scheduler: ", e);
throw new DolphinScheduleException(String.format("Failed to init dolphin scheduler: %s", e.getMessage()));
}
}

Expand All @@ -111,8 +113,7 @@ public DolphinScheduleEngine() {
*/
@Override
public void start() {
LOGGER.info("Starting dolphin scheduler engine");
LOGGER.info("Checking project exists...");
LOGGER.info("Starting dolphin scheduler engine, Checking project exists...");
long code = dsUtils.checkAndGetUniqueId(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME);
if (code != 0) {
LOGGER.info("Project exists, project code: {}", code);
Expand Down Expand Up @@ -141,8 +142,8 @@ public boolean handleRegister(ScheduleInfo scheduleInfo) {
String processName = scheduleInfo.getInlongGroupId() + DS_DEFAULT_PROCESS_NAME;
String processDesc = DS_DEFAULT_PROCESS_DESC + scheduleInfo.getInlongGroupId();

LOGGER.info("Dolphin Scheduler handle register begin for {}", scheduleInfo.getInlongGroupId());
LOGGER.info("Checking process definition id uniqueness...");
LOGGER.info("Dolphin Scheduler handle register begin for {}, Checking process definition id uniqueness...",
scheduleInfo.getInlongGroupId());
try {
long processDefCode = dsUtils.checkAndGetUniqueId(processDefUrl, token, processName);

Expand Down Expand Up @@ -180,7 +181,9 @@ public boolean handleRegister(ScheduleInfo scheduleInfo) {
scheduledProcessMap.putIfAbsent(processDefCode, processName);
return online;
} catch (Exception e) {
throw new DolphinScheduleException("Failed to handle unregister dolphin scheduler", e);
LOGGER.error("Failed to handle unregister dolphin scheduler: ", e);
throw new DolphinScheduleException(
String.format("Failed to handle unregister dolphin scheduler: %s", e.getMessage()));
}
}

Expand All @@ -194,8 +197,8 @@ public boolean handleUnregister(String groupId) {
String processName = groupId + DS_DEFAULT_PROCESS_NAME;
String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL;

LOGGER.info("Dolphin Scheduler handle Unregister begin for {}", groupId);
LOGGER.info("Checking process definition id uniqueness...");
LOGGER.info("Dolphin Scheduler handle Unregister begin for {}, Checking process definition id uniqueness...",
groupId);
try {
long processDefCode = dsUtils.checkAndGetUniqueId(processDefUrl, token, processName);
if (processDefCode != 0 || scheduledProcessMap.containsKey(processDefCode)) {
Expand All @@ -211,7 +214,9 @@ public boolean handleUnregister(String groupId) {
LOGGER.info("Un-registered dolphin schedule info for {}", groupId);
return !scheduledProcessMap.containsKey(processDefCode);
} catch (Exception e) {
throw new DolphinScheduleException("Failed to handle unregister dolphin scheduler", e);
LOGGER.error("Failed to handle unregister dolphin scheduler: ", e);
throw new DolphinScheduleException(
String.format("Failed to handle unregister dolphin scheduler: %s", e.getMessage()));
}
}

Expand All @@ -226,7 +231,9 @@ public boolean handleUpdate(ScheduleInfo scheduleInfo) {
try {
return handleUnregister(scheduleInfo.getInlongGroupId()) && handleRegister(scheduleInfo);
} catch (Exception e) {
throw new DolphinScheduleException("Failed to handle update dolphin scheduler", e);
LOGGER.error("Failed to handle update dolphin scheduler: ", e);
throw new DolphinScheduleException(
String.format("Failed to handle update dolphin scheduler: %s", e.getMessage()));
}
}

Expand Down Expand Up @@ -256,7 +263,8 @@ public void stop() {
LOGGER.info("Dolphin scheduler engine stopped");

} catch (Exception e) {
throw new DolphinScheduleException("Failed to stop dolphin scheduler", e);
LOGGER.error("Failed to stop dolphin scheduler: ", e);
throw new DolphinScheduleException(String.format("Failed to stop dolphin scheduler: %s", e.getMessage()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ public long checkAndGetUniqueId(String url, String token, String searchVal) {
return 0;

} catch (IOException e) {
LOGGER.error("Unexpected wrong in check id uniqueness", e);
return 0;
LOGGER.error("Unexpected wrong in check id uniqueness: ", e);
throw new DolphinScheduleException(
String.format("Unexpected wrong in check id uniqueness: %s", e.getMessage()));
}
}

Expand Down Expand Up @@ -184,8 +185,9 @@ public long creatNewProject(String url, String token, String projectName, String
}
return 0;
} catch (IOException e) {
LOGGER.error("Unexpected wrong in creating new project", e);
return 0;
LOGGER.error("Unexpected wrong in creating new project: ", e);
throw new DolphinScheduleException(
String.format("Unexpected wrong in creating new project: %s", e.getMessage()));
}
}

Expand All @@ -207,8 +209,9 @@ public Map<Long, String> queryAllProcessDef(String url, String token) {
LOGGER.info("Query all process definition success, processes info: {}", processDef);
return processDef;
} catch (IOException e) {
LOGGER.error("Unexpected wrong in generating task code", e);
return null;
LOGGER.error("Unexpected wrong in query process definition: ", e);
throw new DolphinScheduleException(
String.format("Unexpected wrong in query process definition: %s", e.getMessage()));
}
}

Expand All @@ -235,8 +238,9 @@ public long genTaskCode(String url, String token) {
}
return 0;
} catch (IOException e) {
LOGGER.error("Unexpected wrong in generating task code", e);
return 0;
LOGGER.error("Unexpected wrong in generating task code: ", e);
throw new DolphinScheduleException(
String.format("Unexpected wrong in generating task code: %s", e.getMessage()));
}
}

Expand Down Expand Up @@ -291,8 +295,9 @@ public long createProcessDef(String url, String token, String name, String desc,
}
return 0;
} catch (IOException e) {
LOGGER.error("Unexpected wrong in creating process definition", e);
return 0;
LOGGER.error("Unexpected wrong in creating process definition: ", e);
throw new DolphinScheduleException(
String.format("Unexpected wrong in creating process definition: %s", e.getMessage()));
}
}

Expand All @@ -318,8 +323,9 @@ public boolean releaseProcessDef(String processDefUrl, long processDefCode, Stri
}
return false;
} catch (IOException e) {
LOGGER.error("Unexpected wrong in release process definition", e);
return false;
LOGGER.error("Unexpected wrong in release process definition: ", e);
throw new DolphinScheduleException(
String.format("Unexpected wrong in release process definition: %s", e.getMessage()));
}
}

Expand Down Expand Up @@ -347,7 +353,8 @@ public int createScheduleForProcessDef(String url, long processDefCode, String t
} else if (scheduleInfo.getScheduleType() == 1) {
crontab = scheduleInfo.getCrontabExpression();
} else {
throw new IllegalArgumentException("Unsupported schedule type: " + scheduleInfo.getScheduleType());
LOGGER.error("Unsupported schedule type: {}", scheduleInfo.getScheduleType());
throw new DolphinScheduleException("Unsupported schedule type: " + scheduleInfo.getScheduleType());
}

DScheduleInfo dScheduleInfo = new DScheduleInfo();
Expand All @@ -371,8 +378,9 @@ public int createScheduleForProcessDef(String url, long processDefCode, String t
return data.get(DS_ID).getAsInt();
}
} catch (IOException e) {
LOGGER.error("Unexpected wrong in creating schedule for process definition", e);
return 0;
LOGGER.error("Unexpected wrong in creating schedule for process definition: ", e);
throw new DolphinScheduleException(
String.format("Unexpected wrong in creating schedule for process definition: %s", e.getMessage()));
}
return 0;
}
Expand All @@ -394,8 +402,9 @@ public boolean onlineScheduleForProcessDef(String scheduleUrl, int scheduleId, S
return response.get(DS_RESPONSE_DATA).getAsBoolean();
}
} catch (IOException e) {
LOGGER.error("Unexpected wrong in online process definition", e);
return false;
LOGGER.error("Unexpected wrong in online process definition: ", e);
throw new DolphinScheduleException(
String.format("Unexpected wrong in online process definition: %s", e.getMessage()));
}
return false;
}
Expand All @@ -413,7 +422,9 @@ public void deleteProcessDef(String processDefUrl, String token, long processDef
try {
executeHttpRequest(url, HTTP_DELETE, new HashMap<>(), header);
} catch (IOException e) {
LOGGER.error("Unexpected wrong in deleting process definition", e);
LOGGER.error("Unexpected wrong in deleting process definition: ", e);
throw new DolphinScheduleException(
String.format("Unexpected wrong in deleting process definition: %s", e.getMessage()));
}
}

Expand All @@ -430,7 +441,9 @@ public void deleteProject(String projectBaseUrl, String token, long projectCode)
try {
executeHttpRequest(url, HTTP_DELETE, new HashMap<>(), header);
} catch (IOException e) {
LOGGER.error("Unexpected wrong in deleting project definition", e);
LOGGER.error("Unexpected wrong in deleting project definition: ", e);
throw new DolphinScheduleException(
String.format("Unexpected wrong in deleting project definition: %s", e.getMessage()));
}
}

Expand Down Expand Up @@ -517,7 +530,8 @@ private JsonObject executeHttpRequest(String url, String method, Map<String, Str
String responseBody = response.body().string();
return JsonParser.parseString(responseBody).getAsJsonObject();
} else {
throw new IOException("Unexpected http response code " + response);
LOGGER.error("Unexpected http response code: {}", response);
throw new DolphinScheduleException("Unexpected http response code " + response);
}
}
}
Expand All @@ -530,14 +544,16 @@ private JsonObject executeHttpRequest(String url, String method, Map<String, Str
*/
public long calculateOffset(ScheduleInfo scheduleInfo) {
if (scheduleInfo == null) {
throw new IllegalArgumentException("ScheduleInfo cannot be null");
LOGGER.error("ScheduleInfo cannot be null");
throw new DolphinScheduleException("ScheduleInfo cannot be null");
}

long offset = 0;

// Determine offset based on schedule type
if (scheduleInfo.getScheduleType() == null) {
throw new IllegalArgumentException("Schedule type cannot be null");
LOGGER.error("Schedule type cannot be null");
throw new DolphinScheduleException("Schedule type cannot be null");
}

switch (scheduleInfo.getScheduleType()) {
Expand All @@ -548,7 +564,8 @@ public long calculateOffset(ScheduleInfo scheduleInfo) {
offset = calculateCronOffset(scheduleInfo);
break;
default:
throw new IllegalArgumentException("Invalid schedule type");
LOGGER.error("Invalid schedule type");
throw new DolphinScheduleException("Invalid schedule type");
}

// Add delay time if specified
Expand All @@ -561,6 +578,7 @@ public long calculateOffset(ScheduleInfo scheduleInfo) {

private long calculateNormalOffset(ScheduleInfo scheduleInfo) {
if (scheduleInfo.getScheduleInterval() == null || scheduleInfo.getScheduleUnit() == null) {
LOGGER.error("Schedule interval and unit cannot be null for normal scheduling");
throw new IllegalArgumentException("Schedule interval and unit cannot be null for normal scheduling");
}
switch (Objects.requireNonNull(ScheduleUnit.getScheduleUnit(scheduleInfo.getScheduleUnit()))) {
Expand All @@ -581,13 +599,15 @@ private long calculateNormalOffset(ScheduleInfo scheduleInfo) {
case ONE_ROUND:
return scheduleInfo.getScheduleInterval();
default:
throw new IllegalArgumentException("Invalid schedule unit");
LOGGER.error("Invalid schedule unit");
throw new DolphinScheduleException("Invalid schedule unit");
}
}

private long calculateCronOffset(ScheduleInfo scheduleInfo) {
if (scheduleInfo.getCrontabExpression() == null) {
throw new IllegalArgumentException("Crontab expression cannot be null for schedule type crontab");
LOGGER.error("Crontab expression cannot be null for schedule type crontab");
throw new DolphinScheduleException("Crontab expression cannot be null for schedule type crontab");
}

try {
Expand All @@ -598,16 +618,19 @@ private long calculateCronOffset(ScheduleInfo scheduleInfo) {
if (secondExecution != null) {
return secondExecution.getTime() - firstExecution.getTime();
} else {
throw new IllegalArgumentException(
LOGGER.error("Unable to calculate the next execution times for the cron expression");
throw new DolphinScheduleException(
"Unable to calculate the next execution times for the cron expression");
}
} catch (Exception e) {
throw new IllegalArgumentException("Invalid cron expression", e);
LOGGER.error("Invalid cron expression: ", e);
throw new DolphinScheduleException(String.format("Invalid cron expression: %s", e.getMessage()));
}
}

private String generateCrontabExpression(String scheduleUnit, Integer scheduleInterval) {
if (scheduleUnit.isEmpty()) {
LOGGER.error("Schedule unit and interval must not be null for generating crontab expression");
throw new DolphinScheduleException(
"Schedule unit and interval must not be null for generating crontab expression");
}
Expand Down Expand Up @@ -636,6 +659,7 @@ private String generateCrontabExpression(String scheduleUnit, Integer scheduleIn
crontabExpression = String.format("* * * * * ? 0/%d", scheduleInterval);
break;
default:
LOGGER.error("Unsupported schedule unit for generating crontab: {}", scheduleUnit);
throw new DolphinScheduleException("Unsupported schedule unit for generating crontab: " + scheduleUnit);
}

Expand All @@ -648,7 +672,8 @@ private String generateCrontabExpression(String scheduleUnit, Integer scheduleIn
* Call back in inlong, sending a request with parameters required
*/
private String buildScript(String host, int port, String username, String password, long offset, String groupId) {

LOGGER.info("build script for host: {}, port: {}, username: {}, password: {}, offset: {}, groupId: {}", host,
port, username, password, offset, groupId);
return "#!/bin/bash\n\n" +

// Get current timestamp
Expand Down
Loading

0 comments on commit 44da2e2

Please sign in to comment.