Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(schedule): separate the schedule module interface #2881

Merged
merged 20 commits into from
Jul 5, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.springframework.beans.factory.annotation.Autowired;

import com.oceanbase.odc.ServiceTestEnv;
import com.oceanbase.odc.service.schedule.flowtask.OperationType;
import com.oceanbase.odc.service.schedule.model.OperationType;
import com.oceanbase.odc.service.schedule.model.ScheduleChangeStatus;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import com.oceanbase.odc.ServiceTestEnv;
import com.oceanbase.odc.service.quartz.QuartzJobService;
import com.oceanbase.odc.service.schedule.model.CreateQuartzJobReq;
import com.oceanbase.odc.service.schedule.model.JobType;
import com.oceanbase.odc.service.schedule.model.QuartzKeyGenerator;
import com.oceanbase.odc.service.schedule.model.ScheduleType;
import com.oceanbase.odc.service.schedule.model.TriggerConfig;
import com.oceanbase.odc.service.schedule.model.TriggerStrategy;

Expand All @@ -46,8 +46,7 @@ public class QuartzJobServiceTest extends ServiceTestEnv {
@Test
public void create() throws SchedulerException, ParseException {
CreateQuartzJobReq req = new CreateQuartzJobReq();
req.setScheduleId(1L);
req.setType(JobType.SQL_PLAN);
req.setJobKey(QuartzKeyGenerator.generateJobKey("1", ScheduleType.SQL_PLAN.name()));
TriggerConfig config = new TriggerConfig();
config.setTriggerStrategy(TriggerStrategy.START_AT);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd hh:mm:ss");
Expand All @@ -56,7 +55,7 @@ public void create() throws SchedulerException, ParseException {
req.setTriggerConfig(config);
quartzJobService.createJob(req);
Trigger trigger =
quartzJobService.getTrigger(QuartzKeyGenerator.generateTriggerKey(req.getScheduleId(), req.getType()));
quartzJobService.getTrigger(QuartzKeyGenerator.generateTriggerKey("1", ScheduleType.SQL_PLAN.name()));
Assert.assertEquals(trigger.getFinalFireTime(), date);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.oceanbase.odc.service.onlineschemachange;

import static com.oceanbase.odc.service.schedule.model.JobType.ONLINE_SCHEMA_CHANGE_COMPLETE;
import static com.oceanbase.odc.service.schedule.model.ScheduleType.ONLINE_SCHEMA_CHANGE_COMPLETE;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -65,8 +65,8 @@
import com.oceanbase.odc.service.onlineschemachange.oms.response.OmsProjectProgressResponse;
import com.oceanbase.odc.service.onlineschemachange.oms.response.OmsProjectStepVO;
import com.oceanbase.odc.service.quartz.model.MisfireStrategy;
import com.oceanbase.odc.service.schedule.model.JobType;
import com.oceanbase.odc.service.schedule.model.ScheduleStatus;
import com.oceanbase.odc.service.schedule.model.ScheduleType;
import com.oceanbase.odc.service.schedule.model.TriggerConfig;
import com.oceanbase.odc.service.schedule.model.TriggerStrategy;
import com.oceanbase.odc.service.session.DBSessionManageFacade;
Expand Down Expand Up @@ -180,7 +180,7 @@ protected ScheduleEntity getScheduleEntity(ConnectionConfig config, OnlineSchema
scheduleEntity.setStatus(ScheduleStatus.ENABLED);
scheduleEntity.setAllowConcurrent(false);
scheduleEntity.setMisfireStrategy(MisfireStrategy.MISFIRE_INSTRUCTION_DO_NOTHING);
scheduleEntity.setJobType(JobType.ONLINE_SCHEMA_CHANGE_COMPLETE);
scheduleEntity.setType(ScheduleType.ONLINE_SCHEMA_CHANGE_COMPLETE);
scheduleEntity.setJobParametersJson(JsonUtils.toJson(changeParameters));
TriggerConfig triggerConfig = new TriggerConfig();
triggerConfig.setTriggerStrategy(TriggerStrategy.CRON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.oceanbase.odc.service.onlineschemachange;

import static com.oceanbase.odc.service.schedule.model.JobType.ONLINE_SCHEMA_CHANGE_COMPLETE;
import static com.oceanbase.odc.service.schedule.model.ScheduleType.ONLINE_SCHEMA_CHANGE_COMPLETE;

import java.util.function.Consumer;

Expand All @@ -39,7 +39,8 @@ public TriggerListener generateTriggerListener(Long scheduleId, Consumer<JobExec
return new TriggerListenerSupport() {
@Override
public String getName() {
return QuartzKeyGenerator.generateTriggerKey(scheduleId, ONLINE_SCHEMA_CHANGE_COMPLETE).toString();
return QuartzKeyGenerator
.generateTriggerKey(scheduleId.toString(), ONLINE_SCHEMA_CHANGE_COMPLETE.name()).toString();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.oceanbase.odc.server.web.controller.v2;

import java.util.Base64;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
Expand All @@ -26,6 +27,7 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.core.session.ConnectionSession;
import com.oceanbase.odc.service.common.response.ListResponse;
import com.oceanbase.odc.service.common.response.Responses;
Expand Down Expand Up @@ -59,7 +61,12 @@ public ListResponse<String> listTables(@PathVariable String sessionId,
@PathVariable(required = false) String databaseName,
@RequestParam(required = false, name = "fuzzyTableName") String fuzzyTableName) {
ConnectionSession session = sessionService.nullSafeGet(sessionId, true);
return Responses.list(tableService.showTablesLike(session, databaseName, fuzzyTableName));
if (!StringUtils.isEmpty(fuzzyTableName)) {
guowl3 marked this conversation as resolved.
Show resolved Hide resolved
return Responses.list(tableService.showTablesLike(session, databaseName, fuzzyTableName));
} else {
return Responses.list(tableService.listTables(session, databaseName).stream().map(DBTable::getName).collect(
Collectors.toList()));
}
}

@GetMapping(value = {"/{sessionId}/databases/{databaseName}/tables/{tableName}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
import com.oceanbase.odc.service.dlm.model.RateLimitConfiguration;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.schedule.model.CreateScheduleReq;
import com.oceanbase.odc.service.schedule.model.OperationType;
import com.oceanbase.odc.service.schedule.model.QueryScheduleParams;
import com.oceanbase.odc.service.schedule.model.Schedule;
import com.oceanbase.odc.service.schedule.model.ScheduleChangeLog;
import com.oceanbase.odc.service.schedule.model.ScheduleChangeParams;
import com.oceanbase.odc.service.schedule.model.ScheduleDetailResp;
import com.oceanbase.odc.service.schedule.model.ScheduleOverview;
import com.oceanbase.odc.service.schedule.model.ScheduleStatus;
Expand Down Expand Up @@ -71,13 +74,14 @@ public class ScheduleController {

@RequestMapping(value = "/schedules/{id:[\\d]+}/changes", method = RequestMethod.GET)
public ListResponse<ScheduleChangeLog> listChangeLog(@PathVariable Long id) {
throw new UnsupportedException();
return Responses.list(scheduleService.listScheduleChangeLog(id));

}

@RequestMapping(value = "/schedules/{id:[\\d]+}/changes/{scheduleChangeLogId:[\\d]+}", method = RequestMethod.GET)
public SuccessResponse<ScheduleChangeLog> getChangeLog(@PathVariable Long id,
@PathVariable Long scheduleChangeLogId) {
throw new UnsupportedException();
return Responses.success(scheduleService.getChangeLog(id, scheduleChangeLogId));
}

// schedule task
Expand All @@ -91,76 +95,76 @@ public void terminateTask(@PathVariable Long scheduleId, @PathVariable Long task
@RequestMapping(value = "/schedules/{scheduleId:[\\d]+}/tasks/{taskId:[\\d]+}/executions/latest/stop",
method = RequestMethod.POST)
public SuccessResponse<Boolean> stopTask(@PathVariable Long scheduleId, @PathVariable Long taskId) {
throw new UnsupportedException();

scheduleService.stopTask(scheduleId, taskId);
return Responses.success(Boolean.TRUE);
}

@ApiOperation(value = "StartTask", notes = "启动任务")
@RequestMapping(value = "/schedules/{scheduleId:[\\d]+}/tasks/{taskId:[\\d]+}/start",
method = RequestMethod.POST)
public SuccessResponse<Boolean> startTask(@PathVariable Long scheduleId, @PathVariable Long taskId) {
throw new UnsupportedException();

scheduleService.startTask(scheduleId, taskId);
return Responses.success(Boolean.TRUE);
}

@RequestMapping(value = "/schedules/{scheduleId:[\\d]+}/tasks/{taskId:[\\d]+}/rollback",
method = RequestMethod.POST)
public SuccessResponse<Boolean> rollbackTask(@PathVariable Long scheduleId, @PathVariable Long taskId) {
throw new UnsupportedException();
scheduleService.rollbackTask(scheduleId, taskId);
return Responses.success(Boolean.TRUE);

}

@RequestMapping(value = "/schedules/{scheduleId:[\\d]+}/tasks/{taskId:[\\d]+}/executions/latest/log",
method = RequestMethod.GET)
public SuccessResponse<String> getTaskLog(@PathVariable Long scheduleId, @PathVariable Long taskId,
@RequestParam OdcTaskLogLevel logType) {
throw new UnsupportedException();
return Responses.success(scheduleService.getLog(scheduleId, taskId, logType));
}


@RequestMapping(value = "/schedules/{scheduleId:[\\d]+}/tasks/{taskId:[\\d]+}/", method = RequestMethod.GET)
@RequestMapping(value = "/schedules/{scheduleId:[\\d]+}/tasks/{taskId:[\\d]+}", method = RequestMethod.GET)
public SuccessResponse<ScheduleTaskDetailResp> detailScheduleTask(@PathVariable Long scheduleId,
@PathVariable Long taskId) {
throw new UnsupportedException();
return Responses.success(scheduleService.detailScheduleTask(scheduleId, taskId));
}

@RequestMapping(value = "/schedules/{scheduleId:[\\d]+}/tasks", method = RequestMethod.GET)
public PaginatedResponse<ScheduleTaskOverview> listTask(
@PageableDefault(size = Integer.MAX_VALUE, sort = {"id"}, direction = Direction.DESC) Pageable pageable,
@PathVariable Long scheduleId) {
throw new UnsupportedException();
return Responses.paginated(scheduleService.listScheduleTaskOverview(pageable, scheduleId));
}

// schedule

@RequestMapping(value = "/schedules/{id:[\\d]+}/terminate", method = RequestMethod.POST)
public SuccessResponse<Boolean> terminateSchedule(@PathVariable("id") Long id) {
throw new UnsupportedException();

scheduleService.changeSchedule(ScheduleChangeParams.with(id, OperationType.TERMINATE));
return Responses.success(Boolean.TRUE);
}

@RequestMapping(value = "/schedules/{id:[\\d]+}/pause", method = RequestMethod.POST)
public SuccessResponse<Boolean> pauseSchedule(@PathVariable Long id) {
throw new UnsupportedException();

scheduleService.changeSchedule(ScheduleChangeParams.with(id, OperationType.PAUSE));
return Responses.success(Boolean.TRUE);
}

@RequestMapping(value = "/schedules/{id:[\\d]+}/resume", method = RequestMethod.POST)
public SuccessResponse<Boolean> resumeSchedule(@PathVariable Long id) {
throw new UnsupportedException();

scheduleService.changeSchedule(ScheduleChangeParams.with(id, OperationType.RESUME));
return Responses.success(Boolean.TRUE);
}

@RequestMapping(value = "/schedules/{id:[\\d]+}", method = RequestMethod.PUT)
public SuccessResponse<Boolean> updateSchedule(@PathVariable Long id, @RequestBody UpdateScheduleReq req) {
throw new UnsupportedException();

scheduleService.changeSchedule(ScheduleChangeParams.with(id, req));
return Responses.success(Boolean.TRUE);
}

@RequestMapping(value = "/schedules", method = RequestMethod.POST)
public SuccessResponse<Boolean> createSchedule(@RequestBody CreateScheduleReq req) {
throw new UnsupportedException();

public SuccessResponse<Schedule> createSchedule(@RequestBody CreateScheduleReq req) {
return Responses.success(scheduleService.changeSchedule(ScheduleChangeParams.with(req)));
}


Expand Down Expand Up @@ -192,13 +196,14 @@ public PaginatedResponse<ScheduleOverview> list(
.creator(creator)
.projectId(projectId)
.build();
throw new UnsupportedException();

return Responses.paginated(scheduleService.listScheduleOverview(pageable, req));

}

@RequestMapping(value = "/schedules/{id:[\\d]+}", method = RequestMethod.GET)
public SuccessResponse<ScheduleDetailResp> detailSchedule(@PathVariable Long id) {
throw new UnsupportedException();
return Responses.success(scheduleService.detailSchedule(id));
}

@RequestMapping(value = "/schedules/{id:[\\d]+}/dlmRateLimitConfiguration", method = RequestMethod.PUT)
Expand Down
8 changes: 4 additions & 4 deletions server/odc-server/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,8 @@
<Routes pattern="${ctx:scheduleTaskId}">
<Route>
<RollingFile name="InfoRolling-${ctx:scheduleTaskId}"
fileName="${LOG_DIRECTORY}/scheduleTask/${ctx:scheduleId}-${ctx:jobType}/${ctx:scheduleTaskId}/log.all"
filePattern="${LOG_DIRECTORY}/scheduleTask/${ctx:scheduleId}-${ctx:jobType}/${ctx:scheduleTaskId}/${date:yyyy-MM}/log-%d{yyyy-MM-dd}-%i.all.gz">
fileName="${LOG_DIRECTORY}/scheduleTask/${ctx:scheduleId}-${ctx:scheduleType}/${ctx:scheduleTaskId}/log.all"
filePattern="${LOG_DIRECTORY}/scheduleTask/${ctx:scheduleId}-${ctx:scheduleType}/${ctx:scheduleTaskId}/${date:yyyy-MM}/log-%d{yyyy-MM-dd}-%i.all.gz">
<PatternLayout>
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%p] %m%n</pattern>
</PatternLayout>
Expand All @@ -577,8 +577,8 @@
<Routes pattern="${ctx:scheduleTaskId}">
<Route>
<RollingFile name="WarnRolling-${ctx:scheduleTaskId}"
fileName="${LOG_DIRECTORY}/scheduleTask/${ctx:scheduleId}-${ctx:jobType}/${ctx:scheduleTaskId}/log.warn"
filePattern="${LOG_DIRECTORY}/scheduleTask/${ctx:scheduleId}-${ctx:jobType}/${ctx:scheduleTaskId}/${date:yyyy-MM}/log-%d{yyyy-MM-dd}-%i.warn.gz">
fileName="${LOG_DIRECTORY}/scheduleTask/${ctx:scheduleId}-${ctx:scheduleType}/${ctx:scheduleTaskId}/log.warn"
filePattern="${LOG_DIRECTORY}/scheduleTask/${ctx:scheduleId}-${ctx:scheduleType}/${ctx:scheduleTaskId}/${date:yyyy-MM}/log-%d{yyyy-MM-dd}-%i.warn.gz">
<PatternLayout>
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%p] %m%n</pattern>
</PatternLayout>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import javax.persistence.Id;
import javax.persistence.Table;

import com.oceanbase.odc.service.schedule.flowtask.OperationType;
import com.oceanbase.odc.service.schedule.model.OperationType;
import com.oceanbase.odc.service.schedule.model.ScheduleChangeStatus;

import lombok.Data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.hibernate.annotations.GenerationTime;

import com.oceanbase.odc.service.quartz.model.MisfireStrategy;
import com.oceanbase.odc.service.schedule.model.JobType;
import com.oceanbase.odc.service.schedule.model.ScheduleStatus;
import com.oceanbase.odc.service.schedule.model.ScheduleType;

import lombok.Data;

Expand Down Expand Up @@ -74,7 +74,7 @@ public class ScheduleEntity implements Serializable {

@Enumerated(EnumType.STRING)
@Column(name = "job_type", nullable = false)
private JobType jobType;
private ScheduleType type;
@Column(name = "job_parameters_json", nullable = false)
private String jobParametersJson;
@Column(name = "trigger_config_json", nullable = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ public interface ScheduleRepository extends OdcJpaRepository<ScheduleEntity, Lon
default Page<ScheduleEntity> find(@NotNull Pageable pageable, @NotNull QueryScheduleParams params) {
Specification<ScheduleEntity> specification = Specification
.where(OdcJpaRepository.between(ScheduleEntity_.createTime, params.getStartTime(), params.getEndTime()))
.and(OdcJpaRepository.eq(ScheduleEntity_.jobType, params.getType()))
.and(OdcJpaRepository.eq(ScheduleEntity_.type, params.getType()))
.and(OdcJpaRepository.in(ScheduleEntity_.projectId, params.getProjectIds()))
.and(OdcJpaRepository.in(ScheduleEntity_.creatorId, params.getCreatorIds()))
.and(OdcJpaRepository.eq(ScheduleEntity_.id, params.getId()))
.and(OdcJpaRepository.in(ScheduleEntity_.status, params.getStatuses()))
.and(OdcJpaRepository.eq(ScheduleEntity_.organizationId, params.getOrganizationId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public interface ScheduleTaskRepository extends JpaRepository<ScheduleTaskEntity, Long>,
JpaSpecificationExecutor<ScheduleTaskEntity> {

Optional<ScheduleTaskEntity> findByIdAndJobName(Long id, Long scheduleId);
Optional<ScheduleTaskEntity> findByIdAndJobName(Long id, String scheduleId);

List<ScheduleTaskEntity> findByJobNameAndStatusIn(String jobName, List<TaskStatus> statuses);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
*/
package com.oceanbase.odc.service.connection.model;

import java.util.Map;

public interface CloudConnectionConfig {
String getClusterName();

String getTenantName();

String getOBTenantName();

Map<String, String> getProperties();
guowl3 marked this conversation as resolved.
Show resolved Hide resolved

OBInstanceType getInstanceType();

OBInstanceRoleType getInstanceRoleType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ public class TestConnectionReq implements CloudConnectionConfig, SSLConnectionCo
@JsonIgnore
private OBInstanceRoleType instanceRoleType;

private Map<String, String> properties;
guowl3 marked this conversation as resolved.
Show resolved Hide resolved

public DialectType getDialectType() {
if (Objects.nonNull(this.type)) {
return this.type.getDialectType();
Expand All @@ -160,6 +162,7 @@ public static TestConnectionReq fromConnection(ConnectionConfig connection,
req.setPort(connection.getPort());
req.setClusterName(connection.getClusterName());
req.setDefaultSchema(connection.getDefaultSchema());
req.setProperties(connection.getProperties());
if (accountType == ConnectionAccountType.MAIN) {
req.setTenantName(connection.getTenantName());
req.setUsername(connection.getUsername());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public static void sync(ConnectionConfig srcConfig, ConnectionConfig tgtConfig,
String srcTableName, String tgtTableName, Set<DBObjectType> targetType) throws Exception {
DataSource sourceDs = new DruidDataSourceFactory(srcConfig).getDataSource();
DataSource targetDs = new DruidDataSourceFactory(tgtConfig).getDataSource();
if (srcConfig.getDialectType() != tgtConfig.getDialectType()) {
log.warn("Different types of databases do not support structural synchronization.");
return;
}
try {
String tgtDbVersion = getDBVersion(tgtConfig.getType(), targetDs);
String srcDbVersion = getDBVersion(srcConfig.getType(), sourceDs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;
import java.util.Set;

import com.oceanbase.odc.service.connection.database.model.Database;
import com.oceanbase.odc.service.schedule.model.ScheduleTaskParameters;
import com.oceanbase.tools.dbbrowser.model.DBObjectType;
import com.oceanbase.tools.migrator.common.enums.MigrationInsertAction;
Expand All @@ -40,13 +41,11 @@ public class DataArchiveParameters implements ScheduleTaskParameters {

private Long targetDataBaseId;

private String sourceDatabaseName;
// inner init
private Database sourceDatabase;

private String targetDatabaseName;

private String sourceDataSourceName;

private String targetDataSourceName;
// inner init
private Database targetDatabase;

private List<OffsetConfig> variables;

Expand Down
Loading