Skip to content

Commit

Permalink
Moved CommitType enum to proto definition. (#3682)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Jul 25, 2023
1 parent ccb4b21 commit c737ea9
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 139 deletions.
1 change: 1 addition & 0 deletions quickwit/quickwit-codegen/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl Codegen {
"DocBatch.doc_buffer",
"#[schema(value_type = String, format = Binary)]",
)
.enum_attribute(".", "#[serde(rename_all=\"snake_case\")]")
.service_generator(service_generator)
.out_dir(out_dir);

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/source/ingest_api_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ mod tests {
}
IngestRequest {
doc_batches,
commit: commit_type as u32,
commit: commit_type.into(),
}
}

Expand Down
154 changes: 95 additions & 59 deletions quickwit/quickwit-ingest/src/codegen/ingest_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pub struct DropQueueRequest {
pub struct IngestRequest {
#[prost(message, repeated, tag = "1")]
pub doc_batches: ::prost::alloc::vec::Vec<DocBatch>,
#[prost(uint32, tag = "2")]
pub commit: u32,
#[prost(enumeration = "CommitType", tag = "2")]
pub commit: i32,
}
#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -74,19 +74,19 @@ pub struct DocBatch {
#[prost(uint32, repeated, tag = "3")]
pub doc_lengths: ::prost::alloc::vec::Vec<u32>,
}
/// / Suggest to truncate the queue.
/// /
/// / This function allows the queue to remove all records up to and
/// / including `up_to_offset_included`.
/// /
/// / The role of this truncation is to release memory and disk space.
/// /
/// / There are no guarantees that the record will effectively be removed.
/// / Nothing might happen, or the truncation might be partial.
/// /
/// / In other words, truncating from a position, and fetching records starting
/// / earlier than this position can yield undefined result:
/// / the truncated records may or may not be returned.
/// Suggest to truncate the queue.
///
/// This function allows the queue to remove all records up to and
/// including `up_to_offset_included`.
///
/// The role of this truncation is to release memory and disk space.
///
/// There are no guarantees that the record will effectively be removed.
/// Nothing might happen, or the truncation might be partial.
///
/// In other words, truncating from a position, and fetching records starting
/// earlier than this position can yield undefined result:
/// the truncated records may or may not be returned.
#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -114,6 +114,42 @@ pub struct ListQueuesResponse {
#[prost(string, repeated, tag = "1")]
pub queues: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
/// Specifies if the ingest request should block waiting for the records to be committed.
#[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "snake_case")]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum CommitType {
/// The request doesn't wait for commit
Auto = 0,
/// The request waits for the next scheduled commit to finish.
WaitFor = 1,
/// The request forces an immediate commit after the last document in the batch and waits for
/// it to finish.
Force = 2,
}
impl CommitType {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
CommitType::Auto => "Auto",
CommitType::WaitFor => "WaitFor",
CommitType::Force => "Force",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"Auto" => Some(Self::Auto),
"WaitFor" => Some(Self::WaitFor),
"Force" => Some(Self::Force),
_ => None,
}
}
}
/// BEGIN quickwit-codegen
use tower::{Layer, Service, ServiceExt};
#[cfg_attr(any(test, feature = "testsuite"), mockall::automock)]
Expand Down Expand Up @@ -751,13 +787,13 @@ pub mod ingest_service_grpc_client {
self.inner = self.inner.max_encoding_message_size(limit);
self
}
/// / Ingests document in a given queue.
/// /
/// / Upon any kind of error, the client should
/// / - retry to get at least once delivery.
/// / - not retry to get at most once delivery.
/// /
/// / Exactly once delivery is not supported yet.
/// Ingests document in a given queue.
///
/// Upon any kind of error, the client should
/// - retry to get at least once delivery.
/// - not retry to get at most once delivery.
///
/// Exactly once delivery is not supported yet.
pub async fn ingest(
&mut self,
request: impl tonic::IntoRequest<super::IngestRequest>,
Expand All @@ -780,16 +816,16 @@ pub mod ingest_service_grpc_client {
.insert(GrpcMethod::new("ingest_service.IngestService", "Ingest"));
self.inner.unary(req, path, codec).await
}
/// / Fetches record from a given queue.
/// /
/// / Records are returned in order.
/// /
/// / The returned `FetchResponse` object is meant to be read with the
/// / `crate::iter_records` function.
/// /
/// / Fetching does not necessarily return all of the available records.
/// / If returning all records would exceed `FETCH_PAYLOAD_LIMIT` (2MB),
/// / the response will be partial.
/// Fetches record from a given queue.
///
/// Records are returned in order.
///
/// The returned `FetchResponse` object is meant to be read with the
/// `crate::iter_records` function.
///
/// Fetching does not necessarily return all of the available records.
/// If returning all records would exceed `FETCH_PAYLOAD_LIMIT` (2MB),
/// the response will be partial.
pub async fn fetch(
&mut self,
request: impl tonic::IntoRequest<super::FetchRequest>,
Expand All @@ -812,11 +848,11 @@ pub mod ingest_service_grpc_client {
.insert(GrpcMethod::new("ingest_service.IngestService", "Fetch"));
self.inner.unary(req, path, codec).await
}
/// / Returns a batch containing the last records.
/// /
/// / It returns the last documents, from the newest
/// / to the oldest, and stops as soon as `FETCH_PAYLOAD_LIMIT` (2MB)
/// / is exceeded.
/// Returns a batch containing the last records.
///
/// It returns the last documents, from the newest
/// to the oldest, and stops as soon as `FETCH_PAYLOAD_LIMIT` (2MB)
/// is exceeded.
pub async fn tail(
&mut self,
request: impl tonic::IntoRequest<super::TailRequest>,
Expand Down Expand Up @@ -848,36 +884,36 @@ pub mod ingest_service_grpc_server {
/// Generated trait containing gRPC methods that should be implemented for use with IngestServiceGrpcServer.
#[async_trait]
pub trait IngestServiceGrpc: Send + Sync + 'static {
/// / Ingests document in a given queue.
/// /
/// / Upon any kind of error, the client should
/// / - retry to get at least once delivery.
/// / - not retry to get at most once delivery.
/// /
/// / Exactly once delivery is not supported yet.
/// Ingests document in a given queue.
///
/// Upon any kind of error, the client should
/// - retry to get at least once delivery.
/// - not retry to get at most once delivery.
///
/// Exactly once delivery is not supported yet.
async fn ingest(
&self,
request: tonic::Request<super::IngestRequest>,
) -> std::result::Result<tonic::Response<super::IngestResponse>, tonic::Status>;
/// / Fetches record from a given queue.
/// /
/// / Records are returned in order.
/// /
/// / The returned `FetchResponse` object is meant to be read with the
/// / `crate::iter_records` function.
/// /
/// / Fetching does not necessarily return all of the available records.
/// / If returning all records would exceed `FETCH_PAYLOAD_LIMIT` (2MB),
/// / the response will be partial.
/// Fetches record from a given queue.
///
/// Records are returned in order.
///
/// The returned `FetchResponse` object is meant to be read with the
/// `crate::iter_records` function.
///
/// Fetching does not necessarily return all of the available records.
/// If returning all records would exceed `FETCH_PAYLOAD_LIMIT` (2MB),
/// the response will be partial.
async fn fetch(
&self,
request: tonic::Request<super::FetchRequest>,
) -> std::result::Result<tonic::Response<super::FetchResponse>, tonic::Status>;
/// / Returns a batch containing the last records.
/// /
/// / It returns the last documents, from the newest
/// / to the oldest, and stops as soon as `FETCH_PAYLOAD_LIMIT` (2MB)
/// / is exceeded.
/// Returns a batch containing the last records.
///
/// It returns the last documents, from the newest
/// to the oldest, and stops as soon as `FETCH_PAYLOAD_LIMIT` (2MB)
/// is exceeded.
async fn tail(
&self,
request: tonic::Request<super::TailRequest>,
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl IngestApiService {
.queues
.append_batch(&doc_batch.index_id, records_it, ctx)
.await?;
let commit = CommitType::from(request.commit);
let commit = request.commit();
if let Some(max_position) = max_position {
if commit != CommitType::Auto {
if commit == CommitType::Force {
Expand Down Expand Up @@ -429,7 +429,7 @@ mod tests {
doc_lengths: vec![1, 3, 2],
},
],
commit: CommitType::Auto as u32,
commit: CommitType::Auto.into(),
};
assert_eq!(ingest_request.cost(), 9);
}
Expand Down Expand Up @@ -458,7 +458,7 @@ mod tests {

let ingest_request = IngestRequest {
doc_batches: vec![batch.build()],
commit: CommitType::Force as u32,
commit: CommitType::Force.into(),
};
let ingest_response = ingest_api_service
.send_message(ingest_request)
Expand Down Expand Up @@ -517,7 +517,7 @@ mod tests {

let ingest_request = IngestRequest {
doc_batches: vec![batch.build()],
commit: CommitType::WaitFor as u32,
commit: CommitType::WaitFor.into(),
};
let ingest_response = ingest_api_service
.send_message(ingest_request)
Expand Down
84 changes: 47 additions & 37 deletions quickwit/quickwit-ingest/src/ingest_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,32 @@ syntax = "proto3";
package ingest_service;

service IngestService {
/// Ingests document in a given queue.
///
/// Upon any kind of error, the client should
/// - retry to get at least once delivery.
/// - not retry to get at most once delivery.
///
/// Exactly once delivery is not supported yet.
// Ingests document in a given queue.
//
// Upon any kind of error, the client should
// - retry to get at least once delivery.
// - not retry to get at most once delivery.
//
// Exactly once delivery is not supported yet.
rpc Ingest(IngestRequest) returns (IngestResponse);

/// Fetches record from a given queue.
///
/// Records are returned in order.
///
/// The returned `FetchResponse` object is meant to be read with the
/// `crate::iter_records` function.
///
/// Fetching does not necessarily return all of the available records.
/// If returning all records would exceed `FETCH_PAYLOAD_LIMIT` (2MB),
/// the response will be partial.
// Fetches record from a given queue.
//
// Records are returned in order.
//
// The returned `FetchResponse` object is meant to be read with the
// `crate::iter_records` function.
//
// Fetching does not necessarily return all of the available records.
// If returning all records would exceed `FETCH_PAYLOAD_LIMIT` (2MB),
// the response will be partial.
rpc Fetch(FetchRequest) returns (FetchResponse);

/// Returns a batch containing the last records.
///
/// It returns the last documents, from the newest
/// to the oldest, and stops as soon as `FETCH_PAYLOAD_LIMIT` (2MB)
/// is exceeded.
// Returns a batch containing the last records.
//
// It returns the last documents, from the newest
// to the oldest, and stops as soon as `FETCH_PAYLOAD_LIMIT` (2MB)
// is exceeded.
rpc Tail(TailRequest) returns (FetchResponse);
}

Expand All @@ -68,9 +68,20 @@ message DropQueueRequest {
string queue_id = 1;
}

// Specifies if the ingest request should block waiting for the records to be committed.
enum CommitType {
// The request doesn't wait for commit
Auto = 0;
// The request waits for the next scheduled commit to finish.
WaitFor = 1;
// The request forces an immediate commit after the last document in the batch and waits for
// it to finish.
Force = 2;
}

message IngestRequest {
repeated DocBatch doc_batches = 1;
uint32 commit = 2;
CommitType commit = 2;
}

message IngestResponse {
Expand All @@ -94,19 +105,19 @@ message DocBatch {
repeated uint32 doc_lengths = 3;
}

/// Suggest to truncate the queue.
///
/// This function allows the queue to remove all records up to and
/// including `up_to_offset_included`.
///
/// The role of this truncation is to release memory and disk space.
///
/// There are no guarantees that the record will effectively be removed.
/// Nothing might happen, or the truncation might be partial.
///
/// In other words, truncating from a position, and fetching records starting
/// earlier than this position can yield undefined result:
/// the truncated records may or may not be returned.
// Suggest to truncate the queue.
//
// This function allows the queue to remove all records up to and
// including `up_to_offset_included`.
//
// The role of this truncation is to release memory and disk space.
//
// There are no guarantees that the record will effectively be removed.
// Nothing might happen, or the truncation might be partial.
//
// In other words, truncating from a position, and fetching records starting
// earlier than this position can yield undefined result:
// the truncated records may or may not be returned.
message SuggestTruncateRequest {
string index_id = 1;
uint64 up_to_position_included = 2;
Expand All @@ -116,7 +127,6 @@ message TailRequest {
string index_id = 1;
}


message ListQueuesRequest {
}

Expand Down
Loading

0 comments on commit c737ea9

Please sign in to comment.