Skip to content

Commit 34799ae

Browse files
committed
migrate to spring task scheduler
1 parent 80d18b8 commit 34799ae

File tree

8 files changed

+30
-76
lines changed

8 files changed

+30
-76
lines changed

src/main/java/vc/Application.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import com.fasterxml.jackson.databind.ObjectMapper;
66
import com.fasterxml.jackson.databind.SerializationFeature;
77
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
8-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
98
import discord4j.core.DiscordClientBuilder;
109
import discord4j.core.GatewayDiscordClient;
1110
import discord4j.core.object.presence.ClientActivity;
@@ -25,22 +24,21 @@
2524
import org.springframework.http.client.ClientHttpRequestFactory;
2625
import org.springframework.http.client.JdkClientHttpRequestFactory;
2726
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
27+
import org.springframework.scheduling.annotation.EnableScheduling;
2828
import org.springframework.web.client.RestTemplate;
2929

3030
import javax.annotation.PreDestroy;
3131
import java.time.Duration;
32-
import java.util.concurrent.Executors;
33-
import java.util.concurrent.ScheduledExecutorService;
3432

3533
import static org.slf4j.LoggerFactory.getLogger;
3634

3735
@SpringBootApplication
36+
@EnableScheduling
3837
public class Application {
3938
@Value("${BOT_TOKEN}")
4039
String token;
4140
private static final Logger LOGGER = getLogger("Application");
4241
private GatewayDiscordClient gatewayDiscordClient;
43-
private ScheduledExecutorService scheduledExecutorService;
4442

4543
public static void main(String[] args) {
4644
// System.setProperty("reactor.schedulers.defaultBoundedElasticOnVirtualThreads", "true");
@@ -98,23 +96,10 @@ public ObjectMapper objectMapper() {
9896
return mapper;
9997
}
10098

101-
@Bean
102-
public ScheduledExecutorService scheduledExecutorService() {
103-
this.scheduledExecutorService = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
104-
.setDaemon(true)
105-
.setNameFormat("scheduled-%d")
106-
.setUncaughtExceptionHandler((t, e) -> LOGGER.error("Uncaught exception in scheduled thread: {}", t.getName(), e))
107-
.build());
108-
return this.scheduledExecutorService;
109-
}
110-
11199
@PreDestroy
112100
public void onDestroy() {
113101
LOGGER.info("Shutting down Application");
114102
try {
115-
if (this.scheduledExecutorService != null) {
116-
this.scheduledExecutorService.shutdownNow();
117-
}
118103
if (this.gatewayDiscordClient != null) {
119104
this.gatewayDiscordClient.logout().block(Duration.ofSeconds(15));
120105
}

src/main/java/vc/config/live_feed/LiveFeedConfigManager.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import discord4j.core.object.entity.Guild;
66
import discord4j.discordjson.json.GuildData;
77
import discord4j.discordjson.json.GuildFields;
8+
import org.springframework.scheduling.annotation.Scheduled;
89
import org.springframework.stereotype.Component;
910
import reactor.core.publisher.Mono;
1011
import vc.config.ConfigDatabase;
@@ -14,26 +15,21 @@
1415
import java.util.Map;
1516
import java.util.Optional;
1617
import java.util.concurrent.ConcurrentHashMap;
17-
import java.util.concurrent.ScheduledExecutorService;
1818
import java.util.concurrent.TimeUnit;
1919

2020
@Component
2121
public class LiveFeedConfigManager {
2222
private final Map<String, LiveFeedConfigRecord> guildConfigMap;
2323
private final ConfigDatabase configDatabase;
24-
private final ScheduledExecutorService scheduledExecutorService;
2524
private final GatewayDiscordClient gatewayDiscordClient;
2625

2726
public LiveFeedConfigManager(
2827
final ConfigDatabase configDatabase,
29-
final ScheduledExecutorService scheduledExecutorService,
3028
final GatewayDiscordClient gatewayDiscordClient
3129
) {
32-
this.scheduledExecutorService = scheduledExecutorService;
3330
this.gatewayDiscordClient = gatewayDiscordClient;
3431
this.guildConfigMap = new ConcurrentHashMap<>();
3532
this.configDatabase = configDatabase;
36-
this.scheduledExecutorService.scheduleAtFixedRate(this::writeAllLiveFeedConfigs, 1, 1, TimeUnit.DAYS);
3733
}
3834

3935
public void loadGuild(final GuildFields guildFields) {
@@ -52,6 +48,7 @@ public Optional<LiveFeedConfigRecord> getLiveFeedConfig(final String guildId) {
5248
return Optional.ofNullable(guildConfigMap.get(guildId));
5349
}
5450

51+
@Scheduled(fixedRate = 1, initialDelay = 1, timeUnit = TimeUnit.DAYS)
5552
public void writeAllLiveFeedConfigs() {
5653
guildConfigMap.values().forEach(configDatabase::writeGuildConfigRecord);
5754
configDatabase.backupDatabase();

src/main/java/vc/config/watch/WatchConfigManager.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import java.util.*;
1111
import java.util.concurrent.ConcurrentHashMap;
12-
import java.util.concurrent.ScheduledExecutorService;
1312

1413
@Component
1514
public class WatchConfigManager implements DisposableBean {
@@ -21,15 +20,12 @@ public class WatchConfigManager implements DisposableBean {
2120
private final Map<String, Set<String>> ownerToUserWatchMap = new ConcurrentHashMap<>();
2221
private final Map<String, Set<String>> guildToGuildWatchMap = new ConcurrentHashMap<>();
2322
private final ConfigDatabase configDatabase;
24-
private final ScheduledExecutorService scheduledExecutorService;
2523
private final GatewayDiscordClient gatewayDiscordClient;
2624

2725
public WatchConfigManager(
2826
ConfigDatabase configDatabase,
29-
ScheduledExecutorService scheduledExecutorService,
3027
GatewayDiscordClient gatewayDiscordClient
3128
) {
32-
this.scheduledExecutorService = scheduledExecutorService;
3329
this.configDatabase = configDatabase;
3430
this.gatewayDiscordClient = gatewayDiscordClient;
3531
loadUserWatchConfigs();

src/main/java/vc/live/LiveChat.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import vc.live.dto.DeathsRecord;
1414

1515
import java.util.List;
16-
import java.util.concurrent.ScheduledExecutorService;
1716

1817
import static java.util.Arrays.asList;
1918

@@ -26,17 +25,17 @@ public class LiveChat extends LiveFeed {
2625
public LiveChat(final RedisClient redisClient,
2726
final GatewayDiscordClient discordClient,
2827
final LiveFeedConfigManager guildConfigManager,
29-
final ScheduledExecutorService executorService,
3028
final ObjectMapper objectMapper,
3129
@Value("${LIVE_FEEDS}")
3230
final String liveFeedsEnabled
3331
) {
34-
super(redisClient,
32+
super(
33+
redisClient,
3534
discordClient,
3635
guildConfigManager,
37-
executorService,
3836
objectMapper,
39-
Boolean.parseBoolean(liveFeedsEnabled));
37+
Boolean.parseBoolean(liveFeedsEnabled)
38+
);
4039
}
4140

4241
@Override

src/main/java/vc/live/LiveConnections.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import vc.live.dto.enums.Connectiontype;
1414

1515
import java.util.List;
16-
import java.util.concurrent.ScheduledExecutorService;
1716

1817
import static java.util.Arrays.asList;
1918

@@ -23,17 +22,17 @@ public LiveConnections(
2322
final RedisClient redisClient,
2423
final GatewayDiscordClient discordClient,
2524
final LiveFeedConfigManager guildConfigManager,
26-
final ScheduledExecutorService executorService,
2725
final ObjectMapper objectMapper,
2826
@Value("${LIVE_FEEDS}")
2927
final String liveFeedsEnabled
3028
) {
31-
super(redisClient,
29+
super(
30+
redisClient,
3231
discordClient,
3332
guildConfigManager,
34-
executorService,
3533
objectMapper,
36-
Boolean.parseBoolean(liveFeedsEnabled));
34+
Boolean.parseBoolean(liveFeedsEnabled)
35+
);
3736
}
3837

3938
@Override

src/main/java/vc/live/LiveFeed.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.redisson.api.RReliableTopic;
1717
import org.slf4j.Logger;
1818
import org.springframework.beans.factory.DisposableBean;
19+
import org.springframework.scheduling.annotation.Scheduled;
1920
import reactor.core.Exceptions;
2021
import reactor.core.publisher.Flux;
2122
import reactor.core.publisher.Mono;
@@ -30,12 +31,14 @@
3031
import java.time.Duration;
3132
import java.time.Instant;
3233
import java.util.*;
33-
import java.util.concurrent.*;
34+
import java.util.concurrent.ConcurrentHashMap;
35+
import java.util.concurrent.PriorityBlockingQueue;
36+
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.TimeoutException;
3438
import java.util.concurrent.atomic.AtomicInteger;
3539
import java.util.function.Function;
3640

3741
import static java.util.concurrent.TimeUnit.MINUTES;
38-
import static java.util.concurrent.TimeUnit.SECONDS;
3942
import static org.slf4j.LoggerFactory.getLogger;
4043

4144
public abstract class LiveFeed implements DisposableBean {
@@ -47,18 +50,15 @@ public abstract class LiveFeed implements DisposableBean {
4750
protected final Map<InputQueue, TopicListener> inputTopics;
4851
protected final LiveFeedConfigManager guildConfigManager;
4952
private final PriorityBlockingQueue<Message> messageQueue;
50-
private final ScheduledExecutorService executorService;
5153
private final ObjectMapper objectMapper;
5254
private final Cache<String, AtomicInteger> guildMessageSendFailCountCache = CacheBuilder.newBuilder()
5355
.expireAfterWrite(5, MINUTES)
5456
.build();
55-
private ScheduledFuture<?> processMessageQueueFuture;
5657

5758
public LiveFeed(
5859
final RedisClient redisClient,
5960
final GatewayDiscordClient discordClient,
6061
final LiveFeedConfigManager guildConfigManager,
61-
final ScheduledExecutorService executorService,
6262
final ObjectMapper objectMapper,
6363
final boolean liveFeedEnabled
6464
) {
@@ -68,12 +68,10 @@ public LiveFeed(
6868
this.guildConfigManager = guildConfigManager;
6969
this.messageQueue = new PriorityBlockingQueue<>(MESSAGE_Q_CAPACITY);
7070
this.inputTopics = new ConcurrentHashMap<>();
71-
this.executorService = executorService;
7271
this.objectMapper = objectMapper;
7372
if (liveFeedEnabled) {
7473
LOGGER.info("Starting {} live feed", getClass().getSimpleName());
7574
syncChannels();
76-
this.processMessageQueueFuture = this.executorService.scheduleWithFixedDelay(this::processMessageQueue, ((int) (Math.random() * 10)), 10, SECONDS);
7775
inputQueues().forEach(this::registerInputQueue);
7876
} else {
7977
LOGGER.info("Live feed {} disabled", getClass().getSimpleName());
@@ -188,6 +186,7 @@ private RestChannel getRestChannel(final String channelId) {
188186
.block();
189187
}
190188

189+
@Scheduled(initialDelay = 5, fixedRate = 10, timeUnit = TimeUnit.SECONDS)
191190
protected void processMessageQueue() {
192191
try {
193192
final List<EmbedData> embeds = new ArrayList<>(4);
@@ -297,9 +296,6 @@ public void onAllGuildsLoaded() {
297296
@Override
298297
public void destroy() {
299298
LOGGER.info("Shutting down {} live feed", getClass().getSimpleName());
300-
if (processMessageQueueFuture != null && !processMessageQueueFuture.isCancelled()) {
301-
processMessageQueueFuture.cancel(true);
302-
}
303299
inputTopics.values().forEach(topicListener -> {
304300
try {
305301
topicListener.topic.removeListener(topicListener.id);

src/main/java/vc/live/watch/WatchManager.java

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.slf4j.LoggerFactory;
1616
import org.springframework.beans.factory.DisposableBean;
1717
import org.springframework.beans.factory.annotation.Value;
18+
import org.springframework.scheduling.annotation.Scheduled;
1819
import org.springframework.stereotype.Component;
1920
import reactor.core.Exceptions;
2021
import reactor.core.publisher.Mono;
@@ -32,7 +33,8 @@
3233
import java.time.Duration;
3334
import java.util.List;
3435
import java.util.UUID;
35-
import java.util.concurrent.*;
36+
import java.util.concurrent.ConcurrentLinkedDeque;
37+
import java.util.concurrent.TimeoutException;
3638
import java.util.function.BiFunction;
3739
import java.util.function.Function;
3840

@@ -42,19 +44,13 @@ public class WatchManager implements DisposableBean {
4244
private final WatchConfigManager watchConfigManager;
4345
private final RedisClient redisClient;
4446
private final GatewayDiscordClient discordClient;
45-
private final ScheduledExecutorService executorService;
4647
private final ObjectMapper objectMapper;
4748
private final boolean watchesEnabled;
4849
private final ConcurrentLinkedDeque<ConnectionsRecord> joinsQueue = new ConcurrentLinkedDeque<>();
4950
private final ConcurrentLinkedDeque<ConnectionsRecord> leavesQueue = new ConcurrentLinkedDeque<>();
5051
private final ConcurrentLinkedDeque<ChatsRecord> chatsQueue = new ConcurrentLinkedDeque<>();
5152
private final ConcurrentLinkedDeque<DeathsRecord> deathsQueue = new ConcurrentLinkedDeque<>();
5253
private final ConcurrentLinkedDeque<DeathsRecord> killsQueue = new ConcurrentLinkedDeque<>();
53-
ScheduledFuture<?> processJoinsFuture;
54-
ScheduledFuture<?> processLeavesFuture;
55-
ScheduledFuture<?> processChatsFuture;
56-
ScheduledFuture<?> processDeathsFuture;
57-
ScheduledFuture<?> processKillsFuture;
5854
RReliableTopic connectionsTopic;
5955
RReliableTopic chatsTopic;
6056
RReliableTopic deathsTopic;
@@ -66,15 +62,13 @@ public WatchManager(
6662
final WatchConfigManager watchConfigManager,
6763
final RedisClient redisClient,
6864
final GatewayDiscordClient discordClient,
69-
final ScheduledExecutorService executorService,
7065
final ObjectMapper objectMapper,
7166
@Value("${WATCHES}")
7267
final String watchesEnabled
7368
) {
7469
this.watchConfigManager = watchConfigManager;
7570
this.redisClient = redisClient;
7671
this.discordClient = discordClient;
77-
this.executorService = executorService;
7872
this.objectMapper = objectMapper;
7973
this.watchesEnabled = Boolean.parseBoolean(watchesEnabled);
8074
if (this.watchesEnabled) {
@@ -85,21 +79,17 @@ public WatchManager(
8579
LOGGER.info("Loaded {} guild watch configs", watchConfigManager.getAllGuildWatchConfigs().size());
8680
connectionsTopic = this.redisClient.getTopic("ConnectionsTopic");
8781
connectionsTopicId = connectionsTopic.addListener(String.class, (channel, msg) -> connectionsTopicListener(msg));
88-
processJoinsFuture = executorService.scheduleAtFixedRate(this::processJoinsQueue, 0, 1, TimeUnit.SECONDS);
89-
processLeavesFuture = executorService.scheduleAtFixedRate(this::processLeavesQueue, 0, 1, TimeUnit.SECONDS);
9082
chatsTopic = this.redisClient.getTopic("ChatsTopic");
9183
chatsTopicId = chatsTopic.addListener(String.class, (channel, msg) -> chatsTopicListener(msg));
92-
processChatsFuture = executorService.scheduleAtFixedRate(this::processChatsQueue, 0, 1, TimeUnit.SECONDS);
9384
deathsTopic = this.redisClient.getTopic("DeathsTopic");
9485
deathsTopicId = deathsTopic.addListener(String.class, (channel, msg) -> deathsTopicListener(msg));
95-
processDeathsFuture = executorService.scheduleAtFixedRate(this::processDeathsQueue, 0, 1, TimeUnit.SECONDS);
96-
processKillsFuture = executorService.scheduleAtFixedRate(this::processKillsQueue, 0, 1, TimeUnit.SECONDS);
9786
LOGGER.info("Watch manager initialized");
9887
} else {
9988
LOGGER.info("Watch manager disabled");
10089
}
10190
}
10291

92+
@Scheduled(fixedRate = 1000)
10393
private void processJoinsQueue() {
10494
processQueue(
10595
"Joins",
@@ -112,6 +102,7 @@ private void processJoinsQueue() {
112102
);
113103
}
114104

105+
@Scheduled(fixedRate = 1000)
115106
private void processLeavesQueue() {
116107
processQueue(
117108
"Leaves",
@@ -124,6 +115,7 @@ private void processLeavesQueue() {
124115
);
125116
}
126117

118+
@Scheduled(fixedRate = 1000)
127119
private void processKillsQueue() {
128120
processQueue(
129121
"Kills",
@@ -136,6 +128,7 @@ private void processKillsQueue() {
136128
);
137129
}
138130

131+
@Scheduled(fixedRate = 1000)
139132
private void processDeathsQueue() {
140133
processQueue(
141134
"Deaths",
@@ -148,6 +141,7 @@ private void processDeathsQueue() {
148141
);
149142
}
150143

144+
@Scheduled(fixedRate = 1000)
151145
private void processChatsQueue() {
152146
processQueue(
153147
"Chats",
@@ -457,15 +451,6 @@ public void destroy() throws Exception {
457451
} catch (Exception e) {
458452
LOGGER.error("Failed to remove Redis topic listener: {}", deathsTopicId, e);
459453
}
460-
try {
461-
processJoinsFuture.cancel(true);
462-
processLeavesFuture.cancel(true);
463-
processChatsFuture.cancel(true);
464-
processDeathsFuture.cancel(true);
465-
processKillsFuture.cancel(true);
466-
} catch (Exception e) {
467-
LOGGER.error("Failed to cancel scheduled tasks", e);
468-
}
469454
}
470455

471456
public void onAllGuildsLoaded(final List<UserGuildData> guilds) {

0 commit comments

Comments
 (0)