Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 6 additions & 18 deletions crd/benchmarks.resiliencebench.io-v1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,14 @@ spec:
scope: Namespaced
versions:
- additionalPrinterColumns:
- jsonPath: .status.completedScenarios
name: Completed
priority: 3
type: integer
- jsonPath: .status.executionQueueName
name: Execution Queue
priority: 1
type: string
- jsonPath: .status.phase
name: Phase
priority: 0
type: string
- jsonPath: .status.runningScenarios
name: Running
priority: 2
type: integer
- jsonPath: .status.totalScenarios
name: Total Scenarios
priority: 1
type: integer
name: v1beta1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -186,12 +178,12 @@ spec:
type: object
status:
properties:
completedScenarios:
type: integer
completionTime:
type: string
executionId:
type: string
executionQueueName:
type: string
lastReconcileTime:
type: string
message:
Expand All @@ -200,12 +192,8 @@ spec:
type: integer
phase:
type: string
runningScenarios:
type: integer
startTime:
type: string
totalScenarios:
type: integer
type: object
type: object
served: true
Expand Down
48 changes: 47 additions & 1 deletion crd/queues.resiliencebench.io-v1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,28 @@ spec:
singular: queue
scope: Namespaced
versions:
- name: v1beta1
- additionalPrinterColumns:
- jsonPath: .status.completedScenarios
name: Completed
priority: 3
type: integer
- jsonPath: .status.pendingScenarios
name: Pending
priority: 4
type: integer
- jsonPath: .status.phase
name: Phase
priority: 0
type: string
- jsonPath: .status.runningScenarios
name: Running
priority: 2
type: integer
- jsonPath: .status.totalScenarios
name: Total Scenarios
priority: 1
type: integer
name: v1beta1
schema:
openAPIV3Schema:
properties:
Expand Down Expand Up @@ -45,7 +66,32 @@ spec:
type: string
type: object
status:
properties:
completedScenarios:
type: integer
completionTime:
type: string
executionId:
type: string
lastReconcileTime:
type: string
message:
type: string
observedGeneration:
type: integer
pendingScenarios:
type: integer
phase:
type: string
runningScenarios:
type: integer
startTime:
type: string
totalScenarios:
type: integer
type: object
type: object
served: true
storage: true
subresources:
status: {}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.List;

import io.resiliencebench.execution.QueueExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -22,38 +21,34 @@
@ControllerConfiguration
public class BenchmarkController implements Reconciler<Benchmark> {

private static final Logger logger = LoggerFactory.getLogger(BenchmarkController.class);
private static final Logger logger = LoggerFactory.getLogger(BenchmarkController.class);

private final CustomResourceRepository<Scenario> scenarioRepository;
private final CustomResourceRepository<Scenario> scenarioRepository;
private final CustomResourceRepository<Workload> workloadRepository;
private final CustomResourceRepository<ExecutionQueue> queueRepository;

private final CustomResourceRepository<Workload> workloadRepository;
private final CustomResourceRepository<ExecutionQueue> queueRepository;

private final QueueExecutor queueExecutor;

public BenchmarkController(QueueExecutor queueExecutor,
CustomResourceRepository<Scenario> scenarioRepository,
CustomResourceRepository<Workload> workloadRepository,
CustomResourceRepository<ExecutionQueue> queueRepository) {
this.queueExecutor = queueExecutor;
this.scenarioRepository = scenarioRepository;
this.workloadRepository = workloadRepository;
this.queueRepository = queueRepository;
}
public BenchmarkController(
CustomResourceRepository<Scenario> scenarioRepository,
CustomResourceRepository<Workload> workloadRepository,
CustomResourceRepository<ExecutionQueue> queueRepository) {
this.scenarioRepository = scenarioRepository;
this.workloadRepository = workloadRepository;
this.queueRepository = queueRepository;
}

@Override
public UpdateControl<Benchmark> reconcile(Benchmark benchmark, Context<Benchmark> context) {
var benchmarkName = benchmark.getMetadata().getName();
var namespace = benchmark.getMetadata().getNamespace();

logger.info("Reconciling benchmark: {} in namespace: {}", benchmarkName, namespace);

try {
var currentStatus = benchmark.getStatus();
var currentGeneration = benchmark.getMetadata().getGeneration();
if (currentStatus != null &&
currentStatus.isCompleted() &&

if (currentStatus != null &&
currentStatus.isCompleted() &&
!currentStatus.needsReconciliation(currentGeneration)) {
logger.info("Benchmark {} is already completed and no spec changes detected, skipping reconciliation", benchmarkName);
currentStatus.updateReconcileTime();
Expand All @@ -72,16 +67,15 @@ public UpdateControl<Benchmark> reconcile(Benchmark benchmark, Context<Benchmark
return updateStatusWithError(benchmark, "No scenarios generated");
}

var executionQueue = prepareToRunScenarios(benchmark, scenariosList);
createQueue(benchmark, scenariosList);

var status = createOrUpdateStatus(benchmark, scenariosList.size());
var status = createOrUpdateStatus(benchmark, benchmarkName);
benchmark.setStatus(status);

logger.info("Benchmark reconciled {}. {} scenarios created", benchmarkName, scenariosList.size());

queueExecutor.execute(executionQueue);

return UpdateControl.updateStatus(benchmark);

} catch (Exception e) {
logger.error("Error during reconciliation of benchmark: " + benchmarkName, e);
return updateStatusWithError(benchmark, "Reconciliation error: " + e.getMessage());
Expand All @@ -91,60 +85,46 @@ public UpdateControl<Benchmark> reconcile(Benchmark benchmark, Context<Benchmark
private UpdateControl<Benchmark> updateStatusWithError(Benchmark benchmark, String errorMessage) {
var status = benchmark.getStatus();
if (status == null) {
status = new BenchmarkStatus(0);
status = new BenchmarkStatus(benchmark.getMetadata().getName());
benchmark.setStatus(status);
}
status.markAsFailed(errorMessage);
status.setObservedGeneration(benchmark.getMetadata().getGeneration());
return UpdateControl.updateStatus(benchmark);
}

private BenchmarkStatus createOrUpdateStatus(Benchmark benchmark, int totalScenarios) {
private BenchmarkStatus createOrUpdateStatus(Benchmark benchmark, String queueName) {
var currentStatus = benchmark.getStatus();
var currentGeneration = benchmark.getMetadata().getGeneration();

if (currentStatus == null || currentStatus.needsReconciliation(currentGeneration)) {
var newStatus = new BenchmarkStatus(totalScenarios);
var newStatus = new BenchmarkStatus(queueName);
newStatus.setObservedGeneration(currentGeneration);
logger.info("Created new status for benchmark {} with {} total scenarios",
benchmark.getMetadata().getName(), totalScenarios);
logger.info("Created new status for benchmark {} with execution queue {}",
benchmark.getMetadata().getName(), queueName);
return newStatus;
} else {
currentStatus.updateReconcileTime();
currentStatus.setObservedGeneration(currentGeneration);
currentStatus.setExecutionQueueName(queueName);
logger.info("Updated existing status for benchmark {}", benchmark.getMetadata().getName());
return currentStatus;
}
}

private List<Scenario> createScenarios(Benchmark benchmark, Workload workload) {
var namespace = benchmark.getMetadata().getNamespace();

scenarioRepository.deleteAll(namespace);

var scenariosList = ScenarioFactory.create(benchmark, workload);
scenariosList.forEach(scenarioRepository::create);

logger.info("Created {} scenarios for benchmark: {}", scenariosList.size(), benchmark.getMetadata().getName());
return scenariosList;
}

private ExecutionQueue prepareToRunScenarios(Benchmark benchmark, List<Scenario> scenariosList) {
private void createQueue(Benchmark benchmark, List<Scenario> scenariosList) {
var namespace = benchmark.getMetadata().getNamespace();

scenarioRepository.deleteAll(namespace);
scenariosList.forEach(scenarioRepository::create);

queueRepository.deleteAll(namespace);
var queueCreated = ExecutionQueueFactory.create(benchmark, scenariosList);

if (benchmark.getStatus() != null && benchmark.getStatus().getExecutionId() != null) {
if (queueCreated.getMetadata().getLabels() == null) {
queueCreated.getMetadata().setLabels(new java.util.HashMap<>());
}
queueCreated.getMetadata().getLabels().put("execution-id", benchmark.getStatus().getExecutionId());
}

return queueRepository.create(queueCreated);
queueRepository.create(queueCreated);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.resiliencebench;

import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.resiliencebench.execution.QueueExecutor;
import io.resiliencebench.execution.ExecutionQueueStatusUpdater;
import io.resiliencebench.resources.benchmark.Benchmark;
import io.resiliencebench.resources.queue.ExecutionQueue;
import io.resiliencebench.support.CustomResourceRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ControllerConfiguration
public class ExecutionQueueController implements Reconciler<ExecutionQueue> {

private static final Logger logger = LoggerFactory.getLogger(ExecutionQueueController.class);

private final QueueExecutor queueExecutor;
private final ExecutionQueueStatusUpdater statusUpdater;

public ExecutionQueueController(QueueExecutor queueExecutor,
ExecutionQueueStatusUpdater statusUpdater) {
this.queueExecutor = queueExecutor;
this.statusUpdater = statusUpdater;
}

@Override
public UpdateControl<ExecutionQueue> reconcile(ExecutionQueue queue, Context<ExecutionQueue> context) {
logger.info("Reconciling ExecutionQueue: {}", queue.getMetadata().getName());

// Initialize status if needed
if (queue.getStatus() == null) {
queue.updateStatusFromItems();
}

// Update status based on current items state
statusUpdater.updateQueueProgress(queue.getMetadata().getNamespace(), queue.getMetadata().getName());

if (queue.isDone()) {
logger.info("ExecutionQueue {} is already completed", queue.getMetadata().getName());
return UpdateControl.noUpdate();
}

// Only execute if there are pending items and no running items
if (queue.getStatus() != null &&
queue.getStatus().getRunning() == 0 &&
queue.getStatus().getPending() > 0) {
logger.info("Starting execution of next item in queue: {}", queue.getMetadata().getName());
queueExecutor.execute(queue);
} else {
logger.debug("Queue {} is already being processed or has no pending items", queue.getMetadata().getName());
}

return UpdateControl.updateStatus(queue);
}
}
Loading