Skip to content

Conversation

@keynslug
Copy link

@keynslug keynslug commented Mar 24, 2025

Meaning:

  1. Annotate pods with DS-related conditions.
  2. Ensure safe scale-down of DS enabled StatefulSets.
  3. Correctly assign DS replication sites during scaling and migration of StatefulSets.

Summary by CodeRabbit

  • New Features

    • Introduced advanced DS replication monitoring with detailed status tracking for improved database replication insights.
    • Integrated refined configuration management that streamlines dashboard and service settings, while enabling graceful node lifecycle handling.
    • Added support for new constants and structures to enhance EMQX status tracking and replication management.
  • Refactor

    • Enhanced pod readiness, scaling, and reconciliation processes for more stable and predictable deployments.
    • Simplified configuration handling and error management across various components.
  • Documentation

    • Updated API references to reflect the new replication and node configuration details, providing clearer guidance.
    • Added new sections in documentation for DSDBReplicationStatus and DSReplicationStatus structures.
  • Tests

    • Restructured end-to-end and unit tests to validate the updated functionality and ensure robust behavior.
    • Introduced new helper functions in tests to streamline setup and improve clarity.

@coderabbitai
Copy link

coderabbitai bot commented Mar 24, 2025

Walkthrough

The changes introduce extensive enhancements in the EMQX operator. New constants and status types for DS replication (e.g., DSReplicationStatus and DSDBReplicationStatus) are added to track database replication. Configuration handling is refactored to use a dedicated config.Conf object with updated methods for dashboard and listener port retrieval. Controller logic is updated with revised function signatures, new lifecycle hooks (PreStop), DS replica set updates, and pod condition reconciliation. The CRD schema and documentation have been extended to include new pod and DS replication fields. Several obsolete functions and tests have been removed, and dependencies adjusted accordingly.

Changes

File(s) Change Summary
apis/apps/v2beta1/const.go Added DSReplicationSite constant for indicating DS replication responsibility.
apis/apps/v2beta1/status.go, .../zz_generated.deepcopy.go Introduced new types DSReplicationStatus and DSDBReplicationStatus, with accompanying DeepCopy methods; updated EMQXStatus and reordered fields in EMQXNode.
apis/apps/v2beta1/util.go, apis/apps/v2beta1/util_test.go Removed functions for dashboard/listener port mapping and associated tests; added FindPodCondition for pod condition retrieval.
config/crd/bases/apps.emqx.io_emqxes.yaml Updated CRD schema: added podName and nested dsReplication (with fields such as dbs, minReplicas, maxReplicas, etc.).
Controllers (e.g., add_bootstrap_resource.go, add_emqx_core.go, add_emqx_repl.go, add_svc.go, config/emqx.go, files under ds/, ds_reflect_pod_condition.go, ds_update_replica_sets.go, emqx_controller.go, rebalance_controller.go, requester.go) Refactored controller logic to use config.Conf; updated function signatures; introduced new lifecycle hooks (PreStop) for clean pod termination; added DS replication management and pod condition update/reconciliation mechanisms; streamlined requester creation.
Sync functions (e.g., sync_emqx_config.go, sync_emqx_config_test.go [deleted], sync_pods.go, sync_pods_suite_test.go) Removed legacy merge function; incorporated syncPodsReconciliation for RS/STS scaling and refined scale-down logic.
controllers/apps/v2beta1/update_emqx_status.go, update_pod_conditions.go, util.go (in controllers) Modified status updates and condition checks, improved pod condition patching, and removed obsolete helper functions.
Documentation (docs/en_US/reference/v2beta1-reference.md, docs/zh_CN/reference/v2beta1-reference.md) Added sections for DS replication structures and updated EMQX node fields.
End-to-end tests (e2e/v2beta1/e2e_rebalance_test.go, e2e/v2beta1/e2e_test.go, e2e/v2beta1/suite_test.go) Reworked test setups: updated instance initialization, introduced helper functions (e.g. createNamespace, createInstance, refreshInstance), and refined assertions.
go.mod Added dependency on github.com/lithammer/dedent v1.1.0 and removed unused indirect dependencies.
Internal packages (internal/requester/requester.go, internal/test/util.go) Enhanced requester interface by adding SwitchHost and removing GetHost; added test helper HaveCondition for condition matching.

Sequence Diagram(s)

sequenceDiagram
  participant O as EMQXReconciler
  participant C as Config (config.Conf)
  participant R as Requester
  participant DS as DS API
  participant P as Pod Updater

  O->>C: LoadEMQXConf(instance)
  O->>R: Create requester with config
  R->>DS: GetReplicationStatus()
  DS-->>R: Return DS replication data
  O->>P: Update pod condition (DSReplication)
  alt Replica Set update required
    O->>DS: Trigger UpdateReplicaSet()
  end
Loading

Poem

I hopped through lines of code so spry,
DS replication now leaps high,
With fresh configs and hooks that bind,
Pods updated, perfectly aligned,
A rabbit’s cheer in every byte,
Coding with joy from morning till night! 🐇


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between aff73e9 and 692d857.

📒 Files selected for processing (4)
  • controllers/apps/v2beta1/add_bootstrap_resource_suite_test.go (1 hunks)
  • controllers/apps/v2beta1/add_emqx_core_suite_test.go (4 hunks)
  • controllers/apps/v2beta1/add_emqx_repl_suite_test.go (1 hunks)
  • controllers/apps/v2beta1/sync_pods_suite_test.go (15 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • controllers/apps/v2beta1/add_emqx_repl_suite_test.go
  • controllers/apps/v2beta1/add_bootstrap_resource_suite_test.go
  • controllers/apps/v2beta1/add_emqx_core_suite_test.go
🧰 Additional context used
🧬 Code Definitions (1)
controllers/apps/v2beta1/sync_pods_suite_test.go (1)
apis/apps/v2beta1/util.go (3)
  • CloneAndAddLabel (124-136)
  • DefaultCoreLabels (35-41)
  • DefaultReplicantLabels (43-49)
⏰ Context from checks skipped due to timeout of 90000ms (22)
  • GitHub Check: deployment (false, helm, true, EMQX, emqx, config/samples/emqx/v2beta1/emqx-full.yaml)
  • GitHub Check: deployment (helm, EMQX, emqx, config/samples/emqx/v2beta1/emqx-full.yaml, true, true)
  • GitHub Check: deployment (helm, EMQX, emqx, config/samples/emqx/v2beta1/emqx-full.yaml, false, true)
  • GitHub Check: deployment (helm, EMQX, emqx, config/samples/emqx/v2alpha1/emqx-full.yaml, true, true)
  • GitHub Check: deployment (helm, EMQX, emqx, config/samples/emqx/v2alpha1/emqx-full.yaml, false, true)
  • GitHub Check: deployment (helm, EmqxEnterprise, emqx-ee, config/samples/emqx/v1beta4/emqxenterprise-full.yaml, ...
  • GitHub Check: deployment (helm, EmqxEnterprise, emqx-ee, config/samples/emqx/v1beta4/emqxenterprise-full.yaml, ...
  • GitHub Check: deployment (helm, EmqxEnterprise, emqx-ee, config/samples/emqx/v1beta3/emqxenterprise-full.yaml, ...
  • GitHub Check: deployment (helm, EmqxEnterprise, emqx-ee, config/samples/emqx/v1beta3/emqxenterprise-slim.yaml, ...
  • GitHub Check: deployment (static, EMQX, emqx, config/samples/emqx/v2beta1/emqx-full.yaml, false, true)
  • GitHub Check: deployment (static, EMQX, emqx, config/samples/emqx/v2beta1/emqx-slim.yaml, false, true)
  • GitHub Check: deployment (static, EMQX, emqx, config/samples/emqx/v2alpha1/emqx-full.yaml, false, true)
  • GitHub Check: deployment (static, EMQX, emqx, config/samples/emqx/v2alpha1/emqx-slim.yaml, false, true)
  • GitHub Check: deployment (static, EmqxEnterprise, emqx-ee, config/samples/emqx/v1beta4/emqxenterprise-full.yaml...
  • GitHub Check: upgrade (EMQX, emqx, config/samples/emqx/v2alpha1/emqx-full.yaml)
  • GitHub Check: deployment (static, EmqxEnterprise, emqx-ee, config/samples/emqx/v1beta4/emqxenterprise-slim.yaml...
  • GitHub Check: upgrade (EMQX, emqx, config/samples/emqx/v2alpha1/emqx-slim.yaml)
  • GitHub Check: unit-test
  • GitHub Check: deployment (static, EmqxEnterprise, emqx-ee, config/samples/emqx/v1beta3/emqxenterprise-full.yaml...
  • GitHub Check: upgrade (EmqxEnterprise, emqx-ee, config/samples/emqx/v1beta3/emqxenterprise-slim.yaml)
  • GitHub Check: upgrade (EmqxBroker, emqx, config/samples/emqx/v1beta3/emqxbroker-slim.yaml)
  • GitHub Check: deployment (static, EmqxEnterprise, emqx-ee, config/samples/emqx/v1beta3/emqxenterprise-slim.yaml...
🔇 Additional comments (9)
controllers/apps/v2beta1/sync_pods_suite_test.go (9)

30-30: Good addition: Created StatefulSet pod variable to balance with ReplicaSet pod

Adding currentStsPod alongside currentRsPod establishes a consistent pattern for managing both StatefulSet and ReplicaSet pods in the tests, which aligns with the PR objective of ensuring safe scale-down of StatefulSets.


88-92: Improved code readability with variable extraction

Extracting label creation into updateStsLabels reduces code duplication and improves readability by centralizing the label construction logic.


132-136: Consistent pattern for label management

The introduction of updateRsLabels follows the same pattern as updateStsLabels, promoting consistency in how labels are managed for both StatefulSets and ReplicaSets.


176-194: Good implementation: Added StatefulSet pod creation

This addition correctly creates a pod for the StatefulSet with appropriate ownership references, which is necessary for testing pod-related operations. The pod ownership pattern matches the existing ReplicaSet pod implementation.


307-311: Added necessary test setup for StatefulSet pod

Properly setting up a StatefulSet pod with the correct labels and ownership references is critical for testing the DS-related functionality being added in this PR. The implementation follows the same pattern as used for the ReplicaSet pod.

Also applies to: 336-359


363-369: Refactored to use structured admission decisions

The refactoring from direct boolean checks to structured admission results enables more detailed feedback on why a scale-down operation might be allowed or rejected. This is important for Durable Storage considerations during scale-down operations.

The new syncPodsReconciliation pattern provides a more structured approach to testing compared to direct method calls, making test failures more informative.

Also applies to: 374-380, 393-404, 421-428, 441-448, 460-467


383-383: Improved test description clarity

Changing "the replicaSet didn't ready" to "replicants replicaSet is not ready" improves test case readability and provides more specific information about the test condition.


524-529: Consistent use of syncPodsReconciliation for ReplicaSet tests

The refactoring consistently applies the syncPodsReconciliation pattern to ReplicaSet tests, ensuring a uniform approach to both StatefulSet and ReplicaSet scale-down testing.

Also applies to: 534-539, 548-553, 572-578, 592-602, 614-624


269-269: Variable naming improvement

Changing fakeR to fakeReq improves code readability by using a more descriptive name for the fake requester.

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai plan to trigger planning for file edits and PR creation.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@keynslug keynslug force-pushed the feat/ds-awareness branch from 679746a to a380fbe Compare March 25, 2025 18:56
keynslug added 20 commits March 25, 2025 20:05
Meaning:
1. Annotate pods with DS-related conditions.
2. Ensure safe scale-down of DS enabled StatefulSets.
3. Correctly assign DS replication sites during scaling of StatefulSets.

Signed-off-by: Andrew Maiorov <[email protected]>
Because managing StatefulSet replicas is now `syncPods` reconciler's
responsibility, we need to reach `Ready` condition sooner. Otherwise,
`syncPods` will not be able to scale the StatefulSet up and down.

Signed-off-by: Andrew Maiorov <[email protected]>
Signed-off-by: Andrew Maiorov <[email protected]>
Also better separate existing testgroups + define some util functions
for better readability.

Signed-off-by: Andrew Maiorov <[email protected]>
Also make internal APIs a bit richer for better insights in test
failures.

Signed-off-by: Andrew Maiorov <[email protected]>
Signed-off-by: Andrew Maiorov <[email protected]>
While newer versions support `ignore_readonly` in the API, they are
still unaware Durable Sessions / Durable Storage are not really to
be changed via the API.

Signed-off-by: Andrew Maiorov <[email protected]>
Not just core nodes, but also replicants. The latter is not supposed
to happen, but still possible during manual user intervention.

Signed-off-by: Andrew Maiorov <[email protected]>
Instead of the "oldest" node, which is the current default. Without
this change, there's a risk of losing actual DS replication condition
information.

Signed-off-by: Andrew Maiorov <[email protected]>
@keynslug keynslug force-pushed the feat/ds-awareness branch from a380fbe to b4dea42 Compare March 25, 2025 19:06
@keynslug keynslug marked this pull request as ready for review March 25, 2025 19:15
@keynslug keynslug requested review from a team and Rory-Z as code owners March 25, 2025 19:15
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (39)
internal/requester/requester.go (1)

53-62: The SwitchHost method handles host changes but ignores potential errors.

The implementation of SwitchHost in the Requester struct ignores the error returned by net.SplitHostPort. This could lead to unexpected behavior if the current host doesn't include a port or has an invalid format.

Consider updating the implementation to handle the error case:

func (r *Requester) SwitchHost(host string) RequesterInterface {
-	_, port, _ := net.SplitHostPort(r.GetHost())
-	requester := &Requester{
-		Schema:   r.GetSchema(),
-		Host:     net.JoinHostPort(host, port),
-		Username: r.GetUsername(),
-		Password: r.GetPassword(),
-	}
+	var newHost string
+	_, port, err := net.SplitHostPort(r.GetHost())
+	if err != nil {
+		// Default to using the host as-is if there's no port
+		newHost = host
+	} else {
+		newHost = net.JoinHostPort(host, port)
+	}
+	requester := &Requester{
+		Schema:   r.GetSchema(),
+		Host:     newHost,
+		Username: r.GetUsername(),
+		Password: r.GetPassword(),
+	}
	return requester
}
config/crd/bases/apps.emqx.io_emqxes.yaml (2)

13900-13927: Consider adding documentation for the dsReplication fields

While the structure is well-designed, adding documentation comments for each field would enhance the understanding of their purpose. For example, explaining what numTransitions represents or the significance of maxReplicas vs minReplicas would be beneficial for users and developers.

  dsReplication:
    properties:
      dbs:
        items:
          properties:
            maxReplicas:
+             description: "Maximum number of replicas that can be created for this database"
              format: int32
              type: integer
            minReplicas:
+             description: "Minimum number of replicas required for this database to function properly"
              format: int32
              type: integer
            name:
+             description: "Name of the database being replicated"
              type: string
            numShards:
+             description: "Number of shards the database is divided into"
              format: int32
              type: integer
            numTransitions:
+             description: "Number of state transitions the database replication has undergone"
              format: int32
              type: integer

13900-13927: Consider adding validation rules for replication parameters

It would be beneficial to add validation rules for the numeric fields to ensure they have reasonable values. For example, ensuring that minReplicas is not greater than maxReplicas, or that numShards is greater than zero.

  dsReplication:
    properties:
      dbs:
        items:
          properties:
            maxReplicas:
              format: int32
              type: integer
+             minimum: 1
            minReplicas:
              format: int32
              type: integer
+             minimum: 1
            name:
              type: string
            numShards:
              format: int32
              type: integer
+             minimum: 1
            numTransitions:
              format: int32
              type: integer
+             minimum: 0
          required:
          - maxReplicas
          - minReplicas
          - name
          - numShards
          - numTransitions
          type: object
+         x-kubernetes-validations:
+         - message: "minReplicas cannot be greater than maxReplicas"
+           rule: "self.minReplicas <= self.maxReplicas"
        type: array
    type: object
controllers/apps/v2beta1/requester.go (1)

18-63: New function to create a requester for EMQX API access.

The newRequester function is well-structured with the following key features:

  • It reuses the bootstrap API key retrieval logic
  • It determines the protocol (http/https) and port from the configuration
  • It selects a suitable pod for communication (ready, not terminating, with an IP)
  • It properly sorts pods by creation timestamp for consistency
  • It returns a properly configured requester object when successful

One improvement to consider would be to add logging for debugging when no suitable pod is found, or to return a more specific error message in that case.

Consider adding a more descriptive error or log message when returning nil at line 62, to help with troubleshooting when no suitable pod is found.

e2e/v2beta1/e2e_test.go (4)

23-24: Avoid dot imports.
Using dot imports (e.g., import . "github.com/emqx/emqx-operator/internal/test") may introduce namespace conflicts and reduce clarity. Consider importing this package with a named or aliased import to avoid potential collisions.


43-48: Validate namespace creation.
The function creates a namespace using emqx.coresOnly.GetNamespace(). Make sure this returns a valid, non-empty name to avoid errors in dynamic test environments.


50-57: Better error logging could help.
When the namespace already exists or creation succeeds, you return nil with no additional message. Consider adding logs if debugging becomes necessary for already existing namespaces or successful creation.


69-75: Consider logging retrieval errors.
refreshInstance() returns nil if k8sClient.Get fails, without logging any errors. Logging them would help troubleshoot transient failures.

apis/apps/v2beta1/status.go (2)

38-38: Ensure DS replication field is properly documented.
Adding DSReplication DSReplicationStatus to EMQXStatus is a logical extension. However, consider adding a concise comment to clarify how other controllers or code sections should use and update this field.


168-179: Check numeric bounds for transitions and replicas.
The IsStable method’s logic is straightforward and appears correct. However, verify that no negative or nonsensical values (e.g., negative NumTransitions) surface, as they might break the stability conditions. If necessary, validate or clamp these values at assignment time.

controllers/apps/v2beta1/ds_update_replica_sets.go (2)

17-19: Confirm that embedding EMQXReconciler meets the single-responsibility principle.
Embedding *EMQXReconciler gives direct access to shared logic but can also couple the DS update logic tightly to the main reconciler. Ensure that any DS-specific concerns are not leaking into unrelated reconcilers or requiring cyclical dependencies.


21-99: Revisit error handling and logging.
Overall, the new reconcile method covers the DS update workflow well. Consider these improvement points:

  • Logging: When skipping steps (e.g., due to r == nil or DS is disabled), log a debug-level message for easier troubleshooting.
  • Error context: You wrap errors with emperror.Wrap. Ensure these error messages are concise yet sufficiently informative.
  • Early returns: The method returns promptly in many scenarios. This is good for readability, but ensure that partial progress does not leave the system in an inconsistent state.
controllers/apps/v2beta1/update_pod_conditions.go (2)

49-49: Skip logic is correct, but consider logging.
Skipping pods with no controller reference or a deletion timestamp is expected. For diagnostic purposes, consider logging these pod names at a debug level.


110-110: Validate version-based condition logic.
The code calls checkRebalanceStatus if version ≥ 5.0.3. This is correct but might be more robust if we also check for specific EMQX edition capabilities. If future Enterprise versions remove or rename the endpoint, you may need additional fallback logic.

controllers/apps/v2beta1/emqx_controller.go (4)

24-24: Use a more descriptive import alias for clarity
Although functional, consider using a more verbose alias (e.g., configPkg) to clearly distinguish it from other packages.


56-56: Field naming & visibility
If conf is meant for internal usage only, consider unexporting it or renaming to match existing naming patterns, ensuring consistency across the codebase.


99-101: Add supplementary logging for config errors
Raising a warning event is helpful, but adding a debug log or error log can assist in faster diagnosis when the config is invalid.


151-158: Enhance debugging for LoadEMQXConf
Consider adding debug logs with the final merged config to facilitate rapid troubleshooting of user configurations.

controllers/apps/v2beta1/ds_reflect_pod_condition.go (5)

1-2: Add file-level documentation
A brief GoDoc comment describing high-level intent makes maintenance easier.


18-20: Consider embedding implications
Embedding *EMQXReconciler is convenient, but keep an eye on potential method collisions if additional embedded structs are introduced in the future.


22-83: Add optional debug logs
The logic to skip non-Enterprise pods and update DSReplicationSite status looks correct. Adding debug logs for each update can simplify troubleshooting DS site issues.


86-98: Potential performance optimization
For larger clusters, a map-based node lookup might be more performant than iterating. For smaller clusters, this loop is fine.


100-109: Clarify method naming
Renaming getPod to something like fetchPod or retrievePod can more explicitly indicate a network or API call is involved.

controllers/apps/v2beta1/config/emqx.go (5)

20-27: Validate the raw HOCON config in EMQXConf.
Currently, there is no preliminary check to ensure the provided config string is non-empty or valid HOCON before parsing. Although LoadEMQXConf ultimately handles parsing errors, adding an early sanity check could provide clearer error messages and fail faster.

 func EMQXConf(config string) (*Conf, error) {
+   if strings.TrimSpace(config) == "" {
+       return nil, fmt.Errorf("empty config string")
+   }
    c := &Conf{}
    err := c.LoadEMQXConf(config)
    ...
 }

42-59: Ensure coverage for newly introduced read-only keys.
In StripReadOnlyConfig, the code selectively removes read-only keys (like “durable_storage”, “durable_sessions”, etc.). If new read-only keys are introduced in the future (e.g., “nodeDiscovery”), they should be appended here with corresponding tests.


83-122: Consider strconv.ParseInt to handle edge cases for dashboard port.
In GetDashboardPortMap, the code uses strconv.Atoi without fully handling parsing errors. If an invalid port string is provided (e.g., “abc”), port will be 0, silently disabling the dashboard. For clarity, explicitly check the parsing error and log or handle it to avoid confusion.

 port, err := strconv.Atoi(strPort)
-if port != 0 {
+if err == nil && port != 0 {
    portMap["dashboard"] = port
} else {
    delete(portMap, "dashboard")
}

144-232: Refactor GetListenersServicePorts for maintainability.
This function handles multiple nested loops for zones and gateways. Consider breaking it into smaller helper functions (e.g., one function for listeners parsing, another for gateway). This would enhance readability and reduce complexity.


250-266: Check for unexpected type usage in byDefault.
byDefault handles known types (bool, string, int) and expects a matching hocon.Value. If new data types are introduced in the future (e.g., arrays), consider extending or adding warnings for unsupported types.

controllers/apps/v2beta1/ds/ds.go (5)

11-14: Increase visibility of DSReplicationStatus fields.
DBs []DSDBReplicationStatus is clear, but any custom JSON tags or documentation for the DS DB structure can help future maintainers quickly learn the DS layout.


16-21: Validate DSDBReplicationStatus fields upon retrieval.
When fetching DB statuses, the code expects Name, Shards, etc. If any DB is missing these fields, partial or inconsistent data could cause logic errors. Consider adding defensive checks or logs when a DB returns incomplete or malformed data.


70-95: Optimize concurrency in GetReplicationStatus.
A for-loop fetches each DB status sequentially. For large DS deployments, this may be slow. Consider using parallel requests if the DS API can handle concurrent calls. On error, you could cancel the entire set.


122-148: Consider caching DS cluster info.
GetCluster repeatedly queries DS sites. If calls to GetCluster are frequent, caching site data and invalidating it on changes might improve performance and reduce API load.


189-200: Avoid ignoring request headers.
apiRequest's signature supports sending headers but always passes nil. If any DS endpoints require tokens or special headers, ensure they are properly propagated.

controllers/apps/v2beta1/sync_pods.go (6)

25-32: Increase syncPodsReconciliation extensibility.
The struct references multiple *appsv1.*Set fields. If the operator later manages more resource types (e.g., Deployment-based workloads), consider a more flexible or modular approach for different resource controllers.


34-37: Clarify scaleDownAdmission usage with comments.
This struct is essential for controlling the scale-down decision. Adding docstrings about the Pod and Reason fields’ semantics can help new contributors.


39-47: Skip logic early if DS is unavailable or nil.
In reconcile, the code checks if r == nil and the Available condition. Consider also verifying DS existence if DS features must be present for safe scale-down.


144-159: migrationTargetNodes for replicant vs. core logic.
The function’s distinction between replicant and core is clear. Consider how ephemeral pods might be handled if none remain to serve as a target.


192-279: Respect partial node evacuation states in canScaleDownReplicaSet.
When multiple nodes evac at once, ensure that the logic can handle overlapping evacuations or partial sessions on multiple nodes. Currently, it picks a single node from NodeEvacuationsStatus.


382-413: Robust error handling for startEvacuationByAPI.
We do handle 400 with “already_started”. If the DS system returns other 4xx codes, consider logging more details to streamline troubleshooting.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 99f4f1d and aff73e9.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (41)
  • apis/apps/v2beta1/const.go (1 hunks)
  • apis/apps/v2beta1/status.go (4 hunks)
  • apis/apps/v2beta1/util.go (1 hunks)
  • apis/apps/v2beta1/util_test.go (0 hunks)
  • apis/apps/v2beta1/zz_generated.deepcopy.go (2 hunks)
  • config/crd/bases/apps.emqx.io_emqxes.yaml (3 hunks)
  • controllers/apps/v2beta1/add_bootstrap_resource.go (3 hunks)
  • controllers/apps/v2beta1/add_bootstrap_resource_suite_test.go (1 hunks)
  • controllers/apps/v2beta1/add_bootstrap_resource_test.go (2 hunks)
  • controllers/apps/v2beta1/add_emqx_core.go (7 hunks)
  • controllers/apps/v2beta1/add_emqx_core_suite_test.go (4 hunks)
  • controllers/apps/v2beta1/add_emqx_core_test.go (6 hunks)
  • controllers/apps/v2beta1/add_emqx_repl.go (5 hunks)
  • controllers/apps/v2beta1/add_emqx_repl_suite_test.go (1 hunks)
  • controllers/apps/v2beta1/add_emqx_repl_test.go (6 hunks)
  • controllers/apps/v2beta1/add_svc.go (6 hunks)
  • controllers/apps/v2beta1/add_svc_test.go (9 hunks)
  • controllers/apps/v2beta1/config/emqx.go (1 hunks)
  • controllers/apps/v2beta1/config/emqx_test.go (1 hunks)
  • controllers/apps/v2beta1/ds/ds.go (1 hunks)
  • controllers/apps/v2beta1/ds_reflect_pod_condition.go (1 hunks)
  • controllers/apps/v2beta1/ds_update_replica_sets.go (1 hunks)
  • controllers/apps/v2beta1/emqx_controller.go (5 hunks)
  • controllers/apps/v2beta1/rebalance_controller.go (3 hunks)
  • controllers/apps/v2beta1/requester.go (1 hunks)
  • controllers/apps/v2beta1/status_machine.go (1 hunks)
  • controllers/apps/v2beta1/sync_emqx_config.go (4 hunks)
  • controllers/apps/v2beta1/sync_emqx_config_test.go (0 hunks)
  • controllers/apps/v2beta1/sync_pods.go (2 hunks)
  • controllers/apps/v2beta1/sync_pods_suite_test.go (15 hunks)
  • controllers/apps/v2beta1/update_emqx_status.go (4 hunks)
  • controllers/apps/v2beta1/update_pod_conditions.go (5 hunks)
  • controllers/apps/v2beta1/util.go (3 hunks)
  • docs/en_US/reference/v2beta1-reference.md (3 hunks)
  • docs/zh_CN/reference/v2beta1-reference.md (3 hunks)
  • e2e/v2beta1/e2e_rebalance_test.go (1 hunks)
  • e2e/v2beta1/e2e_test.go (14 hunks)
  • e2e/v2beta1/suite_test.go (6 hunks)
  • go.mod (1 hunks)
  • internal/requester/requester.go (5 hunks)
  • internal/test/util.go (1 hunks)
💤 Files with no reviewable changes (2)
  • controllers/apps/v2beta1/sync_emqx_config_test.go
  • apis/apps/v2beta1/util_test.go
🧰 Additional context used
🧬 Code Definitions (23)
controllers/apps/v2beta1/add_bootstrap_resource_suite_test.go (1)
controllers/apps/v2beta1/add_bootstrap_resource.go (3)
  • a (25-46)
  • a (48-70)
  • a (72-86)
e2e/v2beta1/e2e_rebalance_test.go (7)
e2e/v2beta1/e2e_test.go (1)
  • instance (40-40)
controllers/apps/v2beta1/add_emqx_repl_suite_test.go (1)
  • instance (19-19)
controllers/apps/v2beta1/add_emqx_core_suite_test.go (1)
  • instance (20-20)
controllers/apps/v2beta1/sync_pods_suite_test.go (2)
  • instance (25-25)
  • instance (270-270)
controllers/apps/v2beta1/sync_sets_suite_test.go (1)
  • instance (22-22)
e2e/v2beta1/suite_test.go (1)
  • emqx (60-60)
controllers/apps/v2beta1/suite_test.go (1)
  • emqx (57-69)
controllers/apps/v2beta1/status_machine.go (2)
controllers/apps/v2beta1/sync_pods_suite_test.go (1)
  • updateSts (28-28)
controllers/apps/v2beta1/suite_test.go (1)
  • emqx (57-69)
controllers/apps/v2beta1/add_emqx_repl_suite_test.go (7)
controllers/apps/v2beta1/add_emqx_core_suite_test.go (2)
  • a (18-18)
  • instance (20-20)
controllers/apps/v2beta1/add_emqx_core.go (2)
  • a (30-115)
  • a (117-132)
controllers/apps/v2beta1/add_emqx_repl.go (2)
  • a (29-105)
  • a (107-122)
controllers/apps/v2beta1/add_bootstrap_resource_suite_test.go (2)
  • a (17-17)
  • instance (16-16)
controllers/apps/v2beta1/sync_pods_suite_test.go (2)
  • instance (25-25)
  • instance (270-270)
e2e/v2beta1/e2e_test.go (1)
  • instance (40-40)
controllers/apps/v2beta1/sync_sets_suite_test.go (1)
  • instance (22-22)
controllers/apps/v2beta1/add_emqx_core_suite_test.go (3)
controllers/apps/v2beta1/add_emqx_core.go (2)
  • a (30-115)
  • a (117-132)
controllers/apps/v2beta1/add_svc.go (2)
  • a (22-53)
  • a (55-68)
apis/apps/v2beta1/util.go (1)
  • DefaultCoreLabels (35-41)
controllers/apps/v2beta1/sync_emqx_config.go (3)
controllers/apps/v2beta1/config/emqx.go (2)
  • MergeDefaults (234-246)
  • EMQXConf (20-27)
apis/apps/v2beta1/emqx_types.go (1)
  • Config (122-128)
apis/apps/v2beta1/const.go (1)
  • AnnotationsLastEMQXConfigKey (19-19)
controllers/apps/v2beta1/add_emqx_repl_test.go (2)
controllers/apps/v2beta1/config/emqx.go (2)
  • EMQXConf (20-27)
  • MergeDefaults (234-246)
controllers/apps/v2beta1/add_emqx_repl.go (1)
  • getNewReplicaSet (124-150)
controllers/apps/v2beta1/update_emqx_status.go (4)
controllers/apps/v2beta1/emqx_controller.go (4)
  • err (152-152)
  • r (84-149)
  • r (151-158)
  • r (161-171)
controllers/apps/v2beta1/update_pod_conditions.go (3)
  • u (23-98)
  • u (100-117)
  • u (119-152)
apis/apps/v2beta1/status.go (2)
  • DSReplicationStatus (60-62)
  • DSDBReplicationStatus (64-70)
controllers/apps/v2beta1/ds/ds.go (3)
  • DSReplicationStatus (12-14)
  • DSDBReplicationStatus (18-21)
  • GetReplicationStatus (70-95)
controllers/apps/v2beta1/add_emqx_core_test.go (2)
apis/apps/v2beta1/emqx_types.go (1)
  • Config (122-128)
controllers/apps/v2beta1/add_emqx_core.go (1)
  • getNewStatefulSet (134-159)
controllers/apps/v2beta1/rebalance_controller.go (2)
controllers/apps/v2beta1/config/emqx.go (3)
  • Conf (16-18)
  • EMQXConf (20-27)
  • MergeDefaults (234-246)
controllers/apps/v2beta1/emqx_controller.go (4)
  • err (152-152)
  • r (84-149)
  • r (151-158)
  • r (161-171)
controllers/apps/v2beta1/add_svc.go (1)
controllers/apps/v2beta1/config/emqx.go (2)
  • EMQXConf (20-27)
  • Conf (16-18)
controllers/apps/v2beta1/update_pod_conditions.go (3)
controllers/apps/v2beta1/util.go (1)
  • updatePodCondition (113-126)
controllers/apps/v2beta1/ds_update_replica_sets.go (1)
  • u (21-99)
internal/requester/requester.go (2)
  • r (53-62)
  • RequesterInterface (19-25)
apis/apps/v2beta1/util.go (4)
controllers/apps/v2beta1/add_emqx_core_suite_test.go (1)
  • _ (17-105)
controllers/apps/v2beta1/add_emqx_repl_suite_test.go (1)
  • _ (17-236)
controllers/apps/v2beta1/sync_pods_suite_test.go (2)
  • _ (22-265)
  • _ (267-626)
e2e/v2beta1/e2e_test.go (1)
  • _ (38-935)
controllers/apps/v2beta1/sync_pods_suite_test.go (1)
apis/apps/v2beta1/util.go (3)
  • CloneAndAddLabel (124-136)
  • DefaultCoreLabels (35-41)
  • DefaultReplicantLabels (43-49)
controllers/apps/v2beta1/ds_update_replica_sets.go (5)
controllers/apps/v2beta1/emqx_controller.go (6)
  • EMQXReconciler (54-60)
  • r (84-149)
  • r (151-158)
  • r (161-171)
  • subResult (44-47)
  • err (152-152)
controllers/apps/v2beta1/ds_reflect_pod_condition.go (4)
  • u (22-84)
  • u (86-98)
  • u (100-109)
  • u (111-127)
controllers/apps/v2beta1/sync_pods.go (9)
  • r (69-77)
  • r (79-87)
  • r (90-121)
  • r (124-136)
  • r (139-159)
  • r (162-190)
  • r (192-279)
  • r (281-356)
  • err (193-193)
controllers/apps/v2beta1/util.go (1)
  • getStateFulSetList (27-49)
controllers/apps/v2beta1/ds/ds.go (4)
  • GetCluster (122-148)
  • GetReplicationStatus (70-95)
  • APIErrorUnavailable (171-171)
  • UpdateReplicaSet (159-163)
controllers/apps/v2beta1/add_emqx_core.go (5)
controllers/apps/v2beta1/add_emqx_core_suite_test.go (3)
  • instance (20-20)
  • a (18-18)
  • _ (17-105)
controllers/apps/v2beta1/add_emqx_repl.go (2)
  • a (29-105)
  • a (107-122)
controllers/apps/v2beta1/add_svc.go (2)
  • a (22-53)
  • a (55-68)
controllers/apps/v2beta1/add_bootstrap_resource.go (3)
  • a (25-46)
  • a (48-70)
  • a (72-86)
controllers/apps/v2beta1/util.go (2)
  • getStateFulSetList (27-49)
  • justCheckPodTemplate (148-177)
controllers/apps/v2beta1/add_bootstrap_resource.go (7)
controllers/apps/v2beta1/add_bootstrap_resource_suite_test.go (2)
  • instance (16-16)
  • a (17-17)
controllers/apps/v2beta1/add_emqx_core_suite_test.go (2)
  • instance (20-20)
  • a (18-18)
controllers/apps/v2beta1/add_emqx_repl_suite_test.go (2)
  • instance (19-19)
  • a (18-18)
controllers/apps/v2beta1/sync_pods_suite_test.go (2)
  • instance (25-25)
  • instance (270-270)
controllers/apps/v2beta1/sync_sets_suite_test.go (1)
  • instance (22-22)
apis/apps/v2beta1/emqx_types.go (1)
  • EMQX (33-43)
controllers/apps/v2beta1/config/emqx.go (1)
  • Conf (16-18)
controllers/apps/v2beta1/emqx_controller.go (2)
controllers/apps/v2beta1/config/emqx.go (3)
  • Conf (16-18)
  • EMQXConf (20-27)
  • MergeDefaults (234-246)
controllers/apps/v2beta1/requester.go (1)
  • newRequester (18-63)
e2e/v2beta1/e2e_test.go (5)
apis/apps/v2beta1/emqx_types.go (3)
  • EMQX (33-43)
  • EMQXSpec (46-100)
  • Config (122-128)
e2e/v2beta1/e2e_rebalance_test.go (4)
  • instance (44-44)
  • instance (310-310)
  • _ (43-307)
  • _ (309-520)
apis/apps/v2beta1/names.go (8)
  • instance (25-30)
  • instance (32-37)
  • instance (39-44)
  • instance (46-51)
  • instance (53-58)
  • instance (60-65)
  • instance (67-72)
  • instance (74-79)
internal/test/util.go (1)
  • HaveCondition (10-18)
apis/apps/v2beta1/util.go (2)
  • DefaultLabels (28-33)
  • CloneAndAddLabel (124-136)
controllers/apps/v2beta1/ds_reflect_pod_condition.go (7)
controllers/apps/v2beta1/emqx_controller.go (6)
  • EMQXReconciler (54-60)
  • r (84-149)
  • r (151-158)
  • r (161-171)
  • subResult (44-47)
  • err (152-152)
controllers/apps/v2beta1/ds_update_replica_sets.go (1)
  • u (21-99)
controllers/apps/v2beta1/update_pod_conditions.go (3)
  • u (23-98)
  • u (100-117)
  • u (119-152)
internal/requester/requester.go (2)
  • r (53-62)
  • RequesterInterface (19-25)
controllers/apps/v2beta1/ds/ds.go (2)
  • GetCluster (122-148)
  • APIErrorUnavailable (171-171)
apis/apps/v2beta1/const.go (1)
  • DSReplicationSite (26-26)
controllers/apps/v2beta1/util.go (1)
  • updatePodCondition (113-126)
controllers/apps/v2beta1/util.go (3)
apis/apps/v2beta1/names.go (8)
  • instance (25-30)
  • instance (32-37)
  • instance (39-44)
  • instance (46-51)
  • instance (53-58)
  • instance (60-65)
  • instance (67-72)
  • instance (74-79)
apis/apps/v2beta1/emqx_types.go (1)
  • EMQX (33-43)
apis/apps/v2beta1/util.go (1)
  • DefaultLabels (28-33)
controllers/apps/v2beta1/ds/ds.go (1)
apis/apps/v2beta1/status.go (8)
  • DSDBReplicationStatus (64-70)
  • DSReplicationStatus (60-62)
  • s (120-131)
  • s (133-141)
  • s (143-151)
  • s (153-159)
  • s (161-167)
  • s (169-179)
controllers/apps/v2beta1/sync_pods.go (3)
controllers/apps/v2beta1/sync_pods_suite_test.go (8)
  • instance (25-25)
  • instance (270-270)
  • updateSts (28-28)
  • updateRs (29-29)
  • s (23-23)
  • s (268-268)
  • _ (22-265)
  • _ (267-626)
controllers/apps/v2beta1/status_machine.go (11)
  • s (73-101)
  • s (103-105)
  • s (107-109)
  • s (115-132)
  • s (138-152)
  • s (158-179)
  • s (185-204)
  • s (210-223)
  • s (229-251)
  • s (257-282)
  • status (27-29)
controllers/apps/v2beta1/util.go (4)
  • getStateFulSetList (27-49)
  • getReplicaSetList (51-75)
  • checkInitialDelaySecondsReady (128-135)
  • listPodsManagedBy (77-92)
🪛 golangci-lint (1.64.8)
controllers/apps/v2beta1/add_bootstrap_resource_suite_test.go

25-25: Error return value of a.LoadEMQXConf is not checked

(errcheck)

🪛 GitHub Check: lint
controllers/apps/v2beta1/add_bootstrap_resource_suite_test.go

[failure] 25-25:
Error return value of a.LoadEMQXConf is not checked (errcheck)

⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: deployment (static, EMQX, emqx, config/samples/emqx/v2beta1/emqx-full.yaml, false, true)
  • GitHub Check: deployment (static, EMQX, emqx, config/samples/emqx/v2alpha1/emqx-full.yaml, false, true)
  • GitHub Check: deployment (static, EmqxEnterprise, emqx-ee, config/samples/emqx/v1beta4/emqxenterprise-full.yaml...
  • GitHub Check: deployment (static, EmqxEnterprise, emqx-ee, config/samples/emqx/v1beta4/emqxenterprise-slim.yaml...
  • GitHub Check: deployment (static, EmqxEnterprise, emqx-ee, config/samples/emqx/v1beta3/emqxenterprise-slim.yaml...
🔇 Additional comments (198)
apis/apps/v2beta1/util.go (1)

167-174: Well-implemented utility function for finding pod conditions.

This new utility function provides a clean way to find a specific condition in a Pod's status by its type. It follows standard Go idioms and will be useful for the new DS replication features mentioned in the PR objectives.

apis/apps/v2beta1/const.go (1)

25-26: Good addition of the DS replication site constant.

This constant properly defines a new Pod condition type for tracking pods responsible for Durable Storage replication, which directly supports the PR objectives. The naming follows the established convention and includes a descriptive comment.

controllers/apps/v2beta1/add_emqx_repl_suite_test.go (1)

59-59: Proper configuration loading with error check.

The addition correctly loads the EMQX configuration and verifies success using Ginkgo's assertion pattern, which is consistent with the test suite's approach.

internal/test/util.go (1)

10-18: Well-designed test helper for condition validation

This new function provides a clean way to test for specific conditions in EMQX instances using Gomega matchers. It will be particularly useful for testing the new DS (Durable Storage) pod conditions mentioned in the PR objectives.

e2e/v2beta1/e2e_rebalance_test.go (1)

48-48: Updated test instance initialization to use predefined cores-only configuration

The change to use emqx.coresOnly.DeepCopy() instead of a generic generator makes the test more specific and focused on core nodes, which aligns with testing reliable scale operations for StatefulSets with Durable Storage enabled.

controllers/apps/v2beta1/status_machine.go (1)

142-142: Improved core nodes readiness detection for Durable Storage support

This change modifies how core node readiness is determined, now comparing against UpdateReplicas from status rather than the static spec replicas count. This is an important change for safely managing StatefulSets with Durable Storage, as it allows for more accurate status tracking during scale operations where the actual ready count may temporarily differ from desired state.

go.mod (1)

9-9: Added dedent dependency for improved configuration string handling

The addition of the dedent package is appropriate for handling multiline configuration strings in a cleaner way. This likely supports the configuration handling needed for Durable Storage features.

controllers/apps/v2beta1/add_svc_test.go (3)

7-7: LGTM! Added import for the new config package.

The new import for the config package is correctly added and will be used for structured configuration handling.


68-71: Well-designed configuration loading helper function.

The loadConf helper function elegantly abstracts the config loading and merging process, resulting in cleaner test cases.


99-99: Clean transition to structured configuration.

The test cases have been updated to use the new loadConf helper function consistently throughout the file, replacing direct string inputs. This approach is more maintainable and aligns with the refactoring of configuration handling in the operator.

Also applies to: 121-121, 131-131, 141-143, 156-159, 172-175, 194-197

e2e/v2beta1/suite_test.go (4)

26-26: LGTM! Added necessary imports.

The dedent package will help with clean multi-line configuration formatting, and the ptr package is used for pointer creation.

Also applies to: 35-35


60-65: Good structural enhancement for test specs.

The new emqxSpecs struct provides a more organized way to manage different EMQX deployment configurations in tests (cores-only and cores-replicants), which will make testing DS replication scenarios more straightforward.


82-83: Initialization method updated and timeout optimized.

The test now uses the initSpecs() method to initialize configurations, and the timeout has been optimized from 5 to 3 minutes.


152-203: Robust configuration initialization with DS-related config.

The initSpecs method creates a comprehensive test configuration that includes:

  1. A detailed LWM2M gateway configuration
  2. Separate configurations for cores-only and cores-replicants deployments
  3. Properly set replica counts for both deployment types

This will provide a solid foundation for testing Durable Storage capabilities mentioned in the PR objectives.

controllers/apps/v2beta1/add_emqx_core_suite_test.go (6)

7-7: LGTM! Added import for internal test utilities.

Good addition of internal test utilities import which will provide helpful test matchers and functions.


19-20: Variable declarations cleaned up.

The variable declarations have been simplified, which makes the code cleaner.


34-34: Improved initialization with configuration loading.

The test now properly initializes the instance using DeepCopy() and loads the EMQX configuration, ensuring tests accurately represent real-world scenarios.

Also applies to: 49-49


56-58: Explicit test for EMQX CR creation.

Adding a dedicated test step for creating the EMQX custom resource improves test clarity and follows the principle of one assertion per test.


70-71: Enhanced test assertions with matcher patterns.

The use of HaveField matcher improves assertion readability and provides better error messages when tests fail. The test now explicitly verifies multiple StatefulSets with different image references.

Also applies to: 85-88


90-98: Comprehensive condition validation for core nodes.

The test now properly validates that when a new StatefulSet is created (due to image change), the EMQX instance's status conditions correctly reflect the transition state:

  1. The last true condition should be CoreNodesProgressing
  2. The Ready, Available, and CoreNodesReady conditions should be nil

This change aligns with the PR objective to properly annotate pods with Durable Storage conditions.

controllers/apps/v2beta1/add_emqx_repl_test.go (3)

7-7: LGTM! Added import for the config package.

The config package import is correctly added to support the new structured configuration approach.


50-51: Updated function calls with structured configuration.

All test cases now properly load and pass the configuration object to getNewReplicaSet. This consistent approach ensures that the replicaSet generation has access to the full configuration context.

Also applies to: 64-65, 87-88, 112-113, 137-138


108-111: Improved configuration readability with multi-line strings.

The switch to multi-line string format for configuration data improves readability and maintainability. This is particularly important as the complexity of configuration grows with Durable Storage support.

Also applies to: 133-136

controllers/apps/v2beta1/add_bootstrap_resource.go (3)

15-15: Added import for structured config management.

The addition of the config package import aligns with the shift towards a more structured configuration management approach for EMQX operator.


32-32: Updated function call with configuration parameter.

The call to generateNodeCookieSecret now includes a.conf as a parameter, aligning with the function's new signature that accepts a configuration object.


107-108: Enhanced function signature and implementation with structured configuration.

The function now accepts a *config.Conf parameter and uses its GetNodeCookie() method to retrieve the node cookie. This is a better approach than the previous implementation which likely parsed the configuration string directly, improving maintainability and consistency across the codebase.

controllers/apps/v2beta1/rebalance_controller.go (3)

40-40: Added import for structured config management.

The addition of the config package import aligns with the shift towards a more structured configuration management approach.


54-54: Refactored RebalanceReconciler struct to use config.Conf.

The struct field type has been changed from *rest.Config to *config.Conf, which is more appropriate for configuration management in this context and aligns with the PR's objective of improving EMQX Durable Storage management.


177-182: Enhanced configuration handling for requester creation.

This change implements a more structured approach to configuration handling by:

  1. Creating a configuration object from EMQX instance data
  2. Validating the configuration (with error handling)
  3. Passing the configuration to the requester

This improves error handling and provides a more consistent approach to configuration management across the codebase, which will be important for Durable Storage support.

controllers/apps/v2beta1/add_emqx_core_test.go (6)

7-7: Added import for structured config management in tests.

The addition of the config package import allows the tests to use the new configuration structure, maintaining consistency with the implementation code.


47-48: Updated test to use structured configuration approach.

Test now creates a configuration object using EMQXConf and passes it to getNewStatefulSet, aligning with the function's new signature and ensuring the test properly validates the updated implementation.


61-62: Updated test to use structured configuration approach.

Similar to the previous test case, this test now properly creates and passes a configuration object to match the updated function signature.


84-85: Updated test for HTTP port with structured configuration.

Test now correctly creates a configuration object from the test instance's HTTP port configuration and passes it to the function being tested.


105-110: Improved configuration test format and implementation.

The test configuration has been reformatted from a single-line string to a more readable multi-line format. The test now properly creates a configuration object from this data and passes it to the function being tested.


130-135: Improved multi-port configuration test format and implementation.

Similar to the previous test case, the configuration data has been reformatted to a more readable multi-line format, and the test now correctly creates and uses a configuration object.

controllers/apps/v2beta1/add_bootstrap_resource_test.go (3)

8-8: Added import for structured config management in tests.

The addition of the config package import allows the tests to use the new configuration structure, maintaining consistency with the implementation code.


26-27: Updated test to use structured configuration approach.

Test now creates a configuration object using EMQXConf and passes it to generateNodeCookieSecret, aligning with the function's new signature and ensuring the test properly validates the updated implementation.


35-36: Updated pre-configured node cookie test with structured approach.

This test case now correctly creates a configuration object from a test instance with a pre-configured cookie value and passes it to the function, ensuring all code paths are properly tested with the new implementation.

internal/requester/requester.go (2)

81-81: Good addition of a timeout to the HTTP client.

Adding a timeout prevents requests from hanging indefinitely and helps ensure the system remains responsive even when external services are slow or unresponsive.


138-138: Mock implementation of SwitchHost is appropriate for testing.

The implementation returns the same instance without modifications, which is a clean approach for the mock object.

apis/apps/v2beta1/zz_generated.deepcopy.go (3)

65-78: Appropriate implementation of DeepCopy methods for DSDBReplicationStatus.

The generated code correctly implements the DeepCopyInto and DeepCopy methods following the standard pattern used throughout the file.


81-98: Correct handling of slices in DSReplicationStatus DeepCopy methods.

The implementation properly checks for nil DBs slice and correctly performs deep copying if it exists.


475-475: Proper integration of DSReplication deep copying in EMQXStatus.

The EMQXStatus.DeepCopyInto method has been correctly updated to include the new DSReplication field, ensuring that this data is properly preserved during object copying.

config/crd/bases/apps.emqx.io_emqxes.yaml (4)

13868-13869: Good addition of the podName field to coreNodes

The introduction of the podName field to the coreNodes status structure allows for better tracking and identification of pods within the EMQX cluster. This will be particularly useful for operations related to Durable Storage, where understanding the mapping between pod instances and storage is crucial.


13987-13988: Good addition of the podName field to replicantNodes

Adding the podName field to replicantNodes mirrors the same change made to coreNodes, providing consistency across both node types. This consistent approach will simplify management and monitoring of all pods in the EMQX cluster.


13900-13927: Well-structured Durable Storage replication status

The new dsReplication status field with its nested structure properly captures the state of database replication for Durable Storage. The required fields (maxReplicas, minReplicas, name, numShards, and numTransitions) provide a comprehensive view of the replication configuration.

This structure provides the necessary information for:

  1. Monitoring replication status across the cluster
  2. Ensuring safe scale-down operations
  3. Tracking database replication sites during scaling and migration

The implementation follows Kubernetes CRD schema best practices with proper type definitions and required field specifications.


13900-13927:

❓ Verification inconclusive

Verify integration with controller logic

Ensure that the controller logic properly populates these status fields as database replication status changes. This is essential for providing accurate information to operators and ensuring the safe scale-down of StatefulSets with Durable Storage enabled.

Run the following script to verify the controller logic that manages these fields:


🏁 Script executed:

#!/bin/bash
# Check for controller code that populates the new dsReplication status fields

echo "Looking for code that updates dsReplication status:"
rg -A 5 "dsReplication" --type go

echo -e "\nLooking for scale-down related logic that uses dsReplication:"
rg -A 10 "scale.*down|scaledown" --type go | grep -E "dsReplication|Durable"

echo -e "\nLooking for StatefulSet management code:"
rg -A 3 "StatefulSet.*scale|scale.*StatefulSet" --type go

Length of output: 3355


Integration Check on dsReplication Status and Safe Scale-Down

  • Verified that the controller properly updates the dsReplication status fields in controllers/apps/v2beta1/update_emqx_status.go (using ds.GetReplicationStatus and iterating through DBs), and that the CRD in config/crd/bases/apps.emqx.io_emqxes.yaml correctly defines these fields.
  • Confirmed that StatefulSet management (and scale-down operations) is handled in controllers/apps/v2beta1/sync_pods.go; however, there’s no explicit reference to dsReplication within the scale-down logic.
  • Action Required: Please double-check that the scale-down process for StatefulSets with Durable Storage enabled takes the updated dsReplication status into account. If additional checks are needed to prevent unsafe scale-down when the replication status isn’t optimal, these should be integrated accordingly.
controllers/apps/v2beta1/update_emqx_status.go (5)

11-11: Adding DS package import for Durable Storage integration.

Good addition to support the Durable Storage replication status functionality.


121-125: Improved naming for clarity: isEnterpriserisEnterprise.

This rename improves code readability while maintaining the same functionality.


137-171: New logic for tracking Durable Storage replication status.

This section enhances the EMQX operator by gathering and surfacing detailed information about database replication status, including:

  • Number of database shards
  • Minimum and maximum replica counts
  • Number of transitions

The implementation correctly initializes the status structure for both enterprise and non-enterprise editions, with detailed processing when enterprise features are available.


198-199: Adding pod name tracking for core nodes.

Setting the PodName field ensures more complete pod information is available in the status.


208-209: Adding pod name tracking for replicant nodes.

Consistent with the change for core nodes, this ensures PodName is properly set for replicant nodes as well.

docs/en_US/reference/v2beta1-reference.md (4)

54-72: Documentation for new DSDBReplicationStatus type.

This section clearly documents the new DSDBReplicationStatus structure which tracks replication statistics for individual databases, including:

  • Name of the database
  • Number of shards
  • Number of transitions
  • Minimum and maximum replicas

The documentation format follows the established pattern of the reference document.


74-88: Documentation for new DSReplicationStatus type.

This section properly documents the DSReplicationStatus structure that acts as a container for the database replication statuses, with a clear description that it provides a "Summary of DS replication status per database."


199-201: Added podName field to EMQXNode structure.

This documentation update reflects the addition of the podName field to the EMQXNode structure, which matches the code changes in the implementation.


339-339: Added dsReplication field to EMQXStatus structure.

This documentation update reflects the addition of the dsReplication field to the EMQXStatus structure, which corresponds to the implementation changes.

docs/zh_CN/reference/v2beta1-reference.md (4)

54-72: 一致性更新:DSDBReplicationStatus 的中文文档。

与英文文档保持一致,清晰地记录了 DSDBReplicationStatus 结构,包括数据库名称、分片数量、转换次数以及最小和最大副本数。


74-88: 一致性更新:DSReplicationStatus 的中文文档。

与英文文档保持一致,恰当地记录了 DSReplicationStatus 结构,描述为"每个数据库的 DS 复制状态摘要"。


199-201: 添加了 EMQXNode 结构中的 podName 字段。

此文档更新反映了在 EMQXNode 结构中添加 podName 字段的变更,与代码实现相匹配。


339-339: 添加了 EMQXStatus 结构中的 dsReplication 字段。

此文档更新反映了在 EMQXStatus 结构中添加 dsReplication 字段的变更,与代码实现相匹配。

controllers/apps/v2beta1/requester.go (1)

65-86: Utility function to retrieve bootstrap API keys from Kubernetes secrets.

The getBootstrapAPIKey function is well-implemented with proper error handling:

  • It retrieves the secret containing the API key
  • It parses the key data to extract username and password
  • It returns clear error messages when the key can't be found
  • It uses named return values appropriately

This implementation properly follows the pattern of checking for errors early and returning descriptive error messages.

e2e/v2beta1/e2e_test.go (67)

20-21: Check Go version compatibility for slices package.
The slices package was introduced in Go 1.21. Ensure that the build environment targets Go 1.21 or above to avoid compile-time errors.


30-30: Aliased imports are fine.
The alias k8sErrors for k8s.io/apimachinery/pkg/api/errors is standard practice. No immediate issues here.


32-32: Using intstr utilities.
Importing "k8s.io/apimachinery/pkg/util/intstr" is standard for managing int-or-string fields. No concerns.


40-42: Globals in E2E tests.
Declaring instance and instanceKey at this scope is typical for Ginkgo tests but be aware these could cause test interference if parallel tests were introduced in the future. Currently, it’s fine for serial E2E tests.


77-81: BeforeAll usage.
This BeforeAll block sets up shared references for tests. Ginkgo’s BeforeAll is still somewhat experimental. Ensure it behaves as expected in your CI environment, especially if tests run in parallel modes.


82-83: New test context for core-only cluster.
Establishing a separate Context block for core-only scenarios is a good approach to keep test flow clear.


84-85: Quick check for namespace creation.
Straightforward test verifying createNamespace(). No issues found here.


89-90: Instance creation & readiness check.
Good practice to create and then immediately check readiness. Ensure your test environment has enough time to spin up pods so this doesn’t cause flakiness.


92-94: Validating readiness conditions.
Confirming HaveCondition(appsv2beta1.Ready...) and the expected replicas is a solid approach for verifying operator logic.


102-103: Ensuring no replicant nodes.
The checks for Status.ReplicantNodes and Status.ReplicantNodesStatus being nil or zero confirm a pure core-only cluster, which aligns with your test scenario.


124-124: Retrieving instance before scale-up.
Fetching the instance to ensure it’s the latest revision is vital for safe scaling. Good step.


128-128: Retry pattern for scale-up.
Using retry.RetryOnConflict is recommended for concurrency issues in K8s updates. Good usage here.


136-138: Verifying scale-up readiness.
Ensuring ReadyReplicas matches the new replica count is essential. This looks consistent.


169-169: Fetching instance before scale-down.
Same pattern as scale-up. Continues the best practice of retrieving the current instance.


173-173: Conflicting updates for scale-down.
RetryOnConflict again is good, preventing out-of-date resource version conflicts.


180-184: Confirming scale-down readiness.
Similar checks as scale-up, ensuring the operator adjusts core nodes properly.


214-214: Retrieving instance for image change.
Consistent approach for performing an update. Good job.


218-218: Handle concurrency again.
Same pattern—no issues. Good usage of retry.RetryOnConflict.


225-228: Validating updated revision for new image.
Ensures that the operator has created a new revision for the changed image. Good practice.


258-258: Verifying old StatefulSets scale down.
This ensures smooth rolling upgrades. A correct approach to confirm old STS replicas are zero.


293-293: Ensuring new port is exposed in the Service.
Checking the updated port 11883. This thoroughly tests dynamic config changes.


301-301: Endpoints check for 11883.
Endpoints are updated as well. This is consistent with the new port. Good synergy test.


316-318: Reusing the same namespace creation.
Checks are repeated for a new scenario with replicant nodes. This is consistent.


321-322: Switching to replicate scenario.
createInstance(emqx.coresReplicants.Spec) ensures an instance with replicant nodes. Clear naming.


324-325: Confirming Ready condition with replicant.
Ensures the operator recognized and started both core & replicant nodes. Straightforward check.


333-334: Replicant nodes status check.
Verifies replicant replica counts and readiness. Essential for multi-node scenarios.


354-354: Scaling up replicant nodes.
Same approach as core node scale operations, consistent usage of Get and Update.


358-358: Retry for replicant scale-up.
Maintaining concurrency protection is good.


365-368: Validating replicant scale-up.
Ensures replicant count and readiness match the new total. Implementation looks consistent.


377-378: Confirm updated replicant nodes in status.
Checks new replicant node length and readiness. Good coverage.


399-400: Scaling down replicant to zero.
A typical test scenario to confirm the operator tears down replicant pods properly.


403-403: Retrieving instance - replicant scale-down.
Again, consistent pattern.


410-413: Confirming replicant scale-down readiness.
Checks readiness conditions become zero. Good approach.


422-423: HaveLen to confirm zero replicant pods.
Verifies the final parted replicant state.


444-445: Retrieving instance for image change to enterprise version.
This test ensures enterprise images handle core+replicant setups. Good scenario coverage.


448-448: retry.RetryOnConflict for enterprise image.
Consistent concurrency pattern.


452-452: Set replicant replicas to 2.
Tests partial scale while changing image. Nicely combined scenario.


456-459: Ensuring readiness with new enterprise image.
Checks new revision for core nodes, replicant nodes. Great combined test coverage.


468-468: Replicant status check with new revision.
Confirms replicant rolled to the new image. All good.


488-489: Fetching instance for revert to EMQX 5.
Swapping from enterprise to a community image for 0 replicants scenario.


492-492: Retry pattern remains.
No issues in consistency.


500-503: Validating readiness after image + scale-down.
Ensures the final combination of changes is correct.


511-512: Check replicant node status is zero.
Ensures replicant tear-down is successful on new image version.


531-533: Waiting for old ReplicaSets to scale down.
Ensures no leftover RS from prior configuration. Great final check for replicant updates.


539-540: Using slices.DeleteFunc(...).
Leverages the new slices package to remove current revision. Straightforward approach, but ensure Go version is correct (as noted previously).


545-546: Expect old RS to have zero replicas.
Confirms successful transition from old to new revision.


554-555: Same approach for old StatefulSets.
Maintains consistent logic as with RS.


561-562: Deleting old STS with slices.DeleteFunc(...).
Again, consistent usage of the slices package.


568-569: Verifying zero replicas on old STS.
Completes a thorough check for core+replicant STS changes.


577-578: Updating config to new port.
No issues. Good demonstration of RetryOnConflict again.


589-589: Service port validation.
Ensuring the updated port is exposed. Consistent with previous approaches.


597-597: Endpoint port check again.
All consistent for verifying changed ports.


608-610: Revert to default port.
Reapplying the default 1883. This is standard test coverage.


611-615: Context for DS-enabled cluster.
New scenario for durable sessions. Great to isolate DS logic in separate tests.


617-627: Appending DS config to emqx.coresReplicants.Spec.
Proper approach to test DS by modifying Spec.Config.Data for enabling built-in raft.


628-658: Verifying DS readiness & DB replication.
Checks the DSReplication.DBs field, ensuring shards, min/max replicas, etc. This is thorough DS coverage.


660-667: Post-creation checks.
Ensures services, pods, endpoints align with DS changes.


669-679: Scaling up EMQX core with DS.
Uses the same pattern plus checks DS replication adjustments. This is thorough coverage.


681-711: Confirm DS replication fields reflect new replica count.
Ensures DS shard replication updates to 3. Nicely validated.


714-721: Verifying services, pods, endpoints manager.
Same checks for environment consistency after scale.


723-734: Scaling core down with DS.
Checks that DS transitions min/max replicas to 1. Good test scenario coverage.


736-764: Ensuring DS replication sync with fewer core nodes.
Correctly verifies MinReplicas = 1, MaxReplicas = 1. Looks good.


767-774: Final endpoint checks after scale-down.
Ensures the environment remains consistent.


776-828: Smooth configuration changes for DS.
Investing test coverage in the operator’s ability to handle environment variables, custom ports, DS replication. Well done.


831-906: Verifying final endpoints.
ConsistOf checks ensure the correct set of ports are published.


910-923: Disabling DS.
Checks that DS replication becomes empty after removing DS config. A nice tear-down scenario.


987-993: Pod readiness checks with custom conditions.
The test ensures pods move to PodOnServing and Ready. Great coverage for operator-defined statuses.

controllers/apps/v2beta1/add_svc.go (8)

9-9: New import for config package.
This introduction centralizes EMQX configuration handling. Good for maintainability.


36-39: Loading EMQX config object.
Switching from raw string to structured config is beneficial. The error handling gracefully stops reconciliation if loading fails.


40-40: Generate dashboard service with structured config.
This is a clearer approach than manually parsing the config string. Good design improvement.


43-43: Generate listener service from config.
Same methodology for the listener. Reduces duplication and potential parsing errors. Nicely done.


70-70: generateDashboardService now uses conf.
Passing *config.Conf instead of a string clarifies usage, reduces string manipulations.


80-80: Retrieving dashboard port from config.
Method call conf.GetDashboardServicePort() is more robust than manual extraction.


104-104: generateListenerService with new config param.
Enhances consistency with the new config approach.


113-113: GetListenersServicePorts() usage.
Leverages the config struct to gather ports. Clear and maintainable.

controllers/apps/v2beta1/sync_emqx_config.go (6)

11-11: Importing new config package.
Transition from raw merges to structured config is a major step in readability and debugging.


26-31: Merge defaults and parse via config.EMQXConf.
Combining user config with operator defaults is more maintainable. Immediate parse also gives quick feedback if config is invalid.


52-53: Track last config in annotations.
Using AnnotationsLastEMQXConfigKey helps detect changes. This is a reliable pattern for partial or full config updates.


66-68: Short-circuit if config differs but core not ready.
Prevents partial updates if the cluster isn’t in a stable state. Good call to wait until it's ready.


72-80: Stripping read-only configs.
Abstracting the logic in conf.StripReadOnlyConfig() is clean. Emitting an event clarifies what's being dropped.


82-83: Putting config via API.
putEMQXConfigsByAPI updates EMQX with the final config. Clear separation of concerns.

apis/apps/v2beta1/status.go (1)

59-70: Validate struct design and potential future expansions.
The DSReplicationStatus struct and its embedded slice of DSDBReplicationStatus look structurally sound. The design is flexible for storing multi-database replication metadata. If there's a plan for additional fields (e.g., replication lag or site-level metrics), consider how these expansions might affect the structure.

controllers/apps/v2beta1/ds_update_replica_sets.go (1)

101-112: Validate pod name parsing.
getPodIndex correctly splits on - and parses the final chunk. Keep an eye out for edge cases when pods do not follow name-index patterns, or if the string includes additional dashes. The logic returning -1 is fine for now, but consider logging a warning to help trace unexpected naming.

controllers/apps/v2beta1/update_pod_conditions.go (3)

91-94: Check concurrency for pod condition updates.
The code updates the pod conditions by calling updatePodCondition. This is correct, but quickly updating many pods could create concurrency spikes if run in parallel. Verify your logic to ensure you won’t flood the Kubernetes API.

Please confirm that no other part of the code is continuously invoking this function in short intervals. If you feel uncertain, I can generate a shell script to grep for repeated calls to updatePodCondition or the reconcile method to ensure we do not have repeated rapid calls.


119-119: Confirm method signature alignment.
Removing the instance *appsv2beta1.EMQX parameter from checkRebalanceStatus simplifies the method’s interface. Just ensure you do not need the instance’s additional context for expansions (e.g., different DS ports or proprietary logic).


124-124: Ensure mapped ports are valid.
u.conf.GetDashboardPortMap() outlines ports for dashboard endpoints. Confirm that these values exist within the config and reflect the actual pods/cluster environment. Mismatched ports may yield indefinite ConditionUnknown states.

Let me know if you'd like a shell script to quickly parse the code for how dashboard-https or dashboard are set or overridden.

controllers/apps/v2beta1/sync_pods_suite_test.go (8)

24-24: Improve code consistency with variable renaming

The change of fakeR to fakeReq (line 269) and addition of currentStsPod (line 30) improves readability and consistency in the code. This makes the code more maintainable by using more descriptive variable names.

Also applies to: 30-30


88-92: Improved label management with reusable variables

Using a dedicated variable for labels (updateStsLabels) reduces code duplication and makes the code more maintainable. This is an improvement over repeatedly calling appsv2beta1.CloneAndAddLabel.


132-136: Improved label management with reusable variables

Similar to the StatefulSet labels, using a dedicated variable for ReplicaSet labels (updateRsLabels) improves code consistency and maintainability.


176-194: Added StatefulSet pod creation for comprehensive testing

This addition of currentStsPod creation complements the existing currentRsPod creation, providing more complete test coverage for both StatefulSet and ReplicaSet pods, which is essential for the Durable Storage feature.


307-311: Refactored label creation for improved readability

The same pattern of using a dedicated variable for labels is used consistently throughout the test file, making the code more readable and maintainable.


363-364: Improved test structure with dedicated reconciliation object

The tests now use a structured syncPodsReconciliation object instead of direct function calls, improving code organization and clarity. The function signature has also been updated from canBeScaleDownSts to canScaleDownStatefulSet for better readability.

Also applies to: 374-375, 393-394


366-369: Enhanced test assertions for admission object

The test assertions now check for specific fields (Reason and Pod) in the returned admission object, providing more detailed validation compared to the previous boolean checks. This makes the tests more thorough and expressive.

Also applies to: 377-380


408-409: Improved FakeRequester usage with consistent naming

The fakeReq variable is now used consistently throughout the test file, improving readability. The tests also now use the same pattern of creating a syncPodsReconciliation object for testing.

Also applies to: 421-422

controllers/apps/v2beta1/add_emqx_core.go (9)

12-12: Added config import for enhanced configuration management

The addition of the config package import enables the use of a structured configuration object, which will provide better organization and type safety for configuration parameters.


31-31: Updated function call to use configuration object

The call to getNewStatefulSet now includes a.conf to pass the configuration object, which improves the function's flexibility and aligns with the objective of better handling Durable Storage configurations.


35-52: Refactored StatefulSet creation logic for clarity

The logic for determining whether to create a new StatefulSet has been simplified using a needCreate flag with clear conditions and logging. This makes the code more readable and maintainable.


54-80: Improved error handling and status updating

The error handling for StatefulSet creation has been streamlined with better return statements and status updates. This ensures that the EMQX instance status is correctly updated based on the creation result.


82-93: Simplified StatefulSet update process

Direct assignments from the existing StatefulSet to the new one replace deep copying, and the patch calculation now explicitly ignores replicas since scaling is managed separately. This approach better handles StatefulSets with Durable Storage enabled.


95-112: Improved logging and error handling for StatefulSet updates

Enhanced logging with clear reasons and better error handling improves the operator's ability to manage StatefulSets, especially when debugging issues with Durable Storage.


134-136: Refactored service port retrieval using configuration object

Using conf.GetDashboardServicePort() centralizes configuration access and improves maintainability by replacing direct access to instance configurations.


167-181: Added crucial PreStop hook for clean cluster leaving

This is a significant improvement that adds a PreStop hook to execute emqx ctl cluster leave when a pod is stopped. This is essential for maintaining cluster integrity, especially with Durable Storage Raft enabled, preventing leftover records in the cluster metadata.


287-287: Updated lifecycle assignment in container spec

The container's lifecycle field is now set to the configured lifecycle that includes the PreStop hook, ensuring pods leave the cluster cleanly during scale-down operations.

controllers/apps/v2beta1/add_emqx_repl.go (5)

11-11: Added config import for enhanced configuration management

The addition of the config package import enables the use of a structured configuration object for replicant nodes, consistent with the core nodes implementation.


37-37: Updated function call to use configuration object

The call to getNewReplicaSet now includes a.conf to pass the configuration object, ensuring consistent configuration handling between core and replicant nodes.


124-125: Refactored service port retrieval using configuration object

Using conf.GetDashboardServicePort() for replicant nodes matches the approach in core nodes, maintaining consistency and improving maintainability.


158-172: Added crucial PreStop hook for clean cluster leaving

This addition mirrors the change in the core nodes, ensuring that replicant nodes also leave the cluster cleanly when stopped. This is essential for maintaining cluster integrity, especially with Durable Storage Raft enabled.


272-272: Updated lifecycle assignment in container spec

The container's lifecycle field is now set to the configured lifecycle that includes the PreStop hook, ensuring replicant pods also leave the cluster cleanly during scale-down operations.

controllers/apps/v2beta1/util.go (4)

77-92: Added focused Pod retrieval function

The new listPodsManagedBy function efficiently retrieves pods managed by a specific controller UID, which is more targeted than the previous approach. This improves the operator's ability to track pods by their controller, helping with Durable Storage management.


113-126: Added Pod condition update function

The updatePodCondition function is crucial for annotating pods with conditions related to Durable Storage, directly supporting one of the main objectives of the PR. It efficiently applies a strategic merge patch to update pod status conditions.


179-194: Added StatefulSet replicas ignore function for patching

The ignoreStatefulSetReplicas function ensures that replicas count is not considered during StatefulSet patching, supporting the safe scale-down of StatefulSets with Durable Storage enabled by delegating scaling to the dedicated reconciler.


196-209: Added helper function for StatefulSet replicas field filtering

The filterStatefulSetReplicasField function provides the implementation details for the ignoreStatefulSetReplicas function, ensuring consistent handling of replicas during patching by setting the value to 1.

controllers/apps/v2beta1/emqx_controller.go (4)

104-104: Validate r.conf ahead of usage
Ensure LoadEMQXConf was called before creating a requester so r.conf is never nil. Future changes might inadvertently skip the load call.


117-117: Check the ordering of syncConfig
Confirm that synchronizing config before certain reconcilers does not introduce visibility delays or partial states for subsequent steps.


125-126: Review ds reconcilers sequence
dsUpdateReplicaSets and dsReflectPodCondition come at the end of the chain. Verify no DS operations are required earlier (e.g., before standard status updates).


144-145: Guard against uninitialized DS replication
If instance.Status.DSReplication is ever nil, calling IsStable() may panic. Ensure DS replication status is always populated.

controllers/apps/v2beta1/ds_reflect_pod_condition.go (1)

3-16: Validate all imports
Check for any unused or duplicate imports that might sneak in.

controllers/apps/v2beta1/config/emqx_test.go (9)

1-16: File-level overview
This test suite is extensive and appears to cover essential aspects of EMQX configuration handling.


19-26: Imports
All required libraries for assertions and Kubernetes APIs are included. Good choice of testify’s assert package.


28-49: Node cookie coverage
Thoroughly tests empty, normal, and special character cookies, ensuring robust coverage of edge cases.


51-79: Read-only config stripping
Excellent test for verifying that read-only config keys are recognized and removed as expected.


81-117: Durable sessions enabled
Covers multiple scenarios in different zones, providing high confidence in IsDSEnabled() correctness.


119-192: Dashboard port detection
Tests standard, disabled, and IPv4/IPv6 scenarios thoroughly, covering many real-world configurations.


194-236: Dashboard service port retrieval
Comprehensive coverage should minimize regressions in how the dashboard service port is computed.


238-326: Listeners service ports
Verifies multiple protocols—including QUIC and gateway workloads—ensuring they map to service ports as expected.


328-355: Merging default config
These scenarios confirm user-provided config overrides default values correctly—a crucial feature.

controllers/apps/v2beta1/config/emqx.go (3)

16-18: Consider adding dedicated unit tests for the Conf struct.
The Conf struct is central to configuration management. Having focused tests ensures that all HOCON parsing logic is consistently validated, preventing regressions in core configuration loading and transformation.

Would you like me to draft a test file for these functionalities?


65-81: Potential fallback scenario in IsDSEnabled().
The function correctly checks both the global durable_sessions.enable setting and zones. However, consider logging or returning more context if multiple zones have conflicting DS settings. This could aid debugging.


234-246: Validate merged defaults against user-provided overrides.
MergeDefaults merges a template with user config while allowing user overrides to take precedence. Ensure environment variables or external config that might conflict with these defaults are consistently validated at a higher level.

controllers/apps/v2beta1/ds/ds.go (3)

59-68: Confirm DS API path correctness.
IsDSAvailable calls apiGet(r, "api/v5/ds/sites"). Ensure consistent version usage elsewhere (e.g., api/v5/ds/storages) to reduce risk of mismatch or unexpected 404 errors.


97-120: Edge case for leaving transitions in TargetSites.
The function sets set[transition.Site] = false when transition.Transition is "leaving". If multiple transitions reference the same site, a subsequent "joining" transition could re-add the site. Confirm that logic ordering is correct if a site is both "joining" and "leaving" in the same iteration.


159-163: Check response codes for partial success on UpdateReplicaSet.
Only a single condition is returned: any non-2xx code is treated as an error. If the DS API could return partial success (e.g., 207), consider how to handle that scenario or confirm it’s not applicable.

controllers/apps/v2beta1/sync_pods.go (5)

69-77: reconcileReplicaSets short-circuits effectively.
This method is concise and easy to follow. Approving the approach of early returns if updateRs == nil or currentRs == nil, or if the replica sets share the same UID.


79-87: Split scale and migrate logic for clarity.
reconcileStatefulSets calls migrateStatefulSet if the UIDs differ, else calls scaleStatefulSet. This structure is neat. Just ensure future expansions (like forced migrations) do not overshadow this simple branching.


89-121: Pod deletion cost logic looks correct.
Adjusting pod-deletion-cost allows orchestrating an older Pod’s removal first. This approach is aligned with K8s best practices for gracefully draining out-of-date pods.


123-136: migrateStatefulSet uses offset scale-down technique.
The method retrieves a scale-down admission decision, then decrements replicas. It’s straightforward and presumably ensures only one Pod is removed at a time.


358-380: Ensure API path consistency for node queries.
getEMQXNodeInfoByAPI uses api/v5/nodes/..., which must remain consistent with the DS logic if they share references or versions.

Comment on lines +59 to +67
createInstance := func(spec appsv2beta1.EMQXSpec) error {
err := k8sClient.Get(ctx, instanceKey, instance)
if k8sErrors.IsNotFound(err) {
instance.Spec = *spec.DeepCopy()
return k8sClient.Create(ctx, instance)
}
instance.Spec = *spec.DeepCopy()
return k8sClient.Update(ctx, instance)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle unexpected errors when getting the instance.
Currently, if k8sClient.Get returns an error other than NotFound, the function ignores it and proceeds to overwrite instance.Spec. You might want to handle or log non-NotFound errors before continuing.

 if err != nil && !k8sErrors.IsNotFound(err) {
-    instance.Spec = *spec.DeepCopy()
-    return k8sClient.Update(ctx, instance)
+    return err
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
createInstance := func(spec appsv2beta1.EMQXSpec) error {
err := k8sClient.Get(ctx, instanceKey, instance)
if k8sErrors.IsNotFound(err) {
instance.Spec = *spec.DeepCopy()
return k8sClient.Create(ctx, instance)
}
instance.Spec = *spec.DeepCopy()
return k8sClient.Update(ctx, instance)
}
createInstance := func(spec appsv2beta1.EMQXSpec) error {
err := k8sClient.Get(ctx, instanceKey, instance)
if k8sErrors.IsNotFound(err) {
instance.Spec = *spec.DeepCopy()
return k8sClient.Create(ctx, instance)
}
if err != nil && !k8sErrors.IsNotFound(err) {
return err
}
instance.Spec = *spec.DeepCopy()
return k8sClient.Update(ctx, instance)
}

Comment on lines +111 to +127
func (u *dsReflectPodCondition) getSuitableRequester(
ctx context.Context,
instance *appsv2beta1.EMQX,
r req.RequesterInterface,
) req.RequesterInterface {
// Prefer Enterprise node which is part of "update" StatefulSet (if any).
for _, core := range instance.Status.CoreNodes {
if core.Edition == "Enterprise" && strings.Contains(core.PodName, instance.Status.CoreNodesStatus.UpdateRevision) {
pod, err := u.getPod(ctx, instance, core.PodName)
if err == nil && pod.DeletionTimestamp == nil && pod.Status.PodIP != "" {
return r.SwitchHost(pod.Status.PodIP)
}
}
}
// If no suitable pod found, return the original requester.
return r
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure readiness for host switching
When switching to an Enterprise node’s IP, consider verifying the pod is fully ready. Otherwise, DS calls could fail if the container isn’t fully started.

Comment on lines +161 to 190
// Scale up or down the existing statefulSet.
func (r *syncPodsReconciliation) scaleStatefulSet(ctx context.Context, req req.RequesterInterface) subResult {
sts := r.currentSts
desiredReplicas := *r.instance.Spec.CoreTemplate.Spec.Replicas
currentReplicas := *sts.Spec.Replicas

if currentReplicas < desiredReplicas {
*sts.Spec.Replicas = desiredReplicas
if err := r.Client.Update(ctx, sts); err != nil {
return subResult{err: emperror.Wrap(err, "failed to scale up statefulSet")}
}
return subResult{}
}

if updateSts != nil && currentSts != nil && updateSts.UID != currentSts.UID {
canBeScaledDown, err := s.canBeScaleDownSts(ctx, instance, r, currentSts, targetedEMQXNodesName)
if currentReplicas > desiredReplicas {
admission, err := r.canScaleDownStatefulSet(ctx, req)
if err != nil {
return subResult{err: emperror.Wrap(err, "failed to check if sts can be scale down")}
return subResult{err: emperror.Wrap(err, "failed to check if statefulSet can be scaled down")}
}
if canBeScaledDown {
// https://github.com/emqx/emqx-operator/issues/1105
currentSts.Spec.Replicas = ptr.To(int32(*currentSts.Spec.Replicas - 1))
if err := s.Client.Update(ctx, currentSts); err != nil {
return subResult{err: emperror.Wrap(err, "failed to scale down old statefulSet")}
if admission.Pod != nil {
*sts.Spec.Replicas = *sts.Spec.Replicas - 1
if err := r.Client.Update(ctx, sts); err != nil {
return subResult{err: emperror.Wrap(err, "failed to scale down statefulSet")}
}
return subResult{}
}
return subResult{}
}

return subResult{}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Check concurrency while scaling StatefulSet.
scaleStatefulSet updates sts.Spec.Replicas directly. If multiple reconcile loops run concurrently, another update could race to also change replicas. Ensure you rely on final K8s states and re-check if the replica count changed.

Comment on lines +281 to 356
func (r *syncPodsReconciliation) canScaleDownStatefulSet(ctx context.Context, req req.RequesterInterface) (scaleDownAdmission, error) {
// Disallow scaling down the statefulSet if replcants replicaSet is still updating.
status := r.instance.Status
if appsv2beta1.IsExistReplicant(r.instance) {
if status.ReplicantNodesStatus.CurrentRevision != status.ReplicantNodesStatus.UpdateRevision {
return scaleDownAdmission{Reason: "replicant replicaSet is still updating"}, nil
}
}

if !checkInitialDelaySecondsReady(instance) {
return false, nil
if !checkInitialDelaySecondsReady(r.instance) {
return scaleDownAdmission{Reason: "instance is not ready"}, nil
}

if len(instance.Status.NodeEvacuationsStatus) > 0 {
if instance.Status.NodeEvacuationsStatus[0].State != "prohibiting" {
return false, nil
if len(status.NodeEvacuationsStatus) > 0 {
if status.NodeEvacuationsStatus[0].State != "prohibiting" {
return scaleDownAdmission{Reason: "node evacuation is still in progress"}, nil
}
}

shouldDeletePod = &corev1.Pod{}
_ = s.Client.Get(ctx, types.NamespacedName{
Namespace: instance.Namespace,
Name: fmt.Sprintf("%s-%d", oldSts.Name, *oldSts.Spec.Replicas-1),
}, shouldDeletePod)
// Get the pod to be scaled down next.
scaleDownPod := &corev1.Pod{}
err := r.Client.Get(ctx, types.NamespacedName{
Namespace: r.instance.Namespace,
Name: fmt.Sprintf("%s-%d", r.currentSts.Name, *r.currentSts.Spec.Replicas-1),
}, scaleDownPod)

if shouldDeletePod.DeletionTimestamp != nil {
return false, nil
// No more pods, no need to scale down.
if err != nil && k8sErrors.IsNotFound(err) {
return scaleDownAdmission{Reason: "no more pods"}, nil
}

shouldDeletePodInfo, err = getEMQXNodeInfoByAPI(r, fmt.Sprintf("emqx@%s.%s.%s.svc.cluster.local", shouldDeletePod.Name, oldSts.Spec.ServiceName, oldSts.Namespace))
// Disallow scaling down the pod that is already being deleted.
if scaleDownPod.DeletionTimestamp != nil {
return scaleDownAdmission{Reason: "pod deletion in progress"}, nil
}

// Disallow scaling down the pod that is still a DS replication site.
// Only if DS is enabled in the current, most recent EMQX config.
// Otherwise, if the user has disabled DS, the data is apparently no longer
// needs to be preserved.
if r.conf.IsDSEnabled() {
dsCondition := appsv2beta1.FindPodCondition(scaleDownPod, appsv2beta1.DSReplicationSite)
if dsCondition != nil && dsCondition.Status != corev1.ConditionFalse {
return scaleDownAdmission{Reason: "pod is still a DS replication site"}, nil
}
}

// Get the node info of the pod to be scaled down.
scaleDownNodeName := fmt.Sprintf("emqx@%s.%s.%s.svc.cluster.local", scaleDownPod.Name, r.currentSts.Spec.ServiceName, r.currentSts.Namespace)
scaleDownNode, err := getEMQXNodeInfoByAPI(req, scaleDownNodeName)
if err != nil {
return false, emperror.Wrap(err, "failed to get node info by API")
return scaleDownAdmission{}, emperror.Wrap(err, "failed to get node info by API")
}

if shouldDeletePodInfo.NodeStatus == "stopped" {
return true, nil
// Scale down the node that is already stopped.
if scaleDownNode.NodeStatus == "stopped" {
return scaleDownAdmission{Pod: scaleDownPod, Reason: "node is already stopped"}, nil
}

if shouldDeletePodInfo.Edition == "Enterprise" && shouldDeletePodInfo.Session > 0 {
if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil {
return false, emperror.Wrap(err, "failed to start node evacuation")
// Disallow scaling down the node that is Enterprise and has at least one session.
if scaleDownNode.Edition == "Enterprise" && scaleDownNode.Session > 0 {
migrateTo := r.migrationTargetNodes()
if err := startEvacuationByAPI(req, r.instance, migrateTo, scaleDownNode.Node); err != nil {
return scaleDownAdmission{}, emperror.Wrap(err, "failed to start node evacuation")
}
s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node))
return false, nil
r.EventRecorder.Event(r.instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", scaleDownNode.Node))
return scaleDownAdmission{Reason: "node needs to be evacuated"}, nil
}

// Open Source or Enterprise with no session
if !checkWaitTakeoverReady(instance, getEventList(ctx, s.Clientset, oldSts)) {
return false, nil
if !checkWaitTakeoverReady(r.instance, getEventList(ctx, r.Clientset, r.currentSts)) {
return scaleDownAdmission{Reason: "node evacuation just finished"}, nil
}
return true, nil

return scaleDownAdmission{Pod: scaleDownPod}, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential race with direct Pod reference in canScaleDownStatefulSet.
Reading the Pod object and then using Client.Update on the StatefulSet might be vulnerable to versioning conflicts if the Pod is updated or replaced mid-process. Use resourceVersion checks or consistent fetches to avoid errors.

Signed-off-by: Andrew Maiorov <[email protected]>
@Rory-Z
Copy link
Member

Rory-Z commented Mar 28, 2025

Hi @keynslug This is a new feature, could you please create PR to main-2.3 branch, not main branch.

In main-2.3 branch, I change kubebuilder version to v4, so may have a lot of change need to fix.

And I also think this PR can split up two PRs, one of PR move conf code, one of PR add new feature.

Others, I don't know too much about EMQX Durable Storage, do we have any document about clean EMQX node from EMQX cluster ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants