Skip to content

Commit 365fb00

Browse files
committed
Lowercase options before they're passed through to object store
This also introduces the S3_OUTPUT_URL environment variable
1 parent bdba2c6 commit 365fb00

File tree

3 files changed

+40
-3
lines changed

3 files changed

+40
-3
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hotdog"
3-
version = "1.0.1"
3+
version = "1.0.2"
44
authors = ["R. Tyler Croy <rtyler@buoyantdata.com>"]
55
edition = "2024"
66

README.adoc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,29 @@ configured Kafka brokers.
226226
"default topic" for the <<action-forward, Forward action>>.
227227

228228

229+
[[yml-parquet]]
230+
==== Parquet
231+
232+
The link:https://parquet.apache.org[Apache Parquet] sink allows for directly
233+
writing to an
234+
link:https://docs.rs/object_store/latest/object_store/index.html[object_store]
235+
supported `url`
236+
237+
[source,yaml]
238+
----
239+
global:
240+
parquet:
241+
url: 's3://hotdog/streams/'
242+
# Bytes to buffer
243+
buffer: 1024000
244+
flush_ms: 60000
245+
----
246+
247+
[TIP]
248+
====
249+
The `url` can be omitted from the configuration and specified in the environment via `S3_OUTPUT_URL`
250+
====
251+
229252
[[yml-metrics]]
230253
==== Metrics
231254

src/sink/parquet.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use parquet::arrow::async_writer::{AsyncArrowWriter, ParquetObjectWriter};
1414
use smol::stream::StreamExt;
1515
use tracing::log::*;
1616
use tracing::{Level, span};
17+
use url::Url;
1718
use uuid::Uuid;
1819

1920
use std::collections::HashMap;
@@ -44,7 +45,11 @@ impl Sink for Parquet {
4445

4546
fn new(config: Self::Config, _stats: InputQueueScope) -> Self {
4647
let (tx, rx) = bounded(1024);
47-
let opts: HashMap<String, String> = HashMap::from_iter(std::env::vars());
48+
// [object_store] largely expects environment variables to be all lowercased for
49+
// consideration as options
50+
let opts: HashMap<String, String> = HashMap::from_iter(
51+
std::env::vars().map(|(k, v)| (k.to_ascii_lowercase(), v))
52+
);
4853
let (store, _path) = object_store::parse_url_opts(&config.url, opts)
4954
.expect("Failed to parse the Parquet sink URL");
5055
let store = Arc::new(store);
@@ -181,8 +186,9 @@ fn flush_to_parquet(store: ObjectStoreRef, destination: &str, buffer: Vec<String
181186
/// Configuration for [Parquet] sink
182187
#[derive(Clone, Debug, Deserialize, PartialEq)]
183188
pub struct Config {
189+
#[serde(default = "parquet_url_default")]
184190
/// Expected to be an S3 compatible URL
185-
pub url: url::Url,
191+
pub url: Url,
186192
/// Minimum number of bytes to buffer into each parquet file
187193
#[serde(default = "parquet_buffer_default")]
188194
pub buffer: usize,
@@ -191,6 +197,14 @@ pub struct Config {
191197
pub flush_ms: usize,
192198
}
193199

200+
/// Retrieves a URL from the environment for parquet if no [Url] has been specified
201+
fn parquet_url_default() -> Url {
202+
Url::parse(&std::env::var("S3_OUTPUT_URL").expect(
203+
"There is no url: defined for the parquet sink and no S3_OUTPUT_URL in the environment!",
204+
))
205+
.expect("The S3_OUTPUT_URL could not be parsed as a valid URL")
206+
}
207+
194208
/// Default number of log lines per parquet file
195209
fn parquet_buffer_default() -> usize {
196210
1_024 * 1_024 * 100

0 commit comments

Comments
 (0)