Skip to content

Add support for oauthbearer_token_refresh_cb events #1122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ The library currently supports the following callbacks:
* `event_cb`
* `rebalance_cb` (see [Rebalancing](#rebalancing))
* `offset_commit_cb` (see [Commits](#commits))
* `oauthbearer_token_refresh_cb`

### Librdkafka Methods

Expand Down
2 changes: 2 additions & 0 deletions ci/librdkafka-defs-generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ function processItem(configItem) {
return { ...configItem, type: 'boolean | Function' };
case 'rebalance_cb':
return { ...configItem, type: 'boolean | Function' };
case 'oauthbearer_token_refresh_cb':
return { ...configItem, type: 'boolean | Function' };
case 'offset_commit_cb':
return { ...configItem, type: 'boolean | Function' };
}
Expand Down
2 changes: 1 addition & 1 deletion config.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ export interface GlobalConfig {
/**
* SASL/OAUTHBEARER token refresh callback (set with rd_kafka_conf_set_oauthbearer_token_refresh_cb(), triggered by rd_kafka_poll(), et.al. This callback will be triggered when it is time to refresh the client's OAUTHBEARER token. Also see `rd_kafka_conf_enable_sasl_queue()`.
*/
"oauthbearer_token_refresh_cb"?: any;
"oauthbearer_token_refresh_cb"?: boolean | Function;

/**
* Set to "default" or "oidc" to control which login method to be used. If set to "oidc", the following properties must also be be specified: `sasl.oauthbearer.client.id`, `sasl.oauthbearer.client.secret`, and `sasl.oauthbearer.token.endpoint.url`.
Expand Down
28 changes: 28 additions & 0 deletions e2e/consumer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -382,5 +382,33 @@ describe('Consumer', function() {
eventListener(consumer);
});
});

describe('oauthbearer', function () {
gcfg['security.protocol'] = 'SASL_SSL';
gcfg['sasl.mechanisms'] = 'OAUTHBEARER';

it('should emit token refresh event', function (cb) {
gcfg.oauthbearer_token_refresh_cb = true;

consumer = new KafkaConsumer(gcfg, {});

consumer.on('oauthbearer.tokenrefresh', function (oauthbearer_config) {
consumer.disconnect(cb);
});

consumer.connect({ timeout: 2000 });
});

it('should invoke oauthbearer_token_refresh_cb', function (cb) {
gcfg.oauthbearer_token_refresh_cb = (oauthbearer_config) => {
cb();
};

consumer = new KafkaConsumer(gcfg, {});
consumer.connect({ timeout: 2000 });
});
});


});
});
20 changes: 10 additions & 10 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export interface Metadata {
brokers: BrokerMetadata[];
}

export interface WatermarkOffsets{
export interface WatermarkOffsets {
lowOffset: number;
highOffset: number;
}
Expand All @@ -72,7 +72,7 @@ export interface TopicPartition {
partition: number;
}

export interface TopicPartitionOffset extends TopicPartition{
export interface TopicPartitionOffset extends TopicPartition {
offset: number;
}

Expand Down Expand Up @@ -142,7 +142,7 @@ export interface ConsumerStream extends Readable {
close(cb?: () => void): void;
}

export type KafkaClientEvents = 'disconnected' | 'ready' | 'connection.failure' | 'event.error' | 'event.stats' | 'event.log' | 'event.event' | 'event.throttle';
export type KafkaClientEvents = 'disconnected' | 'ready' | 'connection.failure' | 'event.error' | 'event.stats' | 'event.log' | 'event.event' | 'event.throttle' | 'oauthbearer.tokenrefresh';
export type KafkaConsumerEvents = 'data' | 'partition.eof' | 'rebalance' | 'rebalance.error' | 'subscribed' | 'unsubscribed' | 'unsubscribe' | 'offset.commit' | KafkaClientEvents;
export type KafkaProducerEvents = 'delivery-report' | KafkaClientEvents;

Expand Down Expand Up @@ -182,7 +182,7 @@ export abstract class Client<Events extends string> extends EventEmitter {

connect(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError, data: Metadata) => any): this;

setOauthBearerToken(tokenStr: string): this;
setOauthBearerToken(tokenStr: string, lifetimeMs?: number): this;

getClient(): any;

Expand Down Expand Up @@ -286,13 +286,13 @@ export class Producer extends Client<KafkaProducerEvents> {
}

export class HighLevelProducer extends Producer {
produce(topic: string, partition: NumberNullUndefined, message: any, key: any, timestamp: NumberNullUndefined, callback: (err: any, offset?: NumberNullUndefined) => void): any;
produce(topic: string, partition: NumberNullUndefined, message: any, key: any, timestamp: NumberNullUndefined, headers: MessageHeader[], callback: (err: any, offset?: NumberNullUndefined) => void): any;
produce(topic: string, partition: NumberNullUndefined, message: any, key: any, timestamp: NumberNullUndefined, callback: (err: any, offset?: NumberNullUndefined) => void): any;
produce(topic: string, partition: NumberNullUndefined, message: any, key: any, timestamp: NumberNullUndefined, headers: MessageHeader[], callback: (err: any, offset?: NumberNullUndefined) => void): any;

setKeySerializer(serializer: (key: any, cb: (err: any, key: MessageKey) => void) => void): void;
setKeySerializer(serializer: (key: any) => MessageKey | Promise<MessageKey>): void;
setValueSerializer(serializer: (value: any, cb: (err: any, value: MessageValue) => void) => void): void;
setValueSerializer(serializer: (value: any) => MessageValue | Promise<MessageValue>): void;
setKeySerializer(serializer: (key: any, cb: (err: any, key: MessageKey) => void) => void): void;
setKeySerializer(serializer: (key: any) => MessageKey | Promise<MessageKey>): void;
setValueSerializer(serializer: (value: any, cb: (err: any, value: MessageValue) => void) => void): void;
setValueSerializer(serializer: (value: any) => MessageValue | Promise<MessageValue>): void;
}

export const features: string[];
Expand Down
11 changes: 8 additions & 3 deletions lib/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,23 @@ AdminClient.prototype.disconnect = function() {

/**
* Refresh OAuthBearer token, initially provided in factory method.
* Expiry is always set to maximum value, as the callback of librdkafka
* Expiry is always set to maximum value when lifetime is not provided.
* for token refresh is not used.
*
* @param {string} tokenStr - OAuthBearer token string
* @param {number} lifetimeMs - Optional lifetime in milliseconds
* @see connection.cc
*/
AdminClient.prototype.refreshOauthBearerToken = function (tokenStr) {
AdminClient.prototype.refreshOauthBearerToken = function (tokenStr, lifetimeMs) {
if (!tokenStr || typeof tokenStr !== 'string') {
throw new Error("OAuthBearer token is undefined/empty or not a string");
}

this._client.setToken(tokenStr);
if (lifetimeMs && typeof lifetimeMs !== 'number') {
throw new Error("OAuthBearer lifetimeMs is not a number");
}

this._client.setToken(tokenStr, lifetimeMs);
};

/**
Expand Down
12 changes: 8 additions & 4 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,20 +231,24 @@ Client.prototype.connect = function(metadataOptions, cb) {

/**
* Set initial token before any connection is established for oauthbearer authentication flow.
* Expiry is always set to maximum value, as the callback of librdkafka
* for token refresh is not used.
* Expiry is always set to maximum value when lifetime is not provided.
* Call this method again to refresh the token.
*
* @param {string} tokenStr - OAuthBearer token string
* @param {number} lifetimeMs - Optional lifetime in milliseconds
* @see connection.cc
* @return {Client} - Returns itself.
*/
Client.prototype.setOauthBearerToken = function (tokenStr) {
Client.prototype.setOauthBearerToken = function (tokenStr, lifetimeMs) {
if (!tokenStr || typeof tokenStr !== 'string') {
throw new Error("OAuthBearer token is undefined/empty or not a string");
}

this._client.setToken(tokenStr);
if (lifetimeMs && typeof lifetimeMs !== 'number') {
throw new Error("OAuthBearer lifetime is not a number");
}

this._client.setToken(tokenStr, lifetimeMs);
return this;
};

Expand Down
8 changes: 4 additions & 4 deletions lib/kafka-consumer-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ function KafkaConsumerStream(consumer, options) {

/**
* Refresh OAuthBearer token, initially provided in factory method.
* Expiry is always set to maximum value, as the callback of librdkafka
* for token refresh is not used.
* Expiry is always set to maximum value when lifetime is not provided.
*
* @param {string} tokenStr - OAuthBearer token string
* @param {number} lifetimeMs - Optional lifetime in milliseconds
* @see connection.cc
*/
KafkaConsumerStream.prototype.refreshOauthBearerToken = function (tokenStr) {
this.consumer.setOauthBearerToken(tokenStr);
KafkaConsumerStream.prototype.refreshOauthBearerToken = function (tokenStr, lifetimeMs) {
this.consumer.setOauthBearerToken(tokenStr, lifetimeMs);
};

/**
Expand Down
13 changes: 13 additions & 0 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,19 @@ function KafkaConsumer(conf, topicConf) {
};
}

var onOauthbearerTokenRefresh = conf.oauthbearer_token_refresh_cb;
if (onOauthbearerTokenRefresh && typeof onOauthbearerTokenRefresh === 'boolean') {
conf.oauthbearer_token_refresh_cb = function(oauthbearer_config) {
self.emit('oauthbearer.tokenrefresh', oauthbearer_config);
};
} else if (onOauthbearerTokenRefresh && typeof onOauthbearerTokenRefresh === 'function') {
conf.offset_commit_cb = function(oauthbearer_config) {
// Emit the event
self.emit('oauthbearer.tokenrefresh', oauthbearer_config);
onOauthbearerTokenRefresh.call(self, oauthbearer_config);
};
}

// Same treatment for offset_commit_cb
var onOffsetCommit = conf.offset_commit_cb;

Expand Down
31 changes: 31 additions & 0 deletions src/callbacks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,37 @@ void Partitioner::SetCallback(v8::Local<v8::Function> cb) {
callback(cb);
}

// OAuthBearerTokenRefresh callback

void OAuthBearerTokenRefreshDispatcher::Add(
const std::string &oauthbearer_config) {
scoped_mutex_lock lock(async_lock);
m_oauthbearer_config = oauthbearer_config;
}

void OAuthBearerTokenRefreshDispatcher::Flush() {
Nan::HandleScope scope;

const unsigned int argc = 1;

std::string oauthbearer_config;
{
scoped_mutex_lock lock(async_lock);
oauthbearer_config = m_oauthbearer_config;
m_oauthbearer_config.clear();
}

v8::Local<v8::Value> argv[argc] = {};
argv[0] = Nan::New<v8::String>(oauthbearer_config.c_str()).ToLocalChecked();

Dispatch(argc, argv);
}

void OAuthBearerTokenRefresh::oauthbearer_token_refresh_cb(
RdKafka::Handle *handle, const std::string &oauthbearer_config) {
dispatcher.Add(oauthbearer_config);
dispatcher.Execute();
}

} // end namespace Callbacks

Expand Down
17 changes: 17 additions & 0 deletions src/callbacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,23 @@ class Partitioner : public RdKafka::PartitionerCb {
static unsigned int random(const RdKafka::Topic*, int32_t);
};

class OAuthBearerTokenRefreshDispatcher : public Dispatcher {
public:
OAuthBearerTokenRefreshDispatcher() {}
~OAuthBearerTokenRefreshDispatcher() {}
void Add(const std::string &oauthbearer_config);
void Flush();

private:
std::string m_oauthbearer_config;
};

class OAuthBearerTokenRefresh : public RdKafka::OAuthBearerTokenRefreshCb {
public:
void oauthbearer_token_refresh_cb(RdKafka::Handle *, const std::string &);
OAuthBearerTokenRefreshDispatcher dispatcher;
};

} // namespace Callbacks

} // namespace NodeKafka
Expand Down
13 changes: 13 additions & 0 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,19 @@ void Conf::ConfigureCallback(const std::string &string_key, const v8::Local<v8::
this->m_offset_commit_cb->dispatcher.RemoveCallback(cb);
}
}

} else if (string_key.compare("oauthbearer_token_refresh_cb") == 0) {
if (add) {
if (this->m_oauthbearer_token_refresh_cb == NULL) {
this->m_oauthbearer_token_refresh_cb = new NodeKafka::Callbacks::OAuthBearerTokenRefresh();
}
this->m_oauthbearer_token_refresh_cb->dispatcher.AddCallback(cb);
this->set(string_key, this->m_oauthbearer_token_refresh_cb, errstr);
} else {
if (this->m_oauthbearer_token_refresh_cb != NULL) {
this->m_oauthbearer_token_refresh_cb->dispatcher.RemoveCallback(cb);
}
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Conf : public RdKafka::Conf {
protected:
NodeKafka::Callbacks::Rebalance * m_rebalance_cb = NULL;
NodeKafka::Callbacks::OffsetCommit * m_offset_commit_cb = NULL;
NodeKafka::Callbacks::OAuthBearerTokenRefresh * m_oauthbearer_token_refresh_cb = NULL;
};

} // namespace NodeKafka
Expand Down
27 changes: 21 additions & 6 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,19 +246,34 @@ NAN_METHOD(Connection::NodeSetToken)
return;
}

if (info.Length() > 1 && !info[1]->IsNumber()) {
Nan::ThrowError("2nd parameter must be a lifetime_ms number");
return;
}

Nan::Utf8String tk(info[0]);
std::string token = *tk;
// we always set expiry to maximum value in ms, as we don't use refresh callback,
// rdkafka continues sending a token even if it expired. Client code must
// handle token refreshing by calling 'setToken' again when needed.
int64_t expiry = (std::numeric_limits<int64_t>::max)() / 100000;

int64_t lifetime_ms;

if (info.Length() > 1) {
lifetime_ms = Nan::To<int64_t>(info[1]).FromJust();
}
else
{
// set expiry to maximum value in ms if it's not passed.
// rdkafka continues sending a token even if it expired. Client code must
// handle token refreshing by calling 'setToken' again when needed.
lifetime_ms = (std::numeric_limits<int64_t>::max)() / 100000;
}

Connection* obj = ObjectWrap::Unwrap<Connection>(info.This());
RdKafka::Handle* handle = obj->m_client;

if (!handle) {
scoped_shared_write_lock lock(obj->m_connection_lock);
obj->m_init_oauthToken = std::make_unique<OauthBearerToken>(
OauthBearerToken{token, expiry});
OauthBearerToken{token, lifetime_ms});
info.GetReturnValue().Set(Nan::Null());
return;
}
Expand All @@ -267,7 +282,7 @@ NAN_METHOD(Connection::NodeSetToken)
scoped_shared_write_lock lock(obj->m_connection_lock);
std::string errstr;
std::list<std::string> emptyList;
RdKafka::ErrorCode err = handle->oauthbearer_set_token(token, expiry,
RdKafka::ErrorCode err = handle->oauthbearer_set_token(token, lifetime_ms,
"", emptyList, errstr);

if (err != RdKafka::ERR_NO_ERROR) {
Expand Down