Skip to content

Conversation

@mleonidas
Copy link

@mleonidas mleonidas commented Aug 13, 2025

…ally (insecure)

Overview

Fixes #(issue)

Notes for reviewer

Summary by CodeRabbit

  • New Features
    • Added an option to allow insecure TLS for Kafka SSL connections (e.g., self-signed certificates). This enables TLS without SASL when explicitly opted in.
    • Expands connectivity to Kafka deployments using SSL with untrusted/temporary certificates while keeping current defaults.
    • Existing behavior remains unchanged unless the option is enabled; SASL_SSL handling is unaffected.

@mleonidas mleonidas requested a review from a team as a code owner August 13, 2025 19:47
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 13, 2025

📝 Walkthrough

Walkthrough

Introduces a TLSInsecure boolean in KafkaConfiguration and updates Kafka broker config logic to enable TLS for SSL connections when TLSInsecure is true, prior to SASL_SSL handling. No other validation or config creation references were added.

Changes

Cohort / File(s) Summary
Configuration schema update
app/config/ingest.go
Added exported field TLSInsecure bool to KafkaConfiguration; no accompanying validation or config creation logic changes in this file.
Kafka broker TLS handling
openmeter/watermill/driver/kafka/broker.go
In createKafkaConfig, added conditional to enable TLS when SecurityProtocol == "SSL" and TLSInsecure is true; existing SASL_SSL path unchanged.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~7 minutes

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@mleonidas
Copy link
Author

mleonidas commented Aug 13, 2025

Notably, I encountered this issue while attempting to deploy to an internal Kubernetes cluster in AWS, utilizing MSK as the Kafka cluster. We have our cluster configured for TLS, but due to the certs not being locally or in the container, the brokers would fail to connect.

edit: also sorry about the long commit msg, I've never used cz cli, I have a git template that neogit reads in, but the commit hooks would fail until I used cz :/

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (1)
openmeter/watermill/driver/kafka/broker.go (1)

79-94: SASL_SSL path ignores TLSInsecure; set InsecureSkipVerify when requested

In the SASL_SSL branch you always enable TLS and set an empty tls.Config, but you never set InsecureSkipVerify when TLSInsecure is true. This undermines the PR’s purpose for SASL_SSL connections.

Minimal change within this block:

 	if o.KafkaConfig.SecurityProtocol == "SASL_SSL" {
 		config.Net.SASL.Enable = true
 		config.Net.SASL.Handshake = true

 		config.Net.TLS.Enable = true
-		config.Net.TLS.Config = &tls.Config{}
+		config.Net.TLS.Config = &tls.Config{
+			// Note: will be overridden below if TLSInsecure is set globally
+		}

 		switch o.KafkaConfig.SaslMechanisms {

And ensure the global post-block addition applies:

if o.KafkaConfig.TLSInsecure && config.Net.TLS.Enable {
    if config.Net.TLS.Config == nil {
        config.Net.TLS.Config = &tls.Config{}
    }
    config.Net.TLS.Config.InsecureSkipVerify = true
}
🧹 Nitpick comments (2)
openmeter/watermill/driver/kafka/broker.go (2)

48-74: Optional: centralize protocol handling and add warning log for insecure mode

To make this less error-prone, handle protocol selection once, then apply insecure behavior and SASL mechanism specifics. Also log a warning when insecure mode is active.

Example refactor (conceptual):

// Protocol wiring
switch o.KafkaConfig.SecurityProtocol {
case "SSL":
    config.Net.TLS.Enable = true
case "SASL_SSL":
    config.Net.TLS.Enable = true
    config.Net.SASL.Enable = true
    config.Net.SASL.Handshake = true
    // set mechanism...
}

// TLS config
if config.Net.TLS.Enable && config.Net.TLS.Config == nil {
    config.Net.TLS.Config = &tls.Config{}
}
if o.KafkaConfig.TLSInsecure && config.Net.TLS.Enable {
    o.Logger.Warn("TLS insecure mode enabled: skipping certificate and hostname verification")
    config.Net.TLS.Config.InsecureSkipVerify = true
}

48-74: Nit: avoid stringly-typed protocol/mechanism values

Use typed constants or enums for "SSL", "SASL_SSL", "PLAIN" to reduce typo risk and improve discoverability.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9ce6c87 and 22bb247.

📒 Files selected for processing (2)
  • app/config/ingest.go (1 hunks)
  • openmeter/watermill/driver/kafka/broker.go (1 hunks)

Comment on lines 61 to 66
type KafkaConfiguration struct {
Broker string
SecurityProtocol string
TLSInsecure bool
SaslMechanisms string
SaslUsername string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

TLSInsecure is undocumented and not wired into non-sarama (librdkafka) config; add docs, default, and validation

  • Add a clear comment on what “insecure” means (skips cert and hostname verification).
  • Set an explicit Viper default for discoverability: ingest.kafka.tlsInsecure = false.
  • Validate that TLSInsecure is only used with SSL/SASL_SSL; otherwise return an error to avoid silent misconfigurations.
  • If you intend TLSInsecure to apply across both drivers, wire it into the confluent-kafka-go ConfigMap: disable certificate and hostname verification when set.

Apply these diffs and additions:

 type KafkaConfiguration struct {
   Broker           string
   SecurityProtocol string
-  TLSInsecure      bool
+  // TLSInsecure disables TLS certificate and hostname verification.
+  // WARNING: Do not use in production. Intended for local dev/testing with self-signed certs.
+  TLSInsecure      bool
   SaslMechanisms   string
   SaslUsername     string
   SaslPassword     string
 func ConfigureIngest(v *viper.Viper) {
   v.SetDefault("ingest.kafka.broker", "127.0.0.1:29092")
   v.SetDefault("ingest.kafka.securityProtocol", "")
+  v.SetDefault("ingest.kafka.tlsInsecure", false)
   v.SetDefault("ingest.kafka.saslMechanisms", "")
   v.SetDefault("ingest.kafka.saslUsername", "")
   v.SetDefault("ingest.kafka.saslPassword", "")

Outside the changed hunk (CreateKafkaConfig), wire TLSInsecure into librdkafka config:

// Add near where security.protocol is set
if c.SecurityProtocol == "SSL" || c.SecurityProtocol == "SASL_SSL" {
    if c.TLSInsecure {
        // Disable peer cert verification and hostname verification for librdkafka
        // Ref: librdkafka configs: enable.ssl.certificate.verification=false, ssl.endpoint.identification.algorithm=none
        config["enable.ssl.certificate.verification"] = false
        config["ssl.endpoint.identification.algorithm"] = "none"
    }
}

And in validation:

if c.TLSInsecure && c.SecurityProtocol != "SSL" && c.SecurityProtocol != "SASL_SSL" {
    errs = append(errs, errors.New("tlsInsecure requires securityProtocol to be SSL or SASL_SSL"))
}

To confirm TLSInsecure is consistently recognized in config files and envs, run:


🏁 Script executed:

#!/bin/bash
# Verify config keys and usage
rg -n --no-heading $'tlsInsecure|TLSInsecure|enable\\.ssl\\.certificate\\.verification|ssl\\.endpoint\\.identification\\.algorithm'

Length of output: 294


🏁 Script executed:

#!/bin/bash
set -euo pipefail
echo "== ingest.go =="
sed -n '1,240p' app/config/ingest.go || true
echo
echo "== broker.go (context around match) =="
sed -n '1,240p' openmeter/watermill/driver/kafka/broker.go || true

Length of output: 8574


Action required: document TLSInsecure, add default & validation, wire it into librdkafka config, and fix Sarama TLS handling

Verified: TLSInsecure is declared in app/config/ingest.go and referenced only in openmeter/watermill/driver/kafka/broker.go. It is not wired into the confluent-librdkafka config (CreateKafkaConfig). The sarama code currently enables TLS only when TLSInsecure is true (bug). Apply the fixes below.

Files to change:

  • app/config/ingest.go — add docs on TLSInsecure, viper default, validation, and wire into CreateKafkaConfig for librdkafka.
  • openmeter/watermill/driver/kafka/broker.go — enable TLS for SSL/SASL_SSL and set InsecureSkipVerify when TLSInsecure is true; remove the current incorrect conditional.

Suggested diffs:

  1. Add doc to KafkaConfiguration.TLSInsecure
 type KafkaConfiguration struct {
   Broker           string
   SecurityProtocol string
-  TLSInsecure      bool
+  // TLSInsecure disables TLS certificate and hostname verification.
+  // WARNING: Do NOT use in production. Intended only for local dev/testing with self-signed certs.
+  TLSInsecure      bool
   SaslMechanisms   string
   SaslUsername     string
   SaslPassword     string
  1. Set Viper default in ConfigureIngest
 func ConfigureIngest(v *viper.Viper) {
   v.SetDefault("ingest.kafka.broker", "127.0.0.1:29092")
   v.SetDefault("ingest.kafka.securityProtocol", "")
+  v.SetDefault("ingest.kafka.tlsInsecure", false)
   v.SetDefault("ingest.kafka.saslMechanisms", "")
   v.SetDefault("ingest.kafka.saslUsername", "")
   v.SetDefault("ingest.kafka.saslPassword", "")
  1. Validate TLSInsecure usage
 func (c KafkaConfiguration) Validate() error {
   var errs []error
   ...
+  if c.TLSInsecure && c.SecurityProtocol != "SSL" && c.SecurityProtocol != "SASL_SSL" {
+      errs = append(errs, errors.New("tlsInsecure requires securityProtocol to be SSL or SASL_SSL"))
+  }
   return errors.Join(errs...)
 }
  1. Wire TLSInsecure into confluent-kafka-go (CreateKafkaConfig)
 	if c.SecurityProtocol != "" {
 		config["security.protocol"] = c.SecurityProtocol
 	}
+	// If TLSInsecure is enabled, tell librdkafka to skip cert & hostname verification.
+	if c.SecurityProtocol == "SSL" || c.SecurityProtocol == "SASL_SSL" {
+		if c.TLSInsecure {
+			// librdkafka configs:
+			// enable.ssl.certificate.verification=false
+			// ssl.endpoint.identification.algorithm=none
+			config["enable.ssl.certificate.verification"] = false
+			config["ssl.endpoint.identification.algorithm"] = "none"
+		}
+	}
  1. Fix Sarama broker TLS handling (openmeter/watermill/driver/kafka/broker.go)
-	if o.KafkaConfig.SecurityProtocol == "SSL" && o.KafkaConfig.TLSInsecure {
-		config.Net.TLS.Enable = true
-	}
-
-	if o.KafkaConfig.SecurityProtocol == "SASL_SSL" {
-		config.Net.SASL.Enable = true
-		config.Net.SASL.Handshake = true
-
-		config.Net.TLS.Enable = true
-		config.Net.TLS.Config = &tls.Config{}
-
-		switch o.KafkaConfig.SaslMechanisms {
-		case "PLAIN":
-			config.Net.SASL.User = o.KafkaConfig.SaslUsername
-			config.Net.SASL.Password = o.KafkaConfig.SaslPassword
-			config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
-		default:
-			return nil, fmt.Errorf("unsupported SASL mechanism: %s", o.KafkaConfig.SaslMechanisms)
-		}
-	}
+	// Enable TLS for SSL and SASL_SSL protocols.
+	if o.KafkaConfig.SecurityProtocol == "SSL" || o.KafkaConfig.SecurityProtocol == "SASL_SSL" {
+		config.Net.TLS.Enable = true
+		config.Net.TLS.Config = &tls.Config{}
+		if o.KafkaConfig.TLSInsecure {
+			// WARNING: insecure — skips certificate and hostname verification
+			config.Net.TLS.Config.InsecureSkipVerify = true
+		}
+	}
+
+	if o.KafkaConfig.SecurityProtocol == "SASL_SSL" {
+		config.Net.SASL.Enable = true
+		config.Net.SASL.Handshake = true
+
+		switch o.KafkaConfig.SaslMechanisms {
+		case "PLAIN":
+			config.Net.SASL.User = o.KafkaConfig.SaslUsername
+			config.Net.SASL.Password = o.KafkaConfig.SaslPassword
+			config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
+		default:
+			return nil, fmt.Errorf("unsupported SASL mechanism: %s", o.KafkaConfig.SaslMechanisms)
+		}
+	}

Notes:

  • rg search shows TLSInsecure referenced only in app/config/ingest.go and openmeter/watermill/driver/kafka/broker.go — so update both places.
  • The librdkafka config keys used mirror librdkafka options: enable.ssl.certificate.verification and ssl.endpoint.identification.algorithm=none.
  • The sarama change ensures TLS is enabled whenever protocol requests it, and only toggles InsecureSkipVerify when explicitly requested.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
type KafkaConfiguration struct {
Broker string
SecurityProtocol string
TLSInsecure bool
SaslMechanisms string
SaslUsername string
type KafkaConfiguration struct {
Broker string
SecurityProtocol string
// TLSInsecure disables TLS certificate and hostname verification.
// WARNING: Do NOT use in production. Intended only for local dev/testing with self-signed certs.
TLSInsecure bool
SaslMechanisms string
SaslUsername string
🤖 Prompt for AI Agents
In app/config/ingest.go around lines 61-66: add a doc comment on
KafkaConfiguration.TLSInsecure explaining it disables server certificate
verification for TLS connections (unsafe for production), add a Viper default
(false) in ConfigureIngest, and add validation to ensure it's a boolean/allowed
value; then wire TLSInsecure into CreateKafkaConfig by setting the librdkafka
options enable.ssl.certificate.verification (true/false) and when TLSInsecure is
true set ssl.endpoint.identification.algorithm=none. In
openmeter/watermill/driver/kafka/broker.go: change the Sarama TLS logic so TLS
is enabled whenever the SecurityProtocol is SSL or SASL_SSL (not only when
TLSInsecure), and only set TLSConfig.InsecureSkipVerify = true when TLSInsecure
is true; remove the existing conditional that enables TLS only when TLSInsecure.

Comment on lines +75 to +77
if o.KafkaConfig.SecurityProtocol == "SSL" && o.KafkaConfig.TLSInsecure {
config.Net.TLS.Enable = true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Incorrect gating: TLS for SSL should be enabled regardless of TLSInsecure; also missing InsecureSkipVerify

Currently TLS is only enabled for SecurityProtocol == "SSL" when TLSInsecure is true, which breaks valid SSL setups. Additionally, “insecure” mode doesn’t set InsecureSkipVerify, so it won’t achieve the stated goal for self-signed/local certs.

Apply this diff to always enable TLS for SSL and conditionally set InsecureSkipVerify:

-	if o.KafkaConfig.SecurityProtocol == "SSL" && o.KafkaConfig.TLSInsecure {
-		config.Net.TLS.Enable = true
-	}
+	if o.KafkaConfig.SecurityProtocol == "SSL" {
+		config.Net.TLS.Enable = true
+		if config.Net.TLS.Config == nil {
+			config.Net.TLS.Config = &tls.Config{}
+		}
+	}

Add the following block after the SASL_SSL handling (or immediately after SSL/SASL_SSL wiring) to apply “insecure” consistently:

// Apply insecure TLS settings if requested
if o.KafkaConfig.TLSInsecure && config.Net.TLS.Enable {
    if config.Net.TLS.Config == nil {
        config.Net.TLS.Config = &tls.Config{}
    }
    config.Net.TLS.Config.InsecureSkipVerify = true
}
🤖 Prompt for AI Agents
openmeter/watermill/driver/kafka/broker.go lines 75-77: TLS is only being
enabled when SecurityProtocol == "SSL" AND TLSInsecure is true, and
InsecureSkipVerify is never set; change the logic so TLS is enabled whenever
SecurityProtocol == "SSL" (or SASL_SSL where appropriate) and, if TLSInsecure is
true, set config.Net.TLS.Config.InsecureSkipVerify = true, creating
config.Net.TLS.Config if nil; add the InsecureSkipVerify block after the
SASL_SSL/SSL wiring so insecure mode is applied consistently.

@chrisgacsal chrisgacsal added the kind/feature New feature or request label Aug 14, 2025
@turip
Copy link
Member

turip commented Aug 17, 2025

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/feature New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants