diff --git a/instrumentation/opentelemetry_ecto/lib/opentelemetry_ecto.ex b/instrumentation/opentelemetry_ecto/lib/opentelemetry_ecto.ex index 6ec1be78..6ba74651 100644 --- a/instrumentation/opentelemetry_ecto/lib/opentelemetry_ecto.ex +++ b/instrumentation/opentelemetry_ecto/lib/opentelemetry_ecto.ex @@ -20,17 +20,16 @@ defmodule OpentelemetryEcto do require OpenTelemetry.Tracer - alias OpenTelemetry.SemConv.Incubating.DBAttributes + alias OpenTelemetry.SemConv.Incubating.{DBAttributes, Metrics.DBMetrics} + alias OpenTelemetry.SemConv.{ErrorAttributes, ServerAttributes} @typedoc """ Option that you can pass to `setup/2`. """ @typedoc since: "1.3.0" @type setup_option() :: - {:time_unit, System.time_unit()} - | {:span_prefix, String.t()} - | {:additional_attributes, %{String.t() => term()}} - | {:db_statement, :enabled | :disabled | (String.t() -> String.t())} + {:additional_attributes, %{String.t() => term()}} + | {:db_query, :enabled | :disabled | (String.t() -> String.t())} @doc """ Attaches the `OpentelemetryEcto` handler to your repo events. @@ -56,18 +55,12 @@ defmodule OpentelemetryEcto do You may also supply the following options in the second argument: - * `:time_unit` - a time unit used to convert the values of query phase - timings, defaults to `:microsecond`. See `System.convert_time_unit/3`. - * `:span_prefix` - the first part of the span name. - Defaults to the concatenation of the event name with periods, such as - `"blog.repo.query"`. This will always be followed with a colon and the - source (the table name for SQL adapters). For example: `"blog.repo.query:users"`. * `:additional_attributes` - additional attributes to include in the span. If there are conflicts with default provided attributes, the ones provided with this config will have precedence. - * `:db_statement` - `:disabled` (default), `:enabled`, or a function. + * `:db_query` - `:disabled` (default), `:enabled`, or a function. Whether or not to include DB statements in the **span attributes** (as the - `#{DBAttributes.db_statement()}` attribute). + `#{DBAttributes.db_query_text()}` attribute). Optionally provide a function that takes a query string and returns a sanitized version of it. This is useful for removing sensitive information from the query string. Unless this option is `:enabled` or a function, @@ -82,9 +75,9 @@ defmodule OpentelemetryEcto do @doc false def handle_event( - event, + _event, measurements, - %{query: query, source: source, result: query_result, repo: repo, type: type}, + %{query: query, source: source, result: query_result, repo: repo}, config ) do # Doing all this even if the span isn't sampled so the sampler @@ -94,55 +87,30 @@ defmodule OpentelemetryEcto do end_time = :opentelemetry.timestamp() start_time = end_time - total_time measurements = Map.put(measurements, :total_time, total_time) - database = repo.config()[:database] + repo_config = Keyword.take(repo.config(), [:database, :hostname, :port]) - url = - case repo.config()[:url] do - nil -> - # TODO: add port - URI.to_string(%URI{scheme: "ecto", host: repo.config()[:hostname]}) - - url -> - url - end - - span_prefix = - case Keyword.fetch(config, :span_prefix) do - {:ok, prefix} -> prefix - :error -> Enum.join(event, ".") - end - - span_suffix = if source != nil, do: ":#{source}", else: "" - span_name = span_prefix <> span_suffix - - time_unit = Keyword.get(config, :time_unit, :microsecond) additional_attributes = Keyword.get(config, :additional_attributes, %{}) - db_type = - case type do - :ecto_sql_query -> :sql - _ -> type - end - - # TODO: need connection information to complete the required attributes - # net.peer.name or net.peer.ip and net.peer.port - base_attributes = %{ - :source => source, - :"db.instance" => database, - :"db.type" => db_type, - unquote(DBAttributes.db_name()) => database, - :"db.url" => url - } - - db_statement_config = Keyword.get(config, :db_statement, :disabled) + db_statement_config = Keyword.get(config, :db_query, :disabled) attributes = - base_attributes - |> add_measurements(measurements, time_unit) - |> maybe_add_db_statement(db_statement_config, query) - |> maybe_add_db_system(repo.__adapter__()) + # TODO: need connection information to complete the required attributes + # net.peer.name or net.peer.ip and net.peer.port + %{ + unquote(DBAttributes.db_system()) => db_system(repo.__adapter__()), + unquote(DBAttributes.db_namespace()) => repo_config[:database], + unquote(ServerAttributes.server_address()) => repo_config[:hostname] + } + |> maybe_add_db_collection_name(source) + |> maybe_add_server_port(repo_config) + |> maybe_add_db_operation_name(repo.__adapter__(), query) + |> maybe_add_error_type(repo.__adapter__(), query_result) + |> maybe_add_db_query_text(db_statement_config, query) + |> add_measurements(measurements) |> add_additional_attributes(additional_attributes) + span_name = span_name(attributes) + parent_context = case OpentelemetryProcessPropagator.fetch_ctx(self()) do :undefined -> @@ -187,59 +155,125 @@ defmodule OpentelemetryEcto do defp format_error(_), do: "" - defp add_measurements(attributes, measurements, time_unit) do - measurements - |> Enum.reduce(attributes, fn - {k, v}, acc - when not is_nil(v) and k in [:total_time, :decode_time, :query_time, :queue_time, :idle_time] -> - Map.put( - acc, - String.to_atom("#{k}_#{time_unit}s"), - System.convert_time_unit(v, :native, time_unit) - ) - - _, acc -> - acc - end) + @db_systems [ + {Ecto.Adapters.Postgres, DBAttributes.db_system_values().postgresql}, + {Ecto.Adapters.MyXQL, DBAttributes.db_system_values().mysql}, + {Ecto.Adapters.SQLite3, DBAttributes.db_system_values().sqlite}, + {Ecto.Adapters.Tds, DBAttributes.db_system_values().mssql} + ] + + for {adapter, system} <- @db_systems do + defp db_system(unquote(adapter)), do: unquote(system) end - defp maybe_add_db_statement(attributes, :enabled, query) do - Map.put(attributes, unquote(DBAttributes.db_statement()), query) + # NOTE: This is the catch-all clause where we use other_sql as the db.system value, but it may not be a SQL based database. + defp db_system(_), do: unquote(DBAttributes.db_system_values().other_sql) + + defp maybe_add_db_collection_name(attributes, nil), do: attributes + + defp maybe_add_db_collection_name(attributes, source) do + Map.put(attributes, unquote(DBAttributes.db_collection_name()), source) end - defp maybe_add_db_statement(attributes, :disabled, _query) do - attributes + defp maybe_add_server_port(attributes, repo_config) do + case Keyword.has_key?(repo_config, :port) do + false -> attributes + true -> Map.put(attributes, unquote(ServerAttributes.server_port()), repo_config[:port]) + end end - defp maybe_add_db_statement(attributes, sanitizer, query) when is_function(sanitizer, 1) do - Map.put(attributes, unquote(DBAttributes.db_statement()), sanitizer.(query)) + defp maybe_add_db_operation_name(attributes, adapter, query) do + case get_db_operation_name(adapter, query) do + nil -> attributes + operation_name -> Map.put(attributes, unquote(DBAttributes.db_operation_name()), operation_name) + end end - defp maybe_add_db_statement(attributes, _, _query) do - attributes + # NOTE: Postgres does set a `:command` attribute on the result, but since there is no command for the + # error struct we will parse it all the same here. + defp get_db_operation_name(Ecto.Adapters.Postgres, query), do: sql_command(query) + defp get_db_operation_name(Ecto.Adapters.MyXQL, query), do: sql_command(query) + defp get_db_operation_name(Ecto.Adapters.SQLite3, query), do: sql_command(query) + defp get_db_operation_name(Ecto.Adapters.Tds, query), do: sql_command(query) + defp get_db_operation_name(_, _), do: nil + + defp sql_command(query) when is_binary(query) do + query + |> String.split(" ", trim: true) + |> sql_command() + end + + @sql_commands ~w(select insert update delete begin commit) + + defp sql_command([raw_command | _rest]) do + case String.downcase(raw_command) do + command when command in @sql_commands -> raw_command + _ -> nil + end + end + + defp maybe_add_error_type(attributes, _adapter, {:ok, _}), do: attributes + + defp maybe_add_error_type(attributes, adapter, {:error, error}) do + case get_error_type(adapter, error) do + nil -> attributes + error_type -> Map.put(attributes, unquote(ErrorAttributes.error_type()), error_type) + end end - defp maybe_add_db_system(attributes, Ecto.Adapters.Postgres) do - Map.put(attributes, unquote(DBAttributes.db_system()), :postgresql) + defp get_error_type(Ecto.Adapters.Postgres, %{postgres: %{code: code}}), do: code + defp get_error_type(Ecto.Adapters.MyXQL, %{postgres: %{name: name}}), do: name + # NOTE: Exqlite.Error does not have an error type + # TODO: Normalize error type from the error message? + defp get_error_type(Ecto.Adapters.SQLite3, _), do: nil + defp get_error_type(Ecto.Adapters.Tds, %{mssql: %{number: number}}), do: number + defp get_error_type(_adapter, _), do: nil + + @measurements [ + idle_time: DBMetrics.db_client_connection_create_time(), + total_time: DBMetrics.db_client_operation_duration(), + queue_time: DBMetrics.db_client_connection_wait_time(), + query_time: DBMetrics.db_client_connection_use_time(), + ] + + defp add_measurements(attributes, measurements) do + Enum.reduce(@measurements, attributes, fn {telemetry_key, attribute_key}, attributes -> + case Map.get(measurements, telemetry_key) do + nil -> attributes + value -> Map.put(attributes, attribute_key, System.convert_time_unit(value, :native, :microsecond) / 1_000_000) + end + end) end - defp maybe_add_db_system(attributes, Ecto.Adapters.MyXQL) do - Map.put(attributes, unquote(DBAttributes.db_system()), :mysql) + defp maybe_add_db_query_text(attributes, :enabled, query) do + Map.put(attributes, unquote(DBAttributes.db_query_text()), query) end - defp maybe_add_db_system(attributes, Ecto.Adapters.SQLite3) do - Map.put(attributes, unquote(DBAttributes.db_system()), :sqlite) + defp maybe_add_db_query_text(attributes, :disabled, _query) do + attributes end - defp maybe_add_db_system(attributes, Ecto.Adapters.Tds) do - Map.put(attributes, unquote(DBAttributes.db_system()), :mssql) + defp maybe_add_db_query_text(attributes, sanitizer, query) when is_function(sanitizer, 1) do + Map.put(attributes, unquote(DBAttributes.db_query_text()), sanitizer.(query)) end - defp maybe_add_db_system(attributes, _) do + defp maybe_add_db_query_text(attributes, _, _query) do attributes end defp add_additional_attributes(attributes, additional_attributes) do Map.merge(attributes, additional_attributes) end + + # SHOULD be `{db.operation.name} {target}` if there is a (low-cardinality) {db.operation.name} available. + defp span_name(%{unquote(DBAttributes.db_operation_name()) => db_operation_name} = attributes), do: "#{db_operation_name} #{target(attributes)}" + + # If there is no (low-cardinality) `db.operation.name` available, database span names SHOULD be `{target}`. + defp span_name(attributes), do: target(attributes) + + # `db.collection.name` SHOULD be used for data manipulation operations or operations on database collections. + defp target(%{unquote(DBAttributes.db_collection_name()) => db_collection_name}), do: db_collection_name + + # `db.namespace` SHOULD be used for operations on a specific database namespace. + defp target(%{unquote(DBAttributes.db_namespace()) => db_namespace}), do: db_namespace end diff --git a/instrumentation/opentelemetry_ecto/test/opentelemetry_ecto_test.exs b/instrumentation/opentelemetry_ecto/test/opentelemetry_ecto_test.exs index 14555909..58954723 100644 --- a/instrumentation/opentelemetry_ecto/test/opentelemetry_ecto_test.exs +++ b/instrumentation/opentelemetry_ecto/test/opentelemetry_ecto_test.exs @@ -43,22 +43,28 @@ defmodule OpentelemetryEctoTest do assert_receive {:span, span( - name: "opentelemetry_ecto.test_repo.query:users", + name: "select users", attributes: attributes, kind: :client )} + attributes = :otel_attributes.map(attributes) + assert %{ - "db.system": :postgresql, - "db.instance": "opentelemetry_ecto_test", - "db.type": :sql, - "db.url": "ecto://localhost", - decode_time_microseconds: _, - query_time_microseconds: _, - queue_time_microseconds: _, - source: "users", - total_time_microseconds: _ - } = :otel_attributes.map(attributes) + "db.client.operation.duration": _, + "db.client.connection.wait_time": _, + "db.client.connection.use_time": _, + } = attributes + + attributes = Map.drop(attributes, ~w(db.client.operation.duration db.client.connection.wait_time db.client.connection.use_time)a) + + assert attributes == %{ + "db.system": :postgresql, + "db.collection.name": "users", + "db.namespace": "opentelemetry_ecto_test", + "server.address": "localhost", + "db.operation.name": "select" + } end test "exclude unsantized query" do @@ -66,72 +72,40 @@ defmodule OpentelemetryEctoTest do Repo.all(User) assert_receive {:span, span(attributes: attributes)} - assert !Map.has_key?(:otel_attributes.map(attributes), :"db.statement") + assert !Map.has_key?(:otel_attributes.map(attributes), :"db.query.text") end test "include unsanitized query when enabled" do - attach_handler(db_statement: :enabled) + attach_handler(db_query: :enabled) Repo.all(User) assert_receive {:span, span(attributes: attributes)} - assert %{"db.statement": "SELECT u0.\"id\", u0.\"email\" FROM \"users\" AS u0"} = + assert %{"db.query.text": "SELECT u0.\"id\", u0.\"email\" FROM \"users\" AS u0"} = :otel_attributes.map(attributes) end test "include sanitized query with sanitizer function" do - attach_handler(db_statement: fn str -> String.replace(str, "SELECT", "") end) + attach_handler(db_query: fn str -> String.replace(str, "SELECT", "") end) Repo.all(User) assert_receive {:span, span(attributes: attributes)} - assert %{"db.statement": " u0.\"id\", u0.\"email\" FROM \"users\" AS u0"} = + assert %{"db.query.text": " u0.\"id\", u0.\"email\" FROM \"users\" AS u0"} = :otel_attributes.map(attributes) end test "include additional_attributes" do - attach_handler(additional_attributes: %{"config.attribute": "special value", "db.instance": "my_instance"}) + attach_handler(additional_attributes: %{"config.attribute": "special value", "db.system": "my_system"}) Repo.all(User) assert_receive {:span, span(attributes: attributes)} - assert %{"config.attribute": "special value", "db.instance": "my_instance"} = + assert %{"config.attribute": "special value", "db.system": "my_system"} = :otel_attributes.map(attributes) end - test "changes the time unit" do - attach_handler(time_unit: :millisecond) - - Repo.all(Post) - - assert_receive {:span, - span( - name: "opentelemetry_ecto.test_repo.query:posts", - attributes: attributes - )} - - assert %{ - "db.system": :postgresql, - "db.instance": "opentelemetry_ecto_test", - "db.type": :sql, - "db.url": "ecto://localhost", - decode_time_milliseconds: _, - query_time_milliseconds: _, - queue_time_milliseconds: _, - source: "posts", - total_time_milliseconds: _ - } = :otel_attributes.map(attributes) - end - - test "changes the span name prefix" do - attach_handler(span_prefix: "Ecto") - - Repo.all(User) - - assert_receive {:span, span(name: "Ecto:users")} - end - test "collects multiple spans" do user = Repo.insert!(%User{email: "opentelemetry@erlang.org"}) Repo.insert!(%Post{body: "We got traced!", user: user}) @@ -142,8 +116,8 @@ defmodule OpentelemetryEctoTest do |> Repo.all() |> Repo.preload([:posts]) - assert_receive {:span, span(name: "opentelemetry_ecto.test_repo.query:users")} - assert_receive {:span, span(name: "opentelemetry_ecto.test_repo.query:posts")} + assert_receive {:span, span(name: "select users")} + assert_receive {:span, span(name: "select posts")} end test "sets error message on error" do @@ -157,11 +131,14 @@ defmodule OpentelemetryEctoTest do assert_receive {:span, span( - name: "opentelemetry_ecto.test_repo.query:users", - status: {:status, :error, message} + name: "select users", + status: {:status, :error, message}, + attributes: attributes )} assert message =~ "non_existent_field does not exist" + + assert %{"error.type": :undefined_column} = :otel_attributes.map(attributes) end test "preloads in sequence are tied to the parent span" do @@ -180,19 +157,19 @@ defmodule OpentelemetryEctoTest do assert_receive {:span, span( parent_span_id: ^root_span_id, - name: "opentelemetry_ecto.test_repo.query:users" + name: "select users" )} assert_receive {:span, span( parent_span_id: ^root_span_id, - name: "opentelemetry_ecto.test_repo.query:posts" + name: "select posts" )} assert_receive {:span, span( parent_span_id: ^root_span_id, - name: "opentelemetry_ecto.test_repo.query:comments" + name: "select comments" )} end @@ -212,19 +189,19 @@ defmodule OpentelemetryEctoTest do assert_receive {:span, span( parent_span_id: ^root_span_id, - name: "opentelemetry_ecto.test_repo.query:users" + name: "select users" )} assert_receive {:span, span( parent_span_id: ^root_span_id, - name: "opentelemetry_ecto.test_repo.query:posts" + name: "select posts" )} assert_receive {:span, span( parent_span_id: ^root_span_id, - name: "opentelemetry_ecto.test_repo.query:comments" + name: "select comments" )} end @@ -246,34 +223,34 @@ defmodule OpentelemetryEctoTest do assert_receive {:span, span( parent_span_id: ^root_span_id, - name: "opentelemetry_ecto.test_repo.query:users" + name: "select users" )} # comments preload assert_receive {:span, span( parent_span_id: ^root_span_id, - name: "opentelemetry_ecto.test_repo.query:comments" + name: "select comments" )} # users preload assert_receive {:span, span( parent_span_id: ^root_span_id, - name: "opentelemetry_ecto.test_repo.query:users" + name: "select users" )} # preloads of user assert_receive {:span, span( parent_span_id: ^root_span_id, - name: "opentelemetry_ecto.test_repo.query:posts" + name: "select posts" )} assert_receive {:span, span( parent_span_id: ^root_span_id, - name: "opentelemetry_ecto.test_repo.query:comments" + name: "select comments" )} end @@ -301,7 +278,7 @@ defmodule OpentelemetryEctoTest do assert_receive {:span, span( parent_span_id: ^parent_span_id, - name: "opentelemetry_ecto.test_repo.query:users" + name: "select users" )} end