Skip to content

Commit 941933d

Browse files
committed
implement enhanced execution queue status tracking
1 parent 9fcc393 commit 941933d

File tree

13 files changed

+1155
-529
lines changed

13 files changed

+1155
-529
lines changed

resilience-bench/operator/src/main/java/io/resiliencebench/BenchmarkController.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public UpdateControl<Benchmark> reconcile(Benchmark benchmark, Context<Benchmark
6969

7070
createQueue(benchmark, scenariosList);
7171

72-
var status = createOrUpdateStatus(benchmark, scenariosList.size());
72+
var status = createOrUpdateStatus(benchmark, benchmarkName);
7373
benchmark.setStatus(status);
7474

7575
logger.info("Benchmark reconciled {}. {} scenarios created", benchmarkName, scenariosList.size());
@@ -85,27 +85,28 @@ public UpdateControl<Benchmark> reconcile(Benchmark benchmark, Context<Benchmark
8585
private UpdateControl<Benchmark> updateStatusWithError(Benchmark benchmark, String errorMessage) {
8686
var status = benchmark.getStatus();
8787
if (status == null) {
88-
status = new BenchmarkStatus(0);
88+
status = new BenchmarkStatus(benchmark.getMetadata().getName());
8989
benchmark.setStatus(status);
9090
}
9191
status.markAsFailed(errorMessage);
9292
status.setObservedGeneration(benchmark.getMetadata().getGeneration());
9393
return UpdateControl.updateStatus(benchmark);
9494
}
9595

96-
private BenchmarkStatus createOrUpdateStatus(Benchmark benchmark, int totalScenarios) {
96+
private BenchmarkStatus createOrUpdateStatus(Benchmark benchmark, String queueName) {
9797
var currentStatus = benchmark.getStatus();
9898
var currentGeneration = benchmark.getMetadata().getGeneration();
9999

100100
if (currentStatus == null || currentStatus.needsReconciliation(currentGeneration)) {
101-
var newStatus = new BenchmarkStatus(totalScenarios);
101+
var newStatus = new BenchmarkStatus(queueName);
102102
newStatus.setObservedGeneration(currentGeneration);
103-
logger.info("Created new status for benchmark {} with {} total scenarios",
104-
benchmark.getMetadata().getName(), totalScenarios);
103+
logger.info("Created new status for benchmark {} with execution queue {}",
104+
benchmark.getMetadata().getName(), queueName);
105105
return newStatus;
106106
} else {
107107
currentStatus.updateReconcileTime();
108108
currentStatus.setObservedGeneration(currentGeneration);
109+
currentStatus.setExecutionQueueName(queueName);
109110
logger.info("Updated existing status for benchmark {}", benchmark.getMetadata().getName());
110111
return currentStatus;
111112
}

resilience-bench/operator/src/main/java/io/resiliencebench/ExecutionQueueController.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
66
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
77
import io.resiliencebench.execution.QueueExecutor;
8+
import io.resiliencebench.execution.ExecutionQueueStatusUpdater;
9+
import io.resiliencebench.resources.benchmark.Benchmark;
810
import io.resiliencebench.resources.queue.ExecutionQueue;
11+
import io.resiliencebench.support.CustomResourceRepository;
912
import org.slf4j.Logger;
1013
import org.slf4j.LoggerFactory;
1114

@@ -15,22 +18,35 @@ public class ExecutionQueueController implements Reconciler<ExecutionQueue> {
1518
private static final Logger logger = LoggerFactory.getLogger(ExecutionQueueController.class);
1619

1720
private final QueueExecutor queueExecutor;
21+
private final ExecutionQueueStatusUpdater statusUpdater;
1822

19-
public ExecutionQueueController(QueueExecutor queueExecutor) {
23+
public ExecutionQueueController(QueueExecutor queueExecutor,
24+
ExecutionQueueStatusUpdater statusUpdater) {
2025
this.queueExecutor = queueExecutor;
26+
this.statusUpdater = statusUpdater;
2127
}
2228

2329
@Override
2430
public UpdateControl<ExecutionQueue> reconcile(ExecutionQueue queue, Context<ExecutionQueue> context) {
2531
logger.info("Reconciling ExecutionQueue: {}", queue.getMetadata().getName());
32+
33+
// Initialize status if needed
34+
if (queue.getStatus() == null) {
35+
queue.updateStatusFromItems();
36+
}
37+
38+
// Update status based on current items state
39+
statusUpdater.updateQueueProgress(queue.getMetadata().getNamespace(), queue.getMetadata().getName());
2640

2741
if (queue.isDone()) {
2842
logger.info("ExecutionQueue {} is already completed", queue.getMetadata().getName());
2943
return UpdateControl.noUpdate();
3044
}
3145

3246
// Only execute if there are pending items and no running items
33-
if (queue.getStatus().getRunning() == 0 && queue.getStatus().getPending() > 0) {
47+
if (queue.getStatus() != null &&
48+
queue.getStatus().getRunning() == 0 &&
49+
queue.getStatus().getPending() > 0) {
3450
logger.info("Starting execution of next item in queue: {}", queue.getMetadata().getName());
3551
queueExecutor.execute(queue);
3652
} else {

resilience-bench/operator/src/main/java/io/resiliencebench/execution/BenchmarkStatusUpdater.java

Lines changed: 0 additions & 101 deletions
This file was deleted.
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package io.resiliencebench.execution;
2+
3+
import io.resiliencebench.resources.queue.ExecutionQueue;
4+
import io.resiliencebench.resources.queue.ExecutionQueueItem;
5+
import io.resiliencebench.support.CustomResourceRepository;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.stereotype.Service;
9+
10+
@Service
11+
public class ExecutionQueueStatusUpdater {
12+
13+
private static final Logger logger = LoggerFactory.getLogger(ExecutionQueueStatusUpdater.class);
14+
15+
private final CustomResourceRepository<ExecutionQueue> queueRepository;
16+
17+
public ExecutionQueueStatusUpdater(CustomResourceRepository<ExecutionQueue> queueRepository) {
18+
this.queueRepository = queueRepository;
19+
}
20+
21+
/**
22+
* Updates the execution queue status based on the current queue items state
23+
*/
24+
public void updateQueueProgress(String namespace, String queueName) {
25+
try {
26+
var queueOpt = queueRepository.find(namespace, queueName);
27+
28+
if (queueOpt.isEmpty()) {
29+
logger.warn("Execution queue not found: {}/{}", namespace, queueName);
30+
return;
31+
}
32+
33+
var queue = queueOpt.get();
34+
updateQueueStatus(queue);
35+
queueRepository.updateStatus(queue);
36+
37+
logger.debug("Updated queue {} progress: {} running, {} completed, {} pending, {} total",
38+
queueName, queue.getStatus().getRunning(), queue.getStatus().getCompletedScenarios(),
39+
queue.getStatus().getPending(), queue.getStatus().getTotalScenarios());
40+
41+
} catch (Exception e) {
42+
logger.error("Error updating execution queue status for: " + namespace + "/" + queueName, e);
43+
}
44+
}
45+
46+
/**
47+
* Marks a specific scenario as started in the execution queue
48+
*/
49+
public void markScenarioAsStarted(String namespace, String queueName, String scenarioName) {
50+
try {
51+
logger.info("Marking scenario {} as started for queue {}/{}", scenarioName, namespace, queueName);
52+
var queueOpt = queueRepository.find(namespace, queueName);
53+
54+
if (queueOpt.isEmpty()) {
55+
logger.warn("Execution queue not found: {}/{}", namespace, queueName);
56+
return;
57+
}
58+
59+
var queue = queueOpt.get();
60+
var queueItem = queue.getItem(scenarioName);
61+
62+
if (queueItem != null && queueItem.isPending()) {
63+
queueItem.markAsRunning();
64+
updateQueueStatus(queue);
65+
queueRepository.update(queue);
66+
}
67+
68+
} catch (Exception e) {
69+
logger.error("Error marking scenario as started: " + scenarioName, e);
70+
}
71+
}
72+
73+
/**
74+
* Marks a specific scenario as completed in the execution queue
75+
*/
76+
public void markScenarioAsCompleted(String namespace, String queueName, String scenarioName) {
77+
try {
78+
logger.info("Marking scenario {} as completed for queue {}/{}", scenarioName, namespace, queueName);
79+
var queueOpt = queueRepository.find(namespace, queueName);
80+
81+
if (queueOpt.isEmpty()) {
82+
logger.warn("Execution queue not found: {}/{}", namespace, queueName);
83+
return;
84+
}
85+
86+
var queue = queueOpt.get();
87+
var queueItem = queue.getItem(scenarioName);
88+
89+
if (queueItem != null && queueItem.isRunning()) {
90+
queueItem.markAsCompleted();
91+
updateQueueStatus(queue);
92+
queueRepository.update(queue);
93+
}
94+
95+
} catch (Exception e) {
96+
logger.error("Error marking scenario as completed: " + scenarioName, e);
97+
}
98+
}
99+
100+
/**
101+
* Updates the execution queue status based on current items state
102+
*/
103+
private void updateQueueStatus(ExecutionQueue queue) {
104+
queue.updateStatusFromItems();
105+
106+
// Set observed generation if available
107+
if (queue.getMetadata() != null && queue.getMetadata().getGeneration() != null) {
108+
queue.getStatus().setObservedGeneration(queue.getMetadata().getGeneration());
109+
}
110+
}
111+
}

resilience-bench/operator/src/main/java/io/resiliencebench/execution/steps/UpdateStatusQueueStep.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import io.resiliencebench.resources.queue.ExecutionQueue;
88
import io.resiliencebench.resources.scenario.Scenario;
99
import io.resiliencebench.support.CustomResourceRepository;
10-
import io.resiliencebench.execution.BenchmarkStatusUpdater;
10+
import io.resiliencebench.execution.ExecutionQueueStatusUpdater;
1111
import org.springframework.stereotype.Service;
1212

1313
import static java.time.Duration.ofSeconds;
@@ -16,13 +16,13 @@
1616
public class UpdateStatusQueueStep extends ExecutorStep {
1717

1818
private final CustomResourceRepository<ExecutionQueue> executionRepository;
19-
private final BenchmarkStatusUpdater statusUpdater;
19+
private final ExecutionQueueStatusUpdater statusUpdater;
2020

2121
private final RetryConfig retryConfig;
2222

2323
public UpdateStatusQueueStep(KubernetesClient kubernetesClient,
2424
CustomResourceRepository<ExecutionQueue> executionRepository,
25-
BenchmarkStatusUpdater statusUpdater) {
25+
ExecutionQueueStatusUpdater statusUpdater) {
2626
super(kubernetesClient);
2727
this.executionRepository = executionRepository;
2828
this.statusUpdater = statusUpdater;
@@ -54,21 +54,21 @@ private void updateQueueItem(String queueName, String scenarioName, String names
5454

5555
@Override
5656
public void internalExecute(Scenario scenario, ExecutionQueue executionQueue) {
57-
var benchmarkName = executionQueue.getMetadata().getName();
57+
var queueName = executionQueue.getMetadata().getName();
5858
var scenarioName = scenario.getMetadata().getName();
5959
var namespace = scenario.getMetadata().getNamespace();
6060
Retry.of("updateQueueItem", retryConfig)
61-
.executeRunnable(() -> updateStatus(benchmarkName, scenarioName, namespace));
61+
.executeRunnable(() -> updateStatus(queueName, scenarioName, namespace));
6262
}
6363

64-
private void updateStatus(String benchmarkName, String scenarioName, String namespace) {
65-
updateQueueItem(benchmarkName, scenarioName, namespace);
66-
var queue = executionRepository.get(namespace, benchmarkName);
64+
private void updateStatus(String queueName, String scenarioName, String namespace) {
65+
updateQueueItem(queueName, scenarioName, namespace);
66+
var queue = executionRepository.get(namespace, queueName);
6767
var queueItem = queue.getItem(scenarioName);
68-
if (queueItem.isRunning()) {
69-
statusUpdater.markScenarioAsStarted(namespace, benchmarkName, scenarioName);
68+
if (queueItem.isPending()) {
69+
statusUpdater.markScenarioAsStarted(namespace, queueName, scenarioName);
7070
} else if (queueItem.isFinished()) {
71-
statusUpdater.markScenarioAsCompleted(namespace, benchmarkName, scenarioName);
71+
statusUpdater.markScenarioAsCompleted(namespace, queueName, scenarioName);
7272
}
7373
}
7474
}

0 commit comments

Comments
 (0)