Skip to content

Commit ce9ac97

Browse files
authored
add force_flush and documentation to support running on AWS Lambda (#114)
1 parent d8d71c6 commit ce9ac97

File tree

4 files changed

+184
-0
lines changed

4 files changed

+184
-0
lines changed

src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@
6969
//! All environment variables supported by the Rust Opentelemetry SDK are also supported by the
7070
//! Logfire SDK.
7171
//!
72+
//! ## Usage Guide
73+
//!
74+
//! See the [usage guide][usage] for more detailed information about how to use this SDK to its full potential.
75+
//!
7276
//! # Examples
7377
//!
7478
//! See [examples][usage::examples] subchapter of this documentation.

src/logfire.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,22 @@ impl Logfire {
9595
}
9696
}
9797

98+
/// Forcibly flush the current data captured by Logfire.
99+
///
100+
/// Note: this will block until data is flushed. If called from an async context,
101+
/// consider using `tokio::task::spawn_blocking` or similar to avoid blocking the
102+
/// async runtime.
103+
///
104+
/// # Errors
105+
///
106+
/// This will error if the underlying OpenTelemetry SDK fails to flush data.
107+
pub fn force_flush(&self) -> Result<(), opentelemetry_sdk::error::OTelSdkError> {
108+
self.tracer_provider.force_flush()?;
109+
self.meter_provider.force_flush()?;
110+
self.logger_provider.force_flush()?;
111+
Ok(())
112+
}
113+
98114
/// Shuts down the Logfire instance.
99115
///
100116
/// This will flush all data to the opentelemetry exporters and then close all

src/usage/lambda.md

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
# Instrumenting AWS Lambda with Pydantic Logfire
2+
3+
When running on AWS Lambda, extra care must be taken to ensure all telemetry is successfully exported to Logfire.
4+
5+
The AWS Lambda runtime will freeze lambda processes as soon as the response is delivered.
6+
This means that background threads (such as those exporting telemetry to Logfire) will be paused.
7+
To ensure that telemetry is exported successfully, it's necessary to flush Logfire before completing the Lambda invocation.
8+
9+
The following example demonstrates using Logfire with the `lambda_runtime` crate to instrument a Lambda function.
10+
A `tower::Layer` is used to ensure that Logfire is flushed at the end of every invocation.
11+
12+
```rust,ignore
13+
use std::{
14+
future::Future,
15+
task::{Context, Poll},
16+
};
17+
18+
use lambda_runtime::{service_fn, Error, LambdaEvent};
19+
use logfire::{config::ConsoleOptions, Logfire};
20+
use pin_project::pin_project;
21+
use serde::{Deserialize, Serialize};
22+
use tower::{Layer, Service};
23+
24+
/// A `tower::Layer` that will be used to introduce flushing to the lambda function.
25+
pub struct LogfireFlushLayer {
26+
logfire: Logfire,
27+
}
28+
29+
impl<S> Layer<S> for LogfireFlushLayer {
30+
type Service = LogfireFlushService<S>;
31+
32+
fn layer(&self, service: S) -> Self::Service {
33+
LogfireFlushService {
34+
logfire: self.logfire.clone(),
35+
service,
36+
}
37+
}
38+
}
39+
40+
/// A `tower::Service` which wraps an inner service to flush Logfire when the service
41+
/// finishes executing.
42+
pub struct LogfireFlushService<S> {
43+
logfire: Logfire,
44+
service: S,
45+
}
46+
47+
impl<S, Request> Service<Request> for LogfireFlushService<S>
48+
where
49+
S: Service<Request>,
50+
{
51+
type Response = S::Response;
52+
type Error = S::Error;
53+
type Future = LogfireFlushFuture<S::Future>;
54+
55+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
56+
self.service.poll_ready(cx)
57+
}
58+
59+
fn call(&mut self, request: Request) -> Self::Future {
60+
LogfireFlushFuture {
61+
inner: Some(self.service.call(request)),
62+
logfire: self.logfire.clone(),
63+
}
64+
}
65+
}
66+
67+
/// The future produced when calling the `LogfireFlushService`. The future is
68+
/// responsible for driving the inner future and then flushing logfire when
69+
/// the inner future completes.
70+
#[pin_project]
71+
pub struct LogfireFlushFuture<F> {
72+
#[pin]
73+
inner: Option<F>,
74+
logfire: Logfire,
75+
}
76+
77+
impl<F, T, E> Future for LogfireFlushFuture<F>
78+
where
79+
F: Future<Output = Result<T, E>>,
80+
{
81+
type Output = Result<T, E>;
82+
83+
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
84+
let mut this = self.project();
85+
let Some(inner) = this.inner.as_mut().as_pin_mut() else {
86+
panic!("`LogfireFlushFuture` polled after completion");
87+
};
88+
match inner.poll(cx) {
89+
Poll::Ready(result) => {
90+
// Drop the inner future so that any spans it holds are dropped before flushing
91+
this.inner.set(None);
92+
// Flush logfire before returning.
93+
// Note that this is a blocking function. In the context of the current lambda
94+
// invocation that should not be a problem.
95+
let _ = this.logfire.force_flush();
96+
Poll::Ready(result)
97+
}
98+
Poll::Pending => Poll::Pending,
99+
}
100+
}
101+
}
102+
103+
/// Example lambda request payload.
104+
#[derive(Deserialize)]
105+
pub(crate) struct IncomingMessage {
106+
command: String,
107+
}
108+
109+
/// Example lambda response payload.
110+
#[derive(Serialize)]
111+
pub(crate) struct OutgoingMessage {
112+
req_id: String,
113+
msg: String,
114+
}
115+
116+
/// Main body of the lambda function.
117+
#[tracing::instrument(skip_all)]
118+
pub(crate) async fn function_handler(
119+
event: LambdaEvent<IncomingMessage>,
120+
) -> Result<OutgoingMessage, Error> {
121+
122+
// Change this logic to be whatever your lambda function needs to do.
123+
124+
Ok(OutgoingMessage {
125+
req_id: event.context.request_id,
126+
msg: format!("Command {}.", event.payload.command),
127+
})
128+
}
129+
130+
/// Main function for the lambda process.
131+
#[tokio::main]
132+
async fn main() -> Result<(), Error> {
133+
// 1. Configure logfire on startup
134+
let logfire = logfire::configure()
135+
.with_console(Some(ConsoleOptions::default()))
136+
.finish()?;
137+
logfire::info!("Starting up");
138+
139+
// 2. Lambda processes require special termination logic. Logfire's
140+
// shutdown guard can be passed into `lambda_runtime`'s graceful
141+
// shutdown handler to ensure that telemetry is flushed when
142+
// idle lambda processes are shutdown.
143+
let shutdown_guard = logfire.clone().shutdown_guard();
144+
lambda_runtime::spawn_graceful_shutdown_handler(|| async move {
145+
logfire::info!("Shutting down");
146+
let _ = shutdown_guard.shutdown();
147+
})
148+
.await;
149+
150+
// 3. Prepare the main `lambda_runtime::Runtime`
151+
lambda_runtime::Runtime::new(service_fn(function_handler))
152+
// 4. Add a `TracingLayer` before the logfire layer
153+
.layer(lambda_runtime::layers::TracingLayer::new())
154+
// 5. Add the flushing layer after; this way the spans created
155+
// by the `TracingLayer` will be closed before logfire is flushed.
156+
.layer(LogfireFlushLayer { logfire })
157+
// 6. And finally, run the process.
158+
.run()
159+
.await
160+
}
161+
162+
```

src/usage/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,6 @@
108108
//! See [examples] subchapter of this documentation.
109109
110110
pub mod examples;
111+
#[doc = include_str!("./lambda.md")]
112+
pub mod lambda {}
111113
pub mod metrics;

0 commit comments

Comments
 (0)