Skip to content

Conversation

@mmatczuk
Copy link
Collaborator

Add OpenTelemetry Protocol (OTLP) input components supporting both gRPC
and HTTP transports for ingesting traces, logs, and metrics.

  • Add otlp_grpc input on default port 4317
  • Add otlp_http input on default port 4318 with /v1/traces, /v1/logs, and
      /v1/metrics endpoints
  • Convert OTLP protobuf format to Redpanda OTEL v1 protobuf messages
  • Unbatch export requests into individual messages (one per span/log/metric)
  • Support optional rate limiting via rate limit resources
  • Add TLS configuration for both transports
  • Add TCP socket configuration (SO_REUSEADDR, SO_REUSEPORT)

@mmatczuk
Copy link
Collaborator Author

Requires #3893 merged first.

@mmatczuk mmatczuk force-pushed the mmt/otelinput branch 6 times, most recently from 6602493 to 6894c8d Compare January 13, 2026 12:00
shutSig *shutdown.Signaller
}

func makeOTLPInput(mgr *service.Resources, rateLimit string) otlpInput {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why makeOTLPInput instead of following the Go convention of newOTLPInput?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, given the type uses pointer receivers for its functions, would it be better to follow the convention of making this function return a pointer to indicate that it has pointer semantics?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is this is only for embedding - the pointer receiver works on addresses embedded in another struct.
I can sure rename it to new.


// maybeWaitForAccess blocks until the rate limiter grants access or the
// context/shutdown signals. If no rate limit is configured, it returns
// immediately. It must be called before calling [sendMessage].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be sendMessageBatch?

Suggested change
// immediately. It must be called before calling [sendMessage].
// immediately. It must be called before calling [sendMessageBatch].

err = rerr
}
if err != nil {
o.log.Errorf("Rate limit error: %v\n", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
o.log.Errorf("Rate limit error: %v\n", err)
o.log.Errorf("Rate limit error: %v", err)

I don't think we need the newline here, or was this intentional?

}
}

const gracefulShutdownTimeout = 5 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor (non-blocking): given this is only used in the input_grpc.go file, would it be worth moving it to the top of that file as opposed to in here at the bottom?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved above Close.

)

type grpcInputConfig struct {
Address string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these fields need to be exported?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, those are exported fields of not exported type but it's on purpose for config type. It can make your life easier with tests or in separate package otlp_test.

)

// Parse gRPC-specific config
conf.Address, err = pConf.FieldString(giFieldAddress)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor (non-blocking): Very subjective so feel free to ignore, but these seem like a good candidate for inlining, ie:

if conf.Address, err = pConf.FieldString(giFieldAddress); err != nil {
		return nil, err
}


s.maybeWaitForAccess(ctx)

traces := req.Traces()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor (non-blocking): I don't see us using traces variable later, perhaps just call req.Traces().SpanCount() instead of assigning to a variable?

@mmatczuk mmatczuk force-pushed the mmt/otelinput branch 3 times, most recently from 1746df2 to c6d96f2 Compare January 13, 2026 14:04
Add OpenTelemetry Protocol (OTLP) input components supporting both gRPC
and HTTP transports for ingesting traces, logs, and metrics.

- Add otlp_grpc input on default port 4317
- Add otlp_http input on default port 4318 with /v1/traces, /v1/logs, and
  /v1/metrics endpoints
- Convert OTLP protobuf format to Redpanda OTEL v1 protobuf messages
- Unbatch export requests into individual messages (one per span/log/metric)
- Support optional rate limiting via rate limit resources
- Add TLS configuration for both transports
- Add TCP socket configuration (SO_REUSEADDR, SO_REUSEPORT)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants