Skip to content

Commit

Permalink
validate trigger interval when create scheduler. (#3012)
Browse files Browse the repository at this point in the history
  • Loading branch information
guowl3 authored Jul 25, 2024
1 parent 5f0902e commit c586b89
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
1 change: 1 addition & 0 deletions server/odc-server/src/main/resources/data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,7 @@ INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES

INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.data-security.masking.enabled', 'true', '是否开启数据脱敏,默认为开启' ) ON DUPLICATE KEY UPDATE `id` = `id`;
INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.task.partition-plan.schedule-cron', '0 0 * * * ?', '默认调度周期:每天 0 点' ) ON DUPLICATE KEY UPDATE `id` = `id`;
INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.task.trigger.minimum-interval', '600', '计划任务最小触发间隔,默认值:600 ,单位:秒' ) ON DUPLICATE KEY UPDATE `id` = `id`;

--
-- v4.2.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.oceanbase.odc.service.flow.processor;

import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -26,6 +27,7 @@
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.oceanbase.odc.core.shared.PreConditions;
Expand All @@ -41,10 +43,14 @@
import com.oceanbase.odc.service.flow.task.model.DBStructureComparisonParameter;
import com.oceanbase.odc.service.flow.util.DescriptionGenerator;
import com.oceanbase.odc.service.iam.auth.AuthenticationFacade;
import com.oceanbase.odc.service.partitionplan.model.PartitionPlanConfig;
import com.oceanbase.odc.service.quartz.util.QuartzCronExpressionUtils;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.schedule.flowtask.AlterScheduleParameters;
import com.oceanbase.odc.service.schedule.flowtask.OperationType;
import com.oceanbase.odc.service.schedule.model.JobType;
import com.oceanbase.odc.service.schedule.model.TriggerConfig;
import com.oceanbase.odc.service.schedule.model.TriggerStrategy;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -69,6 +75,8 @@ public class CreateFlowInstanceProcessAspect implements InitializingBean {
private List<Preprocessor> preprocessors;
@Autowired
private ProjectService projectService;
@Value("${odc.task.trigger.minimum-interval:600}")
private Long triggerMinimumIntervalSeconds;

private final Map<JobType, Preprocessor> scheduleTaskPreprocessors = new HashMap<>();

Expand All @@ -81,6 +89,7 @@ public void processBeforeCreateFlowInstance() {}
@Before("processBeforeCreateFlowInstance()")
public void preprocess(JoinPoint point) throws Throwable {
CreateFlowInstanceReq req = (CreateFlowInstanceReq) point.getArgs()[0];
validateTriggerConfig(req);
if (req.getTaskType() == TaskType.STRUCTURE_COMPARISON) {
DBStructureComparisonParameter parameters = (DBStructureComparisonParameter) req.getParameters();
req.setDatabaseId(parameters.getSourceDatabaseId());
Expand Down Expand Up @@ -163,4 +172,36 @@ private void adaptCreateFlowInstanceReq(CreateFlowInstanceReq req) {
config.setDatabaseId(req.getDatabaseId());
}
}

private void validateTriggerConfig(CreateFlowInstanceReq req) {
if (req.getParameters() instanceof PartitionPlanConfig) {
PartitionPlanConfig parameters = (PartitionPlanConfig) req.getParameters();
validateTriggerConfig(parameters.getCreationTrigger());
if (parameters.getDroppingTrigger() != null) {
validateTriggerConfig(parameters.getDroppingTrigger());
}
return;
}
if (req.getParameters() instanceof AlterScheduleParameters) {
AlterScheduleParameters parameters = (AlterScheduleParameters) req.getParameters();
validateTriggerConfig(parameters.getTriggerConfig());
}
}

private void validateTriggerConfig(TriggerConfig triggerConfig) {
PreConditions.notNull(triggerConfig, "triggerConfig");
if (triggerConfig.getTriggerStrategy() == TriggerStrategy.CRON) {
List<Date> nextFiveFireTimes =
QuartzCronExpressionUtils.getNextFiveFireTimes(triggerConfig.getCronExpression(), 2);
if (nextFiveFireTimes.size() != 2) {
throw new IllegalArgumentException("Invalid cron expression");
}
long intervalMills = nextFiveFireTimes.get(1).getTime() - nextFiveFireTimes.get(0).getTime();
if (intervalMills / 1000 < triggerMinimumIntervalSeconds) {
throw new IllegalArgumentException(
String.format("The minimum interval is 10 minutes,cron=%s", triggerConfig.getCronExpression()));
}
}
}

}

0 comments on commit c586b89

Please sign in to comment.