Skip to content

Commit

Permalink
optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Nov 19, 2023
1 parent ec7af54 commit d690b6c
Show file tree
Hide file tree
Showing 19 changed files with 89 additions and 71 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Disjob worker configuration
disjob.worker:
group: default
timing-wheel-tick-ms: 100
timing-wheel-ring-size: 60
maximum-pool-size: 100
keep-alive-time-seconds: 300
process-thread-pool-size: 6
worker-token: 358678bfe34648f68b607036a27c6854
supervisor-token: 20bb8b7f1cb94dc894b45546a7c2982f
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.lang.reflect.Array;
import java.text.ParseException;
import java.util.*;
import java.util.function.Supplier;
import java.util.function.Function;
import java.util.regex.Pattern;

import static org.apache.commons.lang3.builder.ToStringBuilder.reflectionToString;
Expand Down Expand Up @@ -45,8 +45,8 @@ public static String toString(Object obj, String defaultStr) {
: reflectionToString(obj, ToStringStyle.JSON_STYLE);
}

public static <T> T defaultIfNull(T object, Supplier<T> defaultValue) {
return object != null ? object : defaultValue.get();
public static <T, R> R applyIfNotNull(T object, Function<T, R> mapper) {
return object == null ? null : mapper.apply(object);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public enum JobCodeMsg implements CodeMsg {

INVALID_PARAM(400, "Invalid param."),
UN_AUTHENTICATED(401, "Un authenticated."),
GROUP_NOT_FOUND(404, "Not found worker group."),
SERVER_ERROR(500, "Server error."),

LOAD_HANDLER_ERROR(1001, "Load job handler error."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private Current(String host, int port) {

private static synchronized Current create(String host, int port) {
if (instance != null) {
throw new AssertionError("Current supervisor already set.");
throw new Error("Current supervisor already set.");
}

instance = new Current(host, port) {
Expand Down
21 changes: 11 additions & 10 deletions disjob-core/src/main/java/cn/ponfee/disjob/core/base/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import cn.ponfee.disjob.common.base.SingletonClassConstraint;
import cn.ponfee.disjob.common.util.Numbers;
import cn.ponfee.disjob.common.util.Strings;
import cn.ponfee.disjob.core.exception.AuthenticationException;
import cn.ponfee.disjob.core.model.SchedJob;
import cn.ponfee.disjob.core.param.worker.AuthenticationParam;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
Expand All @@ -21,6 +23,7 @@
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.Assert;

import java.io.IOException;
Expand Down Expand Up @@ -160,10 +163,6 @@ public static Worker.Current current() {
return Current.instance;
}

public static boolean isCurrent(Worker worker) {
return worker != null && worker.equals(current());
}

// --------------------------------------------------------custom jackson serialize & deserialize

/**
Expand Down Expand Up @@ -203,34 +202,36 @@ private Current(String group, String workerId, String host, int port) {

public abstract Map<String, String> authenticateHeaders();

public abstract String supervisorToken();
public abstract void authenticate(AuthenticationParam param);

// need to use reflection do set
// use synchronized modify for help multiple thread read reference(write to main memory)
private static synchronized Current create(String group, String workerId, String host, int port,
String workerToken, String supervisorToken0) {
if (instance != null) {
throw new AssertionError("Current worker already set.");
throw new Error("Current worker already set.");
}

instance = new Current(group, workerId, host, port) {
private static final long serialVersionUID = 7553139562459109482L;

private final Map<String, String> authenticateHeaders = ImmutableMap.of(
JobConstants.AUTHENTICATE_HEADER_GROUP, group,
JobConstants.AUTHENTICATE_HEADER_TOKEN, workerToken
JobConstants.AUTHENTICATE_HEADER_TOKEN, StringUtils.defaultString(workerToken, "")
);

private final String supervisorToken = supervisorToken0;
private final String supervisorToken = StringUtils.isBlank(supervisorToken0) ? null : supervisorToken0.trim();

@Override
public Map<String, String> authenticateHeaders() {
return authenticateHeaders;
}

@Override
public String supervisorToken() {
return supervisorToken;
public void authenticate(AuthenticationParam param) {
if (supervisorToken != null && !supervisorToken.equals(param.getSupervisorToken())) {
throw new AuthenticationException("Authentication failed.");
}
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/* __________ _____ *\
** \______ \____ _____/ ____\____ ____ Copyright (c) 2017-2023 Ponfee **
** | ___/ _ \ / \ __\/ __ \_/ __ \ http://www.ponfee.cn **
** | | ( <_> ) | \ | \ ___/\ ___/ Apache License Version 2.0 **
** |____| \____/|___| /__| \___ >\___ > http://www.apache.org/licenses/ **
** \/ \/ \/ **
\* */

package cn.ponfee.disjob.core.exception;

import cn.ponfee.disjob.common.exception.BaseRuntimeException;
import cn.ponfee.disjob.core.base.JobCodeMsg;

/**
* Group not found exception
*
* @author Ponfee
*/
public class GroupNotFoundException extends BaseRuntimeException {
private static final long serialVersionUID = -8974006648944765503L;

public GroupNotFoundException() {
super(JobCodeMsg.UN_AUTHENTICATED);
}

public GroupNotFoundException(String message) {
super(JobCodeMsg.UN_AUTHENTICATED.getCode(), message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static String getLocalHost(String specifiedHost) {
return host;
}

throw new AssertionError("Not found available server host.");
throw new Error("Not found available server host.");
}

private static boolean isValidHost(String host, String from) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private void assignWorker(List<DispatchTaskParam> params) {

private void doDispatch(ExecuteTaskParam task) throws Exception {
boolean result;
if (timingWheel != null && Worker.isCurrent(task.getWorker())) {
if (timingWheel != null && Worker.current().equals(task.getWorker())) {
// if the server both is supervisor & worker: dispatch to local worker
LOG.info("Dispatching task to local worker {} | {} | {}", task.getTaskId(), task.getOperation(), task.getWorker());
result = timingWheel.offer(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import cn.ponfee.disjob.common.base.Startable;
import cn.ponfee.disjob.common.base.TimingWheel;
import cn.ponfee.disjob.core.base.Worker;
import cn.ponfee.disjob.core.exception.AuthenticationException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -26,14 +24,12 @@
public abstract class TaskReceiver implements Startable {
private static final Logger LOG = LoggerFactory.getLogger(TaskReceiver.class);

private final Worker currentWorker;
private final Worker.Current currentWorker;
private final TimingWheel<ExecuteTaskParam> timingWheel;
private final String supervisorToken;

public TaskReceiver(Worker.Current currentWorker, TimingWheel<ExecuteTaskParam> timingWheel) {
this.timingWheel = Objects.requireNonNull(timingWheel, "Timing wheel cannot be null.");
this.currentWorker = Objects.requireNonNull(currentWorker, "Current worker cannot be null.");
this.supervisorToken = StringUtils.isBlank(currentWorker.supervisorToken()) ? null : currentWorker.supervisorToken();
}

/**
Expand All @@ -47,9 +43,7 @@ public boolean receive(ExecuteTaskParam param) {
return false;
}

if (supervisorToken != null && !supervisorToken.equals(param.getSupervisorToken())) {
throw new AuthenticationException("Authentication failed.");
}
currentWorker.authenticate(param);

Worker assignedWorker = param.getWorker();
if (!currentWorker.sameWorker(assignedWorker)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ public <T> T execute(String group, String path, HttpMethod httpMethod, Type retu
}

int serverNumber = servers.size();
Map<String, String> authenticationHeaders = discoveryServerRole == ServerRole.SUPERVISOR ? Worker.current().authenticateHeaders() : null;
Map<String, String> authenticationHeaders = null;
if (discoveryServerRole == ServerRole.SUPERVISOR) {
authenticationHeaders = Worker.current().authenticateHeaders();
}
int start = ThreadLocalRandom.current().nextInt(serverNumber);

// minimum retry two times
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public static void main(String[] args) throws Exception {
// --------------------- create registry(select redis or http) --------------------- //
TaskReceiver taskReceiver;
VertxWebServer vertxWebServer;
WorkerCoreRpcService workerCoreRpcProvider = new WorkerCoreRpcProvider(workerProperties.getSupervisorToken());
WorkerCoreRpcService workerCoreRpcProvider = new WorkerCoreRpcProvider(currentWorker);
{
// redis dispatching
//taskReceiver = new RedisTaskReceiver(currentWorker, timingWheel, stringRedisTemplate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,7 @@ public boolean preHandle(HttpServletRequest request, HttpServletResponse respons
}

String group = request.getHeader(JobConstants.AUTHENTICATE_HEADER_GROUP);
SchedGroupManager.DisjobGroup disjobGroup = SchedGroupManager.getDisjobGroup(group);
if (disjobGroup == null) {
throw new AuthenticationException("Authentication failed.");
}

String workerToken = disjobGroup.getWorkerToken();
String workerToken = SchedGroupManager.get(group).getWorkerToken();
if (StringUtils.isBlank(workerToken)) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
*/
public class WorkerCoreRpcClient {

private final Worker currentWorker;
private final Worker.Current currentWorker;
private final WorkerCoreRpcService local;
private final WorkerCoreRpcService remote;

public WorkerCoreRpcClient(HttpProperties httpProperties,
RetryProperties retryProperties,
SupervisorRegistry supervisorRegistry,
@Nullable Worker currentWorker,
@Nullable Worker.Current currentWorker,
@Nullable ObjectMapper objectMapper) {
httpProperties.check();
retryProperties.check();
Expand All @@ -58,12 +58,12 @@ public WorkerCoreRpcClient(HttpProperties httpProperties,
}

public void verify(JobHandlerParam param) throws JobException {
param.setSupervisorToken(getSupervisorToken(param.getJobGroup()));
param.setSupervisorToken(SchedGroupManager.get(param.getJobGroup()).getSupervisorToken());
grouped(param.getJobGroup()).verify(param);
}

public List<SplitTask> split(JobHandlerParam param) throws JobException {
param.setSupervisorToken(getSupervisorToken(param.getJobGroup()));
param.setSupervisorToken(SchedGroupManager.get(param.getJobGroup()).getSupervisorToken());
return grouped(param.getJobGroup()).split(param);
}

Expand All @@ -78,14 +78,6 @@ private WorkerCoreRpcService grouped(String group) {
}
}

private String getSupervisorToken(String group) {
SchedGroupManager.DisjobGroup disjobGroup = SchedGroupManager.getDisjobGroup(group);
if (disjobGroup == null) {
throw new IllegalStateException("Not found worker group: " + group);
}
return disjobGroup.getSupervisorToken();
}

private static class WorkerCoreRpcLocal implements WorkerCoreRpcService {
private static final WorkerCoreRpcLocal INSTANCE = new WorkerCoreRpcLocal();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Supervisor.Current currentSupervisor(@Value("${" + JobConstants.SPRING_WE
public WorkerCoreRpcClient workerCoreRpcClient(HttpProperties httpProperties,
RetryProperties retryProperties,
SupervisorRegistry supervisorRegistry,
@Nullable Worker currentWorker,
@Nullable Worker.Current currentWorker,
@Nullable ObjectMapper objectMapper) {
return new WorkerCoreRpcClient(
httpProperties, retryProperties, supervisorRegistry, currentWorker, objectMapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public boolean hasNotDiscoveredWorkers() {
}

public boolean dispatch(SchedJob job, SchedInstance instance, List<SchedTask> tasks) {
String supervisorToken = SchedGroupManager.getDisjobGroup(job.getJobGroup()).getSupervisorToken();
String supervisorToken = SchedGroupManager.get(job.getJobGroup()).getSupervisorToken();
ExecuteTaskParam.Builder builder = ExecuteTaskParam.builder(instance, job, supervisorToken);
List<ExecuteTaskParam> list;
if (RouteStrategy.BROADCAST.equals(job.getRouteStrategy())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ private void dependJob(SchedInstance parentInstance) {
private List<ExecuteTaskParam> loadExecutingTasks(SchedInstance instance, Operations ops) {
List<ExecuteTaskParam> executingTasks = new ArrayList<>();
SchedJob schedJob = LazyLoader.of(SchedJob.class, jobMapper::get, instance.getJobId());
String supervisorToken = SchedGroupManager.getDisjobGroup(schedJob.getJobGroup()).getSupervisorToken();
String supervisorToken = SchedGroupManager.get(schedJob.getJobGroup()).getSupervisorToken();
ExecuteTaskParam.Builder builder = ExecuteTaskParam.builder(instance, schedJob, supervisorToken);
// immediate trigger
long triggerTime = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import cn.ponfee.disjob.common.base.SingletonClassConstraint;
import cn.ponfee.disjob.common.concurrent.LoopThread;
import cn.ponfee.disjob.core.exception.GroupNotFoundException;
import cn.ponfee.disjob.core.model.SchedGroup;
import cn.ponfee.disjob.supervisor.dao.mapper.SchedGroupMapper;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -79,8 +80,12 @@ public boolean delete(String group) {

// ------------------------------------------------------------other methods

public static DisjobGroup getDisjobGroup(String group) {
return all.get(group);
public static DisjobGroup get(String group) {
DisjobGroup disjobGroup = all.get(group);
if (disjobGroup == null) {
throw new GroupNotFoundException("Not found worker group: " + group);
}
return disjobGroup;
}

public static Set<String> allGroups() {
Expand Down Expand Up @@ -117,14 +122,14 @@ public static class DisjobGroup {
private final Set<String> alarmSubscribers;
private final String webHook;

DisjobGroup(String workerToken, String supervisorToken, Set<String> alarmSubscribers, String webHook) {
private DisjobGroup(String workerToken, String supervisorToken, Set<String> alarmSubscribers, String webHook) {
this.workerToken = workerToken;
this.supervisorToken = supervisorToken;
this.alarmSubscribers = alarmSubscribers;
this.webHook = webHook;
}

static DisjobGroup of(SchedGroup schedGroup) {
private static DisjobGroup of(SchedGroup schedGroup) {
String alarmSubscribers = schedGroup.getAlarmSubscribers();
return new DisjobGroup(
schedGroup.getWorkerToken(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public Worker.Current currentWorker(@Value("${" + JobConstants.SPRING_WEB_SERVER
@DependsOn(JobConstants.SPRING_BEAN_NAME_CURRENT_WORKER)
@ConditionalOnMissingBean
@Bean
public WorkerCoreRpcService workerCoreRpcService(WorkerProperties workerProperties) {
return new WorkerCoreRpcProvider(workerProperties.getSupervisorToken());
public WorkerCoreRpcService workerCoreRpcService(Worker.Current currentWork) {
return new WorkerCoreRpcProvider(currentWork);
}

@ConditionalOnMissingBean
Expand Down
Loading

0 comments on commit d690b6c

Please sign in to comment.