Skip to content

Commit

Permalink
0.6.0
Browse files Browse the repository at this point in the history
  • Loading branch information
inaiat committed May 17, 2024
1 parent 095fc82 commit 16fa9f8
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 11 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kafka-crab-js",
"version": "1.0.0",
"version": "0.6.0",
"main": "index.js",
"types": "index.d.ts",
"napi": {
Expand Down
8 changes: 6 additions & 2 deletions src/kafka/consumer/kafka_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ impl KafkaConsumer {
Some(CommitMode::Async) => Some(RdKfafkaCommitMode::Async),
};

let KafkaConsumerConfiguration { topic, fecth_metadata_timeout, .. } = consumer_configuration.clone();
let KafkaConsumerConfiguration {
topic,
fecth_metadata_timeout,
..
} = consumer_configuration.clone();

Ok(KafkaConsumer {
client_config: client_config.clone(),
Expand Down Expand Up @@ -225,6 +229,6 @@ fn convert_to_consumer_configuration(config: &KafkaConsumerConfiguration) -> Con
create_topic: config.create_topic,
enable_auto_commit: config.enable_auto_commit,
configuration: config.configuration.clone(),
fecth_metadata_timeout: Some(DEFAULT_FECTH_METADATA_TIMEOUT)
fecth_metadata_timeout: Some(DEFAULT_FECTH_METADATA_TIMEOUT),
}
}
32 changes: 25 additions & 7 deletions src/kafka/consumer/kafka_stream_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ impl KafkaStreamConsumer {
client_config: client_config.clone(),
stream_consumer,
fecth_metadata_timeout: Duration::from_millis(
consumer_configuration.fecth_metadata_timeout.unwrap_or(DEFAULT_FECTH_METADATA_TIMEOUT) as u64,
consumer_configuration
.fecth_metadata_timeout
.unwrap_or(DEFAULT_FECTH_METADATA_TIMEOUT) as u64,
),
})
}
Expand Down Expand Up @@ -107,9 +109,14 @@ impl KafkaStreamConsumer {
"Subscribing to topic: {}. Setting all partitions to offset: {:?}",
&item.topic, &all_offsets
);
set_offset_of_all_partitions(&all_offsets, &self.stream_consumer, &item.topic, self.fecth_metadata_timeout)
.map_err(|e| e.convert_to_napi())
.unwrap();
set_offset_of_all_partitions(
&all_offsets,
&self.stream_consumer,
&item.topic,
self.fecth_metadata_timeout,
)
.map_err(|e| e.convert_to_napi())
.unwrap();
} else if let Some(partition_offset) = item.partition_offset.clone() {
info!(
"Subscribing to topic: {} with partition offsets: {:?}",
Expand All @@ -120,7 +127,7 @@ impl KafkaStreamConsumer {
Some(partition_offset),
None,
&self.stream_consumer,
self.fecth_metadata_timeout
self.fecth_metadata_timeout,
)
.map_err(|e| e.convert_to_napi())
.unwrap();
Expand All @@ -137,15 +144,26 @@ impl KafkaStreamConsumer {
}

#[napi]
pub fn seek(&self, topic: String, partition: i32, offset_model: OffsetModel, timeout: Option<i64>) -> Result<()> {
pub fn seek(
&self,
topic: String,
partition: i32,
offset_model: OffsetModel,
timeout: Option<i64>,
) -> Result<()> {
let offset = convert_to_rdkafka_offset(&offset_model);
info!(
"Seeking to topic: {}, partition: {}, offset: {:?}",
topic, partition, offset
);
self
.stream_consumer
.seek(&topic, partition, offset, Duration::from_millis(timeout.unwrap_or(DEFAULT_SEEK_TIMEOUT) as u64))
.seek(
&topic,
partition,
offset,
Duration::from_millis(timeout.unwrap_or(DEFAULT_SEEK_TIMEOUT) as u64),
)
.map_err(|e| {
error!("Error while seeking: {:?}", e);
napi::Error::new(
Expand Down
4 changes: 3 additions & 1 deletion src/kafka/kafka_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ impl<'a> KafkaAdmin<'a> {
KafkaAdmin {
client_config,
admin_client,
fetch_metadata_timeout: Duration::from_millis( fetch_metadata_timeout.unwrap_or(DEFAULT_FECTH_METADATA_TIMEOUT) as u64),
fetch_metadata_timeout: Duration::from_millis(
fetch_metadata_timeout.unwrap_or(DEFAULT_FECTH_METADATA_TIMEOUT) as u64,
),
}
}

Expand Down

0 comments on commit 16fa9f8

Please sign in to comment.