Skip to content

Commit

Permalink
trip state
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Apr 6, 2024
1 parent 7dd7e36 commit 84406d0
Show file tree
Hide file tree
Showing 20 changed files with 159 additions and 87 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2022-2024 Ponfee (http://www.ponfee.cn/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.ponfee.disjob.common.base;

import java.util.concurrent.atomic.AtomicInteger;

/**
* Trip state
*
* @author Ponfee
*/
public class TripState {

private static final int NEW = 0;
private static final int RUNNING = 1;
private static final int STOPPED = 2;

private final AtomicInteger state;

private TripState() {
this.state = new AtomicInteger(NEW);
}

public static TripState create() {
return new TripState();
}

public static TripState createStarted() {
TripState state = new TripState();
state.start();
return state;
}

public boolean isNew() {
return state.get() == NEW;
}

public boolean start() {
return state.compareAndSet(NEW, RUNNING);
}

public boolean isRunning() {
return state.get() == RUNNING;
}

public boolean stop() {
return state.compareAndSet(RUNNING, STOPPED);
}

public boolean isStopped() {
return state.get() == STOPPED;
}

@Override
public String toString() {
if (isNew()) {
return "New";
}
return isRunning() ? "Running" : "Stopped";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package cn.ponfee.disjob.common.concurrent;

import cn.ponfee.disjob.common.base.TripState;
import com.google.common.base.CaseFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* The abstract heartbeat thread.
Expand All @@ -37,9 +37,9 @@ public abstract class AbstractHeartbeatThread extends Thread implements Closeabl
protected final Logger log = LoggerFactory.getLogger(getClass());

/**
* Thread is whether stopped status
* Thread heartbeat state
*/
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final TripState state = TripState.createStarted();

/**
* Heartbeat period milliseconds.
Expand All @@ -66,7 +66,7 @@ public final void run() {

try {
int processedCount = 0;
while (!stopped.get()) {
while (state.isRunning()) {
if (super.isInterrupted()) {
log.error("Thread exit by interrupted.");
break;
Expand All @@ -80,7 +80,7 @@ public final void run() {
isBusyLoop = heartbeat();
} catch (Throwable t) {
isBusyLoop = true;
log.error("Heartbeat occur error, stopped=" + stopped, t);
log.error("Heartbeat occur error, state=" + state, t);
}

long end = System.currentTimeMillis();
Expand All @@ -101,7 +101,7 @@ public final void run() {
}
}
} catch (InterruptedException e) {
log.warn("Sleep occur error in loop, stopped={}, error={}", stopped, e.getMessage());
log.warn("Sleep occur error in loop, state={}, error={}", state, e.getMessage());
Thread.currentThread().interrupt();
}

Expand Down Expand Up @@ -129,7 +129,7 @@ public void close() {
}

public boolean toStop() {
return stopped.compareAndSet(false, true);
return state.stop();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package cn.ponfee.disjob.common.concurrent;

import cn.ponfee.disjob.common.base.TripState;
import cn.ponfee.disjob.common.exception.Throwables.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,7 +25,6 @@
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -59,7 +59,8 @@ public final class AsyncDelayedExecutor<E> extends Thread {
private final ThreadPoolExecutor asyncExecutor;

private final DelayQueue<DelayedData<E>> queue = new DelayQueue<>();
private final AtomicBoolean stopped = new AtomicBoolean(false);

private final TripState state = TripState.createStarted();

public AsyncDelayedExecutor(Consumer<E> dataProcessor) {
this(1, dataProcessor);
Expand Down Expand Up @@ -97,14 +98,14 @@ public AsyncDelayedExecutor(int maximumPoolSize, Consumer<E> dataProcessor) {
* @param delayedData the delayed data
*/
public boolean put(DelayedData<E> delayedData) {
if (stopped.get()) {
if (state.isStopped()) {
return false;
}
return queue.offer(delayedData);
}

public boolean toStop() {
return stopped.compareAndSet(false, true);
return state.stop();
}

public void doStop() {
Expand All @@ -114,7 +115,7 @@ public void doStop() {

@Override
public void run() {
while (!stopped.get()) {
while (state.isRunning()) {
if (super.isInterrupted()) {
LOG.error("Async delayed thread interrupted.");
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package cn.ponfee.disjob.common.concurrent;

import cn.ponfee.disjob.common.base.TripState;
import cn.ponfee.disjob.common.exception.Throwables.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;

/**
* Loop thread
*
Expand All @@ -31,11 +30,7 @@ public class LoopThread extends Thread {

private static final Logger LOG = LoggerFactory.getLogger(LoopThread.class);

private static final int NEW = 0;
private static final int RUNNING = 1;
private static final int TERMINATED = 2;

private final AtomicInteger state = new AtomicInteger(NEW);
private final TripState state = TripState.create();
private final long periodMs;
private final long delayMs;
private final ThrowingRunnable<?> action;
Expand Down Expand Up @@ -72,7 +67,7 @@ public void run() {
if (delayMs > 0) {
ThrowingRunnable.doChecked(() -> Thread.sleep(delayMs));
}
while (state.get() == RUNNING) {
while (state.isRunning()) {
try {
action.run();
Thread.sleep(periodMs);
Expand All @@ -89,15 +84,15 @@ public void run() {

@Override
public synchronized void start() {
if (state.compareAndSet(NEW, RUNNING)) {
if (state.start()) {
super.start();
} else {
throw new IllegalStateException("Loop process thread already started.");
}
}

public boolean terminate() {
if (state.compareAndSet(RUNNING, TERMINATED)) {
if (state.stop()) {
ThrowingRunnable.doCaught(super::interrupt);
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package cn.ponfee.disjob.common.spring;

import cn.ponfee.disjob.common.base.Symbol.Char;
import cn.ponfee.disjob.common.base.Symbol.Str;
import cn.ponfee.disjob.common.collect.TypedMap;
import cn.ponfee.disjob.common.util.ClassUtils;
import cn.ponfee.disjob.common.util.Fields;
Expand Down Expand Up @@ -60,9 +59,7 @@ public <T> T extract(Class<T> beanType, String prefix) {
return null;
}

if (!prefix.isEmpty() && !prefix.endsWith(Str.DOT)) {
prefix += Str.DOT;
}
prefix = Strings.withDotSuffix(prefix);

T bean = ClassUtils.newInstance(beanType);
char[] separators = {Char.HYPHEN, Char.DOT};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ public static String concatSqlLike(String str) {
return str;
}

public static String withDotSuffix(String str) {
if (str == null || str.isEmpty()) {
return str;
}
return str + Str.DOT;
}

public static String trimUrlPath(String urlPath) {
if (StringUtils.isBlank(urlPath) || Str.SLASH.equals(urlPath)) {
return Str.SLASH;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import cn.ponfee.disjob.common.base.SingletonClassConstraint;
import cn.ponfee.disjob.common.base.TimingWheel;
import cn.ponfee.disjob.common.base.TripState;
import cn.ponfee.disjob.common.concurrent.AbstractHeartbeatThread;
import cn.ponfee.disjob.common.spring.RedisKeyRenewal;
import cn.ponfee.disjob.core.base.JobConstants;
Expand All @@ -30,7 +31,6 @@
import org.springframework.data.redis.core.script.RedisScript;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static cn.ponfee.disjob.common.spring.RedisTemplateUtils.evalScript;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -79,7 +79,7 @@ public class RedisTaskReceiver extends TaskReceiver {
*/
private static final byte[] LIST_POP_BATCH_SIZE_BYTES = Integer.toString(JobConstants.PROCESS_BATCH_SIZE).getBytes(UTF_8);

private final AtomicBoolean started = new AtomicBoolean(false);
private final TripState state = TripState.create();
private final ReceiveHeartbeatThread receiveHeartbeatThread;

public RedisTaskReceiver(Worker.Current currentWorker,
Expand All @@ -98,7 +98,7 @@ public boolean receive(ExecuteTaskParam task) {

@Override
public void start() {
if (!started.compareAndSet(false, true)) {
if (!state.start()) {
log.warn("Repeat call start method.");
return;
}
Expand All @@ -107,7 +107,7 @@ public void start() {

@Override
public void stop() {
if (!started.compareAndSet(true, false)) {
if (!state.stop()) {
log.warn("Repeat call stop method.");
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package cn.ponfee.disjob.registry;

import cn.ponfee.disjob.common.base.TripState;
import cn.ponfee.disjob.common.util.GenericUtils;
import cn.ponfee.disjob.core.base.Server;
import cn.ponfee.disjob.registry.discovery.DiscoveryServer;
Expand All @@ -29,7 +30,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -55,9 +55,9 @@ public abstract class ServerRegistry<R extends Server, D extends Server> impleme
protected final Set<R> registered = ConcurrentHashMap.newKeySet();

/**
* Server registry is whether closed status
* Server registry state
*/
protected final AtomicBoolean closed = new AtomicBoolean(false);
protected final TripState state = TripState.createStarted();

protected ServerRegistry(String namespace, char separator) {
this.separator = separator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public final boolean isConnected() {

@Override
public final void register(R server) {
if (closed.get()) {
if (state.isStopped()) {
return;
}

Expand Down Expand Up @@ -135,7 +135,7 @@ public List<R> getRegisteredServers() {
@PreDestroy
@Override
public void close() {
if (!closed.compareAndSet(false, true)) {
if (!state.stop()) {
return;
}

Expand Down Expand Up @@ -178,7 +178,7 @@ private static NewService.Check createCheck() {
}

private void checkPass() {
if (closed.get()) {
if (state.isStopped()) {
return;
}
for (R server : registered) {
Expand Down Expand Up @@ -215,7 +215,7 @@ private ConsulSubscriberThread(long initConsulIndex) {

@Override
public void run() {
while (!closed.get()) {
while (state.isRunning()) {
try {
Response<List<HealthService>> response = getDiscoveryServers(lastConsulIndex, WAIT_TIME_SECONDS);
Long currentIndex = response.getConsulIndex();
Expand Down
Loading

0 comments on commit 84406d0

Please sign in to comment.