Skip to content

Conversation

@chetan-rns
Copy link
Collaborator

@chetan-rns chetan-rns commented Nov 13, 2025

What does this PR do / why we need it:

The operation field is modified by both the agent/principal and the application controller. We need to ensure that they don't overwrite each other. For example, when an operation is still terminating on the agent, the spec update events from the principal should not update the operation field.

  • Update EventWriter to use a FIFO queue to process events in order and to avoid overwriting each other.
  • Hardened the existing e2e tests. Increase timeout and remove finalizers from resources.
  • Add unit tests

Which issue(s) this PR fixes:

Fixes #?

How to test changes / Special notes to the reviewer:

Checklist

  • Documentation update is required by this PR (and has been updated) OR no documentation update is required.

Summary by CodeRabbit

  • Bug Fixes

    • Prevented unintended operation propagation during termination and skipped updates for already-deleted applications; treat immutable-field deletion errors as non-retryable.
  • Improvements

    • More reliable event delivery with per-resource queues, coalescing, and exponential-retry handling.
    • Callback events now distinguish termination vs spec updates and include operation info only when appropriate.
    • Better logging and conflict-aware retry behavior during termination.
  • Tests

    • Expanded unit and e2e tests for operation handling, event payloads, retry semantics, and extended timeouts.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Nov 13, 2025

Walkthrough

Centralizes guarded propagation of Application.Operation via a new helper, preserves existing operations for terminating resources, refactors callback emission to send TerminateOperation or cleaned SpecUpdate events, replaces event writer with per-resource queues and retry/backoff, adds immutable-field error handling, and extends e2e cleanup/retry logic and tests.

Changes

Cohort / File(s) Summary
Operation mirroring & updates
internal/manager/application/application.go
Adds operationToUse(existing,incoming) helper; uses it in UpdateManagedApp, UpdateAutonomousApp, and UpdateOperation to decide whether to preserve existing Operation or apply incoming. Guards DeletionTimestamp/GracePeriod propagation and adjusts patch target construction.
Event callback logic
principal/callbacks.go, principal/callbacks_test.go
Builds CloudEvents ev variable; emits TerminateOperation when operation is terminating, otherwise emits SpecUpdate with a deep-copied/mutated Application (clearing Operation when appropriate). Tests decode JSON payloads and add subtests validating operation inclusion and terminate-event emission.
Event delivery & retry
internal/event/event.go, internal/event/event_test.go
Replaces single-latest-event model with per-resource eventQueue and sentEvents map; implements send/retry lifecycle with exponential backoff, retry scheduling, coalescing, DELETE prioritization, and ACK handling. Tests expanded to cover queue behavior, retries, coalescing, delete handling, concurrency, and cancellation.
Kube client immutable-field handling
internal/kube/client.go
Adds isDeletionImmutableError(err) and treats immutable-field errors (immutable deletion fields) as non-retryable in IsRetryableError.
Principal update guards
principal/event.go
Skips update attempts when the existing Application has a DeletionTimestamp (on create AlreadyExists and SpecUpdate paths) to avoid immutable-field update errors; adds logging for skipped updates.
E2E fixture finalizer cleanup
test/e2e/fixture/fixture.go
Adds removeJobFinalizers to clear batch/v1 Job finalizers in the guestbook namespace and invokes it during CleanUp for both managed and autonomous agent clients.
E2E sync tests robustness
test/e2e/sync_test.go
Waits for running sync operations, wraps TerminateOperation calls in conflict-retry loops with logging, increases timeouts, adjusts hook names, and aligns termination patterns across managed/autonomous contexts.
Agent inbound logging
agent/inbound.go
Adds a trace log entry when receiving a TerminateOperation event in processIncomingApplication.

Sequence Diagram(s)

sequenceDiagram
  rect rgba(240,248,255,0.6)
    participant Client as Incoming Update
    participant Manager as Application Manager
    participant OpGuard as operationToUse()
    participant Patch as Patch Builder
    participant Callbacks as Callback Emitter
    participant EventQ as EventWriter (queues)
  end

  Client->>Manager: UpdateManagedApp(existing, incoming)
  Manager->>OpGuard: determine operationToUse(existing, incoming)
  OpGuard-->>Manager: selected Operation (existing or incoming)
  Manager->>Patch: build patch target (use selected Operation, guarded deletion fields)
  Patch-->>Manager: patch ready
  Manager->>Callbacks: trigger callback with cleaned copy
  alt incoming is terminating
    Callbacks->>EventQ: emit TerminateOperation event
  else
    Callbacks->>EventQ: emit SpecUpdate (Operation cleared as needed)
  end
  EventQ->>EventQ: enqueue per-resource (coalesce, handle DELETE)
  EventQ->>EventQ: sendUnsentEvent / retrySentEvent (backoff/retry)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • Pay extra attention to:
    • operationToUse edge cases across Managed vs Autonomous flows and DeletionTimestamp handling.
    • EventWriter concurrency, sent/unsent queue transitions, retry/backoff correctness and scheduling.
    • Tests that exercise concurrency/cancellation and any added timeouts or retry loops in e2e tests.
    • Immutable-field detection correctness in isDeletionImmutableError.

Possibly related PRs

Suggested reviewers

  • jannfis
  • jgwest
  • mikeshng

Poem

🐇 A rabbit hops with cautious cheer,
Preserving ops when endings near,
Events queued, retried, then sent,
Finalizers cleared before they're spent.
Hooray — neat hops, no rabbit fear!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 10.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix: race condition while updating operation field' accurately describes the primary purpose of the pull request, which addresses a race condition in the operation field modification.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

📝 Customizable high-level summaries are now available in beta!

You can now customize how CodeRabbit generates the high-level summary in your pull requests — including its content, structure, tone, and formatting.

  • Provide your own instructions using the high_level_summary_instructions setting.
  • Format the summary however you like (bullet lists, tables, multi-section layouts, contributor stats, etc.).
  • Use high_level_summary_in_walkthrough to move the summary from the description to the walkthrough section.

Example instruction:

"Divide the high-level summary into five sections:

  1. 📝 Description — Summarize the main change in 50–60 words, explaining what was done.
  2. 📓 References — List relevant issues, discussions, documentation, or related PRs.
  3. 📦 Dependencies & Requirements — Mention any new/updated dependencies, environment variable changes, or configuration updates.
  4. 📊 Contributor Summary — Include a Markdown table showing contributions:
    | Contributor | Lines Added | Lines Removed | Files Changed |
  5. ✔️ Additional Notes — Add any extra reviewer context.
    Keep each section concise (under 200 words) and use bullet or numbered lists for clarity."

Note: This feature is currently in beta for Pro-tier users, and pricing will be announced later.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov-commenter
Copy link

codecov-commenter commented Nov 13, 2025

Codecov Report

❌ Patch coverage is 75.11521% with 54 lines in your changes missing coverage. Please review.
✅ Project coverage is 46.33%. Comparing base (b317965) to head (21e8202).

Files with missing lines Patch % Lines
internal/event/event.go 85.88% 16 Missing and 7 partials ⚠️
internal/manager/application/application.go 42.85% 10 Missing and 2 partials ⚠️
principal/event.go 25.00% 8 Missing and 4 partials ⚠️
internal/kube/client.go 0.00% 6 Missing ⚠️
agent/inbound.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #648      +/-   ##
==========================================
+ Coverage   45.62%   46.33%   +0.71%     
==========================================
  Files          90       90              
  Lines        9991    10152     +161     
==========================================
+ Hits         4558     4704     +146     
- Misses       4965     4970       +5     
- Partials      468      478      +10     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@jannfis
Copy link
Collaborator

jannfis commented Nov 16, 2025

Hi @chetan-rns, the related e2e tests still seems to be flaky.

@chetan-rns
Copy link
Collaborator Author

Hi @chetan-rns, the related e2e tests still seems to be flaky.

@jannfis I think the recent test run is failing for a different reason. After investigating the logs, I found that the terminate operation never reaches the agent. This is due to the event writer's behavior: always send the most recent event. This can cause issues. For example:

  1. The principal adds the terminate operation event to the event writer.
  2. The event writer sends the events asynchronously.
  3. At the same time, the informer could add a regular spec update event, and the event writer overwrites the terminate event and only sends the spec update event to the agent.

The spec-churn goroutine in the failing e2e test should've amplified it. Surprisingly, it didn't fail even once when I ran it continuously for more than 5 hrs.

To fix the issue, we could update the event writer logic to keep the terminate event and only overwrite the spec update events.

Assisted-by: Cursor
Signed-off-by: Chetan Banavikalmutt <[email protected]>
Assisted-by: Cursor
Signed-off-by: Chetan Banavikalmutt <[email protected]>
Assisted-by: Cursor
Signed-off-by: Chetan Banavikalmutt <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (6)
internal/kube/client.go (1)

147-153: String-based error detection is pragmatic but fragile.

The string matching approach works but may break if Kubernetes changes error message format in future versions. Consider adding a comment noting this dependency on error message format.

That said, this is a reasonable approach since Kubernetes doesn't expose structured error types for this specific case.

test/e2e/fixture/fixture.go (1)

436-463: Good addition for test reliability.

The job finalizer removal helps prevent cleanup failures that could cause test flakiness. Using EnsureUpdate with retry logic is the right approach for handling conflicts.

Note: The deployment finalizer removal above (lines 427-433) uses direct Update without retry, while this uses EnsureUpdate. Consider aligning both to use EnsureUpdate for consistency and robustness.

 for _, deployment := range deploymentList.Items {
 	if len(deployment.Finalizers) > 0 {
-		deployment.Finalizers = nil
-		if err := managedAgentClient.Update(ctx, &deployment, metav1.UpdateOptions{}); err != nil {
+		err := EnsureUpdate(ctx, managedAgentClient, &deployment, func(obj KubeObject) {
+			obj.SetFinalizers(nil)
+		})
+		if err != nil {
 			return err
 		}
 	}
 }
principal/event.go (1)

190-199: Guard against updating apps being deleted is correct.

This prevents the immutable field error when trying to update an application that's already marked for deletion.

Minor: The error message "Application should exist before attempting to update" at line 193 could be more accurate since the Get could fail for reasons other than non-existence (e.g., network issues). Consider:

-logCtx.WithError(err).Errorf("Application should exist before attempting to update")
+logCtx.WithError(err).Errorf("Failed to retrieve existing application")
internal/event/event.go (1)

893-904: Resource ID collection handles both queues correctly.

The code properly collects resource IDs from both unsentEvents and sentEvents, avoiding duplicates when a resource appears in both. This ensures all events are processed in the send loop.

Minor optimization: consider using a map for deduplication instead of the exists check:

-			resourceIDs := make([]string, 0, len(ew.unsentEvents)+len(ew.sentEvents))
+			resourceIDSet := make(map[string]struct{}, len(ew.unsentEvents)+len(ew.sentEvents))
 			for resID := range ew.unsentEvents {
-				resourceIDs = append(resourceIDs, resID)
+				resourceIDSet[resID] = struct{}{}
 			}
 			for resID := range ew.sentEvents {
-				// Only add if not already in the list
-				if _, exists := ew.unsentEvents[resID]; !exists {
-					resourceIDs = append(resourceIDs, resID)
-				}
+				resourceIDSet[resID] = struct{}{}
 			}
+			resourceIDs := make([]string, 0, len(resourceIDSet))
+			for resID := range resourceIDSet {
+				resourceIDs = append(resourceIDs, resID)
+			}

This is cleaner and avoids the O(n) map lookup in the hot path.

internal/event/event_test.go (1)

78-86: Test logic is correct but variable naming could be clearer.

The test correctly creates a new event with a different eventID and verifies that removing it doesn't affect the original event in the queue. However, the variable name newEv is created but could be used more consistently throughout the test.

Consider using the newEv variable in the subsequent assertions for clarity:

 		// Try removing an event with the same resourceID but different eventID.
 		app1.ResourceVersion = "3"
 		newEv = es.ApplicationEvent(Update, app1)
 		evSender.Remove(newEv)
 
-		// The old event should not removed from the queue.
-		latestEvent = evSender.Get(ResourceID(newEv))
+		// The old event should not be removed from the queue.
+		latestEvent = evSender.Get(ResourceID(ev))
internal/manager/application/application.go (1)

371-380: Consider including Labels and Annotations in the patch source for consistency.

The source object in this patchFn omits Labels and Annotations, while UpdateManagedApp's patchFn (lines 243-250) includes them. This inconsistency could lead to:

  • Larger, less efficient patches (showing label/annotation changes as additions rather than replacements)
  • Maintenance confusion when comparing similar functions

Apply this diff to align with UpdateManagedApp's pattern:

 source := &v1alpha1.Application{
 	ObjectMeta: v1.ObjectMeta{
+		Labels:                     existing.Labels,
+		Annotations:                existing.Annotations,
 		DeletionTimestamp:          existing.DeletionTimestamp,
 		DeletionGracePeriodSeconds: existing.DeletionGracePeriodSeconds,
 		Finalizers:                 existing.Finalizers,
 	},
 	Spec:      existing.Spec,
 	Status:    existing.Status,
 	Operation: existing.Operation,
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3811288 and 91002a9.

📒 Files selected for processing (10)
  • agent/inbound.go (1 hunks)
  • internal/event/event.go (8 hunks)
  • internal/event/event_test.go (6 hunks)
  • internal/kube/client.go (2 hunks)
  • internal/manager/application/application.go (7 hunks)
  • principal/callbacks.go (2 hunks)
  • principal/callbacks_test.go (4 hunks)
  • principal/event.go (2 hunks)
  • test/e2e/fixture/fixture.go (2 hunks)
  • test/e2e/sync_test.go (7 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • principal/callbacks_test.go
🧰 Additional context used
🧬 Code graph analysis (8)
test/e2e/fixture/fixture.go (1)
test/e2e/fixture/kubeclient.go (2)
  • KubeClient (67-73)
  • KubeObject (49-52)
agent/inbound.go (1)
internal/logging/logging.go (1)
  • Trace (285-287)
principal/event.go (2)
internal/logging/logging.go (1)
  • Trace (285-287)
internal/backend/interface.go (1)
  • Application (83-93)
internal/event/event.go (3)
internal/logging/logging.go (1)
  • Trace (285-287)
internal/checkpoint/checkpoint.go (1)
  • Step (27-31)
pkg/api/grpc/eventstreamapi/eventstream.pb.go (3)
  • Event (41-47)
  • Event (62-62)
  • Event (77-79)
test/e2e/sync_test.go (2)
internal/logging/logfields/logfields.go (3)
  • Status (130-130)
  • Name (59-59)
  • Message (132-132)
internal/event/event.go (1)
  • TerminateOperation (61-61)
internal/manager/application/application.go (4)
internal/logging/logfields/logfields.go (2)
  • Status (130-130)
  • Application (49-49)
internal/backend/interface.go (1)
  • Application (83-93)
internal/backend/mocks/Application.go (1)
  • Application (18-20)
principal/mocks/Application.go (1)
  • Application (16-18)
internal/event/event_test.go (1)
internal/event/event.go (5)
  • Update (57-57)
  • NewEventWriter (744-751)
  • Create (55-55)
  • Delete (56-56)
  • EventID (635-642)
principal/callbacks.go (2)
internal/event/event.go (3)
  • Event (112-115)
  • TerminateOperation (61-61)
  • SpecUpdate (58-58)
pkg/api/grpc/eventstreamapi/eventstream.pb.go (3)
  • Event (41-47)
  • Event (62-62)
  • Event (77-79)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Build and push image
  • GitHub Check: Run unit tests
  • GitHub Check: Build & cache Go code
  • GitHub Check: Run end-to-end tests
  • GitHub Check: Lint Go code
  • GitHub Check: Analyze (go)
🔇 Additional comments (20)
agent/inbound.go (1)

181-187: LGTM!

The trace log addition improves observability for terminate operation events, which is valuable for debugging the race condition scenarios this PR addresses.

internal/kube/client.go (1)

125-130: Early exit for immutable deletion errors is correct.

Properly prevents futile retries when a resource is being deleted. The short-circuit placement before other checks is appropriate.

principal/event.go (1)

214-223: Consistent deletion guard for SpecUpdate path.

Same pattern as above, correctly skipping updates for apps being deleted. The same minor suggestion about error message applies here.

principal/callbacks.go (1)

112-131: Core fix for the race condition - operation field handling is correct.

The logic properly addresses the race condition:

  1. DeepCopy (line 122) correctly avoids mutating the informer's cached object.
  2. Operation clearing (lines 123-125): Only preserves Operation on nil→non-nil transitions (principal-initiated syncs), preventing the operation field from being echoed back on regular updates.
  3. TerminateOperation events are now properly separated from SpecUpdate events.

This ensures that when an operation is terminating on the agent, spec updates from the principal won't overwrite the operation field.

test/e2e/sync_test.go (2)

347-354: Good addition: waiting for sync to start on principal before terminating.

This ensures the operation is actually running on the principal before attempting to terminate, reducing race conditions in the test.


462-470: Good test hardening: waiting for sync on both sides.

Waiting for the sync operation to start on both the principal and autonomous agent before terminating improves test reliability.

internal/event/event.go (6)

708-742: Good architectural improvement for race condition handling.

The refactoring from a single latest event to separate unsentEvents queue and sentEvents map provides a clear separation between pending events and in-flight events awaiting ACK. The addition of retryCount enables proper max retry enforcement.


782-794: DELETE event prioritization is correctly implemented.

The logic properly clears all existing events (both sent and unsent) and ensures only the DELETE event remains in the queue. This prevents any race conditions where spec updates could be sent after a deletion.


960-970: Verify the event loss behavior after max retries.

When retries are exhausted, the event is removed from sentEvents to unblock the queue, but the event is permanently lost. This prevents the queue from being blocked indefinitely, but it means the event will never be delivered.

Consider whether this is acceptable behavior or if you need:

  1. Dead letter queue for failed events
  2. Alerting/logging when events are dropped
  3. Different handling for critical event types (e.g., DELETE, TerminateOperation)

Additionally, verify this behavior aligns with your reliability requirements and document the trade-off in the code comments.


1032-1037: Critical race prevention: moving to sentEvents before unlock.

Correctly moves the event to sentEvents before releasing the lock. This prevents a race where an ACK could arrive after the unlock but before the event is tracked, which would cause the ACK to be ignored and lead to unnecessary retries.


1148-1170: Event coalescing correctly preserves event type ordering.

The add method only coalesces events at the tail when the new event type matches the tail event type. This preserves the ordering of different event types in the queue (e.g., [Create, Update, Update] + Update[Create, Update], not [Update]). This is the correct behavior for maintaining event causality.


823-832: Correct prioritization in Get: sent before unsent.

The method correctly returns sent events (awaiting ACK) before checking unsent events. This ensures that status queries and retry logic operate on the event that's currently in-flight, not on newer events still in the queue.

internal/event/event_test.go (2)

150-442: Excellent test coverage for the new event queue semantics.

The new test cases comprehensively cover:

  • DELETE event prioritization and queue clearing
  • Concurrent operations (race condition prevention)
  • Event lifecycle (unsent → sent → retry → exhaustion)
  • Exponential backoff and max retry behavior
  • ACK event special handling
  • Edge cases (empty IDs, non-existent resources)
  • Get method prioritization (sent before unsent)
  • Context cancellation
  • Event coalescing

This test suite validates the race condition fixes and provides strong confidence in the refactored implementation.


256-286: Retry test validates exponential backoff correctly.

The test properly verifies:

  1. Event moves to sentEvents after first send
  2. retryAfter timestamp is set for backoff
  3. Retry respects timing (doesn't retry before retryAfter)
  4. Retry works after retryAfter passes
  5. retryCount increments correctly

This covers the critical retry logic introduced in the refactoring.

internal/manager/application/application.go (6)

208-208: LGTM: Correctly prevents operation overwrites in managed mode.

The use of operationToUse in both the update and patch paths ensures that spec updates from the principal won't overwrite operations on a managed agent, especially when they're terminating. This directly addresses the race condition described in the PR objectives.

Also applies to: 241-241


347-347: Verify that UpdateAutonomousApp intentionally doesn't use operationToUse.

Unlike UpdateManagedApp (line 208) and UpdateOperation (line 492), this method directly assigns incoming.Operation without the race-condition guards.

This might be correct since in autonomous mode the agent is authoritative and the principal should accept its operation state. However, this creates asymmetry in the codebase and could be confusing.

Please confirm:

  1. Is this intentional because the agent has authority in autonomous mode?
  2. Are there race scenarios where the principal's existing operation should be preserved even when receiving updates from an autonomous agent?

Note: The AI summary states "UpdateAutonomousApp's patch path continues to set Operation using operationToUse(existing, incoming)" but the code at lines 347 and 365 still uses incoming.Operation directly.

Also applies to: 365-365


338-343: LGTM: Deletion timestamp guards prevent unintended overwrites.

The conditional checks ensure DeletionTimestamp and DeletionGracePeriodSeconds are only set when not already present on the existing resource. This defensive approach prevents clearing or overwriting deletion state that's already in progress.

Also applies to: 367-370


492-492: LGTM: Operation updates now protected against race conditions.

Using operationToUse in both paths ensures that operation updates on autonomous agents preserve existing operations during termination, preventing the race condition where spec updates overwrite terminating operations.

Also applies to: 505-505


525-539: Critical test coverage gaps require adding unit tests before merging.

Verification confirms that operationToUse lacks direct unit test coverage for all four guard scenarios:

  1. Incoming operation is nil — No test
  2. Existing operation in OperationTerminating phase — No test
  3. Normal case — Partially tested (indirectly via UpdateManagedApp/UpdateOperation, but not verifying guard logic)
  4. Existing.Status.OperationState is nil — No test

The function is used at lines 208, 241 (UpdateManagedApp) and 492, 505 (UpdateOperation), but existing tests only cover the happy path with both operations non-nil. No test validates that the OperationTerminating guard prevents the race condition.

The implementation is sound, but the 40% patch coverage indicates tests must be added to verify all guard branches before deployment. This is critical given the race condition complexity being fixed.


525-539: Remove references to non-existent event writer component; focus on missing direct unit tests.

The review contains claims about an event writer component and queue mechanism that don't exist in internal/manager/application/application.go. The actual PR refactors duplicated operation-handling logic into a centralized operationToUse() function called 4 times, which is a valid improvement.

The legitimate concern is that operationToUse() lacks direct unit tests despite being called in multiple code paths. While the function logic is sound (preserving terminating state, handling nil operations), add unit tests covering:

  • Incoming operation is nil
  • Existing app is in terminating state
  • Normal operation update scenario

Signed-off-by: Chetan Banavikalmutt <[email protected]>
Signed-off-by: Chetan Banavikalmutt <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
internal/event/event.go (1)

893-903: Minor: duplicate check could be simplified with a set.

The logic correctly avoids duplicate resource IDs when merging keys from both maps. For clarity, you could use a map[string]struct{} as a set, but the current approach works correctly.

 ew.mu.RLock()
 // Collect all resource IDs that have either unsent or sent events
-resourceIDs := make([]string, 0, len(ew.unsentEvents)+len(ew.sentEvents))
-for resID := range ew.unsentEvents {
-    resourceIDs = append(resourceIDs, resID)
-}
-for resID := range ew.sentEvents {
-    // Only add if not already in the list
-    if _, exists := ew.unsentEvents[resID]; !exists {
-        resourceIDs = append(resourceIDs, resID)
-    }
-}
+resourceIDSet := make(map[string]struct{}, len(ew.unsentEvents)+len(ew.sentEvents))
+for resID := range ew.unsentEvents {
+    resourceIDSet[resID] = struct{}{}
+}
+for resID := range ew.sentEvents {
+    resourceIDSet[resID] = struct{}{}
+}
 ew.mu.RUnlock()
+
+for resID := range resourceIDSet {
+    ew.sendEvent(resID)
+}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 91002a9 and 21e8202.

📒 Files selected for processing (4)
  • internal/event/event.go (8 hunks)
  • internal/event/event_test.go (7 hunks)
  • principal/callbacks.go (2 hunks)
  • test/e2e/sync_test.go (7 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
test/e2e/sync_test.go (2)
internal/logging/logfields/logfields.go (3)
  • Status (130-130)
  • Name (59-59)
  • Message (132-132)
internal/event/event.go (1)
  • TerminateOperation (61-61)
principal/callbacks.go (2)
internal/event/event.go (3)
  • Event (112-115)
  • TerminateOperation (61-61)
  • SpecUpdate (58-58)
pkg/api/grpc/eventstreamapi/eventstream.pb.go (3)
  • Event (41-47)
  • Event (62-62)
  • Event (77-79)
internal/event/event.go (2)
internal/logging/logfields/logfields.go (2)
  • EventID (33-33)
  • Event (34-34)
pkg/api/grpc/eventstreamapi/eventstream.pb.go (3)
  • Event (41-47)
  • Event (62-62)
  • Event (77-79)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Run unit tests
  • GitHub Check: Run end-to-end tests
  • GitHub Check: Lint Go code
  • GitHub Check: Build & cache Go code
  • GitHub Check: Build and push image
  • GitHub Check: Analyze (go)
🔇 Additional comments (12)
principal/callbacks.go (2)

112-130: Operation handling logic looks correct for the race condition fix.

The conditional logic properly:

  1. Sends TerminateOperation events when the operation is being terminated
  2. Deep-copies before mutation to avoid corrupting informer cache
  3. Only allows Operation to propagate on nil→non-nil transitions (principal-initiated sync)
  4. Clears Operation on subsequent spec updates to prevent overwriting agent-side operation state

This correctly addresses the race condition described in the PR objectives.


678-683: LGTM!

The isTerminateOperation function correctly identifies when an operation transitions to the terminating phase with proper nil guards.

test/e2e/sync_test.go (2)

371-387: Good improvement: retry pattern now handles errors gracefully.

The retry logic correctly:

  1. Retries on conflict errors with logging
  2. Logs other errors instead of failing immediately with requires.NoError
  3. Returns false to allow Eventually to retry until timeout

This addresses the previous review feedback about assertion failures inside Eventually callbacks.


463-471: Good addition: ensures sync operation is running before termination.

Waiting for the sync operation to start on both the principal and agent sides before calling TerminateOperation reduces test flakiness by ensuring the preconditions are properly established.

internal/event/event_test.go (3)

68-69: Good fix: now correctly uses newEv for the updated event.

The test now properly adds the new event with ResourceVersion: "2" instead of re-adding the original event. This was flagged in a previous review.


151-180: Excellent test coverage for DELETE event prioritization.

This test verifies the critical behavior that DELETE events clear all pending events for a resource, ensuring proper cleanup semantics. The assertions confirm both unsent and sent event maps are properly cleaned.


182-235: Good concurrency smoke test.

This test verifies that concurrent Add and Remove operations don't cause panics or race conditions. Consider running tests with -race flag in CI to catch potential data races.

internal/event/event.go (5)

782-794: DELETE event handling correctly clears all pending events.

The logic properly:

  1. Removes any sent event awaiting ACK
  2. Replaces the unsent queue with only the DELETE event
  3. Ensures DELETE is never coalesced or overwritten

This prevents stale events from being sent after a resource is deleted.


945-951: Potential issue: missing unlock on early return.

At line 950, if retryAfter hasn't passed, the function returns without unlocking sentMsg.mu. However, looking at line 945, sentMsg.mu.Lock() is called, but line 949 shows sentMsg.mu.Unlock() before return - this appears correct.

Wait, let me re-read: Line 945 Lock(), line 948 checks condition, line 949 Unlock(), line 950 return. This is correct.


1032-1037: Good design: move to sentEvents before unlock prevents ACK race.

Moving the event to sentEvents before releasing ew.mu ensures that if an ACK arrives immediately after the send, the event will be found and properly removed. This is a correct pattern for preventing race conditions.


1129-1151: Event coalescing logic is correct.

The tail-coalescing approach ensures that consecutive events of the same type (e.g., multiple spec updates) are merged, keeping only the latest. This reduces unnecessary network traffic while preserving event ordering for different event types.


777-780: Good defensive check for empty resource ID.

The early return when resID is empty prevents adding events that cannot be properly tracked or sent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants