diff --git a/lib/message/message.js b/lib/message/message.js index fce9d13..c2b7d97 100644 --- a/lib/message/message.js +++ b/lib/message/message.js @@ -148,6 +148,12 @@ class Message { return this.properties[MessageConst.SYSTEM_PROP_KEY_STARTDELIVERTIME] || 0; } + get shardingKey() { + return this.properties && this.properties[MessageConst.PROPERTY_SHARDING_KEY]; + } + set shardingKey(val) { + this.properties[MessageConst.PROPERTY_SHARDING_KEY] = val; + } } module.exports = Message; diff --git a/lib/message/message_const.js b/lib/message/message_const.js index 75bf1a0..fff50e5 100644 --- a/lib/message/message_const.js +++ b/lib/message/message_const.js @@ -18,6 +18,7 @@ exports.PROPERTY_PRODUCER_GROUP = 'PGROUP'; exports.PROPERTY_MIN_OFFSET = 'MIN_OFFSET'; exports.PROPERTY_MAX_OFFSET = 'MAX_OFFSET'; exports.PROPERTY_BUYER_ID = 'BUYER_ID'; +exports.PROPERTY_SHARDING_KEY = '__SHARDING_KEY'; exports.PROPERTY_ORIGIN_MESSAGE_ID = 'ORIGIN_MESSAGE_ID'; exports.PROPERTY_TRANSFER_FLAG = 'TRANSFER_FLAG'; exports.PROPERTY_CORRECTION_FLAG = 'CORRECTION_FLAG'; diff --git a/lib/producer/mq_producer.js b/lib/producer/mq_producer.js index 4708414..295ebcc 100644 --- a/lib/producer/mq_producer.js +++ b/lib/producer/mq_producer.js @@ -204,7 +204,7 @@ class MQProducer extends ClientConfig { let lastBrokerName; let sendResult; for (; times < timesTotal && Date.now() - beginTimestamp < maxTimeout; times++) { - const messageQueue = topicPublishInfo.selectOneMessageQueue(lastBrokerName); + const messageQueue = topicPublishInfo.selectOneMessageQueue(lastBrokerName, msg); if (!messageQueue) { continue; } diff --git a/lib/producer/topic_publish_info.js b/lib/producer/topic_publish_info.js index a67f8d4..f83db1f 100644 --- a/lib/producer/topic_publish_info.js +++ b/lib/producer/topic_publish_info.js @@ -1,5 +1,7 @@ 'use strict'; +const crypto = require('crypto') + class TopicPublishInfo { constructor() { this.orderTopic = false; @@ -20,27 +22,46 @@ class TopicPublishInfo { * @param {String} lastBrokerName - last broker name * @return {Object} queue */ - selectOneMessageQueue(lastBrokerName) { + selectOneMessageQueue(lastBrokerName, message) { let index, pos, mq; - if (lastBrokerName) { - index = this.sendWhichQueue++; - for (let i = 0, len = this.messageQueueList.length; i < len; i++) { - pos = Math.abs(index++) % len; - mq = this.messageQueueList[pos]; - if (mq.brokerName !== lastBrokerName) { - return mq; - } - } - return null; + + const shardingKey = message && message.shardingKey; + let queueId = ''; + if (shardingKey) { + // 这里不应该是所有可用队列数字,应该是queue总数 + // 但这里的 messageQueueList 是所有可写队列, golang和java客户端均为在queue总对恶劣中寻找位置 + // 通过寻找所有可写queueId总数计算应该写入的queueId + const queueIds = []; + this.messageQueueList.forEach(queue => { + if (!queueIds.includes(queue.queueId)) queueIds.push(queue.queueId); + }); + + // 参考golang和java的sdk,没有规范确定的shardingKey找queueId的实现标准 + // 若引入其他hash算法就显得比较笨重, 可以使用 nodejs 原生hash方法生成可计算的hash值 + // Reference: https://github.com/apache/rocketmq/blob/c11ed78eeb73c23da4cf9a36a6bad493a1279210/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java#L382 + // Reference: https://github.com/apache/rocketmq-client-go/blob/896a8a3453be4bcfcb5bfb5bb9139c4df6d4cdab/producer/selector.go#L112 + const queueIndex = crypto.createHash('md5').update(shardingKey).digest().readUInt32BE() % queueIds.length; + queueId = queueIds[queueIndex] || '' } - index = this.sendWhichQueue++; + if (!Number.isSafeInteger(this.sendWhichQueue)) { this.sendWhichQueue = 0; // 超出安全范围,重置为 0 } - pos = Math.abs(index) % this.messageQueueList.length; - return this.messageQueueList[pos]; + index = this.sendWhichQueue++; + for (let i = 0, len = this.messageQueueList.length; i < len; i++) { + pos = Math.abs(index++) % len; + mq = this.messageQueueList[pos]; + if (lastBrokerName && mq.brokerName === lastBrokerName) { + continue; + } + if (queueId && mq.queueId !== queueId) { + continue; + } + return mq; + } + return null; } }