Skip to content

Commit 1fb1976

Browse files
committed
fix: passing custom oban attributes to span
1 parent 086ffc0 commit 1fb1976

File tree

7 files changed

+245
-39
lines changed

7 files changed

+245
-39
lines changed

instrumentation/opentelemetry_oban/CHANGELOG.md

+32
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,37 @@
11
# Changelog
22

3+
## 1.1.0
4+
5+
### Changed
6+
7+
* Improve `OpentelemetryOban.PluginHandler` Tracer span attributes.
8+
The Plugin's span introduce a set of attributes prefixed with `oban.`.
9+
Previously, no attributes were added to the span. The new attributes are:
10+
11+
* All Plugin:
12+
* `oban.plugin`
13+
* `Oban.Plugins.Cron` Plugin:
14+
* `oban.jobs_count`
15+
* `Oban.Plugins.Gossip` Plugin:
16+
* `oban.gossip_count`
17+
* `Oban.Plugins.Lifeline` Plugin:
18+
* `oban.discarded_count`
19+
* `oban.rescued_count`
20+
* `Oban.Plugins.Pruner` Plugin:
21+
* `oban.pruned_count`
22+
* `Oban.Pro.Plugins.DynamicCron` Plugin:
23+
* `oban.jobs_count`
24+
* `Oban.Pro.Plugins.DynamicLifeline` Plugin:
25+
* `oban.discarded_count`
26+
* `oban.rescued_count`
27+
* `Oban.Pro.Plugins.DynamicPrioritizer` Plugin:
28+
* `oban.reprioritized_count`
29+
* `Oban.Pro.Plugins.DynamicPruner` Plugin:
30+
* `oban.pruned_count`
31+
* `Oban.Pro.Plugins.DynamicScaler` Plugin:
32+
* `oban.scaler.last_scaled_to`
33+
* `oban.scaler.last_scaled_at`
34+
335
## 1.0.0
436

537
### Changed

instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex

+4-4
Original file line numberDiff line numberDiff line change
@@ -133,15 +133,15 @@ defmodule OpentelemetryOban do
133133
Trace.messaging_system() => :oban,
134134
Trace.messaging_destination() => queue,
135135
Trace.messaging_destination_kind() => :queue,
136-
:"messaging.oban.worker" => worker
136+
:"oban.job.worker" => worker
137137
}
138138
end
139139

140140
defp attributes_after_insert(job) do
141141
%{
142-
"messaging.oban.job_id": job.id,
143-
"messaging.oban.priority": job.priority,
144-
"messaging.oban.max_attempts": job.max_attempts
142+
"oban.job.job_id": job.id,
143+
"oban.job.priority": job.priority,
144+
"oban.job.max_attempts": job.max_attempts
145145
}
146146
end
147147
end

instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex

+7-8
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,13 @@ defmodule OpentelemetryOban.JobHandler do
6464
Trace.messaging_destination() => queue,
6565
Trace.messaging_destination_kind() => :queue,
6666
Trace.messaging_operation() => :process,
67-
:"messaging.oban.job_id" => id,
68-
:"messaging.oban.worker" => worker,
69-
:"messaging.oban.priority" => priority,
70-
:"messaging.oban.attempt" => attempt,
71-
:"messaging.oban.max_attempts" => max_attempts,
72-
:"messaging.oban.inserted_at" =>
73-
if(inserted_at, do: DateTime.to_iso8601(inserted_at), else: nil),
74-
:"messaging.oban.scheduled_at" => DateTime.to_iso8601(scheduled_at)
67+
:"oban.job.job_id" => id,
68+
:"oban.job.worker" => worker,
69+
:"oban.job.priority" => priority,
70+
:"oban.job.attempt" => attempt,
71+
:"oban.job.max_attempts" => max_attempts,
72+
:"oban.job.inserted_at" => DateTime.to_iso8601(inserted_at),
73+
:"oban.job.scheduled_at" => DateTime.to_iso8601(scheduled_at)
7574
}
7675

7776
span_name = "#{worker} process"

instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex

+53-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
defmodule OpentelemetryOban.PluginHandler do
2+
alias OpenTelemetry.Tracer
23
alias OpenTelemetry.Span
34

45
@tracer_id __MODULE__
@@ -41,11 +42,12 @@ defmodule OpentelemetryOban.PluginHandler do
4142
@tracer_id,
4243
"#{plugin} process",
4344
metadata,
44-
%{}
45+
%{attributes: %{"oban.plugin": plugin}}
4546
)
4647
end
4748

4849
def handle_plugin_stop(_event, _measurements, metadata, _config) do
50+
Tracer.set_attributes(end_span_plugin_attrs(metadata))
4951
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
5052
end
5153

@@ -63,4 +65,54 @@ defmodule OpentelemetryOban.PluginHandler do
6365

6466
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
6567
end
68+
69+
defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Cron} = metadata) do
70+
%{"oban.plugins.cron.jobs_count": length(metadata[:jobs])}
71+
end
72+
73+
defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Gossip} = metadata) do
74+
%{"oban.plugins.gossip.gossip_count": metadata[:gossip_count]}
75+
end
76+
77+
defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Lifeline} = metadata) do
78+
%{
79+
"oban.plugins.lifeline.discarded_count": metadata[:discarded_count],
80+
"oban.plugins.lifeline.rescued_count": metadata[:rescued_count]
81+
}
82+
end
83+
84+
defp end_span_plugin_attrs(%{plugin: Oban.Plugins.Pruner} = metadata) do
85+
%{"oban.plugins.pruner.pruned_count": metadata[:pruned_count]}
86+
end
87+
88+
defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicCron} = metadata) do
89+
%{"oban.pro.plugins.dynamic_cron.jobs_count": length(metadata[:jobs])}
90+
end
91+
92+
defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicLifeline} = metadata) do
93+
%{
94+
"oban.pro.plugins.dynamic_lifeline.discarded_count": metadata[:discarded_count],
95+
"oban.pro.plugins.dynamic_lifeline.rescued_count": metadata[:rescued_count]
96+
}
97+
end
98+
99+
defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicPrioritizer} = metadata) do
100+
%{"oban.pro.plugins.dynamic_prioritizer.reprioritized_count": metadata[:reprioritized_count]}
101+
end
102+
103+
defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicPruner} = metadata) do
104+
%{"oban.pro.plugins.dynamic_pruner.pruned_count": metadata[:pruned_count]}
105+
end
106+
107+
defp end_span_plugin_attrs(%{plugin: Oban.Pro.Plugins.DynamicScaler} = metadata) do
108+
%{
109+
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_to": metadata[:scaler][:last_scaled_to],
110+
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_at":
111+
DateTime.to_iso8601(metadata[:scaler][:last_scaled_at])
112+
}
113+
end
114+
115+
defp end_span_plugin_attrs(_) do
116+
%{}
117+
end
66118
end

instrumentation/opentelemetry_oban/mix.exs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule OpentelemetryOban.MixProject do
22
use Mix.Project
33

4-
@version "1.0.0"
4+
@version "1.1.0"
55

66
def project do
77
[

instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs

+123
Original file line numberDiff line numberDiff line change
@@ -109,4 +109,127 @@ defmodule OpentelemetryOban.PluginHandlerTest do
109109
assert [:"exception.message", :"exception.stacktrace", :"exception.type"] ==
110110
Enum.sort(Map.keys(:otel_attributes.map(event_attributes)))
111111
end
112+
113+
describe "[:oban, :plugin, :stop] spans" do
114+
test "Oban.Plugins.Cron plugin" do
115+
execute_plugin(Oban.Plugins.Cron, %{jobs: [1, 3, 4]})
116+
117+
assert %{
118+
"oban.plugin": Elixir.Oban.Plugins.Cron,
119+
"oban.plugins.cron.jobs_count": 3
120+
} ==
121+
receive_span_attrs(Oban.Plugins.Cron)
122+
end
123+
124+
test "Oban.Plugins.Gossip plugin" do
125+
execute_plugin(Oban.Plugins.Gossip, %{gossip_count: 3})
126+
127+
assert %{
128+
"oban.plugin": Elixir.Oban.Plugins.Gossip,
129+
"oban.plugins.gossip.gossip_count": 3
130+
} ==
131+
receive_span_attrs(Oban.Plugins.Gossip)
132+
end
133+
134+
test "Oban.Plugins.Lifeline plugin" do
135+
execute_plugin(Oban.Plugins.Lifeline, %{discarded_count: 3, rescued_count: 2})
136+
137+
assert %{
138+
"oban.plugin": Elixir.Oban.Plugins.Lifeline,
139+
"oban.plugins.lifeline.discarded_count": 3,
140+
"oban.plugins.lifeline.rescued_count": 2
141+
} ==
142+
receive_span_attrs(Oban.Plugins.Lifeline)
143+
end
144+
145+
test "Oban.Plugins.Pruner plugin" do
146+
execute_plugin(Oban.Plugins.Pruner, %{pruned_count: 3})
147+
148+
assert %{
149+
"oban.plugin": Elixir.Oban.Plugins.Pruner,
150+
"oban.plugins.pruner.pruned_count": 3
151+
} ==
152+
receive_span_attrs(Oban.Plugins.Pruner)
153+
end
154+
155+
test "Oban.Pro.Plugins.DynamicCron plugin" do
156+
execute_plugin(Oban.Pro.Plugins.DynamicCron, %{jobs: [1, 3, 4]})
157+
158+
assert %{
159+
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicCron,
160+
"oban.pro.plugins.dynamic_cron.jobs_count": 3
161+
} ==
162+
receive_span_attrs(Oban.Pro.Plugins.DynamicCron)
163+
end
164+
165+
test "Oban.Pro.Plugins.DynamicLifeline plugin" do
166+
execute_plugin(Oban.Pro.Plugins.DynamicLifeline, %{discarded_count: 3, rescued_count: 2})
167+
168+
assert %{
169+
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicLifeline,
170+
"oban.pro.plugins.dynamic_lifeline.discarded_count": 3,
171+
"oban.pro.plugins.dynamic_lifeline.rescued_count": 2
172+
} ==
173+
receive_span_attrs(Oban.Pro.Plugins.DynamicLifeline)
174+
end
175+
176+
test "Oban.Pro.Plugins.DynamicPrioritizer plugin" do
177+
execute_plugin(Oban.Pro.Plugins.DynamicPrioritizer, %{reprioritized_count: 3})
178+
179+
assert %{
180+
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicPrioritizer,
181+
"oban.pro.plugins.dynamic_prioritizer.reprioritized_count": 3
182+
} ==
183+
receive_span_attrs(Oban.Pro.Plugins.DynamicPrioritizer)
184+
end
185+
186+
test "Oban.Pro.Plugins.DynamicPruner plugin" do
187+
execute_plugin(Oban.Pro.Plugins.DynamicPruner, %{pruned_count: 3})
188+
189+
assert %{
190+
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicPruner,
191+
"oban.pro.plugins.dynamic_pruner.pruned_count": 3
192+
} ==
193+
receive_span_attrs(Oban.Pro.Plugins.DynamicPruner)
194+
end
195+
196+
test "Oban.Pro.Plugins.DynamicScaler plugin" do
197+
execute_plugin(Oban.Pro.Plugins.DynamicScaler, %{
198+
scaler: %{last_scaled_to: 3, last_scaled_at: ~U[2021-08-01 12:00:00Z]}
199+
})
200+
201+
assert %{
202+
"oban.plugin": Elixir.Oban.Pro.Plugins.DynamicScaler,
203+
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_to": 3,
204+
"oban.pro.plugins.dynamic_scaler.scaler.last_scaled_at": "2021-08-01T12:00:00Z"
205+
} ==
206+
receive_span_attrs(Oban.Pro.Plugins.DynamicScaler)
207+
end
208+
end
209+
210+
defp receive_span_attrs(name) do
211+
name = "#{name} process"
212+
213+
assert_receive(
214+
{:span, span(name: ^name, attributes: attributes)},
215+
100,
216+
"expected span with name #{name} to be received"
217+
)
218+
219+
elem(attributes, 4)
220+
end
221+
222+
defp execute_plugin(plugin_name, metadata) do
223+
:telemetry.execute(
224+
[:oban, :plugin, :start],
225+
%{system_time: System.system_time()},
226+
%{plugin: plugin_name}
227+
)
228+
229+
:telemetry.execute(
230+
[:oban, :plugin, :stop],
231+
%{duration: 42069},
232+
Map.merge(metadata, %{plugin: plugin_name})
233+
)
234+
end
112235
end

instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs

+25-25
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ defmodule OpentelemetryObanTest do
4747
assert %{
4848
"messaging.destination": "events",
4949
"messaging.destination_kind": :queue,
50-
"messaging.oban.job_id": _job_id,
51-
"messaging.oban.max_attempts": 1,
52-
"messaging.oban.priority": 0,
53-
"messaging.oban.worker": "TestJob",
50+
"oban.job.job_id": _job_id,
51+
"oban.job.max_attempts": 1,
52+
"oban.job.priority": 0,
53+
"oban.job.worker": "TestJob",
5454
"messaging.system": :oban
5555
} = :otel_attributes.map(attributes)
5656
end
@@ -147,13 +147,13 @@ defmodule OpentelemetryObanTest do
147147
assert %{
148148
"messaging.destination": "events",
149149
"messaging.destination_kind": :queue,
150-
"messaging.oban.attempt": 1,
151-
"messaging.oban.inserted_at": _inserted_at,
152-
"messaging.oban.job_id": _job_id,
153-
"messaging.oban.max_attempts": 1,
154-
"messaging.oban.priority": 0,
155-
"messaging.oban.scheduled_at": _scheduled_at,
156-
"messaging.oban.worker": "TestJob",
150+
"oban.job.attempt": 1,
151+
"oban.job.inserted_at": _inserted_at,
152+
"oban.job.job_id": _job_id,
153+
"oban.job.max_attempts": 1,
154+
"oban.job.priority": 0,
155+
"oban.job.scheduled_at": _scheduled_at,
156+
"oban.job.worker": "TestJob",
157157
"messaging.operation": :process,
158158
"messaging.system": :oban
159159
} = :otel_attributes.map(attributes)
@@ -177,13 +177,13 @@ defmodule OpentelemetryObanTest do
177177
assert %{
178178
"messaging.destination": "events",
179179
"messaging.destination_kind": :queue,
180-
"messaging.oban.attempt": 1,
181-
"messaging.oban.inserted_at": _inserted_at,
182-
"messaging.oban.job_id": _job_id,
183-
"messaging.oban.max_attempts": 1,
184-
"messaging.oban.priority": 0,
185-
"messaging.oban.scheduled_at": _scheduled_at,
186-
"messaging.oban.worker": "TestJobThatReturnsError",
180+
"oban.job.attempt": 1,
181+
"oban.job.inserted_at": _inserted_at,
182+
"oban.job.job_id": _job_id,
183+
"oban.job.max_attempts": 1,
184+
"oban.job.priority": 0,
185+
"oban.job.scheduled_at": _scheduled_at,
186+
"oban.job.worker": "TestJobThatReturnsError",
187187
"messaging.operation": :process,
188188
"messaging.system": :oban
189189
} = :otel_attributes.map(attributes)
@@ -255,13 +255,13 @@ defmodule OpentelemetryObanTest do
255255
assert %{
256256
"messaging.destination": "events",
257257
"messaging.destination_kind": :queue,
258-
"messaging.oban.attempt": 1,
259-
"messaging.oban.inserted_at": _inserted_at,
260-
"messaging.oban.job_id": _job_id,
261-
"messaging.oban.max_attempts": 1,
262-
"messaging.oban.priority": 0,
263-
"messaging.oban.scheduled_at": _scheduled_at,
264-
"messaging.oban.worker": "TestJobThatThrowsException",
258+
"oban.job.attempt": 1,
259+
"oban.job.inserted_at": _inserted_at,
260+
"oban.job.job_id": _job_id,
261+
"oban.job.max_attempts": 1,
262+
"oban.job.priority": 0,
263+
"oban.job.scheduled_at": _scheduled_at,
264+
"oban.job.worker": "TestJobThatThrowsException",
265265
"messaging.operation": :process,
266266
"messaging.system": :oban
267267
} = :otel_attributes.map(attributes)

0 commit comments

Comments
 (0)