Skip to content

Commit

Permalink
feat: allow different queues with priority and concurrency limits
Browse files Browse the repository at this point in the history
Closes #33
  • Loading branch information
DigitalDwagon committed Oct 6, 2024
1 parent c1770a4 commit 7623c11
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 6 deletions.
3 changes: 3 additions & 0 deletions src/main/java/dev/digitaldragon/WikiBot.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import dev.digitaldragon.interfaces.irc.IRCClient;
import dev.digitaldragon.interfaces.telegram.TelegramClient;
import dev.digitaldragon.jobs.CleanupListener;
import dev.digitaldragon.jobs.JobManager;
import dev.digitaldragon.jobs.LogFiles;
import dev.digitaldragon.util.Config;
import dev.digitaldragon.warcs.WarcproxManager;
Expand Down Expand Up @@ -51,6 +52,8 @@ public static void main (String[] args) {
System.exit(1);
}
discordClient = new DiscordClient();
JobManager.setQueueConcurrency("default", 15);
JobManager.setQueuePriority("default", 0);

WarcproxManager.run();
IRCClient.enable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,39 @@ public void onMessage(ChannelMessageEvent event) {
channel.sendMessage(nick + ": Submissions are now " + (submissionsEnabled ? "enabled." : "disabled."));
});

commands.put("setqueue", () -> {
if (!isOped(event.getChannel(), event.getActor())) {
channel.sendMessage(nick + ": You don't have permission to do that! Please ask an op.");
return;
}
List<String> args = Command.shellSplit(message);
if (args.size() != 3) {
channel.sendMessage(nick + ": invalid arguments! Usage: !setqueue <concurrency> <priority>");
return;
}
try {
String queue = args.get(0);
int concurrency = Integer.parseInt(args.get(1));
int priority = Integer.parseInt(args.get(2));
JobManager.setQueueConcurrency(queue, concurrency);
JobManager.setQueuePriority(queue, priority);
JobManager.launchJobs();
channel.sendMessage(String.format("%s: Set queue %s to max concurrency: %s, priority: %s", nick, queue, concurrency, priority));
} catch (Exception e) {
channel.sendMessage(nick + ": Invalid arguments! Usage: !setqueue <concurrency> <priority>");
}
});

commands.put("getqueue", () -> {
Integer concurrency = JobManager.getQueueConcurrency(message);
Integer priority = JobManager.getQueuePriority(message);
if (concurrency == null || priority == null) {
channel.sendMessage(nick + ": Queue does not exist");
return;
}
channel.sendMessage(String.format("%s: Queue %s - max concurrency: %s, priority: %s", nick, message, concurrency, priority));
});

if (WikiBot.getConfig().getWikiTeam3Config().warcEnabled()) commands.put("warctest", () -> {
JobManager.submit(new MediaWikiWARCJob(UUID.randomUUID().toString(), parts[1], new JobMeta(nick)));
});
Expand Down
80 changes: 74 additions & 6 deletions src/main/java/dev/digitaldragon/jobs/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@

import dev.digitaldragon.WikiBot;
import dev.digitaldragon.jobs.events.JobQueuedEvent;
import lombok.Getter;
import org.json.JSONObject;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class JobManager {
private static final ExecutorService executorService = Executors.newFixedThreadPool(15);
private static final Map<String, String> jobBuckets = new HashMap<>();
private static final int MAX_CONCURRENCY = 15;
private static final ExecutorService executorService = Executors.newFixedThreadPool(MAX_CONCURRENCY);
private static final Map<String, Job> jobs = new HashMap<>();
private static final List<String> pendingJobs = new ArrayList<>();
private static final List<String> runningJobs = new ArrayList<>();
@Getter
private static final Map<String, Integer> queueConcurrency = new HashMap<>();
@Getter
private static final Map<String, Integer> queuePriority = new HashMap<>();

/**
* Submits a job for execution.
Expand All @@ -24,7 +29,8 @@ public class JobManager {
public static void submit(Job job) {
jobs.put(job.getId(), job);
WikiBot.getBus().post(new JobQueuedEvent(job));
executorService.submit(job::run);
pendingJobs.add(job.getId());
launchJobs();
}

/**
Expand Down Expand Up @@ -70,6 +76,68 @@ public static List<Job> getQueuedJobs() {
return jobs.values().stream().filter(job -> job.getStatus() == JobStatus.QUEUED).collect(Collectors.toList());
}

public static Integer getQueueConcurrency(String queue) {
return queueConcurrency.get(queue) == null ? 0 : queueConcurrency.get(queue);
}

public static void setQueueConcurrency(String queue, int concurrency) {
queueConcurrency.put(queue, concurrency);
if (queuePriority.get(queue) == null) setQueuePriority(queue, 0);
}

public static Integer getQueuePriority(String queue) {
return queuePriority.get(queue) == null ? 0 : queuePriority.get(queue);
}

public static void setQueuePriority(String queue, int priority) {
queuePriority.put(queue, priority);
if (queueConcurrency.get(queue) == null) setQueueConcurrency(queue, 0);

}

public static void launchJobs() {
if (pendingJobs.isEmpty()) return;
if (runningJobs.size() >= MAX_CONCURRENCY) return;
Map<String, Integer> runningJobsPerQueue = new HashMap<>();
runningJobs.forEach(jobId -> {
String queue = get(jobId).getMeta().getQueue();
runningJobsPerQueue.computeIfAbsent(queue, (key) -> 0);
runningJobsPerQueue.put(queue, runningJobsPerQueue.get(queue) + 1);
});

Map<String, List<String>> pendingJobsPerQueue = new HashMap<>();
pendingJobs.forEach(jobId -> {
String queue = get(jobId).getMeta().getQueue();
pendingJobsPerQueue.computeIfAbsent(queue, (key) -> new ArrayList<>());
pendingJobsPerQueue.get(queue).add(jobId);
});
if (pendingJobsPerQueue.isEmpty()) return;

pendingJobsPerQueue.keySet().stream()
.sorted(Comparator.comparingInt((key) -> {
return -getQueuePriority(key); // Sorts smallest to greatest by default, so flip the prio numbers around
})) // Sort by priority
.forEach(queue -> {
pendingJobsPerQueue.get(queue).forEach(jobId -> {
if (runningJobs.size() >= MAX_CONCURRENCY) return;
runningJobsPerQueue.computeIfAbsent(queue, (key) -> 0);
if (runningJobsPerQueue.get(queue) >= getQueueConcurrency(queue)) return;
pendingJobs.remove(jobId);
runningJobs.add(jobId);
executorService.submit(() -> {
try {
get(jobId).run();
} catch (Exception e) {
e.printStackTrace();
//TODO - mark the job failed if this happens
}
runningJobs.remove(jobId);
launchJobs();
});
});
});
}

public static JSONObject getJsonForJob(Job job) {
JobMeta meta = job.getMeta();
JSONObject jsonObject = new JSONObject();
Expand Down
1 change: 1 addition & 0 deletions src/main/java/dev/digitaldragon/jobs/JobMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class JobMeta {
private Optional<String> explain = Optional.empty();
private JobPlatform platform;
private SilentMode silentMode = SilentMode.ALL;
private String queue = "default";
private Optional<String> targetUrl = Optional.empty();
private Optional<String> discordUserId = Optional.empty();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class DokuWikiDumperArgs {
private String silentMode;
@Parameter(names = {"--resume"})
private String resume;
@Parameter(names = {"--queue"})
private String queue;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public DokuWikiDumperJob(String userName, String id, DokuWikiDumperArgs args) {
meta.setExplain(args.getExplain());
meta.setTargetUrl(args.getUrl());
if (args.getSilentMode() != null) meta.setSilentMode(JobMeta.SilentMode.valueOf(args.getSilentMode()));
if (args.getQueue() != null) meta.setQueue(args.getQueue());
}

private void failure(int code) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class WikiTeam3Args {
private boolean warcOnly;
@Parameter(names = {"--silent-mode"})
private String silentMode = null;
@Parameter(names = {"--queue"})
private String queue;

/**
* This method checks the validity of three URL options - api, index, and url.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public WikiTeam3Job(String userName, String id, WikiTeam3Args args) {
meta.setExplain(args.getExplain());
meta.setTargetUrl(targetUrl);
if (args.getSilentMode() != null) meta.setSilentMode(JobMeta.SilentMode.valueOf(args.getSilentMode()));
if (args.getQueue() != null) meta.setQueue(args.getQueue());
}

private void failure(int code) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ url URL of the dokuWiki (provide the doku.php URL)
private boolean force;
@Parameter(names = {"--silent-mode"})
private String silentMode;
@Parameter(names = {"--queue"})
private String queue;

/**
* This method checks the validity of three URL options - api, index, and url.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public PukiWikiDumperJob(String userName, String id, PukiWikiDumperArgs args) {
meta.setExplain(args.getExplain());
meta.setTargetUrl(args.getUrl());
if (args.getSilentMode() != null) meta.setSilentMode(JobMeta.SilentMode.valueOf(args.getSilentMode()));
if (args.getQueue() != null) meta.setQueue(args.getQueue());
}

private void failure(int code) {
Expand Down

0 comments on commit 7623c11

Please sign in to comment.