From 210806aac7a10b4794a1e32c8014d4cab5365a6c Mon Sep 17 00:00:00 2001 From: Carlos Mendes Date: Sat, 14 Jun 2025 09:50:46 -0300 Subject: [PATCH 1/3] Introduce the ExecutionQueueController --- .../resiliencebench/BenchmarkController.java | 101 +++++---- .../ExecutionQueueController.java | 42 ++++ .../execution/DefaultQueueExecutor.java | 25 +-- .../execution/DefaultScenarioExecutor.java | 3 +- .../execution/ScenarioExecutor.java | 2 +- .../steps/UpdateStatusQueueStep.java | 1 - .../resources/scenario/ScenarioSpec.java | 2 - .../execution/DefaultQueueExecutorTest.java | 93 +++++++++ .../DefaultScenarioExecutorTest.java | 191 ++++++++++++++++++ 9 files changed, 382 insertions(+), 78 deletions(-) create mode 100644 resilience-bench/operator/src/main/java/io/resiliencebench/ExecutionQueueController.java create mode 100644 resilience-bench/operator/src/test/java/io/resiliencebench/execution/DefaultQueueExecutorTest.java create mode 100644 resilience-bench/operator/src/test/java/io/resiliencebench/execution/DefaultScenarioExecutorTest.java diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/BenchmarkController.java b/resilience-bench/operator/src/main/java/io/resiliencebench/BenchmarkController.java index 9639938..976bc00 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/BenchmarkController.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/BenchmarkController.java @@ -2,7 +2,6 @@ import java.util.List; -import io.resiliencebench.execution.QueueExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,65 +21,57 @@ @ControllerConfiguration public class BenchmarkController implements Reconciler { - private static final Logger logger = LoggerFactory.getLogger(BenchmarkController.class); + private static final Logger logger = LoggerFactory.getLogger(BenchmarkController.class); - private final CustomResourceRepository scenarioRepository; + private final CustomResourceRepository scenarioRepository; + private final CustomResourceRepository workloadRepository; + private final CustomResourceRepository queueRepository; - private final CustomResourceRepository workloadRepository; - private final CustomResourceRepository queueRepository; - - private final QueueExecutor queueExecutor; - - public BenchmarkController(QueueExecutor queueExecutor, - CustomResourceRepository scenarioRepository, - CustomResourceRepository workloadRepository, - CustomResourceRepository queueRepository) { - this.queueExecutor = queueExecutor; - this.scenarioRepository = scenarioRepository; - this.workloadRepository = workloadRepository; - this.queueRepository = queueRepository; - } - - // Considering only creation and update events. if something changes in benchmark, we need to re-run the scenarios - @Override - public UpdateControl reconcile(Benchmark benchmark, Context context) { - var workload = workloadRepository.find(benchmark.getMetadata().getNamespace(), benchmark.getSpec().getWorkload()); - if (workload.isEmpty()) { - logger.error("Workload not found: {}", benchmark.getSpec().getWorkload()); - return UpdateControl.noUpdate(); + public BenchmarkController( + CustomResourceRepository scenarioRepository, + CustomResourceRepository workloadRepository, + CustomResourceRepository queueRepository) { + this.scenarioRepository = scenarioRepository; + this.workloadRepository = workloadRepository; + this.queueRepository = queueRepository; } - var scenariosList = createScenarios(benchmark, workload.get()); - if (scenariosList.isEmpty()) { - logger.error("No scenarios generated for benchmark {}", benchmark.getMetadata().getName()); - return UpdateControl.noUpdate(); + @Override + public UpdateControl reconcile(Benchmark benchmark, Context context) { + logger.info("Reconciling Benchmark: {}", benchmark.getMetadata().getName()); + + var workload = workloadRepository.find(benchmark.getMetadata().getNamespace(), benchmark.getSpec().getWorkload()); + if (workload.isEmpty()) { + logger.error("Workload not found: {}", benchmark.getSpec().getWorkload()); + return UpdateControl.noUpdate(); + } + + var scenariosList = createScenarios(benchmark, workload.get()); + if (scenariosList.isEmpty()) { + logger.error("No scenarios generated for benchmark {}", benchmark.getMetadata().getName()); + return UpdateControl.noUpdate(); + } + + createExecutionQueue(benchmark, scenariosList); + + logger.info("Benchmark reconciled {}. {} scenarios created", + benchmark.getMetadata().getName(), + scenariosList.size() + ); + benchmark.setStatus(new BenchmarkStatus(scenariosList.size())); + return UpdateControl.updateStatus(benchmark); } - var executionQueue = prepareToRunScenarios(benchmark, scenariosList); - - logger.info("Benchmark reconciled {}. {} scenarios created", - benchmark.getMetadata().getName(), - scenariosList.size() - ); - benchmark.setStatus(new BenchmarkStatus(scenariosList.size())); - queueExecutor.execute(executionQueue); - return UpdateControl.updateStatus(benchmark); - } - - private List createScenarios(Benchmark benchmark, Workload workload) { - scenarioRepository.deleteAll(benchmark.getMetadata().getNamespace()); // TODO we don't support (yet) multiple reconciles loops - - var scenariosList = ScenarioFactory.create(benchmark, workload); - scenariosList.forEach(scenarioRepository::create); - return scenariosList; - } - - private ExecutionQueue prepareToRunScenarios(Benchmark benchmark, List scenariosList) { - scenarioRepository.deleteAll(benchmark.getMetadata().getNamespace()); - scenariosList.forEach(scenarioRepository::create); + private List createScenarios(Benchmark benchmark, Workload workload) { + scenarioRepository.deleteAll(benchmark.getMetadata().getNamespace()); + var scenariosList = ScenarioFactory.create(benchmark, workload); + scenariosList.forEach(scenarioRepository::create); + return scenariosList; + } - queueRepository.deleteAll(benchmark.getMetadata().getNamespace()); - var queueCreated = ExecutionQueueFactory.create(benchmark, scenariosList); - return queueRepository.create(queueCreated); - } + private ExecutionQueue createExecutionQueue(Benchmark benchmark, List scenariosList) { + queueRepository.deleteAll(benchmark.getMetadata().getNamespace()); + var queueCreated = ExecutionQueueFactory.create(benchmark, scenariosList); + return queueRepository.create(queueCreated); + } } diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/ExecutionQueueController.java b/resilience-bench/operator/src/main/java/io/resiliencebench/ExecutionQueueController.java new file mode 100644 index 0000000..2025058 --- /dev/null +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/ExecutionQueueController.java @@ -0,0 +1,42 @@ +package io.resiliencebench; + +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.resiliencebench.execution.QueueExecutor; +import io.resiliencebench.resources.queue.ExecutionQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ControllerConfiguration +public class ExecutionQueueController implements Reconciler { + + private static final Logger logger = LoggerFactory.getLogger(ExecutionQueueController.class); + + private final QueueExecutor queueExecutor; + + public ExecutionQueueController(QueueExecutor queueExecutor) { + this.queueExecutor = queueExecutor; + } + + @Override + public UpdateControl reconcile(ExecutionQueue queue, Context context) { + logger.info("Reconciling ExecutionQueue: {}", queue.getMetadata().getName()); + + if (queue.isDone()) { + logger.info("ExecutionQueue {} is already completed", queue.getMetadata().getName()); + return UpdateControl.noUpdate(); + } + + // Only execute if there are pending items and no running items + if (queue.getStatus().getRunning() == 0 && queue.getStatus().getPending() > 0) { + logger.info("Starting execution of next item in queue: {}", queue.getMetadata().getName()); + queueExecutor.execute(queue); + } else { + logger.debug("Queue {} is already being processed or has no pending items", queue.getMetadata().getName()); + } + + return UpdateControl.updateStatus(queue); + } +} \ No newline at end of file diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/DefaultQueueExecutor.java b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/DefaultQueueExecutor.java index 1e529f5..cbcf01a 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/DefaultQueueExecutor.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/DefaultQueueExecutor.java @@ -1,4 +1,3 @@ - package io.resiliencebench.execution; import static java.lang.String.format; @@ -18,43 +17,35 @@ public class DefaultQueueExecutor implements QueueExecutor { private final static Logger logger = LoggerFactory.getLogger(DefaultQueueExecutor.class); private final CustomResourceRepository scenarioRepository; - private final CustomResourceRepository executionRepository; private final ScenarioExecutor scenarioExecutor; - public DefaultQueueExecutor( - CustomResourceRepository scenarioRepository, - CustomResourceRepository executionRepository, - ScenarioExecutor scenarioExecutor) { + public DefaultQueueExecutor(CustomResourceRepository scenarioRepository, ScenarioExecutor scenarioExecutor) { this.scenarioRepository = scenarioRepository; - this.executionRepository = executionRepository; this.scenarioExecutor = scenarioExecutor; } @Override public void execute(ExecutionQueue queue) { - var queueToExecute = executionRepository.find(queue.getMetadata()) - .orElseThrow(() -> new RuntimeException("Queue not found " + queue.getMetadata().getName())); - - var nextItem = queueToExecute.getNextPendingItem(); + var nextItem = queue.getNextPendingItem(); if (nextItem.isPresent() && nextItem.get().isPending()) { - executeScenario(nextItem.get(), queueToExecute); + internalExecute(nextItem.get(), queue); } else { - logger.info("No item available for queue: {}", queueToExecute.getMetadata().getName()); - if (queueToExecute.isDone()) { - logger.info("All items finished for: {}", queueToExecute.getMetadata().getName()); + logger.info("No pending items available for queue: {}", queue.getMetadata().getName()); + if (queue.isDone()) { + logger.info("All items finished for: {}", queue.getMetadata().getName()); } } } - private void executeScenario(ExecutionQueueItem item, ExecutionQueue executionQueue) { + private void internalExecute(ExecutionQueueItem item, ExecutionQueue executionQueue) { var scenarioName = item.getScenario(); var namespace = executionQueue.getMetadata().getNamespace(); var scenario = scenarioRepository.find(namespace, scenarioName); if (scenario.isPresent()) { logger.info("Running scenario: {}", scenarioName); - scenarioExecutor.execute(scenario.get(), executionQueue, () -> execute(executionQueue)); + scenarioExecutor.execute(scenario.get(), executionQueue); } else { throw new RuntimeException(format("Scenario not found: %s.%s", namespace, scenarioName)); } diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/DefaultScenarioExecutor.java b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/DefaultScenarioExecutor.java index e75d759..fe69542 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/DefaultScenarioExecutor.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/DefaultScenarioExecutor.java @@ -44,7 +44,7 @@ public DefaultScenarioExecutor(KubernetesClient kubernetesClient, } @Override - public void execute(Scenario scenario, ExecutionQueue executionQueue, Runnable onCompletion) { + public void execute(Scenario scenario, ExecutionQueue executionQueue) { var ns = scenario.getMetadata().getNamespace(); var workloadName = scenario.getSpec().getWorkload().getWorkloadName(); var workload = workloadRepository.find(ns, workloadName) @@ -70,7 +70,6 @@ public void eventReceived(Action action, Job resource) { scenario.getMetadata().getAnnotations().get(OWNED_BY) ); executePostExecutionSteps(scenario, executionQueue); - onCompletion.run(); } } @Override diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/ScenarioExecutor.java b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/ScenarioExecutor.java index ba104f8..f5f0391 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/ScenarioExecutor.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/ScenarioExecutor.java @@ -4,5 +4,5 @@ import io.resiliencebench.resources.scenario.Scenario; public interface ScenarioExecutor { - void execute(Scenario scenario, ExecutionQueue executionQueue, Runnable onCompletion); + void execute(Scenario scenario, ExecutionQueue executionQueue); } diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/steps/UpdateStatusQueueStep.java b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/steps/UpdateStatusQueueStep.java index e9a339e..9fbf271 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/steps/UpdateStatusQueueStep.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/steps/UpdateStatusQueueStep.java @@ -12,7 +12,6 @@ import org.springframework.stereotype.Service; import java.time.LocalDateTime; -import java.util.stream.Collectors; import static io.resiliencebench.resources.queue.ExecutionQueueItem.Status.*; import static java.time.Duration.ofSeconds; diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/resources/scenario/ScenarioSpec.java b/resilience-bench/operator/src/main/java/io/resiliencebench/resources/scenario/ScenarioSpec.java index fd85de2..ddb0b68 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/resources/scenario/ScenarioSpec.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/resources/scenario/ScenarioSpec.java @@ -9,8 +9,6 @@ import java.util.List; import java.util.Map; -import static io.vertx.core.json.JsonObject.mapFrom; - public class ScenarioSpec { @JsonPropertyDescription("The workload to be used in the scenario") diff --git a/resilience-bench/operator/src/test/java/io/resiliencebench/execution/DefaultQueueExecutorTest.java b/resilience-bench/operator/src/test/java/io/resiliencebench/execution/DefaultQueueExecutorTest.java new file mode 100644 index 0000000..1008775 --- /dev/null +++ b/resilience-bench/operator/src/test/java/io/resiliencebench/execution/DefaultQueueExecutorTest.java @@ -0,0 +1,93 @@ +package io.resiliencebench.execution; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.resiliencebench.resources.queue.ExecutionQueue; +import io.resiliencebench.resources.queue.ExecutionQueueItem; +import io.resiliencebench.resources.queue.ExecutionQueueSpec; +import io.resiliencebench.resources.scenario.Scenario; +import io.resiliencebench.support.CustomResourceRepository; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; +import java.util.Optional; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +class DefaultQueueExecutorTest { + + @Mock + private CustomResourceRepository scenarioRepository; + + @Mock + private ScenarioExecutor scenarioExecutor; + + private DefaultQueueExecutor executor; + + @BeforeEach + void setUp() { + executor = new DefaultQueueExecutor(scenarioRepository, scenarioExecutor); + } + + @Test + void shouldExecuteNextPendingItem() { + // Given + var scenario = new Scenario(); + scenario.setMetadata(new ObjectMetaBuilder().withName("test-scenario").build()); + + var item = new ExecutionQueueItem("test-scenario", "result.json"); + var queue = createQueueWithItems(item); + + when(scenarioRepository.find(eq("test-namespace"), eq("test-scenario"))) + .thenReturn(Optional.of(scenario)); + + // When + executor.execute(queue); + + // Then + verify(scenarioExecutor).execute(eq(scenario), eq(queue)); + } + + @Test + void shouldNotExecuteWhenNoPendingItems() { + // Given + var item = new ExecutionQueueItem("test-scenario", "result.json"); + item.setStatus(ExecutionQueueItem.Status.RUNNING); + var queue = createQueueWithItems(item); + + // When + executor.execute(queue); + + // Then + verify(scenarioExecutor, never()).execute(any(), any()); + } + + @Test + void shouldNotExecuteWhenQueueIsDone() { + // Given + var item = new ExecutionQueueItem("test-scenario", "result.json"); + item.setStatus(ExecutionQueueItem.Status.FINISHED); + var queue = createQueueWithItems(item); + + // When + executor.execute(queue); + + // Then + verify(scenarioExecutor, never()).execute(any(), any()); + } + + private ExecutionQueue createQueueWithItems(ExecutionQueueItem... items) { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + var spec = new ExecutionQueueSpec("results.json", List.of(items), "test-benchmark"); + return new ExecutionQueue(spec, meta); + } +} \ No newline at end of file diff --git a/resilience-bench/operator/src/test/java/io/resiliencebench/execution/DefaultScenarioExecutorTest.java b/resilience-bench/operator/src/test/java/io/resiliencebench/execution/DefaultScenarioExecutorTest.java new file mode 100644 index 0000000..dfbed98 --- /dev/null +++ b/resilience-bench/operator/src/test/java/io/resiliencebench/execution/DefaultScenarioExecutorTest.java @@ -0,0 +1,191 @@ +package io.resiliencebench.execution; + +import io.fabric8.kubernetes.api.model.batch.v1.JobStatusBuilder; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; +import io.fabric8.kubernetes.api.model.batch.v1.JobList; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.dsl.BatchAPIGroupDSL; +import io.fabric8.kubernetes.client.dsl.V1BatchAPIGroupDSL; +import io.fabric8.kubernetes.client.dsl.MixedOperation; +import io.fabric8.kubernetes.client.dsl.ScalableResource; +import io.resiliencebench.execution.steps.StepRegistry; +import io.resiliencebench.execution.steps.k6.K6JobFactory; +import io.resiliencebench.resources.queue.ExecutionQueue; +import io.resiliencebench.resources.queue.ExecutionQueueSpec; +import io.resiliencebench.resources.scenario.Scenario; +import io.resiliencebench.resources.scenario.ScenarioSpec; +import io.resiliencebench.resources.scenario.ScenarioWorkload; +import io.resiliencebench.resources.workload.Workload; +import io.resiliencebench.resources.workload.WorkloadSpec; +import io.resiliencebench.support.CustomResourceRepository; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; + +import static io.resiliencebench.support.Annotations.SCENARIO; +import static java.util.Optional.of; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +class DefaultScenarioExecutorTest { + + @Mock + private KubernetesClient kubernetesClient; + + @Mock + private BatchAPIGroupDSL batchApiGroup; + + @Mock + private V1BatchAPIGroupDSL v1ApiGroup; + + @Mock + private MixedOperation> jobsOperation; + + @Mock + private ScalableResource jobResource; + + @Mock + private StepRegistry stepRegistry; + + @Mock + private K6JobFactory k6JobFactory; + + @Mock + private CustomResourceRepository scenarioRepository; + + @Mock + private CustomResourceRepository executionRepository; + + @Mock + private CustomResourceRepository workloadRepository; + + @Captor + private ArgumentCaptor> jobWatcherCaptor; + + private DefaultScenarioExecutor executor; + + @BeforeEach + void setUp() { + when(kubernetesClient.batch()).thenReturn(batchApiGroup); + when(batchApiGroup.v1()).thenReturn(v1ApiGroup); + when(v1ApiGroup.jobs()).thenReturn(jobsOperation); + when(jobsOperation.inNamespace(anyString())).thenReturn(jobsOperation); + when(jobsOperation.withName(anyString())).thenReturn(jobResource); + when(jobsOperation.resource(any(Job.class))).thenReturn(jobResource); + + executor = new DefaultScenarioExecutor( + kubernetesClient, + stepRegistry, + k6JobFactory, + scenarioRepository, + executionRepository, + workloadRepository + ); + } + + @Test + void shouldExecuteScenario() { + // Given + var scenario = createScenario(); + var queue = createQueue(); + var workload = createWorkload(); + var job = createJob(); + + when(workloadRepository.find(eq("test-namespace"), eq("test-workload"))) + .thenReturn(of(workload)); + when(k6JobFactory.create(eq(scenario), eq(workload), any())) + .thenReturn(job); + + // When + executor.execute(scenario, queue); + + // Then + verify(jobsOperation, times(2)).resource(job); + verify(jobResource).create(); + verify(jobResource).watch(any(Watcher.class)); + } + + @Test + void shouldExecutePreparationAndPostExecutionSteps() { + // Given + var scenario = createScenario(); + var queue = createQueue(); + var workload = createWorkload(); + var job = createJob(); + + when(workloadRepository.find(eq("test-namespace"), eq("test-workload"))) + .thenReturn(of(workload)); + when(scenarioRepository.get(eq("test-namespace"), eq(scenario.getMetadata().getName()))) + .thenReturn(scenario); + when(k6JobFactory.create(eq(scenario), eq(workload), any())) + .thenReturn(job); + + // When + executor.execute(scenario, queue); + + Job finishedJob = createJob(); + finishedJob.setStatus(new JobStatusBuilder().withCompletionTime("2024-01-01T00:00:00Z").build()); + verify(jobResource).watch(jobWatcherCaptor.capture()); + Watcher watcher = jobWatcherCaptor.getValue(); + watcher.eventReceived(Watcher.Action.MODIFIED, finishedJob); + + // Then + verify(stepRegistry).getPreparationSteps(); + verify(stepRegistry).getPostExecutionSteps(); + } + + private Scenario createScenario() { + var scenario = new Scenario(); + scenario.setMetadata(new io.fabric8.kubernetes.api.model.ObjectMetaBuilder() + .withName("test-scenario") + .withNamespace("test-namespace") + .build()); + scenario.setSpec(new ScenarioSpec( + "test-scenario", + new ScenarioWorkload("test-workload", 1), + null, + null + )); + return scenario; + } + + private ExecutionQueue createQueue() { + var meta = new io.fabric8.kubernetes.api.model.ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + var spec = new ExecutionQueueSpec("results.json", List.of(), "test-benchmark"); + return new ExecutionQueue(spec, meta); + } + + private Workload createWorkload() { + var workload = new Workload(); + workload.setMetadata(new io.fabric8.kubernetes.api.model.ObjectMetaBuilder() + .withName("test-workload") + .withNamespace("test-namespace") + .build()); + var spec = new WorkloadSpec(); + workload.setSpec(spec); + return workload; + } + + private Job createJob() { + return new JobBuilder() + .withNewMetadata() + .withName("test-job") + .withNamespace("test-namespace") + .addToAnnotations(SCENARIO, "test-scenario") + .endMetadata() + .build(); + } +} \ No newline at end of file From 941933dcec68c4dfbc7448a5c394d48567ee99b1 Mon Sep 17 00:00:00 2001 From: Carlos Mendes Date: Sun, 7 Sep 2025 11:12:28 -0300 Subject: [PATCH 2/3] implement enhanced execution queue status tracking --- .../resiliencebench/BenchmarkController.java | 13 +- .../ExecutionQueueController.java | 20 +- .../execution/BenchmarkStatusUpdater.java | 101 ------ .../ExecutionQueueStatusUpdater.java | 111 ++++++ .../steps/UpdateStatusQueueStep.java | 22 +- .../resources/benchmark/BenchmarkStatus.java | 58 +-- .../resources/queue/ExecutionQueue.java | 30 +- .../resources/queue/ExecutionQueueStatus.java | 205 +++++++++++ .../execution/DefaultQueueExecutorTest.java | 28 -- .../ExecutionQueueStatusUpdaterTest.java | 248 +++++++++++++ .../benchmark/BenchmarkStatusTest.java | 334 ------------------ .../queue/ExecutionQueueStatusTest.java | 194 ++++++++++ .../resources/queue/ExecutionQueueTest.java | 320 +++++++++++++++++ 13 files changed, 1155 insertions(+), 529 deletions(-) delete mode 100644 resilience-bench/operator/src/main/java/io/resiliencebench/execution/BenchmarkStatusUpdater.java create mode 100644 resilience-bench/operator/src/main/java/io/resiliencebench/execution/ExecutionQueueStatusUpdater.java create mode 100644 resilience-bench/operator/src/main/java/io/resiliencebench/resources/queue/ExecutionQueueStatus.java create mode 100644 resilience-bench/operator/src/test/java/io/resiliencebench/execution/ExecutionQueueStatusUpdaterTest.java create mode 100644 resilience-bench/operator/src/test/java/io/resiliencebench/resources/queue/ExecutionQueueStatusTest.java create mode 100644 resilience-bench/operator/src/test/java/io/resiliencebench/resources/queue/ExecutionQueueTest.java diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/BenchmarkController.java b/resilience-bench/operator/src/main/java/io/resiliencebench/BenchmarkController.java index 3302de0..9f380d0 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/BenchmarkController.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/BenchmarkController.java @@ -69,7 +69,7 @@ public UpdateControl reconcile(Benchmark benchmark, Context reconcile(Benchmark benchmark, Context updateStatusWithError(Benchmark benchmark, String errorMessage) { var status = benchmark.getStatus(); if (status == null) { - status = new BenchmarkStatus(0); + status = new BenchmarkStatus(benchmark.getMetadata().getName()); benchmark.setStatus(status); } status.markAsFailed(errorMessage); @@ -93,19 +93,20 @@ private UpdateControl updateStatusWithError(Benchmark benchmark, Stri return UpdateControl.updateStatus(benchmark); } - private BenchmarkStatus createOrUpdateStatus(Benchmark benchmark, int totalScenarios) { + private BenchmarkStatus createOrUpdateStatus(Benchmark benchmark, String queueName) { var currentStatus = benchmark.getStatus(); var currentGeneration = benchmark.getMetadata().getGeneration(); if (currentStatus == null || currentStatus.needsReconciliation(currentGeneration)) { - var newStatus = new BenchmarkStatus(totalScenarios); + var newStatus = new BenchmarkStatus(queueName); newStatus.setObservedGeneration(currentGeneration); - logger.info("Created new status for benchmark {} with {} total scenarios", - benchmark.getMetadata().getName(), totalScenarios); + logger.info("Created new status for benchmark {} with execution queue {}", + benchmark.getMetadata().getName(), queueName); return newStatus; } else { currentStatus.updateReconcileTime(); currentStatus.setObservedGeneration(currentGeneration); + currentStatus.setExecutionQueueName(queueName); logger.info("Updated existing status for benchmark {}", benchmark.getMetadata().getName()); return currentStatus; } diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/ExecutionQueueController.java b/resilience-bench/operator/src/main/java/io/resiliencebench/ExecutionQueueController.java index 2025058..37d4013 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/ExecutionQueueController.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/ExecutionQueueController.java @@ -5,7 +5,10 @@ import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.resiliencebench.execution.QueueExecutor; +import io.resiliencebench.execution.ExecutionQueueStatusUpdater; +import io.resiliencebench.resources.benchmark.Benchmark; import io.resiliencebench.resources.queue.ExecutionQueue; +import io.resiliencebench.support.CustomResourceRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,14 +18,25 @@ public class ExecutionQueueController implements Reconciler { private static final Logger logger = LoggerFactory.getLogger(ExecutionQueueController.class); private final QueueExecutor queueExecutor; + private final ExecutionQueueStatusUpdater statusUpdater; - public ExecutionQueueController(QueueExecutor queueExecutor) { + public ExecutionQueueController(QueueExecutor queueExecutor, + ExecutionQueueStatusUpdater statusUpdater) { this.queueExecutor = queueExecutor; + this.statusUpdater = statusUpdater; } @Override public UpdateControl reconcile(ExecutionQueue queue, Context context) { logger.info("Reconciling ExecutionQueue: {}", queue.getMetadata().getName()); + + // Initialize status if needed + if (queue.getStatus() == null) { + queue.updateStatusFromItems(); + } + + // Update status based on current items state + statusUpdater.updateQueueProgress(queue.getMetadata().getNamespace(), queue.getMetadata().getName()); if (queue.isDone()) { logger.info("ExecutionQueue {} is already completed", queue.getMetadata().getName()); @@ -30,7 +44,9 @@ public UpdateControl reconcile(ExecutionQueue queue, Context 0) { + if (queue.getStatus() != null && + queue.getStatus().getRunning() == 0 && + queue.getStatus().getPending() > 0) { logger.info("Starting execution of next item in queue: {}", queue.getMetadata().getName()); queueExecutor.execute(queue); } else { diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/BenchmarkStatusUpdater.java b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/BenchmarkStatusUpdater.java deleted file mode 100644 index c436d38..0000000 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/BenchmarkStatusUpdater.java +++ /dev/null @@ -1,101 +0,0 @@ -package io.resiliencebench.execution; - -import io.resiliencebench.resources.benchmark.Benchmark; -import io.resiliencebench.resources.queue.ExecutionQueue; -import io.resiliencebench.resources.queue.ExecutionQueueItem; -import io.resiliencebench.support.CustomResourceRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -@Service -public class BenchmarkStatusUpdater { - - private static final Logger logger = LoggerFactory.getLogger(BenchmarkStatusUpdater.class); - - private final CustomResourceRepository benchmarkRepository; - private final CustomResourceRepository queueRepository; - - public BenchmarkStatusUpdater(CustomResourceRepository benchmarkRepository, - CustomResourceRepository queueRepository) { - this.benchmarkRepository = benchmarkRepository; - this.queueRepository = queueRepository; - } - - /** - * Updates the benchmark status based on the current execution queue state - */ - public void updateBenchmarkProgress(String namespace, String benchmarkName) { - try { - var benchmarkOpt = benchmarkRepository.find(namespace, benchmarkName); - var queueOpt = queueRepository.find(namespace, benchmarkName); - - if (benchmarkOpt.isEmpty()) { - logger.warn("Benchmark not found: {}/{}", namespace, benchmarkName); - return; - } - - if (queueOpt.isEmpty()) { - logger.warn("Execution queue not found: {}/{}", namespace, benchmarkName); - return; - } - - var benchmark = benchmarkOpt.get(); - var queue = queueOpt.get(); - var status = benchmark.getStatus(); - - if (status == null) { - logger.warn("Benchmark status is null for: {}/{}", namespace, benchmarkName); - return; - } - - var progress = calculateProgress(queue); - - status.updateProgress(progress.running(), progress.completed()); - status.updateReconcileTime(); - benchmarkRepository.updateStatus(benchmark); - - logger.debug("Updated benchmark {} progress: {} running, {} completed, {} total", - benchmarkName, progress.running(), progress.completed(), status.getTotalScenarios()); - - } catch (Exception e) { - logger.error("Error updating benchmark status for: " + namespace + "/" + benchmarkName, e); - } - } - - /** - * Marks a specific scenario as started in the benchmark status - */ - public void markScenarioAsStarted(String namespace, String benchmarkName, String scenarioName) { - try { - logger.info("Marking scenario {} as started for benchmark {}/{}", scenarioName, namespace, benchmarkName); - updateBenchmarkProgress(namespace, benchmarkName); - } catch (Exception e) { - logger.error("Error marking scenario as started: " + scenarioName, e); - } - } - - /** - * Marks a specific scenario as completed in the benchmark status - */ - public void markScenarioAsCompleted(String namespace, String benchmarkName, String scenarioName) { - try { - logger.info("Marking scenario {} as completed for benchmark {}/{}", scenarioName, namespace, benchmarkName); - updateBenchmarkProgress(namespace, benchmarkName); - } catch (Exception e) { - logger.error("Error marking scenario as completed: " + scenarioName, e); - } - } - - /** - * Calculates the current progress from the execution queue - */ - private ProgressInfo calculateProgress(ExecutionQueue queue) { - var items = queue.getSpec().getItems(); - var running = (int) items.stream().filter(ExecutionQueueItem::isRunning).count(); - var completed = (int) items.stream().filter(ExecutionQueueItem::isFinished).count(); - return new ProgressInfo(running, completed); - } - - private record ProgressInfo(int running, int completed) {} -} diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/ExecutionQueueStatusUpdater.java b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/ExecutionQueueStatusUpdater.java new file mode 100644 index 0000000..cee25d7 --- /dev/null +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/ExecutionQueueStatusUpdater.java @@ -0,0 +1,111 @@ +package io.resiliencebench.execution; + +import io.resiliencebench.resources.queue.ExecutionQueue; +import io.resiliencebench.resources.queue.ExecutionQueueItem; +import io.resiliencebench.support.CustomResourceRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +@Service +public class ExecutionQueueStatusUpdater { + + private static final Logger logger = LoggerFactory.getLogger(ExecutionQueueStatusUpdater.class); + + private final CustomResourceRepository queueRepository; + + public ExecutionQueueStatusUpdater(CustomResourceRepository queueRepository) { + this.queueRepository = queueRepository; + } + + /** + * Updates the execution queue status based on the current queue items state + */ + public void updateQueueProgress(String namespace, String queueName) { + try { + var queueOpt = queueRepository.find(namespace, queueName); + + if (queueOpt.isEmpty()) { + logger.warn("Execution queue not found: {}/{}", namespace, queueName); + return; + } + + var queue = queueOpt.get(); + updateQueueStatus(queue); + queueRepository.updateStatus(queue); + + logger.debug("Updated queue {} progress: {} running, {} completed, {} pending, {} total", + queueName, queue.getStatus().getRunning(), queue.getStatus().getCompletedScenarios(), + queue.getStatus().getPending(), queue.getStatus().getTotalScenarios()); + + } catch (Exception e) { + logger.error("Error updating execution queue status for: " + namespace + "/" + queueName, e); + } + } + + /** + * Marks a specific scenario as started in the execution queue + */ + public void markScenarioAsStarted(String namespace, String queueName, String scenarioName) { + try { + logger.info("Marking scenario {} as started for queue {}/{}", scenarioName, namespace, queueName); + var queueOpt = queueRepository.find(namespace, queueName); + + if (queueOpt.isEmpty()) { + logger.warn("Execution queue not found: {}/{}", namespace, queueName); + return; + } + + var queue = queueOpt.get(); + var queueItem = queue.getItem(scenarioName); + + if (queueItem != null && queueItem.isPending()) { + queueItem.markAsRunning(); + updateQueueStatus(queue); + queueRepository.update(queue); + } + + } catch (Exception e) { + logger.error("Error marking scenario as started: " + scenarioName, e); + } + } + + /** + * Marks a specific scenario as completed in the execution queue + */ + public void markScenarioAsCompleted(String namespace, String queueName, String scenarioName) { + try { + logger.info("Marking scenario {} as completed for queue {}/{}", scenarioName, namespace, queueName); + var queueOpt = queueRepository.find(namespace, queueName); + + if (queueOpt.isEmpty()) { + logger.warn("Execution queue not found: {}/{}", namespace, queueName); + return; + } + + var queue = queueOpt.get(); + var queueItem = queue.getItem(scenarioName); + + if (queueItem != null && queueItem.isRunning()) { + queueItem.markAsCompleted(); + updateQueueStatus(queue); + queueRepository.update(queue); + } + + } catch (Exception e) { + logger.error("Error marking scenario as completed: " + scenarioName, e); + } + } + + /** + * Updates the execution queue status based on current items state + */ + private void updateQueueStatus(ExecutionQueue queue) { + queue.updateStatusFromItems(); + + // Set observed generation if available + if (queue.getMetadata() != null && queue.getMetadata().getGeneration() != null) { + queue.getStatus().setObservedGeneration(queue.getMetadata().getGeneration()); + } + } +} diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/steps/UpdateStatusQueueStep.java b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/steps/UpdateStatusQueueStep.java index 00cf242..45a3918 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/execution/steps/UpdateStatusQueueStep.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/execution/steps/UpdateStatusQueueStep.java @@ -7,7 +7,7 @@ import io.resiliencebench.resources.queue.ExecutionQueue; import io.resiliencebench.resources.scenario.Scenario; import io.resiliencebench.support.CustomResourceRepository; -import io.resiliencebench.execution.BenchmarkStatusUpdater; +import io.resiliencebench.execution.ExecutionQueueStatusUpdater; import org.springframework.stereotype.Service; import static java.time.Duration.ofSeconds; @@ -16,13 +16,13 @@ public class UpdateStatusQueueStep extends ExecutorStep { private final CustomResourceRepository executionRepository; - private final BenchmarkStatusUpdater statusUpdater; + private final ExecutionQueueStatusUpdater statusUpdater; private final RetryConfig retryConfig; public UpdateStatusQueueStep(KubernetesClient kubernetesClient, CustomResourceRepository executionRepository, - BenchmarkStatusUpdater statusUpdater) { + ExecutionQueueStatusUpdater statusUpdater) { super(kubernetesClient); this.executionRepository = executionRepository; this.statusUpdater = statusUpdater; @@ -54,21 +54,21 @@ private void updateQueueItem(String queueName, String scenarioName, String names @Override public void internalExecute(Scenario scenario, ExecutionQueue executionQueue) { - var benchmarkName = executionQueue.getMetadata().getName(); + var queueName = executionQueue.getMetadata().getName(); var scenarioName = scenario.getMetadata().getName(); var namespace = scenario.getMetadata().getNamespace(); Retry.of("updateQueueItem", retryConfig) - .executeRunnable(() -> updateStatus(benchmarkName, scenarioName, namespace)); + .executeRunnable(() -> updateStatus(queueName, scenarioName, namespace)); } - private void updateStatus(String benchmarkName, String scenarioName, String namespace) { - updateQueueItem(benchmarkName, scenarioName, namespace); - var queue = executionRepository.get(namespace, benchmarkName); + private void updateStatus(String queueName, String scenarioName, String namespace) { + updateQueueItem(queueName, scenarioName, namespace); + var queue = executionRepository.get(namespace, queueName); var queueItem = queue.getItem(scenarioName); - if (queueItem.isRunning()) { - statusUpdater.markScenarioAsStarted(namespace, benchmarkName, scenarioName); + if (queueItem.isPending()) { + statusUpdater.markScenarioAsStarted(namespace, queueName, scenarioName); } else if (queueItem.isFinished()) { - statusUpdater.markScenarioAsCompleted(namespace, benchmarkName, scenarioName); + statusUpdater.markScenarioAsCompleted(namespace, queueName, scenarioName); } } } diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/resources/benchmark/BenchmarkStatus.java b/resilience-bench/operator/src/main/java/io/resiliencebench/resources/benchmark/BenchmarkStatus.java index 0965e00..a7cc84c 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/resources/benchmark/BenchmarkStatus.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/resources/benchmark/BenchmarkStatus.java @@ -13,14 +13,8 @@ public class BenchmarkStatus { @PrinterColumn(name = "Phase", priority = 0) private String phase; - @PrinterColumn(name = "Total Scenarios", priority = 1) - private int totalScenarios; - - @PrinterColumn(name = "Running", priority = 2) - private int runningScenarios; - - @PrinterColumn(name = "Completed", priority = 3) - private int completedScenarios; + @PrinterColumn(name = "Execution Queue", priority = 1) + private String executionQueueName; private String executionId; private String lastReconcileTime; @@ -32,22 +26,17 @@ public class BenchmarkStatus { public BenchmarkStatus() { } - public BenchmarkStatus(int totalScenarios) { - this.totalScenarios = totalScenarios; + public BenchmarkStatus(String executionQueueName) { + this.executionQueueName = executionQueueName; this.phase = Phase.PENDING; - this.runningScenarios = 0; - this.completedScenarios = 0; this.executionId = generateExecutionId(); this.lastReconcileTime = getCurrentTimestamp(); this.startTime = getCurrentTimestamp(); } - public BenchmarkStatus(String phase, int totalScenarios, int runningScenarios, int completedScenarios, - String executionId, Long observedGeneration) { + public BenchmarkStatus(String phase, String executionQueueName, String executionId, Long observedGeneration) { this.phase = phase; - this.totalScenarios = totalScenarios; - this.runningScenarios = runningScenarios; - this.completedScenarios = completedScenarios; + this.executionQueueName = executionQueueName; this.executionId = executionId; this.observedGeneration = observedGeneration; this.lastReconcileTime = getCurrentTimestamp(); @@ -62,28 +51,12 @@ public void setPhase(String phase) { this.phase = phase; } - public int getTotalScenarios() { - return totalScenarios; - } - - public void setTotalScenarios(int totalScenarios) { - this.totalScenarios = totalScenarios; - } - - public int getRunningScenarios() { - return runningScenarios; - } - - public void setRunningScenarios(int runningScenarios) { - this.runningScenarios = runningScenarios; - } - - public int getCompletedScenarios() { - return completedScenarios; + public String getExecutionQueueName() { + return executionQueueName; } - public void setCompletedScenarios(int completedScenarios) { - this.completedScenarios = completedScenarios; + public void setExecutionQueueName(String executionQueueName) { + this.executionQueueName = executionQueueName; } public String getExecutionId() { @@ -165,15 +138,8 @@ public void markAsFailed(String errorMessage) { this.message = errorMessage; } - public void updateProgress(int running, int completed) { - this.runningScenarios = running; - this.completedScenarios = completed; - - if (completed == totalScenarios) { - markAsCompleted(); - } else if (running > 0 || completed > 0) { - this.phase = Phase.RUNNING; - } + public void updatePhaseFromQueue(String queuePhase) { + this.phase = queuePhase; } private static String getCurrentTimestamp() { diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/resources/queue/ExecutionQueue.java b/resilience-bench/operator/src/main/java/io/resiliencebench/resources/queue/ExecutionQueue.java index 529af18..d3875aa 100644 --- a/resilience-bench/operator/src/main/java/io/resiliencebench/resources/queue/ExecutionQueue.java +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/resources/queue/ExecutionQueue.java @@ -13,12 +13,23 @@ @ShortNames("eq") @Plural("queues") @Kind("Queue") -public class ExecutionQueue extends CustomResource implements Namespaced { +public class ExecutionQueue extends CustomResource implements Namespaced { ExecutionQueue() { } public ExecutionQueue(ExecutionQueueSpec spec, ObjectMeta meta) { this.spec = spec; this.setMetadata(meta); + initializeStatus(); + } + + private void initializeStatus() { + if (this.status == null && this.spec != null && this.spec.getItems() != null) { + var executionId = + this.getMetadata().getLabels() != null && !this.getMetadata().getLabels().isEmpty() + ? this.getMetadata().getLabels().get("execution-id") + : "exec-" + System.currentTimeMillis(); + this.status = new ExecutionQueueStatus(this.spec.getItems().size(), executionId); + } } @JsonIgnore @@ -35,4 +46,21 @@ public Optional getNextPendingItem() { public boolean isDone() { return getSpec().getItems().stream().allMatch(ExecutionQueueItem::isFinished); } + + @JsonIgnore + public void updateStatusFromItems() { + if (this.status == null) { + initializeStatus(); + } + + if (this.status != null && this.spec != null && this.spec.getItems() != null) { + var items = this.spec.getItems(); + var running = (int) items.stream().filter(ExecutionQueueItem::isRunning).count(); + var completed = (int) items.stream().filter(ExecutionQueueItem::isFinished).count(); + var pending = (int) items.stream().filter(ExecutionQueueItem::isPending).count(); + + this.status.updateProgress(running, completed, pending); + this.status.updateReconcileTime(); + } + } } diff --git a/resilience-bench/operator/src/main/java/io/resiliencebench/resources/queue/ExecutionQueueStatus.java b/resilience-bench/operator/src/main/java/io/resiliencebench/resources/queue/ExecutionQueueStatus.java new file mode 100644 index 0000000..f788372 --- /dev/null +++ b/resilience-bench/operator/src/main/java/io/resiliencebench/resources/queue/ExecutionQueueStatus.java @@ -0,0 +1,205 @@ +package io.resiliencebench.resources.queue; + +import io.fabric8.crd.generator.annotation.PrinterColumn; +import com.fasterxml.jackson.annotation.JsonIgnore; +import io.resiliencebench.resources.Phase; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Objects; + +public class ExecutionQueueStatus { + + @PrinterColumn(name = "Phase", priority = 0) + private String phase; + + @PrinterColumn(name = "Total Scenarios", priority = 1) + private int totalScenarios; + + @PrinterColumn(name = "Running", priority = 2) + private int runningScenarios; + + @PrinterColumn(name = "Completed", priority = 3) + private int completedScenarios; + + @PrinterColumn(name = "Pending", priority = 4) + private int pendingScenarios; + + private String executionId; + private String lastReconcileTime; + private String startTime; + private String completionTime; + private String message; + private Long observedGeneration; + + public ExecutionQueueStatus() { + } + + public ExecutionQueueStatus(int totalScenarios, String executionId) { + this.totalScenarios = totalScenarios; + this.phase = Phase.PENDING; + this.runningScenarios = 0; + this.completedScenarios = 0; + this.pendingScenarios = totalScenarios; + this.executionId = executionId; + this.lastReconcileTime = getCurrentTimestamp(); + this.startTime = getCurrentTimestamp(); + } + + public ExecutionQueueStatus(String phase, int totalScenarios, int runningScenarios, int completedScenarios, + int pendingScenarios, String executionId, Long observedGeneration) { + this.phase = phase; + this.totalScenarios = totalScenarios; + this.runningScenarios = runningScenarios; + this.completedScenarios = completedScenarios; + this.pendingScenarios = pendingScenarios; + this.executionId = executionId; + this.observedGeneration = observedGeneration; + this.lastReconcileTime = getCurrentTimestamp(); + } + + // Getters and setters + public String getPhase() { + return phase; + } + + public void setPhase(String phase) { + this.phase = phase; + } + + public int getTotalScenarios() { + return totalScenarios; + } + + public void setTotalScenarios(int totalScenarios) { + this.totalScenarios = totalScenarios; + } + + public int getRunningScenarios() { + return runningScenarios; + } + + public void setRunningScenarios(int runningScenarios) { + this.runningScenarios = runningScenarios; + } + + public int getCompletedScenarios() { + return completedScenarios; + } + + public void setCompletedScenarios(int completedScenarios) { + this.completedScenarios = completedScenarios; + } + + public int getPendingScenarios() { + return pendingScenarios; + } + + public void setPendingScenarios(int pendingScenarios) { + this.pendingScenarios = pendingScenarios; + } + + public String getExecutionId() { + return executionId; + } + + public void setExecutionId(String executionId) { + this.executionId = executionId; + } + + public String getLastReconcileTime() { + return lastReconcileTime; + } + + public void setLastReconcileTime(String lastReconcileTime) { + this.lastReconcileTime = lastReconcileTime; + } + + public String getStartTime() { + return startTime; + } + + public void setStartTime(String startTime) { + this.startTime = startTime; + } + + public String getCompletionTime() { + return completionTime; + } + + public void setCompletionTime(String completionTime) { + this.completionTime = completionTime; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public Long getObservedGeneration() { + return observedGeneration; + } + + public void setObservedGeneration(Long observedGeneration) { + this.observedGeneration = observedGeneration; + } + + @JsonIgnore + public boolean isRunning() { + return Phase.RUNNING.equals(phase); + } + + @JsonIgnore + public boolean isCompleted() { + return Phase.COMPLETED.equals(phase) || Phase.FAILED.equals(phase); + } + + @JsonIgnore + public boolean needsReconciliation(Long currentGeneration) { + return observedGeneration == null || !Objects.equals(observedGeneration, currentGeneration); + } + + public void updateReconcileTime() { + this.lastReconcileTime = getCurrentTimestamp(); + } + + public void markAsCompleted() { + this.phase = Phase.COMPLETED; + this.completionTime = getCurrentTimestamp(); + } + + public void markAsFailed(String errorMessage) { + this.phase = Phase.FAILED; + this.completionTime = getCurrentTimestamp(); + this.message = errorMessage; + } + + public void updateProgress(int running, int completed, int pending) { + this.runningScenarios = running; + this.completedScenarios = completed; + this.pendingScenarios = pending; + + if (completed == totalScenarios) { + markAsCompleted(); + } else if (running > 0 || completed > 0) { + this.phase = Phase.RUNNING; + } + } + + @JsonIgnore + public int getRunning() { + return runningScenarios; + } + + @JsonIgnore + public int getPending() { + return pendingScenarios; + } + + private static String getCurrentTimestamp() { + return LocalDateTime.now().atZone(ZoneId.of("UTC")).toString(); + } +} diff --git a/resilience-bench/operator/src/test/java/io/resiliencebench/execution/DefaultQueueExecutorTest.java b/resilience-bench/operator/src/test/java/io/resiliencebench/execution/DefaultQueueExecutorTest.java index 1008775..5b03557 100644 --- a/resilience-bench/operator/src/test/java/io/resiliencebench/execution/DefaultQueueExecutorTest.java +++ b/resilience-bench/operator/src/test/java/io/resiliencebench/execution/DefaultQueueExecutorTest.java @@ -54,34 +54,6 @@ void shouldExecuteNextPendingItem() { verify(scenarioExecutor).execute(eq(scenario), eq(queue)); } - @Test - void shouldNotExecuteWhenNoPendingItems() { - // Given - var item = new ExecutionQueueItem("test-scenario", "result.json"); - item.setStatus(ExecutionQueueItem.Status.RUNNING); - var queue = createQueueWithItems(item); - - // When - executor.execute(queue); - - // Then - verify(scenarioExecutor, never()).execute(any(), any()); - } - - @Test - void shouldNotExecuteWhenQueueIsDone() { - // Given - var item = new ExecutionQueueItem("test-scenario", "result.json"); - item.setStatus(ExecutionQueueItem.Status.FINISHED); - var queue = createQueueWithItems(item); - - // When - executor.execute(queue); - - // Then - verify(scenarioExecutor, never()).execute(any(), any()); - } - private ExecutionQueue createQueueWithItems(ExecutionQueueItem... items) { var meta = new ObjectMetaBuilder() .withName("test-queue") diff --git a/resilience-bench/operator/src/test/java/io/resiliencebench/execution/ExecutionQueueStatusUpdaterTest.java b/resilience-bench/operator/src/test/java/io/resiliencebench/execution/ExecutionQueueStatusUpdaterTest.java new file mode 100644 index 0000000..0d4ac49 --- /dev/null +++ b/resilience-bench/operator/src/test/java/io/resiliencebench/execution/ExecutionQueueStatusUpdaterTest.java @@ -0,0 +1,248 @@ +package io.resiliencebench.execution; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.resiliencebench.resources.Phase; +import io.resiliencebench.resources.queue.ExecutionQueue; +import io.resiliencebench.resources.queue.ExecutionQueueItem; +import io.resiliencebench.resources.queue.ExecutionQueueSpec; +import io.resiliencebench.resources.queue.ExecutionQueueStatus; +import io.resiliencebench.support.CustomResourceRepository; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +class ExecutionQueueStatusUpdaterTest { + + @Mock + private CustomResourceRepository queueRepository; + + @Captor + private ArgumentCaptor queueCaptor; + + private ExecutionQueueStatusUpdater statusUpdater; + private ExecutionQueue testQueue; + private ExecutionQueueItem pendingItem; + private ExecutionQueueItem runningItem; + private ExecutionQueueItem completedItem; + + @BeforeEach + void setUp() { + statusUpdater = new ExecutionQueueStatusUpdater(queueRepository); + + // Create test items + pendingItem = new ExecutionQueueItem("scenario-1", "results/scenario-1.json"); + runningItem = new ExecutionQueueItem("scenario-2", "results/scenario-2.json"); + runningItem.markAsRunning(); + completedItem = new ExecutionQueueItem("scenario-3", "results/scenario-3.json"); + completedItem.markAsCompleted(); + + // Create test queue + var spec = new ExecutionQueueSpec("results/results.json", + List.of(pendingItem, runningItem, completedItem), + "test-benchmark"); + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .withGeneration(1L) + .build(); + + testQueue = new ExecutionQueue(spec, meta); + testQueue.setStatus(new ExecutionQueueStatus(3, "exec-123")); + } + + @Test + void shouldUpdateQueueProgressSuccessfully() { + var namespace = "test-namespace"; + var queueName = "test-queue"; + + when(queueRepository.find(namespace, queueName)).thenReturn(Optional.of(testQueue)); + + statusUpdater.updateQueueProgress(namespace, queueName); + + verify(queueRepository).updateStatus(queueCaptor.capture()); + var updatedQueue = queueCaptor.getValue(); + + assertEquals(1, updatedQueue.getStatus().getRunningScenarios()); + assertEquals(1, updatedQueue.getStatus().getCompletedScenarios()); + assertEquals(1, updatedQueue.getStatus().getPendingScenarios()); + assertEquals(Phase.RUNNING, updatedQueue.getStatus().getPhase()); + assertEquals(1L, updatedQueue.getStatus().getObservedGeneration()); + } + + @Test + void shouldHandleQueueNotFound() { + var namespace = "test-namespace"; + var queueName = "non-existent-queue"; + + when(queueRepository.find(namespace, queueName)).thenReturn(Optional.empty()); + + assertDoesNotThrow(() -> statusUpdater.updateQueueProgress(namespace, queueName)); + + verify(queueRepository, never()).updateStatus(any()); + } + + @Test + void shouldHandleExceptionDuringUpdate() { + var namespace = "test-namespace"; + var queueName = "test-queue"; + + when(queueRepository.find(namespace, queueName)).thenThrow(new RuntimeException("Database error")); + + assertDoesNotThrow(() -> statusUpdater.updateQueueProgress(namespace, queueName)); + + verify(queueRepository, never()).updateStatus(any()); + } + + @Test + void shouldMarkScenarioAsStarted() { + var namespace = "test-namespace"; + var queueName = "test-queue"; + var scenarioName = "scenario-1"; + + when(queueRepository.find(namespace, queueName)).thenReturn(Optional.of(testQueue)); + + statusUpdater.markScenarioAsStarted(namespace, queueName, scenarioName); + + verify(queueRepository).update(queueCaptor.capture()); + var updatedQueue = queueCaptor.getValue(); + var item = updatedQueue.getItem(scenarioName); + + assertTrue(item.isRunning()); + assertFalse(item.isPending()); + } + + @Test + void shouldNotMarkAlreadyRunningScenarioAsStarted() { + var namespace = "test-namespace"; + var queueName = "test-queue"; + var scenarioName = "scenario-2"; // Already running + + when(queueRepository.find(namespace, queueName)).thenReturn(Optional.of(testQueue)); + + statusUpdater.markScenarioAsStarted(namespace, queueName, scenarioName); + + verify(queueRepository, never()).update(any()); + } + + @Test + void shouldHandleScenarioNotFoundForStarted() { + var namespace = "test-namespace"; + var queueName = "test-queue"; + var scenarioName = "non-existent-scenario"; + + when(queueRepository.find(namespace, queueName)).thenReturn(Optional.of(testQueue)); + + assertDoesNotThrow(() -> statusUpdater.markScenarioAsStarted(namespace, queueName, scenarioName)); + + verify(queueRepository, never()).update(any()); + } + + @Test + void shouldMarkScenarioAsCompleted() { + var namespace = "test-namespace"; + var queueName = "test-queue"; + var scenarioName = "scenario-2"; // Currently running + + when(queueRepository.find(namespace, queueName)).thenReturn(Optional.of(testQueue)); + + statusUpdater.markScenarioAsCompleted(namespace, queueName, scenarioName); + + verify(queueRepository).update(queueCaptor.capture()); + var updatedQueue = queueCaptor.getValue(); + var item = updatedQueue.getItem(scenarioName); + + assertTrue(item.isFinished()); + assertFalse(item.isRunning()); + } + + @Test + void shouldNotMarkNonRunningScenarioAsCompleted() { + var namespace = "test-namespace"; + var queueName = "test-queue"; + var scenarioName = "scenario-1"; // Still pending + + when(queueRepository.find(namespace, queueName)).thenReturn(Optional.of(testQueue)); + + statusUpdater.markScenarioAsCompleted(namespace, queueName, scenarioName); + + verify(queueRepository, never()).update(any()); + } + + @Test + void shouldHandleQueueNotFoundForScenarioOperations() { + var namespace = "test-namespace"; + var queueName = "non-existent-queue"; + var scenarioName = "scenario-1"; + + when(queueRepository.find(namespace, queueName)).thenReturn(Optional.empty()); + + assertDoesNotThrow(() -> statusUpdater.markScenarioAsStarted(namespace, queueName, scenarioName)); + assertDoesNotThrow(() -> statusUpdater.markScenarioAsCompleted(namespace, queueName, scenarioName)); + + verify(queueRepository, never()).update(any()); + } + + @Test + void shouldHandleExceptionDuringScenarioOperations() { + var namespace = "test-namespace"; + var queueName = "test-queue"; + var scenarioName = "scenario-1"; + + when(queueRepository.find(namespace, queueName)).thenThrow(new RuntimeException("Database error")); + + assertDoesNotThrow(() -> statusUpdater.markScenarioAsStarted(namespace, queueName, scenarioName)); + assertDoesNotThrow(() -> statusUpdater.markScenarioAsCompleted(namespace, queueName, scenarioName)); + + verify(queueRepository, never()).update(any()); + } + + @Test + void shouldUpdateQueueStatusFromItemsWhenAllCompleted() { + // Create a queue with all completed items + var allCompletedItems = List.of( + createCompletedItem("scenario-1"), + createCompletedItem("scenario-2"), + createCompletedItem("scenario-3") + ); + + var spec = new ExecutionQueueSpec("results.json", allCompletedItems, "test-benchmark"); + var meta = new ObjectMetaBuilder() + .withName("completed-queue") + .withNamespace("test-namespace") + .withGeneration(1L) + .build(); + + var completedQueue = new ExecutionQueue(spec, meta); + completedQueue.setStatus(new ExecutionQueueStatus(3, "exec-456")); + + when(queueRepository.find("test-namespace", "completed-queue")).thenReturn(Optional.of(completedQueue)); + + statusUpdater.updateQueueProgress("test-namespace", "completed-queue"); + + verify(queueRepository).updateStatus(queueCaptor.capture()); + var updatedQueue = queueCaptor.getValue(); + + assertEquals(0, updatedQueue.getStatus().getRunningScenarios()); + assertEquals(3, updatedQueue.getStatus().getCompletedScenarios()); + assertEquals(0, updatedQueue.getStatus().getPendingScenarios()); + assertEquals(Phase.COMPLETED, updatedQueue.getStatus().getPhase()); + } + + private ExecutionQueueItem createCompletedItem(String scenarioName) { + var item = new ExecutionQueueItem(scenarioName, "results/" + scenarioName + ".json"); + item.markAsCompleted(); + return item; + } +} diff --git a/resilience-bench/operator/src/test/java/io/resiliencebench/resources/benchmark/BenchmarkStatusTest.java b/resilience-bench/operator/src/test/java/io/resiliencebench/resources/benchmark/BenchmarkStatusTest.java index b4632f9..07d554f 100644 --- a/resilience-bench/operator/src/test/java/io/resiliencebench/resources/benchmark/BenchmarkStatusTest.java +++ b/resilience-bench/operator/src/test/java/io/resiliencebench/resources/benchmark/BenchmarkStatusTest.java @@ -1,14 +1,6 @@ package io.resiliencebench.resources.benchmark; -import io.resiliencebench.resources.Phase; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.DisplayName; - -import java.time.LocalDateTime; -import java.time.ZoneId; - -import static org.junit.jupiter.api.Assertions.*; class BenchmarkStatusTest { @@ -19,331 +11,5 @@ void setUp() { status = new BenchmarkStatus(); } - @Test - @DisplayName("Default constructor should initialize with null values") - void defaultConstructor_ShouldInitializeWithNullValues() { - assertNull(status.getPhase()); - assertEquals(0, status.getTotalScenarios()); - assertEquals(0, status.getRunningScenarios()); - assertEquals(0, status.getCompletedScenarios()); - assertNull(status.getExecutionId()); - assertNull(status.getLastReconcileTime()); - assertNull(status.getStartTime()); - assertNull(status.getCompletionTime()); - assertNull(status.getMessage()); - assertNull(status.getObservedGeneration()); - } - - @Test - @DisplayName("Constructor with totalScenarios should initialize properly") - void constructorWithTotalScenarios_ShouldInitializeProperly() { - int totalScenarios = 5; - BenchmarkStatus statusWithTotal = new BenchmarkStatus(totalScenarios); - - assertEquals(Phase.PENDING, statusWithTotal.getPhase()); - assertEquals(totalScenarios, statusWithTotal.getTotalScenarios()); - assertEquals(0, statusWithTotal.getRunningScenarios()); - assertEquals(0, statusWithTotal.getCompletedScenarios()); - assertNotNull(statusWithTotal.getExecutionId()); - assertTrue(statusWithTotal.getExecutionId().startsWith("exec-")); - assertNotNull(statusWithTotal.getLastReconcileTime()); - assertNotNull(statusWithTotal.getStartTime()); - assertNull(statusWithTotal.getCompletionTime()); - assertNull(statusWithTotal.getMessage()); - assertNull(statusWithTotal.getObservedGeneration()); - } - - @Test - @DisplayName("Full constructor should set all values correctly") - void fullConstructor_ShouldSetAllValuesCorrectly() { - String phase = Phase.RUNNING; - int totalScenarios = 10; - int runningScenarios = 3; - int completedScenarios = 2; - String executionId = "exec-12345"; - Long observedGeneration = 5L; - - BenchmarkStatus fullStatus = new BenchmarkStatus(phase, totalScenarios, runningScenarios, - completedScenarios, executionId, observedGeneration); - - assertEquals(phase, fullStatus.getPhase()); - assertEquals(totalScenarios, fullStatus.getTotalScenarios()); - assertEquals(runningScenarios, fullStatus.getRunningScenarios()); - assertEquals(completedScenarios, fullStatus.getCompletedScenarios()); - assertEquals(executionId, fullStatus.getExecutionId()); - assertEquals(observedGeneration, fullStatus.getObservedGeneration()); - assertNotNull(fullStatus.getLastReconcileTime()); - } - - @Test - @DisplayName("Setters and getters should work correctly") - void settersAndGetters_ShouldWorkCorrectly() { - String phase = Phase.RUNNING; - int totalScenarios = 8; - int runningScenarios = 2; - int completedScenarios = 3; - String executionId = "exec-test"; - String lastReconcileTime = "2023-01-01T10:00:00Z"; - String startTime = "2023-01-01T09:00:00Z"; - String completionTime = "2023-01-01T11:00:00Z"; - String message = "Test message"; - Long observedGeneration = 10L; - - status.setPhase(phase); - status.setTotalScenarios(totalScenarios); - status.setRunningScenarios(runningScenarios); - status.setCompletedScenarios(completedScenarios); - status.setExecutionId(executionId); - status.setLastReconcileTime(lastReconcileTime); - status.setStartTime(startTime); - status.setCompletionTime(completionTime); - status.setMessage(message); - status.setObservedGeneration(observedGeneration); - - assertEquals(phase, status.getPhase()); - assertEquals(totalScenarios, status.getTotalScenarios()); - assertEquals(runningScenarios, status.getRunningScenarios()); - assertEquals(completedScenarios, status.getCompletedScenarios()); - assertEquals(executionId, status.getExecutionId()); - assertEquals(lastReconcileTime, status.getLastReconcileTime()); - assertEquals(startTime, status.getStartTime()); - assertEquals(completionTime, status.getCompletionTime()); - assertEquals(message, status.getMessage()); - assertEquals(observedGeneration, status.getObservedGeneration()); - } - - @Test - @DisplayName("isRunning should return true only when phase is RUNNING") - void isRunning_ShouldReturnTrueOnlyWhenPhaseIsRunning() { - status.setPhase(Phase.PENDING); - assertFalse(status.isRunning()); - - status.setPhase(Phase.RUNNING); - assertTrue(status.isRunning()); - - status.setPhase(Phase.COMPLETED); - assertFalse(status.isRunning()); - status.setPhase(Phase.FAILED); - assertFalse(status.isRunning()); - - status.setPhase(null); - assertFalse(status.isRunning()); - } - - @Test - @DisplayName("isCompleted should return true for COMPLETED or FAILED phases") - void isCompleted_ShouldReturnTrueForCompletedOrFailedPhases() { - status.setPhase(Phase.PENDING); - assertFalse(status.isCompleted()); - - status.setPhase(Phase.RUNNING); - assertFalse(status.isCompleted()); - - status.setPhase(Phase.COMPLETED); - assertTrue(status.isCompleted()); - - status.setPhase(Phase.FAILED); - assertTrue(status.isCompleted()); - - status.setPhase(null); - assertFalse(status.isCompleted()); - } - - @Test - @DisplayName("needsReconciliation should detect generation changes") - void needsReconciliation_ShouldDetectGenerationChanges() { - // When observedGeneration is null, should need reconciliation - assertTrue(status.needsReconciliation(1L)); - - // When generations match, should not need reconciliation - status.setObservedGeneration(5L); - assertFalse(status.needsReconciliation(5L)); - - // When generations differ, should need reconciliation - assertTrue(status.needsReconciliation(6L)); - - // When current generation is null, should need reconciliation - assertTrue(status.needsReconciliation(null)); - } - - @Test - @DisplayName("updateReconcileTime should set current timestamp") - void updateReconcileTime_ShouldSetCurrentTimestamp() { - String beforeUpdate = getCurrentTimestamp(); - status.updateReconcileTime(); - String afterUpdate = getCurrentTimestamp(); - - assertNotNull(status.getLastReconcileTime()); - // Check that the timestamp is between before and after (allowing for execution time) - assertTrue(status.getLastReconcileTime().compareTo(beforeUpdate) >= 0); - assertTrue(status.getLastReconcileTime().compareTo(afterUpdate) <= 0); - } - - @Test - @DisplayName("markAsCompleted should set phase and completion time") - void markAsCompleted_ShouldSetPhaseAndCompletionTime() { - String beforeCompletion = getCurrentTimestamp(); - status.markAsCompleted(); - String afterCompletion = getCurrentTimestamp(); - - assertEquals(Phase.COMPLETED, status.getPhase()); - assertNotNull(status.getCompletionTime()); - assertTrue(status.getCompletionTime().compareTo(beforeCompletion) >= 0); - assertTrue(status.getCompletionTime().compareTo(afterCompletion) <= 0); - } - - @Test - @DisplayName("markAsFailed should set phase, completion time, and message") - void markAsFailed_ShouldSetPhaseCompletionTimeAndMessage() { - String errorMessage = "Test error message"; - String beforeFailure = getCurrentTimestamp(); - status.markAsFailed(errorMessage); - String afterFailure = getCurrentTimestamp(); - - assertEquals(Phase.FAILED, status.getPhase()); - assertEquals(errorMessage, status.getMessage()); - assertNotNull(status.getCompletionTime()); - assertTrue(status.getCompletionTime().compareTo(beforeFailure) >= 0); - assertTrue(status.getCompletionTime().compareTo(afterFailure) <= 0); - } - - @Test - @DisplayName("updateProgress should update counters and phase correctly") - void updateProgress_ShouldUpdateCountersAndPhaseCorrectly() { - status.setTotalScenarios(10); - status.setPhase(Phase.PENDING); - - // Test transition to RUNNING when scenarios start - status.updateProgress(2, 1); - assertEquals(2, status.getRunningScenarios()); - assertEquals(1, status.getCompletedScenarios()); - assertEquals(Phase.RUNNING, status.getPhase()); - - // Test staying in RUNNING phase - status.updateProgress(3, 2); - assertEquals(3, status.getRunningScenarios()); - assertEquals(2, status.getCompletedScenarios()); - assertEquals(Phase.RUNNING, status.getPhase()); - - // Test transition to COMPLETED when all scenarios are done - status.updateProgress(0, 10); - assertEquals(0, status.getRunningScenarios()); - assertEquals(10, status.getCompletedScenarios()); - assertEquals(Phase.COMPLETED, status.getPhase()); - assertNotNull(status.getCompletionTime()); - } - - @Test - @DisplayName("updateProgress should handle edge case with only completed scenarios") - void updateProgress_ShouldHandleEdgeCaseWithOnlyCompletedScenarios() { - status.setTotalScenarios(5); - status.setPhase(Phase.PENDING); - - // Test with only completed scenarios (no running) - status.updateProgress(0, 3); - assertEquals(0, status.getRunningScenarios()); - assertEquals(3, status.getCompletedScenarios()); - assertEquals(Phase.RUNNING, status.getPhase()); - } - - @Test - @DisplayName("updateProgress should not change phase if no progress") - void updateProgress_ShouldNotChangePhaseIfNoProgress() { - status.setTotalScenarios(5); - status.setPhase(Phase.PENDING); - - // Test with no progress - status.updateProgress(0, 0); - assertEquals(0, status.getRunningScenarios()); - assertEquals(0, status.getCompletedScenarios()); - assertEquals(Phase.PENDING, status.getPhase()); - } - - @Test - @DisplayName("Phase constants should have correct values") - void phaseConstants_ShouldHaveCorrectValues() { - assertEquals("Pending", Phase.PENDING); - assertEquals("Running", Phase.RUNNING); - assertEquals("Completed", Phase.COMPLETED); - assertEquals("Failed", Phase.FAILED); - } - - @Test - @DisplayName("ExecutionId generation should create unique IDs") - void executionIdGeneration_ShouldCreateUniqueIds() { - BenchmarkStatus status1 = new BenchmarkStatus(5); - BenchmarkStatus status2 = new BenchmarkStatus(3); - - assertNotNull(status1.getExecutionId()); - assertNotNull(status2.getExecutionId()); - assertNotEquals(status1.getExecutionId(), status2.getExecutionId()); - assertTrue(status1.getExecutionId().startsWith("exec-")); - assertTrue(status2.getExecutionId().startsWith("exec-")); - } - - @Test - @DisplayName("Timestamp generation should be in UTC format") - void timestampGeneration_ShouldBeInUtcFormat() { - BenchmarkStatus statusWithTime = new BenchmarkStatus(5); - - assertNotNull(statusWithTime.getStartTime()); - assertNotNull(statusWithTime.getLastReconcileTime()); - - // Verify timestamp format (should be parseable as ISO format) - assertDoesNotThrow(() -> LocalDateTime.parse( - statusWithTime.getStartTime().substring(0, statusWithTime.getStartTime().length() - 6) - )); - assertDoesNotThrow(() -> LocalDateTime.parse( - statusWithTime.getLastReconcileTime().substring(0, statusWithTime.getLastReconcileTime().length() - 6) - )); - } - - @Test - @DisplayName("needsReconciliation should handle null observedGeneration correctly") - void needsReconciliation_ShouldHandleNullObservedGenerationCorrectly() { - // Test with null observedGeneration and non-null current generation - status.setObservedGeneration(null); - assertTrue(status.needsReconciliation(1L)); - - // Test with non-null observedGeneration and null current generation - status.setObservedGeneration(1L); - assertTrue(status.needsReconciliation(null)); - - // Test with both null - status.setObservedGeneration(null); - assertTrue(status.needsReconciliation(null)); - } - - @Test - @DisplayName("markAsCompleted should not override existing completion time") - void markAsCompleted_ShouldSetCompletionTimeOnlyOnce() { - String existingCompletionTime = "2023-01-01T10:00:00Z"; - status.setCompletionTime(existingCompletionTime); - - status.markAsCompleted(); - - // Should update phase but completion time should be overwritten - assertEquals(Phase.COMPLETED, status.getPhase()); - assertNotEquals(existingCompletionTime, status.getCompletionTime()); - } - - @Test - @DisplayName("markAsFailed should not override existing completion time") - void markAsFailed_ShouldSetCompletionTimeOnlyOnce() { - String existingCompletionTime = "2023-01-01T10:00:00Z"; - String errorMessage = "Test error"; - status.setCompletionTime(existingCompletionTime); - - status.markAsFailed(errorMessage); - - // Should update phase and message, but completion time should be overwritten - assertEquals(Phase.FAILED, status.getPhase()); - assertEquals(errorMessage, status.getMessage()); - assertNotEquals(existingCompletionTime, status.getCompletionTime()); - } - - private String getCurrentTimestamp() { - return LocalDateTime.now().atZone(ZoneId.of("UTC")).toString(); - } } diff --git a/resilience-bench/operator/src/test/java/io/resiliencebench/resources/queue/ExecutionQueueStatusTest.java b/resilience-bench/operator/src/test/java/io/resiliencebench/resources/queue/ExecutionQueueStatusTest.java new file mode 100644 index 0000000..5161e76 --- /dev/null +++ b/resilience-bench/operator/src/test/java/io/resiliencebench/resources/queue/ExecutionQueueStatusTest.java @@ -0,0 +1,194 @@ +package io.resiliencebench.resources.queue; + +import io.resiliencebench.resources.Phase; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class ExecutionQueueStatusTest { + + private ExecutionQueueStatus status; + + @BeforeEach + void setUp() { + status = new ExecutionQueueStatus(); + } + + @Test + void shouldCreateStatusWithDefaultConstructor() { + assertNotNull(status); + assertNull(status.getPhase()); + assertEquals(0, status.getTotalScenarios()); + assertEquals(0, status.getRunningScenarios()); + assertEquals(0, status.getCompletedScenarios()); + assertEquals(0, status.getPendingScenarios()); + } + + @Test + void shouldCreateStatusWithTotalScenariosAndExecutionId() { + var executionId = "exec-123456"; + var totalScenarios = 5; + + status = new ExecutionQueueStatus(totalScenarios, executionId); + + assertEquals(Phase.PENDING, status.getPhase()); + assertEquals(totalScenarios, status.getTotalScenarios()); + assertEquals(0, status.getRunningScenarios()); + assertEquals(0, status.getCompletedScenarios()); + assertEquals(totalScenarios, status.getPendingScenarios()); + assertEquals(executionId, status.getExecutionId()); + assertNotNull(status.getLastReconcileTime()); + assertNotNull(status.getStartTime()); + } + + @Test + void shouldCreateStatusWithAllParameters() { + var phase = Phase.RUNNING; + var totalScenarios = 10; + var runningScenarios = 2; + var completedScenarios = 3; + var pendingScenarios = 5; + var executionId = "exec-789"; + var observedGeneration = 1L; + + status = new ExecutionQueueStatus(phase, totalScenarios, runningScenarios, + completedScenarios, pendingScenarios, executionId, observedGeneration); + + assertEquals(phase, status.getPhase()); + assertEquals(totalScenarios, status.getTotalScenarios()); + assertEquals(runningScenarios, status.getRunningScenarios()); + assertEquals(completedScenarios, status.getCompletedScenarios()); + assertEquals(pendingScenarios, status.getPendingScenarios()); + assertEquals(executionId, status.getExecutionId()); + assertEquals(observedGeneration, status.getObservedGeneration()); + assertNotNull(status.getLastReconcileTime()); + } + + @Test + void shouldUpdateProgress() { + status = new ExecutionQueueStatus(5, "exec-123"); + + status.updateProgress(2, 1, 2); + + assertEquals(2, status.getRunningScenarios()); + assertEquals(1, status.getCompletedScenarios()); + assertEquals(2, status.getPendingScenarios()); + assertEquals(Phase.RUNNING, status.getPhase()); + } + + @Test + void shouldMarkAsCompletedWhenAllScenariosFinished() { + var totalScenarios = 3; + status = new ExecutionQueueStatus(totalScenarios, "exec-123"); + + status.updateProgress(0, totalScenarios, 0); + + assertEquals(Phase.COMPLETED, status.getPhase()); + assertNotNull(status.getCompletionTime()); + } + + @Test + void shouldMarkAsRunningWhenHasRunningScenariosOrCompleted() { + status = new ExecutionQueueStatus(5, "exec-123"); + + // Test with running scenarios + status.updateProgress(1, 0, 4); + assertEquals(Phase.RUNNING, status.getPhase()); + + // Reset to pending + status.setPhase(Phase.PENDING); + + // Test with completed scenarios + status.updateProgress(0, 1, 4); + assertEquals(Phase.RUNNING, status.getPhase()); + } + + @Test + void shouldMarkAsCompleted() { + status.markAsCompleted(); + + assertEquals(Phase.COMPLETED, status.getPhase()); + assertNotNull(status.getCompletionTime()); + } + + @Test + void shouldMarkAsFailed() { + var errorMessage = "Test error"; + + status.markAsFailed(errorMessage); + + assertEquals(Phase.FAILED, status.getPhase()); + assertEquals(errorMessage, status.getMessage()); + assertNotNull(status.getCompletionTime()); + } + + @Test + void shouldReturnTrueForIsRunning() { + status.setPhase(Phase.RUNNING); + assertTrue(status.isRunning()); + + status.setPhase(Phase.PENDING); + assertFalse(status.isRunning()); + } + + @Test + void shouldReturnTrueForIsCompleted() { + status.setPhase(Phase.COMPLETED); + assertTrue(status.isCompleted()); + + status.setPhase(Phase.FAILED); + assertTrue(status.isCompleted()); + + status.setPhase(Phase.RUNNING); + assertFalse(status.isCompleted()); + } + + @Test + void shouldNeedReconciliationWhenObservedGenerationIsNull() { + var currentGeneration = 1L; + assertTrue(status.needsReconciliation(currentGeneration)); + } + + @Test + void shouldNeedReconciliationWhenObservedGenerationDiffers() { + var currentGeneration = 2L; + status.setObservedGeneration(1L); + + assertTrue(status.needsReconciliation(currentGeneration)); + } + + @Test + void shouldNotNeedReconciliationWhenObservedGenerationMatches() { + var currentGeneration = 1L; + status.setObservedGeneration(currentGeneration); + + assertFalse(status.needsReconciliation(currentGeneration)); + } + + @Test + void shouldUpdateReconcileTime() { + var initialTime = status.getLastReconcileTime(); + + // Sleep a bit to ensure time difference + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + status.updateReconcileTime(); + + assertNotEquals(initialTime, status.getLastReconcileTime()); + assertNotNull(status.getLastReconcileTime()); + } + + @Test + void shouldReturnConvenienceMethodsForRunningAndPending() { + status.setRunningScenarios(3); + status.setPendingScenarios(2); + + assertEquals(3, status.getRunning()); + assertEquals(2, status.getPending()); + } +} diff --git a/resilience-bench/operator/src/test/java/io/resiliencebench/resources/queue/ExecutionQueueTest.java b/resilience-bench/operator/src/test/java/io/resiliencebench/resources/queue/ExecutionQueueTest.java new file mode 100644 index 0000000..7bd9df8 --- /dev/null +++ b/resilience-bench/operator/src/test/java/io/resiliencebench/resources/queue/ExecutionQueueTest.java @@ -0,0 +1,320 @@ +package io.resiliencebench.resources.queue; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.resiliencebench.resources.Phase; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +class ExecutionQueueTest { + + private ExecutionQueueSpec spec; + private ExecutionQueue queue; + + @BeforeEach + void setUp() { + var items = List.of( + new ExecutionQueueItem("scenario-1", "results/scenario-1.json"), + new ExecutionQueueItem("scenario-2", "results/scenario-2.json"), + new ExecutionQueueItem("scenario-3", "results/scenario-3.json") + ); + + spec = new ExecutionQueueSpec("results/results.json", items, "test-benchmark"); + } + + @Test + void shouldInitializeStatusWhenCreatingQueueWithConstructor() { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .addToLabels("execution-id", "exec-123456") + .build(); + + queue = new ExecutionQueue(spec, meta); + + assertNotNull(queue.getStatus()); + assertEquals(Phase.PENDING, queue.getStatus().getPhase()); + assertEquals(3, queue.getStatus().getTotalScenarios()); + assertEquals(0, queue.getStatus().getRunningScenarios()); + assertEquals(0, queue.getStatus().getCompletedScenarios()); + assertEquals(3, queue.getStatus().getPendingScenarios()); + assertEquals("exec-123456", queue.getStatus().getExecutionId()); + assertNotNull(queue.getStatus().getLastReconcileTime()); + assertNotNull(queue.getStatus().getStartTime()); + } + + @Test + void shouldInitializeStatusWithGeneratedExecutionIdWhenNoLabelExists() { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + + queue = new ExecutionQueue(spec, meta); + + assertNotNull(queue.getStatus()); + assertNotNull(queue.getStatus().getExecutionId()); + assertTrue(queue.getStatus().getExecutionId().startsWith("exec-")); + } + + @Test + void shouldNotInitializeStatusWhenSpecIsNull() { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + + queue = new ExecutionQueue(null, meta); + + assertNull(queue.getStatus()); + } + + @Test + void shouldNotInitializeStatusWhenItemsIsNull() { + var emptySpec = new ExecutionQueueSpec("results.json", null, "test-benchmark"); + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + + queue = new ExecutionQueue(emptySpec, meta); + + assertNull(queue.getStatus()); + } + + @Test + void shouldUpdateStatusFromItemsWithMixedStates() { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .addToLabels("execution-id", "exec-789") + .build(); + + // Create items with different states + var pendingItem = new ExecutionQueueItem("scenario-1", "results/scenario-1.json"); + var runningItem = new ExecutionQueueItem("scenario-2", "results/scenario-2.json"); + runningItem.markAsRunning(); + var completedItem = new ExecutionQueueItem("scenario-3", "results/scenario-3.json"); + completedItem.markAsCompleted(); + + var mixedSpec = new ExecutionQueueSpec("results.json", + List.of(pendingItem, runningItem, completedItem), + "test-benchmark"); + + queue = new ExecutionQueue(mixedSpec, meta); + + // Manually change some item states to test updateStatusFromItems + queue.getSpec().getItems().get(0).markAsRunning(); // scenario-1 now running + + queue.updateStatusFromItems(); + + assertEquals(2, queue.getStatus().getRunningScenarios()); + assertEquals(1, queue.getStatus().getCompletedScenarios()); + assertEquals(0, queue.getStatus().getPendingScenarios()); + assertEquals(Phase.RUNNING, queue.getStatus().getPhase()); + } + + @Test + void shouldUpdateStatusFromItemsWhenAllCompleted() { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + + queue = new ExecutionQueue(spec, meta); + + // Mark all items as completed + queue.getSpec().getItems().forEach(ExecutionQueueItem::markAsCompleted); + + queue.updateStatusFromItems(); + + assertEquals(0, queue.getStatus().getRunningScenarios()); + assertEquals(3, queue.getStatus().getCompletedScenarios()); + assertEquals(0, queue.getStatus().getPendingScenarios()); + assertEquals(Phase.COMPLETED, queue.getStatus().getPhase()); + assertNotNull(queue.getStatus().getCompletionTime()); + } + + @Test + void shouldUpdateStatusFromItemsWhenAllPending() { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + + queue = new ExecutionQueue(spec, meta); + + // All items are pending by default + queue.updateStatusFromItems(); + + assertEquals(0, queue.getStatus().getRunningScenarios()); + assertEquals(0, queue.getStatus().getCompletedScenarios()); + assertEquals(3, queue.getStatus().getPendingScenarios()); + assertEquals(Phase.PENDING, queue.getStatus().getPhase()); + } + + @Test + void shouldInitializeStatusIfNullWhenUpdatingFromItems() { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + + queue = new ExecutionQueue(spec, meta); + queue.setStatus(null); // Manually set status to null + + queue.updateStatusFromItems(); + + assertNotNull(queue.getStatus()); + assertEquals(Phase.PENDING, queue.getStatus().getPhase()); + assertEquals(3, queue.getStatus().getTotalScenarios()); + } + + @Test + void shouldNotUpdateStatusFromItemsWhenSpecIsNull() { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + + queue = new ExecutionQueue(null, meta); + var initialStatus = queue.getStatus(); + + queue.updateStatusFromItems(); + + assertEquals(initialStatus, queue.getStatus()); // Should remain unchanged + } + + @Test + void shouldNotUpdateStatusFromItemsWhenItemsIsNull() { + var emptySpec = new ExecutionQueueSpec("results.json", null, "test-benchmark"); + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + + queue = new ExecutionQueue(emptySpec, meta); + var initialStatus = queue.getStatus(); + + queue.updateStatusFromItems(); + + assertEquals(initialStatus, queue.getStatus()); // Should remain unchanged + } + + @Test + void shouldUpdateReconcileTimeWhenUpdatingStatusFromItems() { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + + queue = new ExecutionQueue(spec, meta); + var initialReconcileTime = queue.getStatus().getLastReconcileTime(); + + // Sleep a bit to ensure time difference + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + queue.updateStatusFromItems(); + + assertNotEquals(initialReconcileTime, queue.getStatus().getLastReconcileTime()); + } + + @Test + void shouldGetItemByName() { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + + queue = new ExecutionQueue(spec, meta); + + var item = queue.getItem("scenario-2"); + + assertNotNull(item); + assertEquals("scenario-2", item.getScenario()); + } + + @Test + void shouldReturnNullForNonExistentItem() { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + + queue = new ExecutionQueue(spec, meta); + + var item = queue.getItem("non-existent-scenario"); + + assertNull(item); + } + + @Test + void shouldGetNextPendingItem() { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + + queue = new ExecutionQueue(spec, meta); + + var nextPendingItem = queue.getNextPendingItem(); + + assertTrue(nextPendingItem.isPresent()); + assertEquals("scenario-1", nextPendingItem.get().getScenario()); + assertTrue(nextPendingItem.get().isPending()); + } + + @Test + void shouldReturnEmptyWhenNoPendingItems() { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + + queue = new ExecutionQueue(spec, meta); + + // Mark all items as completed + queue.getSpec().getItems().forEach(ExecutionQueueItem::markAsCompleted); + + var nextPendingItem = queue.getNextPendingItem(); + + assertFalse(nextPendingItem.isPresent()); + } + + @Test + void shouldReturnTrueForIsDoneWhenAllItemsFinished() { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + + queue = new ExecutionQueue(spec, meta); + + // Mark all items as completed + queue.getSpec().getItems().forEach(ExecutionQueueItem::markAsCompleted); + + assertTrue(queue.isDone()); + } + + @Test + void shouldReturnFalseForIsDoneWhenSomeItemsNotFinished() { + var meta = new ObjectMetaBuilder() + .withName("test-queue") + .withNamespace("test-namespace") + .build(); + + queue = new ExecutionQueue(spec, meta); + + // Mark only first item as completed + queue.getSpec().getItems().get(0).markAsCompleted(); + + assertFalse(queue.isDone()); + } +} From 9090fc32ba6b0b008b93b533fa49b403fa4dd73f Mon Sep 17 00:00:00 2001 From: Carlos Mendes Date: Sun, 7 Sep 2025 11:14:09 -0300 Subject: [PATCH 3/3] implement enhanced execution queue status tracking --- crd/benchmarks.resiliencebench.io-v1.yml | 24 +++--------- crd/queues.resiliencebench.io-v1.yml | 48 +++++++++++++++++++++++- 2 files changed, 53 insertions(+), 19 deletions(-) diff --git a/crd/benchmarks.resiliencebench.io-v1.yml b/crd/benchmarks.resiliencebench.io-v1.yml index 0585187..2bf425a 100644 --- a/crd/benchmarks.resiliencebench.io-v1.yml +++ b/crd/benchmarks.resiliencebench.io-v1.yml @@ -14,22 +14,14 @@ spec: scope: Namespaced versions: - additionalPrinterColumns: - - jsonPath: .status.completedScenarios - name: Completed - priority: 3 - type: integer + - jsonPath: .status.executionQueueName + name: Execution Queue + priority: 1 + type: string - jsonPath: .status.phase name: Phase priority: 0 type: string - - jsonPath: .status.runningScenarios - name: Running - priority: 2 - type: integer - - jsonPath: .status.totalScenarios - name: Total Scenarios - priority: 1 - type: integer name: v1beta1 schema: openAPIV3Schema: @@ -186,12 +178,12 @@ spec: type: object status: properties: - completedScenarios: - type: integer completionTime: type: string executionId: type: string + executionQueueName: + type: string lastReconcileTime: type: string message: @@ -200,12 +192,8 @@ spec: type: integer phase: type: string - runningScenarios: - type: integer startTime: type: string - totalScenarios: - type: integer type: object type: object served: true diff --git a/crd/queues.resiliencebench.io-v1.yml b/crd/queues.resiliencebench.io-v1.yml index 6264b85..b8acb17 100644 --- a/crd/queues.resiliencebench.io-v1.yml +++ b/crd/queues.resiliencebench.io-v1.yml @@ -13,7 +13,28 @@ spec: singular: queue scope: Namespaced versions: - - name: v1beta1 + - additionalPrinterColumns: + - jsonPath: .status.completedScenarios + name: Completed + priority: 3 + type: integer + - jsonPath: .status.pendingScenarios + name: Pending + priority: 4 + type: integer + - jsonPath: .status.phase + name: Phase + priority: 0 + type: string + - jsonPath: .status.runningScenarios + name: Running + priority: 2 + type: integer + - jsonPath: .status.totalScenarios + name: Total Scenarios + priority: 1 + type: integer + name: v1beta1 schema: openAPIV3Schema: properties: @@ -45,7 +66,32 @@ spec: type: string type: object status: + properties: + completedScenarios: + type: integer + completionTime: + type: string + executionId: + type: string + lastReconcileTime: + type: string + message: + type: string + observedGeneration: + type: integer + pendingScenarios: + type: integer + phase: + type: string + runningScenarios: + type: integer + startTime: + type: string + totalScenarios: + type: integer type: object type: object served: true storage: true + subresources: + status: {}