Skip to content

Commit

Permalink
Introduce SpanContext for distributed tracing (#74)
Browse files Browse the repository at this point in the history
* Introduce SpanContext for distributed tracing

* Send SpanContext to the Sender along with the spans

* Remove Logger messages that would commonly appear in testing

* Don't wait for as long for absence of traces

* Send a Trace to the Sender instead of a list of Spans and a SpanContext
  • Loading branch information
GregMefford authored and zachdaniel committed Aug 29, 2018
1 parent 89f852b commit 42dd125
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 108 deletions.
4 changes: 3 additions & 1 deletion lib/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ defmodule Spandex.Adapter do
The callbacks required to implement the Spandex.Adapter behaviour.
"""

@callback distributed_context(Plug.Conn.t(), Keyword.t()) :: {:ok, term} | {:error, term}
@callback distributed_context(Plug.Conn.t(), Keyword.t()) ::
{:ok, Spandex.SpanContext.t()}
| {:error, atom()}
@callback trace_id() :: Spandex.id()
@callback span_id() :: Spandex.id()
@callback now() :: Spandex.timestamp()
Expand Down
7 changes: 3 additions & 4 deletions lib/plug/start_trace.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Spandex.Plug.StartTrace do
@behaviour Plug

alias Spandex.Plug.Utils
alias Spandex.SpanContext

@init_opts Optimal.schema(
opts: [
Expand Down Expand Up @@ -55,14 +56,12 @@ defmodule Spandex.Plug.StartTrace do
tracer_opts = opts[:tracer_opts]

case tracer.distributed_context(conn, tracer_opts) do
{:ok, %{trace_id: trace_id, parent_id: parent_id}} ->
tracer.continue_trace("request", trace_id, parent_id, tracer_opts)

{:ok, %SpanContext{} = span_context} ->
tracer.continue_trace("request", span_context, tracer_opts)
Utils.trace(conn, true)

{:error, :no_distributed_trace} ->
tracer.start_trace(opts[:span_name], tracer_opts)

Utils.trace(conn, true)

_ ->
Expand Down
6 changes: 3 additions & 3 deletions lib/span.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ defmodule Spandex.Span do
env: :string,
error: :keyword,
http: :keyword,
id: :integer,
id: :any,
name: :string,
parent_id: :integer,
parent_id: :any,
private: :keyword,
resource: [:atom, :string],
service: :atom,
services: :keyword,
sql_query: :keyword,
start: :integer,
tags: :keyword,
trace_id: :integer,
trace_id: :any,
type: :atom
],
defaults: [
Expand Down
23 changes: 23 additions & 0 deletions lib/span_context.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Spandex.SpanContext do
@moduledoc """
From the [OpenTracing specification]:
> Each SpanContext encapsulates the following state:
> * Any OpenTracing-implementation-dependent state (for example, trace and span ids) needed to refer to a distinct Span across a process boundary
> * Baggage Items, which are just key:value pairs that cross process boundaries
[OpenTracing specification]: https://github.com/opentracing/specification/blob/master/specification.md
"""

@typedoc @moduledoc
@type t :: %__MODULE__{
trace_id: Spandex.id(),
parent_id: Spandex.id(),
priority: integer(),
baggage: Keyword.t()
}

defstruct trace_id: nil,
parent_id: nil,
priority: 1,
baggage: []
end
56 changes: 37 additions & 19 deletions lib/spandex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Spandex do

alias Spandex.{
Span,
SpanContext,
Trace,
Tracer
}
Expand Down Expand Up @@ -46,7 +47,6 @@ defmodule Spandex do

case strategy.get_trace(opts[:trace_key]) do
{:error, :no_trace_context} = error ->
Logger.error("Tried to start a span without an active trace.")
error

{:error, _} = error ->
Expand All @@ -71,11 +71,9 @@ defmodule Spandex do

case strategy.get_trace(opts[:trace_key]) do
{:error, :no_trace_context} = error ->
Logger.error("Tried to update a span without an active trace.")
error

{:ok, %Trace{stack: []}} ->
Logger.error("Tried to update a span without an active span.")
{:error, :no_span_context}

{:ok, trace} ->
Expand Down Expand Up @@ -107,7 +105,6 @@ defmodule Spandex do

case strategy.get_trace(opts[:trace_key]) do
{:error, :no_trace_context} = error ->
Logger.error("Tried to update a span without an active trace.")
error

{:ok, %Trace{stack: stack, spans: spans} = trace} ->
Expand Down Expand Up @@ -136,10 +133,11 @@ defmodule Spandex do
Logger.error("Tried to finish a trace without an active trace.")
error

{:ok, %Trace{spans: spans, stack: stack}} ->
{:ok, %Trace{spans: spans, stack: stack} = trace} ->
unfinished_spans = Enum.map(stack, &ensure_completion_time_set(&1, adapter))
sender = opts[:sender] || adapter.default_sender()
sender.send_spans(spans ++ unfinished_spans)
# TODO: We need to define a behaviour for the Sender API.
sender.send_trace(%Trace{trace | spans: spans ++ unfinished_spans, stack: []})
strategy.delete_trace(opts[:trace_key])

{:error, _} = error ->
Expand All @@ -161,7 +159,6 @@ defmodule Spandex do

case strategy.get_trace(opts[:trace_key]) do
{:error, :no_trace_context} = error ->
Logger.error("Tried to finish a span without an active trace.")
error

{:ok, %Trace{stack: []}} ->
Expand Down Expand Up @@ -244,25 +241,36 @@ defmodule Spandex do
end
end

@spec continue_trace(String.t(), Spandex.id(), Spandex.id(), Keyword.t()) ::
@spec continue_trace(String.t(), SpanContext.t(), Keyword.t()) ::
{:ok, %Trace{}}
| {:error, :disabled}
| {:error, :trace_already_present}
| {:error, [Optimal.error()]}
def continue_trace(_, _, _, :disabled), do: {:error, :disabled}
def continue_trace(_, _, :disabled), do: {:error, :disabled}

def continue_trace(name, trace_id, span_id, opts) do
def continue_trace(name, %SpanContext{} = span_context, opts) do
strategy = opts[:strategy]
opts = Keyword.put(opts, :parent_id, span_id)

if strategy.trace_active?(opts[:trace_key]) do
Logger.error("Tried to continue a trace over top of another trace.")
{:error, :trace_already_present}
else
do_continue_trace(name, trace_id, opts)
do_continue_trace(name, span_context, opts)
end
end

@spec continue_trace(String.t(), Spandex.id(), Spandex.id(), Keyword.t()) ::
{:ok, %Trace{}}
| {:error, :disabled}
| {:error, :trace_already_present}
| {:error, [Optimal.error()]}
@deprecated "Use continue_trace/3 instead"
def continue_trace(_, _, _, :disabled), do: {:error, :disabled}

def continue_trace(name, trace_id, span_id, opts) do
continue_trace(name, %SpanContext{trace_id: trace_id, parent_id: span_id}, opts)
end

@spec continue_trace_from_span(String.t(), Span.t(), Tracer.opts()) ::
{:ok, %Trace{}}
| {:error, :disabled}
Expand Down Expand Up @@ -294,12 +302,19 @@ defmodule Spandex do

# Private Helpers

defp do_continue_trace(name, trace_id, opts) do
defp do_continue_trace(name, span_context, opts) do
strategy = opts[:strategy]
adapter = opts[:adapter]

with {:ok, top_span} <- span(name, opts, trace_id, adapter) do
trace = %Trace{id: trace_id, stack: [top_span], spans: []}
with {:ok, top_span} <- span(name, opts, span_context, adapter) do
trace = %Trace{
id: span_context.trace_id,
priority: span_context.priority,
baggage: span_context.baggage,
stack: [top_span],
spans: []
}

strategy.put_trace(opts[:trace_key], trace)
end
end
Expand Down Expand Up @@ -328,8 +343,9 @@ defmodule Spandex do
defp do_start_span(name, %Trace{stack: [], id: trace_id} = trace, opts) do
strategy = opts[:strategy]
adapter = opts[:adapter]
span_context = %SpanContext{trace_id: trace_id}

with {:ok, span} <- span(name, opts, trace_id, adapter),
with {:ok, span} <- span(name, opts, span_context, adapter),
{:ok, _trace} <- strategy.put_trace(opts[:trace_key], %{trace | stack: [span]}) do
Logger.metadata(span_id: span.id)
{:ok, span}
Expand All @@ -340,8 +356,9 @@ defmodule Spandex do
strategy = opts[:strategy]
adapter = opts[:adapter]
trace_id = adapter.trace_id()
span_context = %SpanContext{trace_id: trace_id}

with {:ok, span} <- span(name, opts, trace_id, adapter) do
with {:ok, span} <- span(name, opts, span_context, adapter) do
Logger.metadata(trace_id: trace_id, span_id: span.id)
trace = %Trace{spans: [], stack: [span], id: trace_id}
strategy.put_trace(opts[:trace_key], trace)
Expand Down Expand Up @@ -373,10 +390,11 @@ defmodule Spandex do

defp ensure_completion_time_set(%Span{} = span, _adapter), do: span

defp span(name, opts, trace_id, adapter) do
defp span(name, opts, span_context, adapter) do
opts
|> Keyword.put_new(:name, name)
|> Keyword.put(:trace_id, trace_id)
|> Keyword.put(:trace_id, span_context.trace_id)
|> Keyword.put(:parent_id, span_context.parent_id)
|> Keyword.put(:start, adapter.now())
|> Keyword.put(:id, adapter.span_id())
|> Span.new()
Expand Down
21 changes: 15 additions & 6 deletions lib/trace.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@ defmodule Spandex.Trace do
@moduledoc """
A representation of an ongoing trace.
`stack` represents all parent spans, while `spans` represents
all completed spans.
* `baggage`: Key-value metadata about the overall trace (propagated across distributed service)
* `id`: The trace ID, which consistently refers to this trace across distributed services
* `priority`: The trace sampling priority for this trace (propagated across distributed services)
* `spans`: The set of completed spans for this trace from this proces
* `stack`: The stack of active parent spans
"""
defstruct [:stack, :spans, :id, :start]
defstruct baggage: [],
id: nil,
priority: 1,
spans: [],
stack: []

@typedoc @moduledoc
@type t :: %__MODULE__{
stack: [Spandex.Span.t()],
baggage: Keyword.t(),
id: Spandex.id(),
priority: integer(),
spans: [Spandex.Span.t()],
id: Spandex.id()
stack: [Spandex.Span.t()]
}
end
15 changes: 9 additions & 6 deletions lib/tracer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ defmodule Spandex.Tracer do
```
"""

alias Spandex.Trace
alias Spandex.Span
alias Spandex.{
Span,
SpanContext,
Trace
}

@type tagged_tuple(arg) :: {:ok, arg} | {:error, term}
@type tagged_tuple(arg) :: {:ok, arg} | {:error, term()}
@type span_name() :: String.t()
@type opts :: Keyword.t() | :disabled

Expand All @@ -26,7 +29,7 @@ defmodule Spandex.Tracer do
@callback finish_trace(opts) :: tagged_tuple(Trace.t())
@callback finish_span(opts) :: tagged_tuple(Span.t())
@callback span_error(error :: Exception.t(), stacktrace :: [term], opts) :: tagged_tuple(Span.t())
@callback continue_trace(span_name, trace_id :: term, span_id :: term, opts) :: tagged_tuple(Trace.t())
@callback continue_trace(span_name :: String.t(), trace_context :: SpanContext.t(), opts) :: tagged_tuple(Trace.t())
@callback continue_trace_from_span(span_name, span :: term, opts) :: tagged_tuple(Trace.t())
@callback current_trace_id(opts) :: nil | Spandex.id()
@callback current_span_id(opts) :: nil | Spandex.id()
Expand Down Expand Up @@ -193,8 +196,8 @@ defmodule Spandex.Tracer do
end

@impl Spandex.Tracer
def continue_trace(span_name, trace_id, span_id, opts \\ []) do
Spandex.continue_trace(span_name, trace_id, span_id, config(opts, @otp_app))
def continue_trace(span_name, span_context, opts \\ []) do
Spandex.continue_trace(span_name, span_context, config(opts, @otp_app))
end

@impl Spandex.Tracer
Expand Down
Loading

0 comments on commit 42dd125

Please sign in to comment.