Skip to content

Commit

Permalink
[Feature-16834] Start worker node with worker group label (#16875)
Browse files Browse the repository at this point in the history
  • Loading branch information
SbloodyS authored Dec 6, 2024
1 parent f56d51c commit 28e849e
Show file tree
Hide file tree
Showing 63 changed files with 687 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ worker:
max-heartbeat-interval: 10s
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# worker group name
group: default
server-load-protection:
enabled: true
# Worker max system cpu usage, when the worker's system cpu usage is smaller then this value, worker server can be dispatched tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.dolphinscheduler.common.enums.AuditOperationType;
import org.apache.dolphinscheduler.dao.entity.AuditLog;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;

import java.util.List;
import java.util.Map;
Expand All @@ -35,7 +35,7 @@
public class WorkerGroupAuditOperatorImpl extends BaseAuditOperator {

@Autowired
private WorkerGroupMapper workerGroupMapper;
private WorkerGroupDao workerGroupDao;

@Override
public void modifyAuditOperationType(AuditType auditType, Map<String, Object> paramsMap,
Expand All @@ -54,7 +54,7 @@ public String getObjectNameFromIdentity(Object identity) {
return "";
}

WorkerGroup obj = workerGroupMapper.selectById(objId);
WorkerGroup obj = workerGroupDao.queryById(objId);
return obj == null ? "" : obj.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ public Result assignWorkerGroups(@Parameter(hidden = true) @RequestAttribute(val
* @param projectCode project code
* @return worker group list
*/
@Operation(summary = "queryWorkerGroups", description = "QUERY_WORKER_GROUP_LIST")
@Operation(summary = "queryAssignedWorkerGroups", description = "QUERY_WORKER_GROUP_LIST")
@Parameters({
@Parameter(name = "projectCode", description = "PROJECT_CODE", schema = @Schema(implementation = long.class, example = "123456"))
})
@GetMapping()
@ResponseStatus(HttpStatus.OK)
public Map<String, Object> queryWorkerGroups(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode) {
return projectWorkerGroupRelationService.queryWorkerGroupsByProject(loginUser, projectCode);
public Map<String, Object> queryAssignedWorkerGroups(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode) {
return projectWorkerGroupRelationService.queryAssignedWorkerGroupsByProject(loginUser, projectCode);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.repository.UserDao;
import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
import org.apache.dolphinscheduler.service.process.ProcessService;

import java.util.Arrays;
Expand Down Expand Up @@ -273,10 +273,10 @@ public Set<Integer> listAuthorizedResourceIds(int userId, Logger logger) {
@Component
public static class WorkerGroupResourcePermissionCheck implements ResourceAcquisitionAndPermissionCheck<Integer> {

private final WorkerGroupMapper workerGroupMapper;
private final WorkerGroupDao workerGroupDao;

public WorkerGroupResourcePermissionCheck(WorkerGroupMapper workerGroupMapper) {
this.workerGroupMapper = workerGroupMapper;
public WorkerGroupResourcePermissionCheck(WorkerGroupDao workerGroupDao) {
this.workerGroupDao = workerGroupDao;
}

@Override
Expand All @@ -291,7 +291,7 @@ public boolean permissionCheck(int userId, String url, Logger logger) {

@Override
public Set<Integer> listAuthorizedResourceIds(int userId, Logger logger) {
List<WorkerGroup> workerGroups = workerGroupMapper.queryAllWorkerGroup();
List<WorkerGroup> workerGroups = workerGroupDao.queryAllWorkerGroup();
return workerGroups.stream().map(WorkerGroup::getId).collect(Collectors.toSet());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import java.util.List;
import java.util.Map;

/**
* the service of project and worker group
*/
public interface ProjectWorkerGroupRelationService {

/**
Expand All @@ -43,6 +40,6 @@ public interface ProjectWorkerGroupRelationService {
* @param loginUser the login user
* @param projectCode project code
*/
Map<String, Object> queryWorkerGroupsByProject(User loginUser, Long projectCode);
Map<String, Object> queryAssignedWorkerGroupsByProject(User loginUser, Long projectCode);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.entity.WorkerGroupPageDetail;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -79,4 +80,6 @@ public interface WorkerGroupService {
*/
Map<Long, String> queryWorkerGroupByWorkflowDefinitionCodes(List<Long> workflowDefinitionCodeList);

List<WorkerGroupPageDetail> getConfigWorkerGroupPageDetail();

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,23 @@
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.ProjectWorkerGroupRelationService;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectWorkerGroup;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectWorkerGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.dao.repository.ProjectWorkerGroupDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.SetUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -53,45 +53,35 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;

/**
* task definition service impl
*/
@Service
@Slf4j
public class ProjectWorkerGroupRelationServiceImpl extends BaseServiceImpl
implements
ProjectWorkerGroupRelationService {

@Autowired
private ProjectWorkerGroupMapper projectWorkerGroupMapper;
private ProjectWorkerGroupDao projectWorkerGroupDao;

@Autowired
private ProjectMapper projectMapper;

@Autowired
private WorkerGroupMapper workerGroupMapper;

@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
private TaskDefinitionDao taskDefinitionDao;

@Autowired
private ScheduleMapper scheduleMapper;

@Autowired
private ProjectService projectService;

/**
* assign worker groups to a project
*
* @param loginUser the login user
* @param projectCode the project code
* @param workerGroups assigned worker group names
*/
@Autowired
private WorkerGroupDao workerGroupDao;

@Autowired
private WorkerGroupService workerGroupService;

@Override
public Result assignWorkerGroupsToProject(User loginUser, Long projectCode, List<String> workerGroups) {

Result result = new Result();

if (!isAdmin(loginUser)) {
Expand All @@ -105,7 +95,12 @@ public Result assignWorkerGroupsToProject(User loginUser, Long projectCode, List
}

if (CollectionUtils.isEmpty(workerGroups)) {
putMsg(result, Status.WORKER_GROUP_TO_PROJECT_IS_EMPTY);
boolean deleted = projectWorkerGroupDao.deleteByProjectCode(projectCode);
if (deleted) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.ASSIGN_WORKER_GROUP_TO_PROJECT_ERROR);
}
return result;
}

Expand All @@ -115,30 +110,23 @@ public Result assignWorkerGroupsToProject(User loginUser, Long projectCode, List
return result;
}

Set<String> workerGroupNames =
workerGroupMapper.queryAllWorkerGroup().stream().map(WorkerGroup::getName).collect(
Collectors.toSet());

workerGroupNames.add(WorkerGroupUtils.getDefaultWorkerGroup());

Set<String> assignedWorkerGroupNames = new HashSet<>(workerGroups);

Set<String> difference = SetUtils.difference(assignedWorkerGroupNames, workerGroupNames);
Set<String> allWorkerGroupNames = new HashSet<>(workerGroupDao.queryAllWorkerGroupNames());
workerGroupService.getConfigWorkerGroupPageDetail().forEach(
workerGroupPageDetail -> allWorkerGroupNames.add(workerGroupPageDetail.getName()));
Set<String> unauthorizedWorkerGroupNames = new HashSet<>(workerGroups);

// check if assign worker group exists in the system
Set<String> difference = SetUtils.difference(unauthorizedWorkerGroupNames, allWorkerGroupNames);
if (!difference.isEmpty()) {
putMsg(result, Status.WORKER_GROUP_NOT_EXIST, difference.toString());
return result;
}

Set<String> projectWorkerGroupNames = projectWorkerGroupMapper.selectList(new QueryWrapper<ProjectWorkerGroup>()
.lambda()
.eq(ProjectWorkerGroup::getProjectCode, projectCode))
.stream()
.map(ProjectWorkerGroup::getWorkerGroup)
.collect(Collectors.toSet());

difference = SetUtils.difference(projectWorkerGroupNames, assignedWorkerGroupNames);

// check if assign worker group exists in the project
Set<String> projectWorkerGroupNames =
projectWorkerGroupDao.queryAssignedWorkerGroupNamesByProjectCode(projectCode);
difference = SetUtils.difference(unauthorizedWorkerGroupNames, projectWorkerGroupNames);
Date now = new Date();
if (CollectionUtils.isNotEmpty(difference)) {
Set<String> usedWorkerGroups = getAllUsedWorkerGroups(project);

Expand All @@ -147,27 +135,22 @@ public Result assignWorkerGroupsToProject(User loginUser, Long projectCode, List
SetUtils.intersection(usedWorkerGroups, difference).toSet());
}

int deleted = projectWorkerGroupMapper.delete(
new QueryWrapper<ProjectWorkerGroup>().lambda().eq(ProjectWorkerGroup::getProjectCode, projectCode)
.in(ProjectWorkerGroup::getWorkerGroup, difference));
if (deleted > 0) {
boolean deleted =
projectWorkerGroupDao.deleteByProjectCodeAndWorkerGroups(projectCode, new ArrayList<>(difference));
if (deleted) {
log.info("Success to delete worker groups [{}] for the project [{}] .", difference, project.getName());
} else {
log.error("Failed to delete worker groups [{}] for the project [{}].", difference, project.getName());
throw new ServiceException(Status.ASSIGN_WORKER_GROUP_TO_PROJECT_ERROR);
}
}

difference = SetUtils.difference(assignedWorkerGroupNames, projectWorkerGroupNames);
Date now = new Date();
if (CollectionUtils.isNotEmpty(difference)) {
difference.stream().forEach(workerGroupName -> {
difference.forEach(workerGroupName -> {
ProjectWorkerGroup projectWorkerGroup = new ProjectWorkerGroup();
projectWorkerGroup.setProjectCode(projectCode);
projectWorkerGroup.setWorkerGroup(workerGroupName);
projectWorkerGroup.setCreateTime(now);
projectWorkerGroup.setUpdateTime(now);
int create = projectWorkerGroupMapper.insert(projectWorkerGroup);
int create = projectWorkerGroupDao.insert(projectWorkerGroup);
if (create > 0) {
log.info("Success to add worker group [{}] for the project [{}] .", workerGroupName,
project.getName());
Expand All @@ -183,13 +166,8 @@ public Result assignWorkerGroupsToProject(User loginUser, Long projectCode, List
return result;
}

/**
* query worker groups that assigned to the project
*
* @param projectCode project code
*/
@Override
public Map<String, Object> queryWorkerGroupsByProject(User loginUser, Long projectCode) {
public Map<String, Object> queryAssignedWorkerGroupsByProject(User loginUser, Long projectCode) {
Map<String, Object> result = new HashMap<>();

Project project = projectMapper.queryByCode(projectCode);
Expand All @@ -201,9 +179,8 @@ public Map<String, Object> queryWorkerGroupsByProject(User loginUser, Long proje

Set<String> assignedWorkerGroups = getAllUsedWorkerGroups(project);

projectWorkerGroupMapper.selectList(
new QueryWrapper<ProjectWorkerGroup>().lambda().eq(ProjectWorkerGroup::getProjectCode, projectCode))
.stream().forEach(projectWorkerGroup -> assignedWorkerGroups.add(projectWorkerGroup.getWorkerGroup()));
projectWorkerGroupDao.queryByProjectCode(projectCode)
.forEach(projectWorkerGroup -> assignedWorkerGroups.add(projectWorkerGroup.getWorkerGroup()));

List<ProjectWorkerGroup> projectWorkerGroups = assignedWorkerGroups.stream().map(workerGroup -> {
ProjectWorkerGroup projectWorkerGroup = new ProjectWorkerGroup();
Expand All @@ -220,9 +197,9 @@ public Map<String, Object> queryWorkerGroupsByProject(User loginUser, Long proje
private Set<String> getAllUsedWorkerGroups(Project project) {
Set<String> usedWorkerGroups = new TreeSet<>();
// query all worker groups that tasks depend on
taskDefinitionMapper.queryAllDefinitionList(project.getCode()).stream().forEach(taskDefinition -> {
if (StringUtils.isNotEmpty(taskDefinition.getWorkerGroup())) {
usedWorkerGroups.add(taskDefinition.getWorkerGroup());
taskDefinitionDao.queryAllTaskDefinitionWorkerGroups(project.getCode()).forEach(workerGroupName -> {
if (StringUtils.isNotEmpty(workerGroupName)) {
usedWorkerGroups.add(workerGroupName);
}
});

Expand Down
Loading

0 comments on commit 28e849e

Please sign in to comment.