Skip to content

Commit

Permalink
fix: update consume
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnimhoff committed Nov 6, 2024
1 parent 0e4ec00 commit 88aa290
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 23 deletions.
13 changes: 7 additions & 6 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ KafkaConsumer.prototype.setDefaultConsumeTimeout = function(timeoutMs) {
this._consumeTimeout = timeoutMs;
};

KafkaConsumer.prototype.disableQueueForwarding = function(topicPartition) {
this._client.disableQueueForwarding(topicPartition);
return this;
};

/**
* Set the default sleep delay for the next consume loop after the previous one has timed out.
* @param {number} intervalMs - number of milliseconds to sleep after a message fetch has timed out
Expand All @@ -151,11 +156,6 @@ KafkaConsumer.prototype.setDefaultConsumeLoopTimeoutDelay = function(intervalMs)
this._consumeLoopTimeoutDelay = intervalMs;
};

KafkaConsumer.prototype.disableQueueForwarding = function(topicPartition) {
this._client.disableQueueForwarding(topicPartition);
return this;
};

/**
* Get a stream representation of this KafkaConsumer
*
Expand Down Expand Up @@ -403,7 +403,7 @@ KafkaConsumer.prototype.unsubscribe = function() {
* @param {KafkaConsumer~readCallback} cb - Callback to return when a message
* is fetched.
*/
KafkaConsumer.prototype.consume = function(number, topic, partition, cb) {
KafkaConsumer.prototype.consume = function(number, topic, partition, cb) {
var timeoutMs = this._consumeTimeout !== undefined ? this._consumeTimeout : DEFAULT_CONSUME_TIME_OUT;
var self = this;

Expand All @@ -419,6 +419,7 @@ KafkaConsumer.prototype.unsubscribe = function() {
} else if ((number && typeof number === 'number') || (number && topic)) {
// topic is given as the cb
cb = topic;

if (cb === undefined) {
cb = function() {};
} else if (typeof cb !== 'function') {
Expand Down
79 changes: 62 additions & 17 deletions src/kafka-consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,8 @@ NAN_METHOD(KafkaConsumer::NodeConsumeLoop) {

Nan::Callback *callback = new Nan::Callback(cb);

consumer->m_consume_loop = new Workers::KafkaConsumerConsumeLoop(callback, consumer, timeout_ms, timeout_sleep_delay_ms);
consumer->m_consume_loop = new Workers::KafkaConsumerConsumeLoop(
callback, consumer, timeout_ms, timeout_sleep_delay_ms);

info.GetReturnValue().Set(Nan::Null());
}
Expand All @@ -1196,27 +1197,71 @@ NAN_METHOD(KafkaConsumer::NodeConsume) {
}

if (info[1]->IsNumber()) {
if (!info[2]->IsFunction()) {
return Nan::ThrowError("Need to specify a callback");
}
if (info[2]->IsString() && info[3]->IsNumber()) {
// Consume per partition
if (!info[4]->IsFunction()) {
return Nan::ThrowError("Need to specify a callback");
}

v8::Local<v8::Number> numMessagesNumber = info[1].As<v8::Number>();
Nan::Maybe<uint32_t> numMessagesMaybe = Nan::To<uint32_t>(numMessagesNumber); // NOLINT

uint32_t numMessages;
if (numMessagesMaybe.IsNothing()) {
return Nan::ThrowError("Parameter must be a number over 0");
} else {
numMessages = numMessagesMaybe.FromJust();
}

// Get string pointer for the topic name
Nan::Utf8String topicUTF8(Nan::To<v8::String>(info[2]).ToLocalChecked());
std::string topic_name(*topicUTF8);

// Parse partition
v8::Local<v8::Number> partitionNumber = info[3].As<v8::Number>();
Nan::Maybe<uint32_t> partitionMaybe = Nan::To<uint32_t>(partitionNumber); // NOLINT

uint32_t partition;
if (partitionMaybe.IsNothing()) {
return Nan::ThrowError("Parameter must be a number equal to or over 0");
} else {
partition = partitionMaybe.FromJust();
}

v8::Local<v8::Number> numMessagesNumber = info[1].As<v8::Number>();
Nan::Maybe<uint32_t> numMessagesMaybe = Nan::To<uint32_t>(numMessagesNumber); // NOLINT
// Parse onlyApplyTimeoutToFirstMessage
bool only_apply_timeout_to_first_message;
if (!Nan::To<bool>(info[5]).To(&only_apply_timeout_to_first_message)) {
only_apply_timeout_to_first_message = false;
}

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());

uint32_t numMessages;
if (numMessagesMaybe.IsNothing()) {
return Nan::ThrowError("Parameter must be a number over 0");
v8::Local<v8::Function> cb = info[4].As<v8::Function>();
Nan::Callback *callback = new Nan::Callback(cb);
Nan::AsyncQueueWorker(
new Workers::KafkaConsumerConsumeNumOfPartition(callback, consumer, numMessages, topic_name, partition, timeout_ms, only_apply_timeout_to_first_message)); // NOLINT
} else {
numMessages = numMessagesMaybe.FromJust();
}
if (!info[2]->IsFunction()) {
return Nan::ThrowError("Need to specify a callback");
}

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());
v8::Local<v8::Number> numMessagesNumber = info[1].As<v8::Number>();
Nan::Maybe<uint32_t> numMessagesMaybe = Nan::To<uint32_t>(numMessagesNumber); // NOLINT

v8::Local<v8::Function> cb = info[2].As<v8::Function>();
Nan::Callback *callback = new Nan::Callback(cb);
Nan::AsyncQueueWorker(
new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms)); // NOLINT
uint32_t numMessages;
if (numMessagesMaybe.IsNothing()) {
return Nan::ThrowError("Parameter must be a number over 0");
} else {
numMessages = numMessagesMaybe.FromJust();
}

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());

v8::Local<v8::Function> cb = info[2].As<v8::Function>();
Nan::Callback *callback = new Nan::Callback(cb);
Nan::AsyncQueueWorker(
new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms)); // NOLINT
}
} else {
if (!info[1]->IsFunction()) {
return Nan::ThrowError("Need to specify a callback");
Expand Down Expand Up @@ -1269,7 +1314,7 @@ NAN_METHOD(KafkaConsumer::NodeDisconnect) {
// cleanup the async worker
consumeLoop->WorkComplete();
consumeLoop->Destroy();

consumer->m_consume_loop = nullptr;
}

Expand Down

0 comments on commit 88aa290

Please sign in to comment.