Skip to content

Commit

Permalink
Implement and plug ingest source v2 (#3812)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Sep 18, 2023
1 parent 2de3fb9 commit 08ab948
Show file tree
Hide file tree
Showing 65 changed files with 2,606 additions and 857 deletions.
370 changes: 180 additions & 190 deletions quickwit/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion quickwit/quickwit-actors/src/actor_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl<A: Actor> ActorHandle<A> {
return self.wait_for_observable_state_callback(oneshot_rx).await;
} else {
error!(
actor_id = self.actor_context.actor_instance_id(),
actor_id=%self.actor_context.actor_instance_id(),
"Failed to send observe message"
);
}
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ quickwit-actors = { workspace = true }
quickwit-cluster = { workspace = true }
quickwit-common = { workspace = true }
quickwit-config = { workspace = true }
quickwit-index-management = { workspace = true }
quickwit-directories = { workspace = true }
quickwit-doc-mapper = { workspace = true }
quickwit-index-management = { workspace = true }
quickwit-indexing = { workspace = true }
quickwit-ingest = { workspace = true }
quickwit-metastore = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-rest-client = { workspace = true }
Expand Down
14 changes: 7 additions & 7 deletions quickwit/quickwit-cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,31 +75,31 @@ impl RunCliCommand {

pub async fn execute(&self) -> anyhow::Result<()> {
debug!(args = ?self, "run-service");
let mut config = load_node_config(&self.config_uri).await?;
let mut node_config = load_node_config(&self.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
get_resolvers(&node_config.storage_configs, &node_config.metastore_configs);
crate::busy_detector::set_enabled(true);

if let Some(services) = &self.services {
tracing::info!(services = %services.iter().join(", "), "Setting services from override.");
config.enabled_services = services.clone();
node_config.enabled_services = services.clone();
}
let telemetry_handle_opt =
quickwit_telemetry::start_telemetry_loop(quickwit_telemetry_info(&config));
quickwit_telemetry::start_telemetry_loop(quickwit_telemetry_info(&node_config));
quickwit_telemetry::send_telemetry_event(TelemetryEvent::RunCommand).await;
// TODO move in serve quickwit?
let runtimes_config = RuntimesConfig::default();
start_actor_runtimes(runtimes_config, &config.enabled_services)?;
start_actor_runtimes(runtimes_config, &node_config.enabled_services)?;
let shutdown_signal = Box::pin(async move {
signal::ctrl_c()
.await
.expect("Registering a signal handler for SIGINT should not fail.");
});
let serve_result = serve_quickwit(
config,
node_config,
runtimes_config,
storage_resolver,
metastore_resolver,
storage_resolver,
shutdown_signal,
)
.await;
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use quickwit_indexing::models::{
DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline,
};
use quickwit_indexing::IndexingPipeline;
use quickwit_ingest::IngesterPool;
use quickwit_metastore::Metastore;
use quickwit_proto::search::SearchResponse;
use quickwit_search::{single_node_search, SearchResponseRest};
Expand Down Expand Up @@ -457,6 +458,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
cluster,
metastore,
None,
IngesterPool::default(),
storage_resolver,
)
.await?;
Expand Down Expand Up @@ -586,6 +588,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
cluster,
metastore,
None,
IngesterPool::default(),
storage_resolver,
)
.await?;
Expand Down
27 changes: 15 additions & 12 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub const INGEST_API_SOURCE_ID: &str = "_ingest-api-source";
/// Reserved source ID used for native Quickwit ingest.
pub const INGEST_SOURCE_ID: &str = "_ingest-source";

pub const RESERVED_SOURCE_IDS: &[&str] =
&[CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, INGEST_SOURCE_ID];

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(into = "VersionedSourceConfig")]
#[serde(try_from = "VersionedSourceConfig")]
Expand Down Expand Up @@ -122,8 +125,8 @@ impl SourceConfig {
pub fn ingest_default() -> Self {
Self {
source_id: INGEST_SOURCE_ID.to_string(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"),
desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
enabled: false,
source_params: SourceParams::Ingest,
transform_config: None,
Expand All @@ -135,8 +138,8 @@ impl SourceConfig {
pub fn ingest_api_default() -> Self {
Self {
source_id: INGEST_API_SOURCE_ID.to_string(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"),
desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
enabled: true,
source_params: SourceParams::IngestApi,
transform_config: None,
Expand All @@ -148,8 +151,8 @@ impl SourceConfig {
pub fn cli_ingest_source() -> Self {
Self {
source_id: CLI_INGEST_SOURCE_ID.to_string(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"),
desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
enabled: true,
source_params: SourceParams::IngestCli,
transform_config: None,
Expand All @@ -161,8 +164,8 @@ impl SourceConfig {
pub fn for_test(source_id: &str, source_params: SourceParams) -> Self {
Self {
source_id: source_id.to_string(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"),
desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
enabled: true,
source_params,
transform_config: None,
Expand Down Expand Up @@ -658,8 +661,8 @@ mod tests {
load_source_config_from_user_config(config_format, file_content.as_bytes()).unwrap();
let expected_source_config = SourceConfig {
source_id: "hdfs-logs-kinesis-source".to_string(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"),
desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
enabled: true,
source_params: SourceParams::Kinesis(KinesisSourceParams {
stream_name: "emr-cluster-logs".to_string(),
Expand Down Expand Up @@ -1066,8 +1069,8 @@ mod tests {
let source_config: SourceConfig = ConfigFormat::Json.parse(&file_content).unwrap();
let expected_source_config = SourceConfig {
source_id: INGEST_API_SOURCE_ID.to_string(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).unwrap(),
desired_num_pipelines: NonZeroUsize::new(1).unwrap(),
max_num_pipelines_per_indexer: NonZeroUsize::new(1).expect("1 should be non-zero"),
desired_num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
enabled: true,
source_params: SourceParams::IngestApi,
transform_config: Some(TransformConfig {
Expand Down
9 changes: 3 additions & 6 deletions quickwit/quickwit-config/src/source_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@ use std::num::NonZeroUsize;
use anyhow::bail;
use serde::{Deserialize, Serialize};

use super::TransformConfig;
use crate::{
validate_identifier, ConfigFormat, SourceConfig, SourceInputFormat, SourceParams,
CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID,
};
use super::{TransformConfig, RESERVED_SOURCE_IDS};
use crate::{validate_identifier, ConfigFormat, SourceConfig, SourceInputFormat, SourceParams};

type SourceConfigForSerialization = SourceConfigV0_6;

Expand Down Expand Up @@ -72,7 +69,7 @@ impl SourceConfigForSerialization {
///
/// TODO refactor #1065
fn validate_and_build(self) -> anyhow::Result<SourceConfig> {
if self.source_id != CLI_INGEST_SOURCE_ID && self.source_id != INGEST_API_SOURCE_ID {
if !RESERVED_SOURCE_IDS.contains(&self.source_id.as_str()) {
validate_identifier("Source ID", &self.source_id)?;
}
let desired_num_pipelines = NonZeroUsize::new(self.desired_num_pipelines)
Expand Down
Loading

0 comments on commit 08ab948

Please sign in to comment.