From 62171804cad8e0742613b2eaeb3697a997cc50d7 Mon Sep 17 00:00:00 2001 From: Nothere998 Date: Sat, 7 Dec 2024 17:50:15 +0200 Subject: [PATCH 1/5] Update README.md issue This PR Solves In microservices, logs are often collected using busy waiting loops, where the system constantly checks for new logs. This leads to several issues: Wasting Resources: The system keeps checking for new logs, using up CPU resources even when there are no logs to process. Increased Latency: Logs are delayed because the system is busy checking for new data rather than processing logs as they arrive. This PR solves these problems by switching to a more efficient event-driven log aggregation system. Instead of constantly checking for new logs, the system reacts only when new logs are available, reducing unnecessary CPU usage and improving scalability and performance. --- microservices-log-aggregation/README.md | 149 +++++++++++++++++------- 1 file changed, 105 insertions(+), 44 deletions(-) diff --git a/microservices-log-aggregation/README.md b/microservices-log-aggregation/README.md index c23b1fe4319b..f75e2f847096 100644 --- a/microservices-log-aggregation/README.md +++ b/microservices-log-aggregation/README.md @@ -48,37 +48,85 @@ The `CentralLogStore` is responsible for storing the logs collected from various ```java public class CentralLogStore { + private final List logs = new ArrayList<>(); - private final List logs = new ArrayList<>(); + public void storeLog(LogEntry logEntry) { + logs.add(logEntry); + } + public void displayLogs() { + logs.forEach(System.out::println); + } +} +``` + +The `LogAggregator` collects logs from various services and stores them in the `CentralLogStore`. It filters logs based on their log level. + + +```java +//This will be the structure to hold the log data +public class LogEntry { + private final String serviceName; + private final String logLevel; + private final String message; + private final LocalDateTime timestamp; + + public LogEntry(String serviceName, String logLevel, String message, LocalDateTime timestamp) { + this.serviceName = serviceName; + this.logLevel = logLevel; + this.message = message; + this.timestamp = timestamp; + } + + @Override + public String toString() { + return "LogEntry{" + + "serviceName='" + serviceName + '\'' + + ", logLevel='" + logLevel + '\'' + + ", message='" + message + '\'' + + ", timestamp=" + timestamp + + '}'; + } + public String getServiceName() { + return serviceName; + } + + public String getLogLevel() { + return logLevel; + } - public void storeLog(LogEntry logEntry) { - logs.add(logEntry); - } + public String getMessage() { + return message; + } - public void displayLogs() { - logs.forEach(System.out::println); - } + public LocalDateTime getTimestamp() { + return timestamp; + } } ``` -The `LogAggregator` collects logs from various services and stores them in the `CentralLogStore`. It filters logs based on their log level. ```java public class LogAggregator { + private final CentralLogStore centralLogStore; + private final BlockingQueue logQueue; - private final CentralLogStore centralLogStore; - private final LogLevel minimumLogLevel; - - public LogAggregator(CentralLogStore centralLogStore, LogLevel minimumLogLevel) { - this.centralLogStore = centralLogStore; - this.minimumLogLevel = minimumLogLevel; - } + public LogAggregator(CentralLogStore centralLogStore, BlockingQueue logQueue) { + this.centralLogStore = centralLogStore; + this.logQueue = logQueue; + } - public void collectLog(LogEntry logEntry) { - if (logEntry.getLogLevel().compareTo(minimumLogLevel) >= 0) { - centralLogStore.storeLog(logEntry); + public void startLogAggregation() { + new Thread(() -> { + try { + while (true) { + LogEntry log = logQueue.take(); + centralLogStore.storeLog(log); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }).start(); } - } } ``` @@ -86,19 +134,22 @@ The `LogProducer` represents a service that generates logs. It sends the logs to ```java public class LogProducer { + private final String serviceName; + private final BlockingQueue logQueue; - private final String serviceName; - private final LogAggregator logAggregator; - - public LogProducer(String serviceName, LogAggregator logAggregator) { - this.serviceName = serviceName; - this.logAggregator = logAggregator; - } + public LogProducer(String serviceName, BlockingQueue logQueue) { + this.serviceName = serviceName; + this.logQueue = logQueue; + } - public void generateLog(LogLevel logLevel, String message) { - LogEntry logEntry = new LogEntry(serviceName, logLevel, message, LocalDateTime.now()); - logAggregator.collectLog(logEntry); - } + public void generateLog(String logLevel, String message) { + LogEntry logEntry = new LogEntry(serviceName, logLevel, message, java.time.LocalDateTime.now()); + try { + logQueue.put(logEntry); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } ``` @@ -106,20 +157,30 @@ The `main` application creates services, generates logs, aggregates, and finally ```java public class App { - - public static void main(String[] args) throws InterruptedException { - final CentralLogStore centralLogStore = new CentralLogStore(); - final LogAggregator aggregator = new LogAggregator(centralLogStore, LogLevel.INFO); - - final LogProducer serviceA = new LogProducer("ServiceA", aggregator); - final LogProducer serviceB = new LogProducer("ServiceB", aggregator); - - serviceA.generateLog(LogLevel.INFO, "This is an INFO log from ServiceA"); - serviceB.generateLog(LogLevel.ERROR, "This is an ERROR log from ServiceB"); - serviceA.generateLog(LogLevel.DEBUG, "This is a DEBUG log from ServiceA"); - - centralLogStore.displayLogs(); - } + public static void main(String[] args) { + CentralLogStore centralLogStore = new CentralLogStore(); + + BlockingQueue logQueue = new LinkedBlockingQueue<>(); + + LogAggregator logAggregator = new LogAggregator(centralLogStore, logQueue); + logAggregator.startLogAggregation(); // Start aggregation in the background + + + LogProducer serviceA = new LogProducer("ServiceA", logQueue); + LogProducer serviceB = new LogProducer("ServiceB", logQueue); + + + serviceA.generateLog("INFO", "This is an INFO log from ServiceA"); + serviceB.generateLog("ERROR", "This is an ERROR log from ServiceB"); + serviceA.generateLog("DEBUG", "This is a DEBUG log from ServiceA"); + + try { + Thread.sleep(2000); // Wait for logs to be processed + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + centralLogStore.displayLogs(); + } } ``` From 5171a96a024dc8d44efacadc8553135d7d379d98 Mon Sep 17 00:00:00 2001 From: Nothere998 <190948515+Nothere998@users.noreply.github.com> Date: Sat, 7 Dec 2024 18:54:44 +0200 Subject: [PATCH 2/5] fix #twin-readme --- twin/README.md | 43 +++++++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/twin/README.md b/twin/README.md index b2913e0a963f..0c40b810b3bc 100644 --- a/twin/README.md +++ b/twin/README.md @@ -84,36 +84,47 @@ public class BallItem extends GameItem { public class BallThread extends Thread { @Setter private BallItem twin; - private volatile boolean isSuspended; - private volatile boolean isRunning = true; + private boolean isSuspended; + private boolean isRunning = true; + @Override public void run() { - while (isRunning) { - if (!isSuspended) { + synchronized (this) { + while (isRunning) { + while (isSuspended) { + try { + wait(); + } catch (InterruptedException e) { + LOGGER.error("Thread interrupted", e); + } + } twin.draw(); twin.move(); - } - try { - Thread.sleep(250); - } catch (InterruptedException e) { - throw new RuntimeException(e); + try { + Thread.sleep(250); // Optional delay for smooth execution + } catch (InterruptedException e) { + LOGGER.error("Sleep interrupted", e); + } } } } - public void suspendMe() { + public synchronized void suspendMe() { isSuspended = true; - LOGGER.info("Begin to suspend BallThread"); + LOGGER.info("BallThread suspended"); } - public void resumeMe() { + public synchronized void resumeMe() { isSuspended = false; - LOGGER.info("Begin to resume BallThread"); + LOGGER.info("BallThread resumed"); + notify(); } - public void stopMe() { - this.isRunning = false; - this.isSuspended = true; + public synchronized void stopMe() { + isRunning = false; + isSuspended = false; + LOGGER.info("BallThread stopping"); + notify(); } } ``` From a19abe961f1339dbf3ba2af96d36947453c0a4ab Mon Sep 17 00:00:00 2001 From: Nothere998 <190948515+Nothere998@users.noreply.github.com> Date: Sat, 7 Dec 2024 21:16:01 +0200 Subject: [PATCH 3/5] Scheduler pattern The Scheduler design pattern provides a clean and structured solution to manage and execute tasks automatically. It involves: Scheduler: A component responsible for managing the execution of tasks, ensuring they run at the correct time or under the right conditions. Task: A unit of work that encapsulates the logic to be executed. Trigger/Condition: Defines when and how often a task should execute, whether it's based on time intervals (e.g., every minute) or events Execution Context: Ensures that tasks are executed in the right environment, managing resources, states, and concurrency. By using this pattern, tasks are managed centrally, execution timing is automated, and the system can handle various triggers and execution conditions efficiently, allowing developers to focus on defining what needs to be done instead of when and how to execute it. --- Scheduler pattern | 69 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 Scheduler pattern diff --git a/Scheduler pattern b/Scheduler pattern new file mode 100644 index 000000000000..5ee6c80c136a --- /dev/null +++ b/Scheduler pattern @@ -0,0 +1,69 @@ +import java.util.ArrayList; +import java.util.List; +interface Job { + void execute(); +} +interface Trigger { + boolean shouldRun(); +} +class Scheduler { + private List jobs; + private List triggers; + public Scheduler() { + this.jobs = new ArrayList<>(); + this.triggers = new ArrayList<>(); + } + public void addJob(Job job, Trigger trigger) { + jobs.add(job); + triggers.add(trigger); + } + public void start() { + while (true) { + for (int i = 0; i < jobs.size(); i++) { + Job job = jobs.get(i); + Trigger trigger = triggers.get(i); + + if (trigger.shouldRun()) { + job.execute(); + } + } + try { + Thread.sleep(1000); // Delay for 1 second + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } +} +class ExampleJob implements Job { + @Override + public void execute() { + System.out.println("Executing ExampleJob at " + System.currentTimeMillis()); + } +} +class TimeBasedTrigger implements Trigger { + private long lastExecutionTime; + private long interval; + public TimeBasedTrigger(long interval) { + this.interval = interval; + this.lastExecutionTime = System.currentTimeMillis(); + } + @Override + public boolean shouldRun() { + long currentTime = System.currentTimeMillis(); + if (currentTime - lastExecutionTime >= interval) { + lastExecutionTime = currentTime; + return true; + } + return false; + } +} +public class JobSDesign { + public static void main(String[] args) { + Scheduler scheduler = new Scheduler(); + Job exampleJob = new ExampleJob(); + Trigger timeBasedTrigger = new TimeBasedTrigger(5000); // Run every 5 seconds + scheduler.addJob(exampleJob, timeBasedTrigger); + scheduler.start(); + } +} From 057cc17ac886e9ecd9b024a593bd2d66813822cd Mon Sep 17 00:00:00 2001 From: Nothere998 <190948515+Nothere998@users.noreply.github.com> Date: Sat, 7 Dec 2024 22:53:25 +0200 Subject: [PATCH 4/5] Delete Scheduler pattern --- Scheduler pattern | 69 ----------------------------------------------- 1 file changed, 69 deletions(-) delete mode 100644 Scheduler pattern diff --git a/Scheduler pattern b/Scheduler pattern deleted file mode 100644 index 5ee6c80c136a..000000000000 --- a/Scheduler pattern +++ /dev/null @@ -1,69 +0,0 @@ -import java.util.ArrayList; -import java.util.List; -interface Job { - void execute(); -} -interface Trigger { - boolean shouldRun(); -} -class Scheduler { - private List jobs; - private List triggers; - public Scheduler() { - this.jobs = new ArrayList<>(); - this.triggers = new ArrayList<>(); - } - public void addJob(Job job, Trigger trigger) { - jobs.add(job); - triggers.add(trigger); - } - public void start() { - while (true) { - for (int i = 0; i < jobs.size(); i++) { - Job job = jobs.get(i); - Trigger trigger = triggers.get(i); - - if (trigger.shouldRun()) { - job.execute(); - } - } - try { - Thread.sleep(1000); // Delay for 1 second - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } -} -class ExampleJob implements Job { - @Override - public void execute() { - System.out.println("Executing ExampleJob at " + System.currentTimeMillis()); - } -} -class TimeBasedTrigger implements Trigger { - private long lastExecutionTime; - private long interval; - public TimeBasedTrigger(long interval) { - this.interval = interval; - this.lastExecutionTime = System.currentTimeMillis(); - } - @Override - public boolean shouldRun() { - long currentTime = System.currentTimeMillis(); - if (currentTime - lastExecutionTime >= interval) { - lastExecutionTime = currentTime; - return true; - } - return false; - } -} -public class JobSDesign { - public static void main(String[] args) { - Scheduler scheduler = new Scheduler(); - Job exampleJob = new ExampleJob(); - Trigger timeBasedTrigger = new TimeBasedTrigger(5000); // Run every 5 seconds - scheduler.addJob(exampleJob, timeBasedTrigger); - scheduler.start(); - } -} From c3295a400dfcda3203cf6b15d11776e1e79ce2c7 Mon Sep 17 00:00:00 2001 From: Nothere998 <190948515+Nothere998@users.noreply.github.com> Date: Sat, 7 Dec 2024 22:57:03 +0200 Subject: [PATCH 5/5] Create schedule pattern --- schedule pattern | 69 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 schedule pattern diff --git a/schedule pattern b/schedule pattern new file mode 100644 index 000000000000..5ee6c80c136a --- /dev/null +++ b/schedule pattern @@ -0,0 +1,69 @@ +import java.util.ArrayList; +import java.util.List; +interface Job { + void execute(); +} +interface Trigger { + boolean shouldRun(); +} +class Scheduler { + private List jobs; + private List triggers; + public Scheduler() { + this.jobs = new ArrayList<>(); + this.triggers = new ArrayList<>(); + } + public void addJob(Job job, Trigger trigger) { + jobs.add(job); + triggers.add(trigger); + } + public void start() { + while (true) { + for (int i = 0; i < jobs.size(); i++) { + Job job = jobs.get(i); + Trigger trigger = triggers.get(i); + + if (trigger.shouldRun()) { + job.execute(); + } + } + try { + Thread.sleep(1000); // Delay for 1 second + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } +} +class ExampleJob implements Job { + @Override + public void execute() { + System.out.println("Executing ExampleJob at " + System.currentTimeMillis()); + } +} +class TimeBasedTrigger implements Trigger { + private long lastExecutionTime; + private long interval; + public TimeBasedTrigger(long interval) { + this.interval = interval; + this.lastExecutionTime = System.currentTimeMillis(); + } + @Override + public boolean shouldRun() { + long currentTime = System.currentTimeMillis(); + if (currentTime - lastExecutionTime >= interval) { + lastExecutionTime = currentTime; + return true; + } + return false; + } +} +public class JobSDesign { + public static void main(String[] args) { + Scheduler scheduler = new Scheduler(); + Job exampleJob = new ExampleJob(); + Trigger timeBasedTrigger = new TimeBasedTrigger(5000); // Run every 5 seconds + scheduler.addJob(exampleJob, timeBasedTrigger); + scheduler.start(); + } +}