diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex index d3272d54..a6af0c08 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban.ex @@ -26,16 +26,10 @@ defmodule OpentelemetryOban do @doc """ Initializes and configures telemetry handlers. - Example: + By default jobs and plugins are traced. If you wish to trace only jobs then + use: - OpentelemetryOban.setup(trace: [:jobs]) - - Options: - - * `:trace` - A list of events to trace. Supported values are `:jobs` and `:plugins`, - defaults to [:jobs, :plugins]. - * `:time_unit` - a time unit used to convert the timing values, defaults - to `:microsecond`. + OpentelemetryOban.setup(trace: [:jobs]) Note that if you don't trace plugins, but inside the plugins, there are spans from other instrumentation libraries (e.g. ecto) then these will still be @@ -45,15 +39,13 @@ defmodule OpentelemetryOban do @spec setup() :: :ok def setup(opts \\ []) do trace = Keyword.get(opts, :trace, [:jobs, :plugins]) - time_unit = Keyword.get(opts, :time_unit, :microsecond) - config = %{time_unit: time_unit} if Enum.member?(trace, :jobs) do - OpentelemetryOban.JobHandler.attach(config) + OpentelemetryOban.JobHandler.attach() end if Enum.member?(trace, :plugins) do - OpentelemetryOban.PluginHandler.attach(config) + OpentelemetryOban.PluginHandler.attach() end :ok diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex index 054cf0c5..19a9200f 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/job_handler.ex @@ -6,10 +6,10 @@ defmodule OpentelemetryOban.JobHandler do @tracer_id __MODULE__ - def attach(config) do + def attach() do attach_job_start_handler() - attach_job_stop_handler(config) - attach_job_exception_handler(config) + attach_job_stop_handler() + attach_job_exception_handler() end defp attach_job_start_handler() do @@ -21,21 +21,21 @@ defmodule OpentelemetryOban.JobHandler do ) end - defp attach_job_stop_handler(config) do + defp attach_job_stop_handler() do :telemetry.attach( "#{__MODULE__}.job_stop", [:oban, :job, :stop], &__MODULE__.handle_job_stop/4, - config + [] ) end - defp attach_job_exception_handler(config) do + defp attach_job_exception_handler() do :telemetry.attach( "#{__MODULE__}.job_exception", [:oban, :job, :exception], &__MODULE__.handle_job_exception/4, - config + [] ) end @@ -83,8 +83,8 @@ defmodule OpentelemetryOban.JobHandler do }) end - def handle_job_stop(_event, measurements, metadata, config) do - set_measurements_attributes(measurements, config) + def handle_job_stop(_event, measurements, metadata, _config) do + set_measurements_attributes(measurements) OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) end @@ -92,7 +92,7 @@ defmodule OpentelemetryOban.JobHandler do _event, measurements, %{stacktrace: stacktrace, error: error} = metadata, - config + _config ) do ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata) @@ -100,19 +100,16 @@ defmodule OpentelemetryOban.JobHandler do Span.record_exception(ctx, error, stacktrace) Span.set_status(ctx, OpenTelemetry.status(:error, "")) - set_measurements_attributes(measurements, config) + set_measurements_attributes(measurements) OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) end - defp set_measurements_attributes(%{duration: duration, queue_time: queue_time}, %{ - time_unit: time_unit - }) do + defp set_measurements_attributes(%{duration: duration, queue_time: queue_time}) do OpenTelemetry.Tracer.set_attributes(%{ - :"messaging.oban.duration_#{time_unit}" => - System.convert_time_unit(duration, :native, time_unit), - :"messaging.oban.queue_time_#{time_unit}" => - System.convert_time_unit(queue_time, :nanosecond, time_unit) + :"messaging.oban.duration_us" => System.convert_time_unit(duration, :native, :microsecond), + :"messaging.oban.queue_time_us" => + System.convert_time_unit(queue_time, :nanosecond, :microsecond) }) end end diff --git a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex index a322b68a..dfc94c12 100644 --- a/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex +++ b/instrumentation/opentelemetry_oban/lib/opentelemetry_oban/plugin_handler.ex @@ -3,10 +3,10 @@ defmodule OpentelemetryOban.PluginHandler do @tracer_id __MODULE__ - def attach(config) do + def attach() do attach_plugin_start_handler() - attach_plugin_stop_handler(config) - attach_plugin_exception_handler(config) + attach_plugin_stop_handler() + attach_plugin_exception_handler() end defp attach_plugin_start_handler() do @@ -18,21 +18,21 @@ defmodule OpentelemetryOban.PluginHandler do ) end - defp attach_plugin_stop_handler(config) do + defp attach_plugin_stop_handler() do :telemetry.attach( "#{__MODULE__}.plugin_stop", [:oban, :plugin, :stop], &__MODULE__.handle_plugin_stop/4, - config + [] ) end - defp attach_plugin_exception_handler(config) do + defp attach_plugin_exception_handler() do :telemetry.attach( "#{__MODULE__}.plugin_exception", [:oban, :plugin, :exception], &__MODULE__.handle_plugin_exception/4, - config + [] ) end @@ -45,16 +45,15 @@ defmodule OpentelemetryOban.PluginHandler do ) end - def handle_plugin_stop(_event, measurements, metadata, config) do - set_measurements_attributes(measurements, config) + def handle_plugin_stop(_event, _measurements, metadata, _config) do OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) end def handle_plugin_exception( _event, - measurements, + _measurements, %{stacktrace: stacktrace, error: error} = metadata, - config + _config ) do ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata) @@ -62,15 +61,6 @@ defmodule OpentelemetryOban.PluginHandler do Span.record_exception(ctx, error, stacktrace) Span.set_status(ctx, OpenTelemetry.status(:error, "")) - set_measurements_attributes(measurements, config) - OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata) end - - defp set_measurements_attributes(%{duration: duration}, %{time_unit: time_unit}) do - OpenTelemetry.Tracer.set_attributes(%{ - :"messaging.oban.duration_#{time_unit}" => - System.convert_time_unit(duration, :native, time_unit) - }) - end end diff --git a/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs b/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs index 270dbb79..81d596ff 100644 --- a/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs +++ b/instrumentation/opentelemetry_oban/test/opentelemetry_oban/plugin_handler_test.exs @@ -24,110 +24,47 @@ defmodule OpentelemetryOban.PluginHandlerTest do :application.start(:opentelemetry) TestHelpers.remove_oban_handlers() + OpentelemetryOban.setup() :ok end - describe "with the default config" do - setup do - OpentelemetryOban.setup() - end - - test "does not create spans when tracing plugins is disabled" do - TestHelpers.remove_oban_handlers() - OpentelemetryOban.setup(trace: [:jobs]) - - :telemetry.execute( - [:oban, :plugin, :start], - %{system_time: System.system_time()}, - %{plugin: Elixir.Oban.Plugins.Stager} - ) - - :telemetry.execute( - [:oban, :plugin, :stop], - %{duration: 444}, - %{plugin: Elixir.Oban.Plugins.Stager} - ) - - refute_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")} - end - - test "records span on plugin execution" do - :telemetry.execute( - [:oban, :plugin, :start], - %{system_time: System.system_time()}, - %{plugin: Elixir.Oban.Plugins.Stager} - ) + test "does not create spans when tracing plugins is disabled" do + TestHelpers.remove_oban_handlers() + OpentelemetryOban.setup(trace: [:jobs]) - :telemetry.execute( - [:oban, :plugin, :stop], - %{duration: 444}, - %{plugin: Elixir.Oban.Plugins.Stager} - ) + :telemetry.execute( + [:oban, :plugin, :start], + %{system_time: System.system_time()}, + %{plugin: Elixir.Oban.Plugins.Stager} + ) - assert_receive {:span, - span(name: "Elixir.Oban.Plugins.Stager process", attributes: attributes)} + :telemetry.execute( + [:oban, :plugin, :stop], + %{duration: 444}, + %{plugin: Elixir.Oban.Plugins.Stager} + ) - assert %{ - "messaging.oban.duration_microsecond": _duration - } = :otel_attributes.map(attributes) - end + refute_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")} + end - test "records span on plugin error" do - :telemetry.execute( - [:oban, :plugin, :start], - %{system_time: System.system_time()}, - %{plugin: Elixir.Oban.Plugins.Stager} - ) + test "records span on plugin execution" do + :telemetry.execute( + [:oban, :plugin, :start], + %{system_time: System.system_time()}, + %{plugin: Elixir.Oban.Plugins.Stager} + ) - :telemetry.execute( - [:oban, :plugin, :exception], - %{duration: 444}, - %{ - plugin: Elixir.Oban.Plugins.Stager, - kind: :error, - stacktrace: [ - {Some, :error, [], []} - ], - error: %UndefinedFunctionError{ - arity: 0, - function: :error, - message: nil, - module: Some, - reason: nil - } - } - ) + :telemetry.execute( + [:oban, :plugin, :stop], + %{duration: 444}, + %{plugin: Elixir.Oban.Plugins.Stager} + ) - expected_status = OpenTelemetry.status(:error, "") - - assert_receive {:span, - span( - name: "Elixir.Oban.Plugins.Stager process", - attributes: attributes, - events: events, - status: ^expected_status - )} - - assert %{ - "messaging.oban.duration_microsecond": _duration - } = :otel_attributes.map(attributes) - - [ - event( - name: "exception", - attributes: event_attributes - ) - ] = :otel_events.list(events) - - assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == - Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) - end + assert_receive {:span, span(name: "Elixir.Oban.Plugins.Stager process")} end - test "can configure time_unit" do - OpentelemetryOban.setup(time_unit: :second) - + test "records span on plugin error" do :telemetry.execute( [:oban, :plugin, :start], %{system_time: System.system_time()}, @@ -135,16 +72,41 @@ defmodule OpentelemetryOban.PluginHandlerTest do ) :telemetry.execute( - [:oban, :plugin, :stop], + [:oban, :plugin, :exception], %{duration: 444}, - %{plugin: Elixir.Oban.Plugins.Stager} + %{ + plugin: Elixir.Oban.Plugins.Stager, + kind: :error, + stacktrace: [ + {Some, :error, [], []} + ], + error: %UndefinedFunctionError{ + arity: 0, + function: :error, + message: nil, + module: Some, + reason: nil + } + } ) + expected_status = OpenTelemetry.status(:error, "") + assert_receive {:span, - span(name: "Elixir.Oban.Plugins.Stager process", attributes: attributes)} + span( + name: "Elixir.Oban.Plugins.Stager process", + events: events, + status: ^expected_status + )} + + [ + event( + name: "exception", + attributes: event_attributes + ) + ] = :otel_events.list(events) - assert %{ - "messaging.oban.duration_second": _duration - } = :otel_attributes.map(attributes) + assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == + Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) end end diff --git a/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs b/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs index 411e52e6..5590966e 100644 --- a/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs +++ b/instrumentation/opentelemetry_oban/test/opentelemetry_oban_test.exs @@ -26,68 +26,41 @@ defmodule OpentelemetryObanTest do :application.start(:opentelemetry) TestHelpers.remove_oban_handlers() + OpentelemetryOban.setup() :ok end - describe "with the default config" do - setup do - OpentelemetryOban.setup() - end - - test "records span on job insertion" do - OpentelemetryOban.insert(TestJob.new(%{})) - assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) - - assert_receive {:span, - span( - name: "TestJob send", - attributes: attributes, - parent_span_id: :undefined, - kind: :producer, - status: :undefined - )} - - assert %{ - "messaging.destination": "events", - "messaging.destination_kind": :queue, - "messaging.oban.job_id": _job_id, - "messaging.oban.max_attempts": 1, - "messaging.oban.priority": 0, - "messaging.oban.worker": "TestJob", - "messaging.system": :oban - } = :otel_attributes.map(attributes) - end + test "records span on job insertion" do + OpentelemetryOban.insert(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) - test "job creation uses existing trace if present" do - OpenTelemetry.Tracer.with_span "test span" do - ctx = OpenTelemetry.Tracer.current_span_ctx() - root_trace_id = OpenTelemetry.Span.trace_id(ctx) - root_span_id = OpenTelemetry.Span.span_id(ctx) - - OpentelemetryOban.insert(TestJob.new(%{})) - assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) - - assert_receive {:span, - span( - name: "TestJob send", - attributes: _attributes, - trace_id: ^root_trace_id, - parent_span_id: ^root_span_id, - kind: :producer, - status: :undefined - )} - end - end + assert_receive {:span, + span( + name: "TestJob send", + attributes: attributes, + parent_span_id: :undefined, + kind: :producer, + status: :undefined + )} - test "keeps existing meta information" do - OpentelemetryOban.insert(TestJob.new(%{}, meta: %{foo: "bar"})) + assert %{ + "messaging.destination": "events", + "messaging.destination_kind": :queue, + "messaging.oban.job_id": _job_id, + "messaging.oban.max_attempts": 1, + "messaging.oban.priority": 0, + "messaging.oban.worker": "TestJob", + "messaging.system": :oban + } = :otel_attributes.map(attributes) + end - assert [job] = all_enqueued() - assert job.meta["foo"] == "bar" - end + test "job creation uses existing trace if present" do + OpenTelemetry.Tracer.with_span "test span" do + ctx = OpenTelemetry.Tracer.current_span_ctx() + root_trace_id = OpenTelemetry.Span.trace_id(ctx) + root_span_id = OpenTelemetry.Span.span_id(ctx) - test "tracing information is propagated between send and process" do OpentelemetryOban.insert(TestJob.new(%{})) assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) @@ -95,326 +68,329 @@ defmodule OpentelemetryObanTest do span( name: "TestJob send", attributes: _attributes, - trace_id: send_trace_id, - span_id: send_span_id, + trace_id: ^root_trace_id, + parent_span_id: ^root_span_id, kind: :producer, status: :undefined )} + end + end - assert_receive {:span, - span( - name: "TestJob process", - attributes: _attributes, - kind: :consumer, - status: :undefined, - trace_id: process_trace_id, - links: links - )} + test "keeps existing meta information" do + OpentelemetryOban.insert(TestJob.new(%{}, meta: %{foo: "bar"})) - [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(links) + assert [job] = all_enqueued() + assert job.meta["foo"] == "bar" + end - # Process is ran asynchronously so we create a new trace, but still link - # the traces together. - assert send_trace_id != process_trace_id - end + test "tracing information is propagated between send and process" do + OpentelemetryOban.insert(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) - test "no link is created on process when tracing info was not propagated" do - # Using regular Oban, instead of OpentelemetryOban - Oban.insert(TestJob.new(%{})) - assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + assert_receive {:span, + span( + name: "TestJob send", + attributes: _attributes, + trace_id: send_trace_id, + span_id: send_span_id, + kind: :producer, + status: :undefined + )} - assert_receive {:span, - span( - name: "TestJob process", - attributes: _attributes, - kind: :consumer, - status: :undefined, - trace_id: _trace_id, - links: links - )} + assert_receive {:span, + span( + name: "TestJob process", + attributes: _attributes, + kind: :consumer, + status: :undefined, + trace_id: process_trace_id, + links: links + )} - assert [] == :otel_links.list(links) - end + [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(links) - test "records spans for successful Oban jobs" do - OpentelemetryOban.insert(TestJob.new(%{})) - assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + # Process is ran asynchronously so we create a new trace, but still link + # the traces together. + assert send_trace_id != process_trace_id + end - assert_receive {:span, - span( - name: "TestJob process", - attributes: attributes, - kind: :consumer, - status: :undefined - )} + test "no link is created on process when tracing info was not propagated" do + # Using regular Oban, instead of OpentelemetryOban + Oban.insert(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) - assert %{ - "messaging.destination_kind": :queue, - "messaging.destination": "events", - "messaging.oban.attempt": 1, - "messaging.oban.duration_microsecond": _duration, - "messaging.oban.inserted_at": _inserted_at, - "messaging.oban.job_id": _job_id, - "messaging.oban.max_attempts": 1, - "messaging.oban.priority": 0, - "messaging.oban.queue_time_microsecond": _queue_time, - "messaging.oban.scheduled_at": _scheduled_at, - "messaging.oban.worker": "TestJob", - "messaging.operation": :process, - "messaging.system": :oban - } = :otel_attributes.map(attributes) - end + assert_receive {:span, + span( + name: "TestJob process", + attributes: _attributes, + kind: :consumer, + status: :undefined, + trace_id: _trace_id, + links: links + )} - test "records spans for Oban jobs that stop with {:error, :something}" do - OpentelemetryOban.insert(TestJobThatReturnsError.new(%{})) - assert %{success: 0, discard: 1} = Oban.drain_queue(queue: :events) + assert [] == :otel_links.list(links) + end - expected_status = OpenTelemetry.status(:error, "") + test "records spans for successful Oban jobs" do + OpentelemetryOban.insert(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) - assert_receive {:span, - span( - name: "TestJobThatReturnsError process", - attributes: attributes, - kind: :consumer, - events: events, - status: ^expected_status - )} + assert_receive {:span, + span( + name: "TestJob process", + attributes: attributes, + kind: :consumer, + status: :undefined + )} - assert %{ - "messaging.destination_kind": :queue, - "messaging.destination": "events", - "messaging.oban.attempt": 1, - "messaging.oban.duration_microsecond": _duration, - "messaging.oban.inserted_at": _inserted_at, - "messaging.oban.job_id": _job_id, - "messaging.oban.max_attempts": 1, - "messaging.oban.priority": 0, - "messaging.oban.queue_time_microsecond": _queue_time, - "messaging.oban.scheduled_at": _scheduled_at, - "messaging.oban.worker": "TestJobThatReturnsError", - "messaging.operation": :process, - "messaging.system": :oban - } = :otel_attributes.map(attributes) - - [ - event( - name: "exception", - attributes: event_attributes - ) - ] = :otel_events.list(events) - - assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == - Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) - end + assert %{ + "messaging.destination_kind": :queue, + "messaging.destination": "events", + "messaging.oban.attempt": 1, + "messaging.oban.duration_us": _duration, + "messaging.oban.inserted_at": _inserted_at, + "messaging.oban.job_id": _job_id, + "messaging.oban.max_attempts": 1, + "messaging.oban.priority": 0, + "messaging.oban.queue_time_us": _queue_time, + "messaging.oban.scheduled_at": _scheduled_at, + "messaging.oban.worker": "TestJob", + "messaging.operation": :process, + "messaging.system": :oban + } = :otel_attributes.map(attributes) + end - test "records spans for each retry" do - OpentelemetryOban.insert(TestJobThatReturnsError.new(%{}, max_attempts: 2)) + test "records spans for Oban jobs that stop with {:error, :something}" do + OpentelemetryOban.insert(TestJobThatReturnsError.new(%{})) + assert %{success: 0, discard: 1} = Oban.drain_queue(queue: :events) - assert %{success: 0, failure: 1, discard: 1} = - Oban.drain_queue(queue: :events, with_scheduled: true, with_recursion: true) + expected_status = OpenTelemetry.status(:error, "") + + assert_receive {:span, + span( + name: "TestJobThatReturnsError process", + attributes: attributes, + kind: :consumer, + events: events, + status: ^expected_status + )} - expected_status = OpenTelemetry.status(:error, "") + assert %{ + "messaging.destination_kind": :queue, + "messaging.destination": "events", + "messaging.oban.attempt": 1, + "messaging.oban.duration_us": _duration, + "messaging.oban.inserted_at": _inserted_at, + "messaging.oban.job_id": _job_id, + "messaging.oban.max_attempts": 1, + "messaging.oban.priority": 0, + "messaging.oban.queue_time_us": _queue_time, + "messaging.oban.scheduled_at": _scheduled_at, + "messaging.oban.worker": "TestJobThatReturnsError", + "messaging.operation": :process, + "messaging.system": :oban + } = :otel_attributes.map(attributes) - assert_receive {:span, - span( - name: "TestJobThatReturnsError send", - trace_id: send_trace_id, - span_id: send_span_id - )} + [ + event( + name: "exception", + attributes: event_attributes + ) + ] = :otel_events.list(events) - assert_receive {:span, - span( - name: "TestJobThatReturnsError process", - status: ^expected_status, - trace_id: first_process_trace_id, - links: job_1_links - )} + assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == + Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) + end - [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_1_links) + test "records spans for each retry" do + OpentelemetryOban.insert(TestJobThatReturnsError.new(%{}, max_attempts: 2)) - assert_receive {:span, - span( - name: "TestJobThatReturnsError process", - status: ^expected_status, - trace_id: second_process_trace_id, - links: job_2_links - )} + assert %{success: 0, failure: 1, discard: 1} = + Oban.drain_queue(queue: :events, with_scheduled: true, with_recursion: true) - [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_2_links) + expected_status = OpenTelemetry.status(:error, "") - assert first_process_trace_id != second_process_trace_id - end + assert_receive {:span, + span( + name: "TestJobThatReturnsError send", + trace_id: send_trace_id, + span_id: send_span_id + )} - test "records spans for Oban jobs that stop with an exception" do - OpentelemetryOban.insert(TestJobThatThrowsException.new(%{})) - assert %{success: 0, discard: 1} = Oban.drain_queue(queue: :events) + assert_receive {:span, + span( + name: "TestJobThatReturnsError process", + status: ^expected_status, + trace_id: first_process_trace_id, + links: job_1_links + )} - expected_status = OpenTelemetry.status(:error, "") + [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_1_links) - assert_receive {:span, - span( - name: "TestJobThatThrowsException process", - attributes: attributes, - kind: :consumer, - events: events, - status: ^expected_status - )} + assert_receive {:span, + span( + name: "TestJobThatReturnsError process", + status: ^expected_status, + trace_id: second_process_trace_id, + links: job_2_links + )} - assert %{ - "messaging.destination": "events", - "messaging.destination_kind": :queue, - "messaging.oban.attempt": 1, - "messaging.oban.inserted_at": _inserted_at, - "messaging.oban.job_id": _job_id, - "messaging.oban.max_attempts": 1, - "messaging.oban.priority": 0, - "messaging.oban.scheduled_at": _scheduled_at, - "messaging.oban.worker": "TestJobThatThrowsException", - "messaging.operation": :process, - "messaging.system": :oban - } = :otel_attributes.map(attributes) - - [ - event( - name: "exception", - attributes: event_attributes - ) - ] = :otel_events.list(events) - - assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == - Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) - end + [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_2_links) - test "spans inside the job are associated with the job trace" do - OpentelemetryOban.insert(TestJobWithInnerSpan.new(%{})) - assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + assert first_process_trace_id != second_process_trace_id + end - assert_receive {:span, - span( - name: "TestJobWithInnerSpan process", - kind: :consumer, - trace_id: trace_id, - span_id: process_span_id - )} + test "records spans for Oban jobs that stop with an exception" do + OpentelemetryOban.insert(TestJobThatThrowsException.new(%{})) + assert %{success: 0, discard: 1} = Oban.drain_queue(queue: :events) - assert_receive {:span, - span( - name: "span inside the job", - kind: :internal, - trace_id: ^trace_id, - parent_span_id: ^process_span_id - )} - end + expected_status = OpenTelemetry.status(:error, "") - test "OpentelemetryOban.insert!/2 returns job on successful insert" do - %Oban.Job{} = OpentelemetryOban.insert!(TestJob.new(%{})) - assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) - assert_receive {:span, span(name: "TestJob send")} - assert_receive {:span, span(name: "TestJob process")} - end + assert_receive {:span, + span( + name: "TestJobThatThrowsException process", + attributes: attributes, + kind: :consumer, + events: events, + status: ^expected_status + )} + + assert %{ + "messaging.destination": "events", + "messaging.destination_kind": :queue, + "messaging.oban.attempt": 1, + "messaging.oban.inserted_at": _inserted_at, + "messaging.oban.job_id": _job_id, + "messaging.oban.max_attempts": 1, + "messaging.oban.priority": 0, + "messaging.oban.scheduled_at": _scheduled_at, + "messaging.oban.worker": "TestJobThatThrowsException", + "messaging.operation": :process, + "messaging.system": :oban + } = :otel_attributes.map(attributes) - test "OpentelemetryOban.insert!/2 raises an error on failed insert" do - assert_raise( - Ecto.InvalidChangesetError, - fn -> OpentelemetryOban.insert!(TestJob.new(%{}, max_attempts: -1)) end + [ + event( + name: "exception", + attributes: event_attributes ) + ] = :otel_events.list(events) - assert %{success: 0, failure: 0} = Oban.drain_queue(queue: :events) + assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == + Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) + end - expected_status = OpenTelemetry.status(:error, "") + test "spans inside the job are associated with the job trace" do + OpentelemetryOban.insert(TestJobWithInnerSpan.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) - assert_receive {:span, - span( - name: "TestJob send", - events: events, - status: ^expected_status - )} + assert_receive {:span, + span( + name: "TestJobWithInnerSpan process", + kind: :consumer, + trace_id: trace_id, + span_id: process_span_id + )} - [ - event( - name: "exception", - attributes: event_attributes - ) - ] = :otel_events.list(events) + assert_receive {:span, + span( + name: "span inside the job", + kind: :internal, + trace_id: ^trace_id, + parent_span_id: ^process_span_id + )} + end - assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == - Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) + test "OpentelemetryOban.insert!/2 returns job on successful insert" do + %Oban.Job{} = OpentelemetryOban.insert!(TestJob.new(%{})) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + assert_receive {:span, span(name: "TestJob send")} + assert_receive {:span, span(name: "TestJob process")} + end - refute_received {:span, span(name: "TestJob process")} - end + test "OpentelemetryOban.insert!/2 raises an error on failed insert" do + assert_raise( + Ecto.InvalidChangesetError, + fn -> OpentelemetryOban.insert!(TestJob.new(%{}, max_attempts: -1)) end + ) - test "tracing information is propagated when using insert_all/2" do - OpentelemetryOban.insert_all([ - TestJob.new(%{}), - TestJob.new(%{}) - ]) + assert %{success: 0, failure: 0} = Oban.drain_queue(queue: :events) - assert %{success: 2, failure: 0} = Oban.drain_queue(queue: :events) + expected_status = OpenTelemetry.status(:error, "") - assert_receive {:span, - span( - name: :"Oban bulk insert", - attributes: _attributes, - trace_id: send_trace_id, - span_id: send_span_id, - kind: :producer, - status: :undefined - )} + assert_receive {:span, + span( + name: "TestJob send", + events: events, + status: ^expected_status + )} - assert_receive {:span, - span( - name: "TestJob process", - attributes: _attributes, - kind: :consumer, - status: :undefined, - trace_id: first_process_trace_id, - links: job_1_links - )} + [ + event( + name: "exception", + attributes: event_attributes + ) + ] = :otel_events.list(events) - [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_1_links) + assert [:"exception.message", :"exception.stacktrace", :"exception.type"] == + Enum.sort(Map.keys(:otel_attributes.map(event_attributes))) - assert_receive {:span, - span( - name: "TestJob process", - attributes: _attributes, - kind: :consumer, - status: :undefined, - trace_id: second_process_trace_id, - links: job_2_links - )} + refute_received {:span, span(name: "TestJob process")} + end - [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_2_links) + test "tracing information is propagated when using insert_all/2" do + OpentelemetryOban.insert_all([ + TestJob.new(%{}), + TestJob.new(%{}) + ]) - # Process is ran asynchronously so we create a new trace, but still link - # the traces together. - assert send_trace_id != first_process_trace_id - assert send_trace_id != second_process_trace_id - assert first_process_trace_id != second_process_trace_id - end + assert %{success: 2, failure: 0} = Oban.drain_queue(queue: :events) - test "works with Oban.Testing.perform_job helper function" do - Oban.Testing.perform_job(TestJob, %{}, repo: TestRepo) + assert_receive {:span, + span( + name: :"Oban bulk insert", + attributes: _attributes, + trace_id: send_trace_id, + span_id: send_span_id, + kind: :producer, + status: :undefined + )} - assert_receive {:span, span(name: "TestJob process")} - end - end + assert_receive {:span, + span( + name: "TestJob process", + attributes: _attributes, + kind: :consumer, + status: :undefined, + trace_id: first_process_trace_id, + links: job_1_links + )} - test "can configure time_unit" do - OpentelemetryOban.setup(time_unit: :second) - OpentelemetryOban.insert(TestJob.new(%{})) - assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :events) + [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_1_links) assert_receive {:span, span( name: "TestJob process", - attributes: attributes, + attributes: _attributes, kind: :consumer, - status: :undefined + status: :undefined, + trace_id: second_process_trace_id, + links: job_2_links )} - assert %{ - "messaging.oban.duration_second": _duration, - "messaging.oban.queue_time_second": _queue_time - } = :otel_attributes.map(attributes) + [link(trace_id: ^send_trace_id, span_id: ^send_span_id)] = :otel_links.list(job_2_links) + + # Process is ran asynchronously so we create a new trace, but still link + # the traces together. + assert send_trace_id != first_process_trace_id + assert send_trace_id != second_process_trace_id + assert first_process_trace_id != second_process_trace_id + end + + test "works with Oban.Testing.perform_job helper function" do + Oban.Testing.perform_job(TestJob, %{}, repo: TestRepo) + + assert_receive {:span, span(name: "TestJob process")} end end