Skip to content

Commit

Permalink
Use 1.27 semantic convention
Browse files Browse the repository at this point in the history
  • Loading branch information
danschultzer committed Jan 8, 2025
1 parent b6c577b commit a808399
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 151 deletions.
207 changes: 121 additions & 86 deletions instrumentation/opentelemetry_ecto/lib/opentelemetry_ecto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ defmodule OpentelemetryEcto do

require OpenTelemetry.Tracer

alias OpenTelemetry.SemConv.Incubating.DBAttributes
alias OpenTelemetry.SemConv.Incubating.{DBAttributes, Metrics.DBMetrics}
alias OpenTelemetry.SemConv.{ErrorAttributes, ServerAttributes}

@typedoc """
Option that you can pass to `setup/2`.
"""
@typedoc since: "1.3.0"
@type setup_option() ::
{:time_unit, System.time_unit()}
| {:span_prefix, String.t()}
| {:additional_attributes, %{String.t() => term()}}
| {:db_statement, :enabled | :disabled | (String.t() -> String.t())}
{:additional_attributes, %{String.t() => term()}}
| {:db_query, :enabled | :disabled | (String.t() -> String.t())}

@doc """
Attaches the `OpentelemetryEcto` handler to your repo events.
Expand All @@ -56,18 +55,12 @@ defmodule OpentelemetryEcto do
You may also supply the following options in the second argument:
* `:time_unit` - a time unit used to convert the values of query phase
timings, defaults to `:microsecond`. See `System.convert_time_unit/3`.
* `:span_prefix` - the first part of the span name.
Defaults to the concatenation of the event name with periods, such as
`"blog.repo.query"`. This will always be followed with a colon and the
source (the table name for SQL adapters). For example: `"blog.repo.query:users"`.
* `:additional_attributes` - additional attributes to include in the span. If there
are conflicts with default provided attributes, the ones provided with
this config will have precedence.
* `:db_statement` - `:disabled` (default), `:enabled`, or a function.
* `:db_query` - `:disabled` (default), `:enabled`, or a function.
Whether or not to include DB statements in the **span attributes** (as the
`#{DBAttributes.db_statement()}` attribute).
`#{DBAttributes.db_query_text()}` attribute).
Optionally provide a function that takes a query string and returns a
sanitized version of it. This is useful for removing sensitive information from the
query string. Unless this option is `:enabled` or a function,
Expand All @@ -82,9 +75,9 @@ defmodule OpentelemetryEcto do

@doc false
def handle_event(
event,
_event,
measurements,
%{query: query, source: source, result: query_result, repo: repo, type: type},
%{query: query, source: source, result: query_result, repo: repo},
config
) do
# Doing all this even if the span isn't sampled so the sampler
Expand All @@ -94,55 +87,30 @@ defmodule OpentelemetryEcto do
end_time = :opentelemetry.timestamp()
start_time = end_time - total_time
measurements = Map.put(measurements, :total_time, total_time)
database = repo.config()[:database]
repo_config = Keyword.take(repo.config(), [:database, :hostname, :port])

url =
case repo.config()[:url] do
nil ->
# TODO: add port
URI.to_string(%URI{scheme: "ecto", host: repo.config()[:hostname]})

url ->
url
end

span_prefix =
case Keyword.fetch(config, :span_prefix) do
{:ok, prefix} -> prefix
:error -> Enum.join(event, ".")
end

span_suffix = if source != nil, do: ":#{source}", else: ""
span_name = span_prefix <> span_suffix

time_unit = Keyword.get(config, :time_unit, :microsecond)
additional_attributes = Keyword.get(config, :additional_attributes, %{})

db_type =
case type do
:ecto_sql_query -> :sql
_ -> type
end
db_statement_config = Keyword.get(config, :db_query, :disabled)

# TODO: need connection information to complete the required attributes
# net.peer.name or net.peer.ip and net.peer.port
base_attributes = %{
:source => source,
:"db.instance" => database,
:"db.type" => db_type,
unquote(DBAttributes.db_name()) => database,
:"db.url" => url
}

db_statement_config = Keyword.get(config, :db_statement, :disabled)

attributes =
base_attributes
|> add_measurements(measurements, time_unit)
|> maybe_add_db_statement(db_statement_config, query)
|> maybe_add_db_system(repo.__adapter__())
%{
unquote(DBAttributes.db_system()) => db_system(repo.__adapter__()),
unquote(DBAttributes.db_namespace()) => repo_config[:database],
unquote(ServerAttributes.server_address()) => repo_config[:hostname]
}
|> maybe_add_db_collection_name(source)
|> maybe_add_server_port(repo_config)
|> maybe_add_db_operation_name(repo.__adapter__(), query)
|> maybe_add_error_type(repo.__adapter__(), query_result)
|> maybe_add_db_query_text(db_statement_config, query)
|> add_measurements(measurements)
|> add_additional_attributes(additional_attributes)

span_name = span_name(attributes)

parent_context =
case OpentelemetryProcessPropagator.fetch_ctx(self()) do
:undefined ->
Expand Down Expand Up @@ -187,59 +155,126 @@ defmodule OpentelemetryEcto do

defp format_error(_), do: ""

defp add_measurements(attributes, measurements, time_unit) do
measurements
|> Enum.reduce(attributes, fn
{k, v}, acc
when not is_nil(v) and k in [:total_time, :decode_time, :query_time, :queue_time, :idle_time] ->
Map.put(
acc,
String.to_atom("#{k}_#{time_unit}s"),
System.convert_time_unit(v, :native, time_unit)
)

_, acc ->
acc
end)
@db_systems [
{Ecto.Adapters.Postgres, DBAttributes.db_system_values().postgresql},
{Ecto.Adapters.MyXQL, DBAttributes.db_system_values().mysql},
{Ecto.Adapters.SQLite3, DBAttributes.db_system_values().sqlite},
{Ecto.Adapters.Tds, DBAttributes.db_system_values().mssql}
]

for {adapter, system} <- @db_systems do
defp db_system(unquote(adapter)), do: unquote(system)
end

defp maybe_add_db_statement(attributes, :enabled, query) do
Map.put(attributes, unquote(DBAttributes.db_statement()), query)
# NOTE: This is the catch-all clause where we use other_sql as the db.system value, but it may not be a SQL based database.
defp db_system(_), do: unquote(DBAttributes.db_system_values().other_sql)

defp maybe_add_db_collection_name(attributes, nil), do: attributes

defp maybe_add_db_collection_name(attributes, source) do
Map.put(attributes, unquote(DBAttributes.db_collection_name()), source)
end

defp maybe_add_db_statement(attributes, :disabled, _query) do
attributes
defp maybe_add_server_port(attributes, repo_config) do
case Keyword.has_key?(repo_config, :port) do
false -> attributes
true -> Map.put(attributes, unquote(ServerAttributes.server_port()), repo_config[:port])
end
end

defp maybe_add_db_statement(attributes, sanitizer, query) when is_function(sanitizer, 1) do
Map.put(attributes, unquote(DBAttributes.db_statement()), sanitizer.(query))
defp maybe_add_db_operation_name(attributes, adapter, query) do
case get_db_operation_name(adapter, query) do
nil -> attributes
operation_name -> Map.put(attributes, unquote(DBAttributes.db_operation_name()), operation_name)
end
end

defp maybe_add_db_statement(attributes, _, _query) do
attributes
# NOTE: Postgres does set a `:command` attribute on the result, but since there is no command for the
# error struct we will parse it all the same here.
defp get_db_operation_name(Ecto.Adapters.Postgres, query), do: sql_command(query)
defp get_db_operation_name(Ecto.Adapters.MyXQL, query), do: sql_command(query)
defp get_db_operation_name(Ecto.Adapters.SQLite3, query), do: sql_command(query)
defp get_db_operation_name(Ecto.Adapters.Tds, query), do: sql_command(query)
defp get_db_operation_name(_, _), do: nil

defp sql_command(query) when is_binary(query) do
query
|> String.split(" ", trim: true)
|> sql_command()
end

@sql_commands ~w(select insert update delete begin commit)

defp sql_command([raw_command | _rest]) do
case String.downcase(raw_command) do
command when command in @sql_commands -> raw_command
_ -> nil
end
end

defp maybe_add_error_type(attributes, _adapter, {:ok, _}), do: attributes

defp maybe_add_error_type(attributes, adapter, {:error, error}) do
case get_error_type(adapter, error) do
nil -> attributes
error_type -> Map.put(attributes, unquote(ErrorAttributes.error_type()), error_type)
end
end

defp maybe_add_db_system(attributes, Ecto.Adapters.Postgres) do
Map.put(attributes, unquote(DBAttributes.db_system()), :postgresql)
defp get_error_type(Ecto.Adapters.Postgres, %{postgres: %{code: code}}), do: code
defp get_error_type(Ecto.Adapters.MyXQL, %{postgres: %{name: name}}), do: name
# NOTE: Exqlite.Error does not have an error type
# TODO: Normalize error type from the error message?
defp get_error_type(Ecto.Adapters.SQLite3, _), do: nil
defp get_error_type(Ecto.Adapters.Tds, %{mssql: %{number: number}}), do: number
defp get_error_type(_adapter, _), do: nil

@measurements [
idle_time: DBMetrics.db_client_connection_create_time(),
total_time: DBMetrics.db_client_operation_duration(),
queue_time: DBMetrics.db_client_connection_wait_time(),
query_time: DBMetrics.db_client_connection_use_time()
]

defp add_measurements(attributes, measurements) do
Enum.reduce(@measurements, attributes, fn {telemetry_key, attribute_key}, attributes ->
case Map.get(measurements, telemetry_key) do
nil -> attributes
value -> Map.put(attributes, attribute_key, System.convert_time_unit(value, :native, :microsecond) / 1_000_000)
end
end)
end

defp maybe_add_db_system(attributes, Ecto.Adapters.MyXQL) do
Map.put(attributes, unquote(DBAttributes.db_system()), :mysql)
defp maybe_add_db_query_text(attributes, :enabled, query) do
Map.put(attributes, unquote(DBAttributes.db_query_text()), query)
end

defp maybe_add_db_system(attributes, Ecto.Adapters.SQLite3) do
Map.put(attributes, unquote(DBAttributes.db_system()), :sqlite)
defp maybe_add_db_query_text(attributes, :disabled, _query) do
attributes
end

defp maybe_add_db_system(attributes, Ecto.Adapters.Tds) do
Map.put(attributes, unquote(DBAttributes.db_system()), :mssql)
defp maybe_add_db_query_text(attributes, sanitizer, query) when is_function(sanitizer, 1) do
Map.put(attributes, unquote(DBAttributes.db_query_text()), sanitizer.(query))
end

defp maybe_add_db_system(attributes, _) do
defp maybe_add_db_query_text(attributes, _, _query) do
attributes
end

defp add_additional_attributes(attributes, additional_attributes) do
Map.merge(attributes, additional_attributes)
end

# SHOULD be `{db.operation.name} {target}` if there is a (low-cardinality) {db.operation.name} available.
defp span_name(%{unquote(DBAttributes.db_operation_name()) => db_operation_name} = attributes),
do: "#{db_operation_name} #{target(attributes)}"

# If there is no (low-cardinality) `db.operation.name` available, database span names SHOULD be `{target}`.
defp span_name(attributes), do: target(attributes)

# `db.collection.name` SHOULD be used for data manipulation operations or operations on database collections.
defp target(%{unquote(DBAttributes.db_collection_name()) => db_collection_name}), do: db_collection_name

# `db.namespace` SHOULD be used for operations on a specific database namespace.
defp target(%{unquote(DBAttributes.db_namespace()) => db_namespace}), do: db_namespace
end
Loading

0 comments on commit a808399

Please sign in to comment.