Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler pattern #76 #3135

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 105 additions & 44 deletions microservices-log-aggregation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,78 +48,139 @@ The `CentralLogStore` is responsible for storing the logs collected from various

```java
public class CentralLogStore {
private final List<LogEntry> logs = new ArrayList<>();

private final List<LogEntry> logs = new ArrayList<>();
public void storeLog(LogEntry logEntry) {
logs.add(logEntry);
}
public void displayLogs() {
logs.forEach(System.out::println);
}
}
```

The `LogAggregator` collects logs from various services and stores them in the `CentralLogStore`. It filters logs based on their log level.


```java
//This will be the structure to hold the log data
public class LogEntry {
private final String serviceName;
private final String logLevel;
private final String message;
private final LocalDateTime timestamp;

public LogEntry(String serviceName, String logLevel, String message, LocalDateTime timestamp) {
this.serviceName = serviceName;
this.logLevel = logLevel;
this.message = message;
this.timestamp = timestamp;
}

@Override
public String toString() {
return "LogEntry{" +
"serviceName='" + serviceName + '\'' +
", logLevel='" + logLevel + '\'' +
", message='" + message + '\'' +
", timestamp=" + timestamp +
'}';
}
public String getServiceName() {
return serviceName;
}

public String getLogLevel() {
return logLevel;
}

public void storeLog(LogEntry logEntry) {
logs.add(logEntry);
}
public String getMessage() {
return message;
}

public void displayLogs() {
logs.forEach(System.out::println);
}
public LocalDateTime getTimestamp() {
return timestamp;
}
}
```

The `LogAggregator` collects logs from various services and stores them in the `CentralLogStore`. It filters logs based on their log level.

```java
public class LogAggregator {
private final CentralLogStore centralLogStore;
private final BlockingQueue<LogEntry> logQueue;

private final CentralLogStore centralLogStore;
private final LogLevel minimumLogLevel;

public LogAggregator(CentralLogStore centralLogStore, LogLevel minimumLogLevel) {
this.centralLogStore = centralLogStore;
this.minimumLogLevel = minimumLogLevel;
}
public LogAggregator(CentralLogStore centralLogStore, BlockingQueue<LogEntry> logQueue) {
this.centralLogStore = centralLogStore;
this.logQueue = logQueue;
}

public void collectLog(LogEntry logEntry) {
if (logEntry.getLogLevel().compareTo(minimumLogLevel) >= 0) {
centralLogStore.storeLog(logEntry);
public void startLogAggregation() {
new Thread(() -> {
try {
while (true) {
LogEntry log = logQueue.take();
centralLogStore.storeLog(log);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
```

The `LogProducer` represents a service that generates logs. It sends the logs to the `LogAggregator`.

```java
public class LogProducer {
private final String serviceName;
private final BlockingQueue<LogEntry> logQueue;

private final String serviceName;
private final LogAggregator logAggregator;

public LogProducer(String serviceName, LogAggregator logAggregator) {
this.serviceName = serviceName;
this.logAggregator = logAggregator;
}
public LogProducer(String serviceName, BlockingQueue<LogEntry> logQueue) {
this.serviceName = serviceName;
this.logQueue = logQueue;
}

public void generateLog(LogLevel logLevel, String message) {
LogEntry logEntry = new LogEntry(serviceName, logLevel, message, LocalDateTime.now());
logAggregator.collectLog(logEntry);
}
public void generateLog(String logLevel, String message) {
LogEntry logEntry = new LogEntry(serviceName, logLevel, message, java.time.LocalDateTime.now());
try {
logQueue.put(logEntry);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
```

The `main` application creates services, generates logs, aggregates, and finally displays the logs.

```java
public class App {

public static void main(String[] args) throws InterruptedException {
final CentralLogStore centralLogStore = new CentralLogStore();
final LogAggregator aggregator = new LogAggregator(centralLogStore, LogLevel.INFO);

final LogProducer serviceA = new LogProducer("ServiceA", aggregator);
final LogProducer serviceB = new LogProducer("ServiceB", aggregator);

serviceA.generateLog(LogLevel.INFO, "This is an INFO log from ServiceA");
serviceB.generateLog(LogLevel.ERROR, "This is an ERROR log from ServiceB");
serviceA.generateLog(LogLevel.DEBUG, "This is a DEBUG log from ServiceA");

centralLogStore.displayLogs();
}
public static void main(String[] args) {
CentralLogStore centralLogStore = new CentralLogStore();

BlockingQueue<LogEntry> logQueue = new LinkedBlockingQueue<>();

LogAggregator logAggregator = new LogAggregator(centralLogStore, logQueue);
logAggregator.startLogAggregation(); // Start aggregation in the background


LogProducer serviceA = new LogProducer("ServiceA", logQueue);
LogProducer serviceB = new LogProducer("ServiceB", logQueue);


serviceA.generateLog("INFO", "This is an INFO log from ServiceA");
serviceB.generateLog("ERROR", "This is an ERROR log from ServiceB");
serviceA.generateLog("DEBUG", "This is a DEBUG log from ServiceA");

try {
Thread.sleep(2000); // Wait for logs to be processed
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
centralLogStore.displayLogs();
}
}
```

Expand Down
69 changes: 69 additions & 0 deletions schedule pattern
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import java.util.ArrayList;
import java.util.List;
interface Job {
void execute();
}
interface Trigger {
boolean shouldRun();
}
class Scheduler {
private List<Job> jobs;
private List<Trigger> triggers;
public Scheduler() {
this.jobs = new ArrayList<>();
this.triggers = new ArrayList<>();
}
public void addJob(Job job, Trigger trigger) {
jobs.add(job);
triggers.add(trigger);
}
public void start() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we use spring boot and the framework itself provides centralized system and the dev just needs to use specific stereotype annotations to schedule job with configs

while (true) {
for (int i = 0; i < jobs.size(); i++) {
Job job = jobs.get(i);
Trigger trigger = triggers.get(i);

if (trigger.shouldRun()) {
job.execute();
}
}
try {
Thread.sleep(1000); // Delay for 1 second
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
class ExampleJob implements Job {
@Override
public void execute() {
System.out.println("Executing ExampleJob at " + System.currentTimeMillis());
}
}
class TimeBasedTrigger implements Trigger {
private long lastExecutionTime;
private long interval;
public TimeBasedTrigger(long interval) {
this.interval = interval;
this.lastExecutionTime = System.currentTimeMillis();
}
@Override
public boolean shouldRun() {
long currentTime = System.currentTimeMillis();
if (currentTime - lastExecutionTime >= interval) {
lastExecutionTime = currentTime;
return true;
}
return false;
}
}
public class JobSDesign {
public static void main(String[] args) {
Scheduler scheduler = new Scheduler();
Job exampleJob = new ExampleJob();
Trigger timeBasedTrigger = new TimeBasedTrigger(5000); // Run every 5 seconds
scheduler.addJob(exampleJob, timeBasedTrigger);
scheduler.start();
}
}
43 changes: 27 additions & 16 deletions twin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,36 +84,47 @@ public class BallItem extends GameItem {
public class BallThread extends Thread {
@Setter
private BallItem twin;
private volatile boolean isSuspended;
private volatile boolean isRunning = true;
private boolean isSuspended;
private boolean isRunning = true;

@Override
public void run() {
while (isRunning) {
if (!isSuspended) {
synchronized (this) {
while (isRunning) {
while (isSuspended) {
try {
wait();
} catch (InterruptedException e) {
LOGGER.error("Thread interrupted", e);
}
}
twin.draw();
twin.move();
}
try {
Thread.sleep(250);
} catch (InterruptedException e) {
throw new RuntimeException(e);
try {
Thread.sleep(250); // Optional delay for smooth execution
} catch (InterruptedException e) {
LOGGER.error("Sleep interrupted", e);
}
}
}
}

public void suspendMe() {
public synchronized void suspendMe() {
isSuspended = true;
LOGGER.info("Begin to suspend BallThread");
LOGGER.info("BallThread suspended");
}

public void resumeMe() {
public synchronized void resumeMe() {
isSuspended = false;
LOGGER.info("Begin to resume BallThread");
LOGGER.info("BallThread resumed");
notify();
}

public void stopMe() {
this.isRunning = false;
this.isSuspended = true;
public synchronized void stopMe() {
isRunning = false;
isSuspended = false;
LOGGER.info("BallThread stopping");
notify();
}
}
```
Expand Down