Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d537aee
feat(otel): add opentelemety utility functions
AndrewWinterman Jun 28, 2024
d292598
Merge branch 'main' into feat/opentelemetry
AndrewWinterman Jun 28, 2024
75a6aeb
feat(otel): take a stab at wiring otel up
AndrewWinterman Jul 30, 2024
ccf814a
Merge branch 'feat/opentelemetry' of https://github.com/AndrewWinterm…
AndrewWinterman Jul 30, 2024
13a1894
Merge branch 'main' into feat/opentelemetry
AndrewWinterman Jul 30, 2024
e0fa7c6
fix: remove reference to outreach gobox lib
AndrewWinterman Jul 30, 2024
1aeb2d0
Merge branch 'feat/opentelemetry' of https://github.com/AndrewWinterm…
AndrewWinterman Jul 30, 2024
47aa58b
a smidge of polish
AndrewWinterman Aug 19, 2024
135fda6
Add otel inst
dkPranav Feb 19, 2025
9790ae7
Update README.md
dkPranav Mar 20, 2025
f7be3c5
AndrewWinterman + fix
dkPranav Mar 20, 2025
f154228
Merge branch 'otel' of github.com:dkPranav/amqp091-go into otel
dkPranav Mar 20, 2025
4d6ab91
Go fmt
dkPranav Mar 20, 2025
11e87a8
Typo
dkPranav Mar 20, 2025
c42d97b
Merge branch 'feat/opentelemetry' into otel
dkPranav Mar 20, 2025
1a75eac
Fix Rebase issues
dkPranav Mar 20, 2025
626c70d
Attributes for consume operation
dkPranav Jun 11, 2025
4851162
Comments
dkPranav Jun 11, 2025
931eede
Update opentelemetry.go
dkPranav Jun 26, 2025
3009c4c
Refactor delivery attributes and telemetry integration for improved o…
dkPranav Jun 27, 2025
3c9333e
Remove unused DeliveryResponse type and associated methods
dkPranav Jun 27, 2025
cb91b98
Go fmt
dkPranav Jun 27, 2025
a7e5515
Update opentelemetry.go
dkPranav Jul 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,7 @@ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg
/*
PublishWithContext sends a Publishing from the client to an exchange on the server.

NOTE: this function is equivalent to [Channel.Publish]. Context is not honoured.
NOTE: Context termination is not honoured.

When you want a single message to be delivered to a single queue, you can
publish to the default exchange with the routingKey of the queue name. This is
Expand Down Expand Up @@ -1523,8 +1523,9 @@ confirmations start at 1. Exit when all publishings are confirmed.
When Publish does not return an error and the channel is in confirm mode, the
internal counter for DeliveryTags with the first confirmation starts at 1.
*/
func (ch *Channel) PublishWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error {
return ch.Publish(exchange, key, mandatory, immediate, msg)
func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error {
_, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg)
return err
Comment on lines -1526 to +1528
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving a note to myself to revisit this change. I think it would be preferable to duplicate some code, instead of executing all the code for the deferred confirm and ignore the return value.

}

/*
Expand Down Expand Up @@ -1583,11 +1584,19 @@ DeferredConfirmation, allowing the caller to wait on the publisher confirmation
for this message. If the channel has not been put into confirm mode,
the DeferredConfirmation will be nil.

NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context variant. The context passed
to this function is not honoured.
NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context
variant. The termination of the context passed to this function is not
honoured.
*/
func (ch *Channel) PublishWithDeferredConfirmWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) {
return ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg)
func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) {
_, msg, endSpanFn := spanForPublication(ctx, msg, exchange, key, immediate)
dc, err := ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg)
if err != nil {
endSpanFn(err)
return nil, err
}
endSpanFn(nil)
return dc, nil
}

/*
Expand Down
114 changes: 114 additions & 0 deletions delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
package amqp091

import (
"context"
"errors"
"time"

"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

var ErrDeliveryNotInitialized = errors.New("delivery not initialized. Channel is probably closed")
Expand Down Expand Up @@ -58,6 +62,24 @@ type Delivery struct {
Body []byte
}

// Span returns context and a span that for the delivery
// the resulting span is linked to the publication that created it, if it has
// the appropraite headers set. See [context-propagation] for more details
//
// [context-propagation]: https://opentelemetry.io/docs/concepts/context-propagation/
func (d Delivery) Span(
ctx context.Context,
options ...trace.SpanStartOption,
) (context.Context, trace.Span) {
return spanForDelivery(ctx, &d, options...)
}

// Link returns a link for the delivery. The link points to the publication, if
// the appropriate headers are set.
func (d Delivery) Link(ctx context.Context) trace.Link {
return extractLinkFromDelivery(ctx, &d)
}

func newDelivery(channel *Channel, msg messageWithContent) *Delivery {
props, body := msg.getContent()

Expand Down Expand Up @@ -171,3 +193,95 @@ func (d Delivery) Nack(multiple, requeue bool) error {
}
return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue)
}

/*
AckWithContext delegates an acknowledgement through the Acknowledger interface that the
client or server has finished work on a delivery, with OpenTelemetry span support.

This method creates a settle span for the acknowledgment operation and properly
handles error recording in the span.

When multiple is true, this delivery and all prior unacknowledged deliveries
on the same channel will be acknowledged.

Either AckWithContext, RejectWithContext or NackWithContext must be called for every delivery that is
not automatically acknowledged when using context-aware operations.
*/
func (d *Delivery) AckWithContext(ctx context.Context, multiple bool) error {
// Start the settle span before performing the acknowledgment operation
_, span := settleAckDelivery(ctx, d, multiple)
defer span.End()

err := d.Ack(multiple)

// Record any error in the span
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

return err
}

/*
RejectWithContext delegates a negatively acknowledgement through the Acknowledger interface
with OpenTelemetry span support.

This method creates a settle span for the rejection operation and properly
handles error recording in the span.

When requeue is true, queue this message to be delivered to a consumer on a
different channel. When requeue is false or the server is unable to queue this
message, it will be dropped.

Either AckWithContext, RejectWithContext or NackWithContext must be called for every delivery that is
not automatically acknowledged when using context-aware operations.
*/
func (d *Delivery) RejectWithContext(ctx context.Context, requeue bool) error {
// Start the settle span before performing the rejection operation
_, span := settleRejectDelivery(ctx, d, requeue)
defer span.End()

err := d.Reject(requeue)

// Record any error in the span
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

return err
}

/*
NackWithContext negatively acknowledges the delivery of message(s) identified by the
delivery tag from either the client or server, with OpenTelemetry span support.

This method creates a settle span for the nack operation and properly
handles error recording in the span.

When multiple is true, nack messages up to and including delivered messages up
until the delivery tag delivered on the same channel.

When requeue is true, request the server to deliver this message to a different
consumer. If it is not possible or requeue is false, the message will be
dropped or delivered to a server configured dead-letter queue.

Either AckWithContext, RejectWithContext or NackWithContext must be called for every delivery that is
not automatically acknowledged when using context-aware operations.
*/
func (d *Delivery) NackWithContext(ctx context.Context, multiple, requeue bool) error {
// Start the settle span before performing the nack operation
_, span := settleNackDelivery(ctx, d, multiple, requeue)
defer span.End()

err := d.Nack(multiple, requeue)

// Record any error in the span
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

return err
}
17 changes: 15 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
module github.com/rabbitmq/amqp091-go

go 1.20
go 1.22.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What features are we using of Go 1.22 for this bump?

We are quite conservative with go directive bumps, specially since recent versions of Go have set this as a hard requirement for minimum version.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to have come-in as part of the otel-go package https://github.com/open-telemetry/opentelemetry-go/releases/tag/v1.34.0, which requires a minimum go version of 1.22.0.

We will revisit the approach for this PR, aiming to adopt a subpackage-middleware strategy without making major changes to the main package. (discussions around this in the other PR #272 (comment))

Does this make sense @Zerpet ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zerpet / @AndrewWinterman - Just to flesh this out a bit more.

The OpenTelemetry packages have a requirement for the latest version of Go, for example the most recent release bumps the requirement to 1.23.

There are 2 possible solutions to including OpenTelemetry into this package:

  1. Simply accept the Go version requirement from the Otel package and add the Otel packages to this packages dependencies.
  2. Introduce a Middleware concept to break the dependancy with Otel and maintain the existing Go version requirement. OpenTelemetry instrumentation would then be implemented as a new middleware module, within this repo but with it's own dependencies.

Option 1 is what's implemented currently in this PR.

Option 2, would add a new interface and methods to set the middleware of a Connection and an otel/ directory with it's own go.md.

I envision that the Middleware would have pre and post functions for all the methods that we had instrumented so far.

type Middleware interface {
    PrePublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing)
    PostPublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing, err Error) // err is result of the call to PublishWithContext.

... // Other methods Pre/Post 
}

and then in the PublishWithContext function

func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error {
        if ch.middleware != nil {
            ch.middleware.PrePublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
        }
	_, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg)
        if ch.middleware != nil {
            ch.middleware.PostPublishWithContext(ctx, exchange, key, mandatory, immediate, msg, err)
        }

	return err
}

If we think Option 2 is the way to go, We can add a new PR with the proposed Middleware implementation and agree on that before working on the OpenTelemetry middleware.

Comments?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still drinking my coffee, so apologies in advance if I miss something obvious 😴

I'm not sure how option 2 solves the problem. I understand that we can move OTEL bits to its own module e.g. otel/go.mod, however, this module will still require OTEL version X, which forces a go directive of 1.22 or 1.23 to the middleware module. From the code snippet, I understand that the client module amqp091-go will import the middleware module in otel/go.mod, which will in turn force the client module to bump its Go directive. This is somewhat "recent" since the Go toolchain has become very picky about importing modules of higher go directive. In short, I don't think this middleware approach would solve the problem.

Am I missing something?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I wasn't clear enough....

The middleware would be an interface and there would be no explicit dependency between the original ampq091-go module and the new otel module. The new Otel module would have a new class that implemented the methods defined in the Middleware interface.

Any code using the ampq091-go module which didn't want to use otel would not need changing or see an change to the go version required.

Code that wanted otel instrumentation would import the otel module, instantiate the Otel middleware and apply it to the client/channel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea to make option 1 acceptable would be to stay behind on OTEL releases. I personally think that the biggest version bump is to 1.21, because it introduces the toolchain directive, which is not interpreted by earlier versions.

I'm ok to bump to Go 1.22 as part of this PR, and bump to Go 1.23 in a few months time, when there's a valid justification (like supporting OTEL). I'm just on the lookout for Go version bumps for the sake of bumping it 👀

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually like the middleware approach, but I thought we could likely drop down to the protocol level. Likely still need some way to get the span off of a delivery or a return, but that can live in a different package

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I wasn't clear enough....

The middleware would be an interface and there would be no explicit dependency between the original ampq091-go module and the new otel module. The new Otel module would have a new class that implemented the methods defined in the Middleware interface.

Any code using the ampq091-go module which didn't want to use otel would not need changing or see an change to the go version required.

Code that wanted otel instrumentation would import the otel module, instantiate the Otel middleware and apply it to the client/channel.

Thank for the explanation, I think I get the idea now. The "middleware" interface (I think the name should be more specific) will be part of amqp091-go module, and the module with OTEL will implement this interface. Then any user who wants to use automatic instrumentation will initialise the middleware and inject/set it in the Connection or anywhere TBD that makes sense.

If my understanding is correct, I like this idea 👍 and I agree is the right thing to do. FWIW, other data services like Redis also separate the OTEL module from the main library.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really have bandwidth to make a middleware module, but I really think it's a good way to go for this sort of thing. Also would let libraries consistently set e.g. app id, user id, and timestamp headers


require go.uber.org/goleak v1.3.0
toolchain go1.22.5

require (
go.opentelemetry.io/otel v1.34.0
go.opentelemetry.io/otel/trace v1.34.0
go.uber.org/goleak v1.3.0
)

require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what's causing this indirect import 🤔 I was under the impression that libraries should only import APIs and not the SDK, according to OpenTelemetry docs.

go.opentelemetry.io/otel/metric v1.34.0 // indirect
)
21 changes: 20 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading
Loading