Skip to content

Commit

Permalink
feat: 支持shardingKey 客户端实现 (#109)
Browse files Browse the repository at this point in the history
closes #16

Co-authored-by: mwj209488 <[email protected]>
  • Loading branch information
orangemi and teambition-miwenjie authored Jul 31, 2023
1 parent 1a7763e commit 32d4d09
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 15 deletions.
6 changes: 6 additions & 0 deletions lib/message/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ class Message {
return this.properties[MessageConst.SYSTEM_PROP_KEY_STARTDELIVERTIME] || 0;
}

get shardingKey() {
return this.properties && this.properties[MessageConst.PROPERTY_SHARDING_KEY];
}
set shardingKey(val) {
this.properties[MessageConst.PROPERTY_SHARDING_KEY] = val;
}
}

module.exports = Message;
1 change: 1 addition & 0 deletions lib/message/message_const.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ exports.PROPERTY_PRODUCER_GROUP = 'PGROUP';
exports.PROPERTY_MIN_OFFSET = 'MIN_OFFSET';
exports.PROPERTY_MAX_OFFSET = 'MAX_OFFSET';
exports.PROPERTY_BUYER_ID = 'BUYER_ID';
exports.PROPERTY_SHARDING_KEY = '__SHARDING_KEY';
exports.PROPERTY_ORIGIN_MESSAGE_ID = 'ORIGIN_MESSAGE_ID';
exports.PROPERTY_TRANSFER_FLAG = 'TRANSFER_FLAG';
exports.PROPERTY_CORRECTION_FLAG = 'CORRECTION_FLAG';
Expand Down
2 changes: 1 addition & 1 deletion lib/producer/mq_producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class MQProducer extends ClientConfig {
let lastBrokerName;
let sendResult;
for (; times < timesTotal && Date.now() - beginTimestamp < maxTimeout; times++) {
const messageQueue = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
const messageQueue = topicPublishInfo.selectOneMessageQueue(lastBrokerName, msg);
if (!messageQueue) {
continue;
}
Expand Down
49 changes: 35 additions & 14 deletions lib/producer/topic_publish_info.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict';

const crypto = require('crypto')

class TopicPublishInfo {
constructor() {
this.orderTopic = false;
Expand All @@ -20,27 +22,46 @@ class TopicPublishInfo {
* @param {String} lastBrokerName - last broker name
* @return {Object} queue
*/
selectOneMessageQueue(lastBrokerName) {
selectOneMessageQueue(lastBrokerName, message) {
let index,
pos,
mq;
if (lastBrokerName) {
index = this.sendWhichQueue++;
for (let i = 0, len = this.messageQueueList.length; i < len; i++) {
pos = Math.abs(index++) % len;
mq = this.messageQueueList[pos];
if (mq.brokerName !== lastBrokerName) {
return mq;
}
}
return null;

const shardingKey = message && message.shardingKey;
let queueId = '';
if (shardingKey) {
// 这里不应该是所有可用队列数字,应该是queue总数
// 但这里的 messageQueueList 是所有可写队列, golang和java客户端均为在queue总对恶劣中寻找位置
// 通过寻找所有可写queueId总数计算应该写入的queueId
const queueIds = [];
this.messageQueueList.forEach(queue => {
if (!queueIds.includes(queue.queueId)) queueIds.push(queue.queueId);
});

// 参考golang和java的sdk,没有规范确定的shardingKey找queueId的实现标准
// 若引入其他hash算法就显得比较笨重, 可以使用 nodejs 原生hash方法生成可计算的hash值
// Reference: https://github.com/apache/rocketmq/blob/c11ed78eeb73c23da4cf9a36a6bad493a1279210/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java#L382
// Reference: https://github.com/apache/rocketmq-client-go/blob/896a8a3453be4bcfcb5bfb5bb9139c4df6d4cdab/producer/selector.go#L112
const queueIndex = crypto.createHash('md5').update(shardingKey).digest().readUInt32BE() % queueIds.length;
queueId = queueIds[queueIndex] || ''
}
index = this.sendWhichQueue++;

if (!Number.isSafeInteger(this.sendWhichQueue)) {
this.sendWhichQueue = 0; // 超出安全范围,重置为 0
}
pos = Math.abs(index) % this.messageQueueList.length;
return this.messageQueueList[pos];
index = this.sendWhichQueue++;
for (let i = 0, len = this.messageQueueList.length; i < len; i++) {
pos = Math.abs(index++) % len;
mq = this.messageQueueList[pos];
if (lastBrokerName && mq.brokerName === lastBrokerName) {
continue;
}
if (queueId && mq.queueId !== queueId) {
continue;
}
return mq;
}
return null;
}
}

Expand Down

0 comments on commit 32d4d09

Please sign in to comment.