diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index ab38396f..d09092d5 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -143,6 +143,11 @@ KafkaConsumer.prototype.setDefaultConsumeTimeout = function(timeoutMs) { this._consumeTimeout = timeoutMs; }; +KafkaConsumer.prototype.disableQueueForwarding = function(topicPartition) { + this._client.disableQueueForwarding(topicPartition); + return this; +}; + /** * Set the default sleep delay for the next consume loop after the previous one has timed out. * @param {number} intervalMs - number of milliseconds to sleep after a message fetch has timed out @@ -151,11 +156,6 @@ KafkaConsumer.prototype.setDefaultConsumeLoopTimeoutDelay = function(intervalMs) this._consumeLoopTimeoutDelay = intervalMs; }; -KafkaConsumer.prototype.disableQueueForwarding = function(topicPartition) { - this._client.disableQueueForwarding(topicPartition); - return this; -}; - /** * Get a stream representation of this KafkaConsumer * @@ -403,7 +403,7 @@ KafkaConsumer.prototype.unsubscribe = function() { * @param {KafkaConsumer~readCallback} cb - Callback to return when a message * is fetched. */ - KafkaConsumer.prototype.consume = function(number, topic, partition, cb) { +KafkaConsumer.prototype.consume = function(number, topic, partition, cb) { var timeoutMs = this._consumeTimeout !== undefined ? this._consumeTimeout : DEFAULT_CONSUME_TIME_OUT; var self = this; @@ -419,6 +419,7 @@ KafkaConsumer.prototype.unsubscribe = function() { } else if ((number && typeof number === 'number') || (number && topic)) { // topic is given as the cb cb = topic; + if (cb === undefined) { cb = function() {}; } else if (typeof cb !== 'function') { diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index ea9110fc..c130d8c5 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -1172,7 +1172,8 @@ NAN_METHOD(KafkaConsumer::NodeConsumeLoop) { Nan::Callback *callback = new Nan::Callback(cb); - consumer->m_consume_loop = new Workers::KafkaConsumerConsumeLoop(callback, consumer, timeout_ms, timeout_sleep_delay_ms); + consumer->m_consume_loop = new Workers::KafkaConsumerConsumeLoop( + callback, consumer, timeout_ms, timeout_sleep_delay_ms); info.GetReturnValue().Set(Nan::Null()); } @@ -1196,27 +1197,71 @@ NAN_METHOD(KafkaConsumer::NodeConsume) { } if (info[1]->IsNumber()) { - if (!info[2]->IsFunction()) { - return Nan::ThrowError("Need to specify a callback"); - } + if (info[2]->IsString() && info[3]->IsNumber()) { + // Consume per partition + if (!info[4]->IsFunction()) { + return Nan::ThrowError("Need to specify a callback"); + } + + v8::Local numMessagesNumber = info[1].As(); + Nan::Maybe numMessagesMaybe = Nan::To(numMessagesNumber); // NOLINT + + uint32_t numMessages; + if (numMessagesMaybe.IsNothing()) { + return Nan::ThrowError("Parameter must be a number over 0"); + } else { + numMessages = numMessagesMaybe.FromJust(); + } + + // Get string pointer for the topic name + Nan::Utf8String topicUTF8(Nan::To(info[2]).ToLocalChecked()); + std::string topic_name(*topicUTF8); + + // Parse partition + v8::Local partitionNumber = info[3].As(); + Nan::Maybe partitionMaybe = Nan::To(partitionNumber); // NOLINT + + uint32_t partition; + if (partitionMaybe.IsNothing()) { + return Nan::ThrowError("Parameter must be a number equal to or over 0"); + } else { + partition = partitionMaybe.FromJust(); + } - v8::Local numMessagesNumber = info[1].As(); - Nan::Maybe numMessagesMaybe = Nan::To(numMessagesNumber); // NOLINT + // Parse onlyApplyTimeoutToFirstMessage + bool only_apply_timeout_to_first_message; + if (!Nan::To(info[5]).To(&only_apply_timeout_to_first_message)) { + only_apply_timeout_to_first_message = false; + } + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); - uint32_t numMessages; - if (numMessagesMaybe.IsNothing()) { - return Nan::ThrowError("Parameter must be a number over 0"); + v8::Local cb = info[4].As(); + Nan::Callback *callback = new Nan::Callback(cb); + Nan::AsyncQueueWorker( + new Workers::KafkaConsumerConsumeNumOfPartition(callback, consumer, numMessages, topic_name, partition, timeout_ms, only_apply_timeout_to_first_message)); // NOLINT } else { - numMessages = numMessagesMaybe.FromJust(); - } + if (!info[2]->IsFunction()) { + return Nan::ThrowError("Need to specify a callback"); + } - KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + v8::Local numMessagesNumber = info[1].As(); + Nan::Maybe numMessagesMaybe = Nan::To(numMessagesNumber); // NOLINT - v8::Local cb = info[2].As(); - Nan::Callback *callback = new Nan::Callback(cb); - Nan::AsyncQueueWorker( - new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms)); // NOLINT + uint32_t numMessages; + if (numMessagesMaybe.IsNothing()) { + return Nan::ThrowError("Parameter must be a number over 0"); + } else { + numMessages = numMessagesMaybe.FromJust(); + } + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + + v8::Local cb = info[2].As(); + Nan::Callback *callback = new Nan::Callback(cb); + Nan::AsyncQueueWorker( + new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms)); // NOLINT + } } else { if (!info[1]->IsFunction()) { return Nan::ThrowError("Need to specify a callback"); @@ -1269,7 +1314,7 @@ NAN_METHOD(KafkaConsumer::NodeDisconnect) { // cleanup the async worker consumeLoop->WorkComplete(); consumeLoop->Destroy(); - + consumer->m_consume_loop = nullptr; }