-
Notifications
You must be signed in to change notification settings - Fork 358
Development: Improve Hazelcast cluster stability with build agent client mode
#12051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
461551e
7b5cc5d
2a32682
aac636a
3e316b1
87ecc7a
bdc282a
5014666
26bff1c
4d31e89
3f6c1f5
8bb9786
f1913a3
a98d0cd
a11a7e1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| package de.tum.cit.aet.artemis.buildagent.dto; | ||
|
|
||
| import org.jspecify.annotations.Nullable; | ||
|
|
||
| import de.tum.cit.aet.artemis.programming.domain.build.BuildStatus; | ||
|
|
||
| /** | ||
| * Common interface for build job DTOs, providing access to fields shared between | ||
| * active jobs ({@link BuildJobQueueItem}) and finished jobs ({@link FinishedBuildJobDTO}). | ||
| * <p> | ||
| * This interface enables type-safe handling of build jobs in REST endpoints that | ||
| * may return either type depending on the job's state. | ||
| */ | ||
| public interface BuildJobDTO { | ||
|
|
||
| /** | ||
| * @return the unique identifier of the build job | ||
| */ | ||
| String id(); | ||
|
|
||
| /** | ||
| * @return the name/description of the build job | ||
| */ | ||
| String name(); | ||
|
|
||
| /** | ||
| * @return the ID of the participation this build job belongs to | ||
| */ | ||
| long participationId(); | ||
|
|
||
| /** | ||
| * @return the ID of the course this build job belongs to | ||
| */ | ||
| long courseId(); | ||
|
|
||
| /** | ||
| * @return the ID of the exercise this build job belongs to | ||
| */ | ||
| long exerciseId(); | ||
|
|
||
| /** | ||
| * @return the current status of the build job, or null if not yet determined | ||
| */ | ||
| @Nullable | ||
| BuildStatus status(); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,8 +14,10 @@ | |
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ConcurrentSkipListSet; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.ScheduledFuture; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.TimeoutException; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.locks.ReentrantLock; | ||
| import java.util.function.Supplier; | ||
|
|
||
|
|
@@ -26,6 +28,7 @@ | |
| import org.springframework.beans.factory.annotation.Value; | ||
| import org.springframework.context.annotation.Lazy; | ||
| import org.springframework.context.annotation.Profile; | ||
| import org.springframework.scheduling.TaskScheduler; | ||
| import org.springframework.stereotype.Service; | ||
|
|
||
| import de.tum.cit.aet.artemis.buildagent.BuildAgentConfiguration; | ||
|
|
@@ -34,7 +37,6 @@ | |
| import de.tum.cit.aet.artemis.buildagent.dto.BuildResult; | ||
| import de.tum.cit.aet.artemis.core.exception.LocalCIException; | ||
| import de.tum.cit.aet.artemis.programming.service.localci.DistributedDataAccessService; | ||
| import de.tum.cit.aet.artemis.programming.service.localci.distributed.api.topic.DistributedTopic; | ||
|
|
||
| /** | ||
| * Coordinates submission, tracking, timeout handling, and cancellation of build jobs | ||
|
|
@@ -77,6 +79,12 @@ public class BuildJobManagementService { | |
|
|
||
| private static final Logger log = LoggerFactory.getLogger(BuildJobManagementService.class); | ||
|
|
||
| /** | ||
| * Interval between retries when waiting for cluster connection during startup. | ||
| * Uses the same interval as the availability check in SharedQueueProcessingService for consistency. | ||
| */ | ||
| private static final java.time.Duration CLUSTER_CONNECTION_RETRY_INTERVAL = java.time.Duration.ofSeconds(5); | ||
|
|
||
| private final BuildJobExecutionService buildJobExecutionService; | ||
|
|
||
| private final BuildAgentConfiguration buildAgentConfiguration; | ||
|
|
@@ -87,6 +95,25 @@ public class BuildJobManagementService { | |
|
|
||
| private final BuildLogsMap buildLogsMap; | ||
|
|
||
| private final TaskScheduler taskScheduler; | ||
|
|
||
| /** | ||
| * Scheduled future for retrying cluster connection and initialization. | ||
| * This is used when the build agent starts before any core node is available. | ||
| */ | ||
| private ScheduledFuture<?> connectionRetryFuture; | ||
|
|
||
| /** | ||
| * Flag to track whether initialization has completed successfully. | ||
| * Uses AtomicBoolean to ensure thread-safe access from the retry task. | ||
| */ | ||
| private final AtomicBoolean initialized = new AtomicBoolean(false); | ||
|
|
||
| /** | ||
| * UUID of the cancel build job message listener. Stored to allow removal on reconnection. | ||
| */ | ||
| private java.util.UUID cancelListenerId; | ||
|
|
||
| /** | ||
| * Guards job lifecycle state transitions that must be atomic across multiple data structures: | ||
| * <ul> | ||
|
|
@@ -150,34 +177,108 @@ public class BuildJobManagementService { | |
| private final Set<String> cancelledBuildJobs = new ConcurrentSkipListSet<>(); | ||
|
|
||
| public BuildJobManagementService(DistributedDataAccessService distributedDataAccessService, BuildJobExecutionService buildJobExecutionService, | ||
| BuildAgentConfiguration buildAgentConfiguration, BuildJobContainerService buildJobContainerService, BuildLogsMap buildLogsMap) { | ||
| BuildAgentConfiguration buildAgentConfiguration, BuildJobContainerService buildJobContainerService, BuildLogsMap buildLogsMap, TaskScheduler taskScheduler) { | ||
| this.buildJobExecutionService = buildJobExecutionService; | ||
| this.buildAgentConfiguration = buildAgentConfiguration; | ||
| this.buildJobContainerService = buildJobContainerService; | ||
| this.distributedDataAccessService = distributedDataAccessService; | ||
| this.buildLogsMap = buildLogsMap; | ||
| this.taskScheduler = taskScheduler; | ||
| } | ||
|
|
||
| /** | ||
| * Add a listener to the canceledBuildJobsTopic that cancels the build job for the given buildJobId. | ||
| * It gets broadcast to all nodes in the cluster. Only the node that is running the build job will cancel it. | ||
| * Initialize the service by setting up the cancel listener for build jobs. | ||
| * <p> | ||
| * When running as a Hazelcast client with asyncStart=true, the client may not yet be | ||
| * connected to the cluster when this method is called. In that case, we schedule | ||
| * periodic retries until the connection is established and initialization completes. | ||
| * <p> | ||
| * Additionally, a connection state listener is registered to handle reconnection after | ||
| * a connection loss. When the client reconnects to the cluster, the listener re-initializes | ||
| * the distributed topic listener which may have been lost during the disconnection. | ||
| * <p> | ||
| * EventListener cannot be used here, as the bean is lazy | ||
| * <a href="https://docs.spring.io/spring-framework/reference/core/beans/context-introduction.html#context-functionality-events-annotation">Spring Docs</a> | ||
| */ | ||
| @PostConstruct | ||
| public void init() { | ||
| DistributedTopic<String> canceledBuildJobsTopic = distributedDataAccessService.getCanceledBuildJobsTopic(); | ||
| canceledBuildJobsTopic.addMessageListener(buildJobId -> { | ||
| jobLifecycleLock.lock(); | ||
| try { | ||
| if (runningFutures.containsKey(buildJobId)) { | ||
| cancelBuildJob(buildJobId); | ||
| } | ||
| } | ||
| finally { | ||
| jobLifecycleLock.unlock(); | ||
| // Register a connection state listener to handle both initial connection and reconnection. | ||
| // On reconnection (isInitialConnection=false), the topic listener needs to be re-registered | ||
| // because it may have been lost when the connection was interrupted. | ||
| distributedDataAccessService.addConnectionStateListener(isInitialConnection -> { | ||
| if (!isInitialConnection) { | ||
| // This is a reconnection - reset the initialized flag so listeners are re-registered | ||
| log.info("Hazelcast client reconnected to cluster. Re-initializing BuildJobManagementService listeners."); | ||
| initialized.set(false); | ||
| } | ||
| tryInitialize(); | ||
| }); | ||
|
|
||
| // If already connected, tryInitialize was called by the listener above. | ||
| // If not connected yet, schedule periodic retries as a fallback. | ||
| if (!initialized.get() && !distributedDataAccessService.isConnectedToCluster()) { | ||
| log.info("Hazelcast client not yet connected to cluster. Scheduling periodic initialization retries every {} seconds.", CLUSTER_CONNECTION_RETRY_INTERVAL.toSeconds()); | ||
|
|
||
| connectionRetryFuture = taskScheduler.scheduleAtFixedRate(() -> { | ||
| if (tryInitialize()) { | ||
| // Initialization succeeded - cancel the retry task | ||
| if (connectionRetryFuture != null) { | ||
| connectionRetryFuture.cancel(false); | ||
| } | ||
| } | ||
| }, CLUSTER_CONNECTION_RETRY_INTERVAL); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Attempts to initialize the cancel listener for build jobs. | ||
| * <p> | ||
| * This method checks if the Hazelcast client is connected to the cluster before | ||
| * attempting to access distributed data structures. If not connected, it returns | ||
| * false so the caller can retry later. | ||
| * | ||
| * @return true if initialization succeeded, false if not connected to cluster | ||
| */ | ||
| private boolean tryInitialize() { | ||
| if (initialized.get()) { | ||
| return true; | ||
| } | ||
|
|
||
| if (!distributedDataAccessService.isConnectedToCluster()) { | ||
| log.debug("Cannot initialize BuildJobManagementService: not connected to Hazelcast cluster yet"); | ||
| return false; | ||
| } | ||
|
|
||
| try { | ||
| var canceledBuildJobsTopic = distributedDataAccessService.getCanceledBuildJobsTopic(); | ||
|
|
||
| // Remove old listener if it exists (prevents duplicate listeners on reconnection) | ||
| if (cancelListenerId != null) { | ||
| canceledBuildJobsTopic.removeMessageListener(cancelListenerId); | ||
| cancelListenerId = null; | ||
| } | ||
|
|
||
| cancelListenerId = canceledBuildJobsTopic.addMessageListener(buildJobId -> { | ||
| jobLifecycleLock.lock(); | ||
| try { | ||
| if (runningFutures.containsKey(buildJobId)) { | ||
| cancelBuildJob(buildJobId); | ||
| } | ||
| } | ||
| finally { | ||
| jobLifecycleLock.unlock(); | ||
| } | ||
| }); | ||
|
|
||
| initialized.set(true); | ||
| log.info("BuildJobManagementService initialized successfully - cancel listener registered"); | ||
| return true; | ||
| } | ||
| catch (Exception e) { | ||
| // This can happen if the connection is lost between the check and the access | ||
| log.warn("Failed to initialize BuildJobManagementService: {}. Will retry.", e.getMessage()); | ||
| return false; | ||
| } | ||
|
Comment on lines
+233
to
+281
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prevent concurrent The retry task and the connection listener can call 🔧 Proposed fix- private boolean tryInitialize() {
+ private synchronized boolean tryInitialize() {🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: ls1intum/Artemis
Length of output: 2092
🏁 Script executed:
Repository: ls1intum/Artemis
Length of output: 3638
Interface implementation is complete but has a type contract inconsistency.
Both
BuildJobQueueItemandFinishedBuildJobDTOcorrectly implement theBuildJobDTOinterface with all required methods. However,FinishedBuildJobDTOdeclaresstatus()as non-nullable while the interface specifies it as@Nullable. Although semantically sensible (finished jobs always have a determined status), this violates the interface contract and may cause type safety issues when the interface is used polymorphically. Consider markingstatus()as@NullableinFinishedBuildJobDTOfor consistency, or document why the stricter contract is intentional.🤖 Prompt for AI Agents