|
10 | 10 | import discord4j.discordjson.json.MessageCreateRequest; |
11 | 11 | import discord4j.rest.entity.RestChannel; |
12 | 12 | import discord4j.rest.http.client.ClientException; |
| 13 | +import discord4j.rest.util.MultipartRequest; |
13 | 14 | import org.redisson.api.RBoundedBlockingQueue; |
14 | 15 | import org.slf4j.Logger; |
| 16 | +import reactor.core.Exceptions; |
15 | 17 | import reactor.core.publisher.Flux; |
16 | 18 | import reactor.core.publisher.Mono; |
17 | 19 | import reactor.core.scheduler.Schedulers; |
18 | | -import reactor.retry.RetryExhaustedException; |
19 | 20 | import reactor.util.retry.Retry; |
20 | 21 | import vc.config.GuildConfigManager; |
21 | 22 | import vc.config.GuildConfigRecord; |
@@ -91,7 +92,7 @@ record InputQueue<T>( |
91 | 92 | record Message(EmbedData embedData, long timestamp) implements Comparable<Message> { |
92 | 93 | @Override |
93 | 94 | public int compareTo(final Message o) { |
94 | | - return (timestamp() < o.timestamp()) ? -1 : ((timestamp() == o.timestamp()) ? 0 : 1); |
| 95 | + return Long.compare(timestamp(), o.timestamp()); |
95 | 96 | } |
96 | 97 | } |
97 | 98 |
|
@@ -206,35 +207,34 @@ protected void processMessageQueue() { |
206 | 207 | } |
207 | 208 | } |
208 | 209 | if (embeds.isEmpty()) return; |
| 210 | + final MultipartRequest<MessageCreateRequest> request = MultipartRequest.ofRequest(MessageCreateRequest.builder() |
| 211 | + .embeds(embeds) |
| 212 | + .build()); |
209 | 213 | // todo: test if we need to use a rate limiter between sending messages to different guilds |
210 | 214 | Flux.fromIterable(liveChannels.entrySet()) |
211 | 215 | .parallel() |
212 | 216 | .runOn(Schedulers.parallel()) |
213 | | - .flatMap(entry -> processSend(entry, embeds)) |
| 217 | + .flatMap(entry -> processSend(entry.getKey(), entry.getValue(), request)) |
214 | 218 | .sequential() |
215 | 219 | .blockLast(Duration.ofSeconds(20)); |
216 | 220 | } catch (final Throwable e) { |
217 | 221 | LOGGER.error("Error processing message queue", e); |
218 | 222 | } |
219 | 223 | } |
220 | 224 |
|
221 | | - private Mono<?> processSend(final Map.Entry<String, RestChannel> entry, final List<EmbedData> embeds) { |
222 | | - final RestChannel channel = entry.getValue(); |
223 | | - final String guildId = entry.getKey(); |
224 | | - return channel.createMessage( |
225 | | - MessageCreateRequest.builder() |
226 | | - .embeds(embeds) |
227 | | - .build()) |
| 225 | + private Mono<?> processSend(String guildId, RestChannel channel, MultipartRequest<MessageCreateRequest> request) { |
| 226 | + return channel.createMessage(request) |
228 | 227 | .timeout(Duration.ofSeconds(3)) |
229 | 228 | // retry only on TimeoutException |
230 | 229 | .retryWhen(Retry.fixedDelay(1, Duration.ofSeconds(1)) |
231 | 230 | .filter(error -> error instanceof TimeoutException) |
232 | | - .onRetryExhaustedThrow((spec, signal) -> |
233 | | - new RetryExhaustedException("Retries exhausted sending message to guild: " + guildId + ", channelId: " + channel.getId().asString(), signal.failure()))) |
| 231 | + .onRetryExhaustedThrow((spec, signal) -> Exceptions.retryExhausted( |
| 232 | + "Retries exhausted sending message to guild: " + guildId + ", channelId: " + channel.getId().asString(), |
| 233 | + signal.failure()))) |
234 | 234 | .onErrorResume(error -> { |
235 | | - if (error instanceof RetryExhaustedException e) { |
236 | | - handleBroadcastError(e.getCause(), guildId, channel); |
237 | | - } else |
| 235 | + if (Exceptions.isRetryExhausted(error)) |
| 236 | + handleBroadcastError(error.getCause(), guildId, channel); |
| 237 | + else |
238 | 238 | handleBroadcastError(error, guildId, channel); |
239 | 239 | return Mono.empty(); |
240 | 240 | }); |
|
0 commit comments