-
Notifications
You must be signed in to change notification settings - Fork 156
Add Opentelemetry Support #304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 18 commits
d537aee
d292598
75a6aeb
ccf814a
13a1894
e0fa7c6
1aeb2d0
47aa58b
135fda6
9790ae7
f7be3c5
f154228
4d6ab91
11e87a8
c42d97b
1a75eac
626c70d
4851162
931eede
3009c4c
3c9333e
cb91b98
a7e5515
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,8 +6,12 @@ | |
| package amqp091 | ||
|
|
||
| import ( | ||
| "errors" | ||
| "time" | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "time" | ||
|
|
||
| "go.opentelemetry.io/otel/trace" | ||
| ) | ||
|
|
||
| var ErrDeliveryNotInitialized = errors.New("delivery not initialized. Channel is probably closed") | ||
|
|
@@ -17,88 +21,106 @@ var ErrDeliveryNotInitialized = errors.New("delivery not initialized. Channel is | |
| // | ||
| // Applications can provide mock implementations in tests of Delivery handlers. | ||
| type Acknowledger interface { | ||
| Ack(tag uint64, multiple bool) error | ||
| Nack(tag uint64, multiple, requeue bool) error | ||
| Reject(tag uint64, requeue bool) error | ||
| Ack(tag uint64, multiple bool) error | ||
| Nack(tag uint64, multiple, requeue bool) error | ||
| Reject(tag uint64, requeue bool) error | ||
| } | ||
|
|
||
| // Delivery captures the fields for a previously delivered message resident in | ||
| // a queue to be delivered by the server to a consumer from Channel.Consume or | ||
| // Channel.Get. | ||
| type Delivery struct { | ||
| Acknowledger Acknowledger // the channel from which this delivery arrived | ||
|
|
||
| Headers Table // Application or header exchange table | ||
|
|
||
| // Properties | ||
| ContentType string // MIME content type | ||
| ContentEncoding string // MIME content encoding | ||
| DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) | ||
| Priority uint8 // queue implementation use - 0 to 9 | ||
| CorrelationId string // application use - correlation identifier | ||
| ReplyTo string // application use - address to reply to (ex: RPC) | ||
| Expiration string // implementation use - message expiration spec | ||
| MessageId string // application use - message identifier | ||
| Timestamp time.Time // application use - message timestamp | ||
| Type string // application use - message type name | ||
| UserId string // application use - creating user - should be authenticated user | ||
| AppId string // application use - creating application id | ||
|
|
||
| // Valid only with Channel.Consume | ||
| ConsumerTag string | ||
|
|
||
| // Valid only with Channel.Get | ||
| MessageCount uint32 | ||
|
|
||
| DeliveryTag uint64 | ||
| Redelivered bool | ||
| Exchange string // basic.publish exchange | ||
| RoutingKey string // basic.publish routing key | ||
|
|
||
| Body []byte | ||
| Acknowledger Acknowledger // the channel from which this delivery arrived | ||
|
|
||
| Headers Table // Application or header exchange table | ||
|
|
||
| // Properties | ||
| ContentType string // MIME content type | ||
| ContentEncoding string // MIME content encoding | ||
| DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) | ||
| Priority uint8 // queue implementation use - 0 to 9 | ||
| CorrelationId string // application use - correlation identifier | ||
| ReplyTo string // application use - address to reply to (ex: RPC) | ||
| Expiration string // implementation use - message expiration spec | ||
| MessageId string // application use - message identifier | ||
| Timestamp time.Time // application use - message timestamp | ||
| Type string // application use - message type name | ||
| UserId string // application use - creating user - should be authenticated user | ||
| AppId string // application use - creating application id | ||
|
|
||
| // Valid only with Channel.Consume | ||
| ConsumerTag string | ||
|
|
||
| // Valid only with Channel.Get | ||
| MessageCount uint32 | ||
|
|
||
| DeliveryTag uint64 | ||
| Redelivered bool | ||
| Exchange string // basic.publish exchange | ||
| RoutingKey string // basic.publish routing key | ||
|
|
||
| Body []byte | ||
| } | ||
|
|
||
| func newDelivery(channel *Channel, msg messageWithContent) *Delivery { | ||
| props, body := msg.getContent() | ||
|
|
||
| delivery := Delivery{ | ||
| Acknowledger: channel, | ||
|
|
||
| Headers: props.Headers, | ||
| ContentType: props.ContentType, | ||
| ContentEncoding: props.ContentEncoding, | ||
| DeliveryMode: props.DeliveryMode, | ||
| Priority: props.Priority, | ||
| CorrelationId: props.CorrelationId, | ||
| ReplyTo: props.ReplyTo, | ||
| Expiration: props.Expiration, | ||
| MessageId: props.MessageId, | ||
| Timestamp: props.Timestamp, | ||
| Type: props.Type, | ||
| UserId: props.UserId, | ||
| AppId: props.AppId, | ||
|
|
||
| Body: body, | ||
| } | ||
| // Span returns context and a span that for the delivery | ||
| // the resulting span is linked to the publication that created it, if it has | ||
| // the appropraite headers set. See [context-propagation] for more details | ||
| // | ||
| // [context-propagation]: https://opentelemetry.io/docs/concepts/context-propagation/ | ||
| func (d Delivery) Span( | ||
| ctx context.Context, | ||
| options ...trace.SpanStartOption, | ||
| ) (context.Context, trace.Span) { | ||
| return spanForDelivery(ctx, &d, options...) | ||
| } | ||
|
|
||
| // Properties for the delivery types | ||
| switch m := msg.(type) { | ||
| case *basicDeliver: | ||
| delivery.ConsumerTag = m.ConsumerTag | ||
| delivery.DeliveryTag = m.DeliveryTag | ||
| delivery.Redelivered = m.Redelivered | ||
| delivery.Exchange = m.Exchange | ||
| delivery.RoutingKey = m.RoutingKey | ||
|
|
||
| case *basicGetOk: | ||
| delivery.MessageCount = m.MessageCount | ||
| delivery.DeliveryTag = m.DeliveryTag | ||
| delivery.Redelivered = m.Redelivered | ||
| delivery.Exchange = m.Exchange | ||
| delivery.RoutingKey = m.RoutingKey | ||
| } | ||
| // Link returns a link for the delivery. The link points to the publication, if | ||
| // the appropriate headers are set. | ||
| func (d Delivery) Link(ctx context.Context) trace.Link { | ||
| return extractLinkFromDelivery(ctx, &d) | ||
| } | ||
|
|
||
| return &delivery | ||
| func newDelivery(channel *Channel, msg messageWithContent) *Delivery { | ||
| props, body := msg.getContent() | ||
|
|
||
| delivery := Delivery{ | ||
| Acknowledger: channel, | ||
|
|
||
| Headers: props.Headers, | ||
| ContentType: props.ContentType, | ||
| ContentEncoding: props.ContentEncoding, | ||
| DeliveryMode: props.DeliveryMode, | ||
| Priority: props.Priority, | ||
| CorrelationId: props.CorrelationId, | ||
| ReplyTo: props.ReplyTo, | ||
| Expiration: props.Expiration, | ||
| MessageId: props.MessageId, | ||
| Timestamp: props.Timestamp, | ||
| Type: props.Type, | ||
| UserId: props.UserId, | ||
| AppId: props.AppId, | ||
|
|
||
| Body: body, | ||
| } | ||
|
|
||
| // Properties for the delivery types | ||
| switch m := msg.(type) { | ||
| case *basicDeliver: | ||
| delivery.ConsumerTag = m.ConsumerTag | ||
| delivery.DeliveryTag = m.DeliveryTag | ||
| delivery.Redelivered = m.Redelivered | ||
| delivery.Exchange = m.Exchange | ||
| delivery.RoutingKey = m.RoutingKey | ||
|
|
||
| case *basicGetOk: | ||
| delivery.MessageCount = m.MessageCount | ||
| delivery.DeliveryTag = m.DeliveryTag | ||
| delivery.Redelivered = m.Redelivered | ||
| delivery.Exchange = m.Exchange | ||
| delivery.RoutingKey = m.RoutingKey | ||
| } | ||
|
|
||
| return &delivery | ||
| } | ||
|
|
||
| /* | ||
|
|
@@ -166,8 +188,43 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every | |
| delivery that is not automatically acknowledged. | ||
| */ | ||
| func (d Delivery) Nack(multiple, requeue bool) error { | ||
| if d.Acknowledger == nil { | ||
| return ErrDeliveryNotInitialized | ||
| } | ||
| return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue) | ||
| if d.Acknowledger == nil { | ||
| return ErrDeliveryNotInitialized | ||
| } | ||
| return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue) | ||
| } | ||
|
|
||
| type DeliveryResponse uint8 | ||
|
|
||
| const ( | ||
| Ack DeliveryResponse = iota | ||
| Reject | ||
| Nack | ||
| ) | ||
|
|
||
| func (r DeliveryResponse) Name() string { | ||
| switch r { | ||
| case Ack: | ||
| return "ack" | ||
| case Nack: | ||
| return "nack" | ||
| case Reject: | ||
| return "reject" | ||
| default: | ||
| return "unknown" | ||
| } | ||
| } | ||
|
|
||
| func (d Delivery) Settle(ctx context.Context, response DeliveryResponse, multiple, requeue bool) error { | ||
|
||
| defer settleDelivery(ctx, &d, response, multiple, requeue) | ||
| switch response { | ||
| case Ack: | ||
| return d.Ack(multiple) | ||
| case Nack: | ||
| return d.Nack(multiple, requeue) | ||
| case Reject: | ||
| return d.Reject(requeue) | ||
| default: | ||
| return fmt.Errorf("unknown operation %s", response.Name()) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,18 @@ | ||
| module github.com/rabbitmq/amqp091-go | ||
|
|
||
| go 1.20 | ||
| go 1.22.0 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What features are we using of Go 1.22 for this bump? We are quite conservative with go directive bumps, specially since recent versions of Go have set this as a hard requirement for minimum version.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This appears to have come-in as part of the otel-go package https://github.com/open-telemetry/opentelemetry-go/releases/tag/v1.34.0, which requires a minimum go version of 1.22.0. We will revisit the approach for this PR, aiming to adopt a subpackage-middleware strategy without making major changes to the main package. (discussions around this in the other PR #272 (comment)) Does this make sense @Zerpet ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Zerpet / @AndrewWinterman - Just to flesh this out a bit more. The OpenTelemetry packages have a requirement for the latest version of Go, for example the most recent release bumps the requirement to 1.23. There are 2 possible solutions to including OpenTelemetry into this package:
Option 1 is what's implemented currently in this PR. Option 2, would add a new interface and methods to set the middleware of a I envision that the Middleware would have pre and post functions for all the methods that we had instrumented so far. type Middleware interface {
PrePublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing)
PostPublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing, err Error) // err is result of the call to PublishWithContext.
... // Other methods Pre/Post
}and then in the func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error {
if ch.middleware != nil {
ch.middleware.PrePublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
}
_, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg)
if ch.middleware != nil {
ch.middleware.PostPublishWithContext(ctx, exchange, key, mandatory, immediate, msg, err)
}
return err
}If we think Option 2 is the way to go, We can add a new PR with the proposed Middleware implementation and agree on that before working on the OpenTelemetry middleware. Comments?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm still drinking my coffee, so apologies in advance if I miss something obvious 😴 I'm not sure how option 2 solves the problem. I understand that we can move OTEL bits to its own module e.g. Am I missing something? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I wasn't clear enough.... The middleware would be an interface and there would be no explicit dependency between the original ampq091-go module and the new otel module. The new Otel module would have a new class that implemented the methods defined in the Middleware interface. Any code using the ampq091-go module which didn't want to use otel would not need changing or see an change to the go version required. Code that wanted otel instrumentation would import the otel module, instantiate the Otel middleware and apply it to the client/channel.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another idea to make option 1 acceptable would be to stay behind on OTEL releases. I personally think that the biggest version bump is to 1.21, because it introduces the I'm ok to bump to Go 1.22 as part of this PR, and bump to Go 1.23 in a few months time, when there's a valid justification (like supporting OTEL). I'm just on the lookout for Go version bumps for the sake of bumping it 👀
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually like the middleware approach, but I thought we could likely drop down to the protocol level. Likely still need some way to get the span off of a delivery or a return, but that can live in a different package
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Thank for the explanation, I think I get the idea now. The "middleware" interface (I think the name should be more specific) will be part of If my understanding is correct, I like this idea 👍 and I agree is the right thing to do. FWIW, other data services like Redis also separate the OTEL module from the main library.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't really have bandwidth to make a middleware module, but I really think it's a good way to go for this sort of thing. Also would let libraries consistently set e.g. app id, user id, and timestamp headers |
||
|
|
||
| require go.uber.org/goleak v1.3.0 | ||
| toolchain go1.22.5 | ||
|
|
||
| require ( | ||
| go.opentelemetry.io/otel v1.34.0 | ||
| go.opentelemetry.io/otel/trace v1.34.0 | ||
| go.uber.org/goleak v1.3.0 | ||
| ) | ||
|
|
||
| require ( | ||
| github.com/go-logr/logr v1.4.2 // indirect | ||
| github.com/go-logr/stdr v1.2.2 // indirect | ||
| go.opentelemetry.io/auto/sdk v1.1.0 // indirect | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder what's causing this indirect import 🤔 I was under the impression that libraries should only import APIs and not the SDK, according to OpenTelemetry docs. |
||
| go.opentelemetry.io/otel/metric v1.34.0 // indirect | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,25 @@ | ||
| github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= | ||
| github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||
| github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= | ||
| github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= | ||
| github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= | ||
| github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= | ||
| github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= | ||
| github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= | ||
| github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= | ||
| github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | ||
| github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= | ||
| github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||
| github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= | ||
| github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= | ||
| go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= | ||
| go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= | ||
| go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= | ||
| go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= | ||
| go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= | ||
| go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= | ||
| go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= | ||
| go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= | ||
| go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= | ||
| go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= | ||
| gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | ||
| gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving a note to myself to revisit this change. I think it would be preferable to duplicate some code, instead of executing all the code for the deferred confirm and ignore the return value.