Skip to content

Commit 0db6747

Browse files
committed
feat: :on_prepare_payload option
1 parent ea5f751 commit 0db6747

File tree

5 files changed

+91
-3
lines changed

5 files changed

+91
-3
lines changed

ex/lib/logflare_ex.ex

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,22 @@ defmodule LogflareEx do
6060
def send_events(%Client{source_token: nil, source_name: nil}, _batch), do: {:error, :no_source}
6161

6262
def send_events(client, [%{} | _] = batch) do
63-
body = Bertex.encode(%{"batch" => batch, "source" => client.source_token})
63+
on_prepare_payload = Map.get(client, :on_prepare_payload)
64+
65+
prepared_batch =
66+
if on_prepare_payload do
67+
Enum.map(batch, fn event ->
68+
case on_prepare_payload do
69+
{m, f, 1} -> apply(m, f, [event])
70+
cb when is_function(cb) -> cb.(event)
71+
_ -> event
72+
end
73+
end)
74+
else
75+
batch
76+
end
77+
78+
body = Bertex.encode(%{"batch" => prepared_batch, "source" => client.source_token})
6479

6580
case Tesla.post(client.tesla_client, "/api/logs", body) do
6681
{:ok, %Tesla.Env{status: status, body: body}} when status < 300 ->

ex/lib/logflare_ex/client.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ defmodule LogflareEx.Client do
4141
- `:source_token`: Source UUID. Mutually exclusive with `:source_name`
4242
- `:source_name`: Source name. Mutually exclusive with `:source_token`
4343
- `:on_error`: mfa callback for handling API errors. Must be 1 arity.
44+
- `:on_prepare_payload`: mfa callback or anonymous function for preparing the final payload before sending to API. Must be 1 arity.
4445
- `:auto_flush`: Used for batching. Enables automatic flushing. If disabled, `LogflareEx.flush/1` must be called.
4546
- `:flush_interval`: Used for batching. Flushes cached events at the provided interval.
4647
- `:batch_size`: Used for batching. It is the maximum number of events send per API request.
@@ -61,6 +62,7 @@ defmodule LogflareEx.Client do
6162
field(:source_token, String.t())
6263
field(:source_name, String.t())
6364
field(:on_error, list() | mfa(), default: nil)
65+
field(:on_prepare_payload, list() | mfa(), default: nil)
6466
# batching
6567
field(:auto_flush, :boolean, default: true)
6668
field(:flush_interval, non_neg_integer(), default: @default_flush_interval)
@@ -79,6 +81,7 @@ defmodule LogflareEx.Client do
7981
source_name: get_config_value(:source_name),
8082
tesla_client: nil,
8183
on_error: get_config_value(:on_error),
84+
on_prepare_payload: get_config_value(:on_prepare_payload),
8285
flush_interval: get_config_value(:flush_interval) || @default_flush_interval,
8386
batch_size: get_config_value(:batch_size) || @default_batch_size
8487
})

ex/lib/logflare_ex/telemetry_reporter.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,16 @@ defmodule LogflareEx.TelemetryReporter do
6060
def handle_attach(event, measurements, metadata, config) when is_list(config) do
6161
# merge configuration
6262
config_file_opts = (Application.get_env(:logflare_ex, __MODULE__) || []) |> Map.new()
63-
opts = Enum.into(config, config_file_opts)
63+
64+
opts =
65+
Enum.into(config, config_file_opts)
6466

6567
payload = %{metadata: metadata, measurements: measurements}
6668
to_include = Map.get(opts, :include, [])
6769

6870
filtered_payload =
6971
for path <- to_include,
70-
String.starts_with?(path, "measurements.") or String.starts_with?(path, "metadata."),
72+
String.starts_with?(path, "measurements") or String.starts_with?(path, "metadata"),
7173
reduce: %{} do
7274
acc -> put_path(acc, path, get_path(payload, path))
7375
end

ex/test/logflare_ex_test.exs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,38 @@ defmodule LogflareExTest do
8585
end
8686
end
8787

88+
describe "on_prepare_payload" do
89+
test "triggered before payload is sent" do
90+
pid = self()
91+
92+
Tesla
93+
|> expect(:post, 3, fn _client, _path, body ->
94+
%{"batch" => [event]} = Bertex.decode(body)
95+
send(pid, {event.ref, event})
96+
{:ok, %Tesla.Env{status: 500, body: "some server error"}}
97+
end)
98+
99+
LogflareEx.TestUtils
100+
|> expect(:stub_function, 2, fn data ->
101+
%{different: "value", ref: data.ref}
102+
end)
103+
104+
for cb <- [
105+
{LogflareEx.TestUtils, :stub_function, 1},
106+
&LogflareEx.TestUtils.stub_function/1,
107+
fn data -> %{different: "value", ref: data.ref} end
108+
] do
109+
client = LogflareEx.client(api_key: "123", source_token: "123", on_prepare_payload: cb)
110+
ref = make_ref()
111+
112+
assert {:error, %Tesla.Env{}} =
113+
LogflareEx.send_events(client, [%{some: "event", ref: ref}])
114+
115+
assert_receive {^ref, %{different: "value", ref: _}}
116+
end
117+
end
118+
end
119+
88120
describe "batching" do
89121
setup do
90122
pid = start_supervised!(BatcherSup)

ex/test/telemetry_reporter_test.exs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,42 @@ defmodule LogflareEx.TelemetryReporterTest do
119119
assert event[:measurements][:latency] == [123, 223]
120120
refute event[:measurements][:other]
121121
end
122+
123+
test "handle_attach/4 with :on_prepare_payload with anonymous function" do
124+
pid = self()
125+
ref = make_ref()
126+
127+
Tesla
128+
|> expect(:post, fn _client, _path, body ->
129+
decoded = Bertex.decode(body)
130+
send(pid, {ref, decoded})
131+
{:ok, %Tesla.Env{status: 201, body: Jason.encode!(%{"message" => "server msg"})}}
132+
end)
133+
134+
:telemetry.attach("my-id", [:some, :event], &TelemetryReporter.handle_attach/4,
135+
auto_flush: true,
136+
flush_interval: 50,
137+
include: ["measurements"],
138+
on_prepare_payload: fn payload ->
139+
payload
140+
|> Map.put(:message, "hello!")
141+
|> Map.put(:test, payload.measurements.other)
142+
end
143+
)
144+
145+
:telemetry.execute([:some, :event], %{latency: [123, 223], other: "value"}, %{
146+
some: "metadata",
147+
to_exclude: "this field"
148+
})
149+
150+
Process.sleep(300)
151+
152+
assert_received {^ref, %{"batch" => [event]}}
153+
# other fields will be included
154+
assert event[:message] == "hello!"
155+
assert event[:test] == "value"
156+
assert event[:measurements][:latency] == [123, 223]
157+
end
122158
end
123159

124160
# reporter

0 commit comments

Comments
 (0)