From 46d462b3761cf0a65e603f50d68ddbd7327ddceb Mon Sep 17 00:00:00 2001 From: tulios Date: Tue, 15 May 2018 11:30:34 +0200 Subject: [PATCH 01/10] Add support to signed and unsigned varint32 --- src/protocol/decoder.js | 42 ++++++++++++++++++++++++++++++++++++ src/protocol/encoder.js | 23 ++++++++++++++++++++ src/protocol/encoder.spec.js | 21 ++++++++++++++++++ 3 files changed, 86 insertions(+) create mode 100644 src/protocol/encoder.spec.js diff --git a/src/protocol/decoder.js b/src/protocol/decoder.js index b7c7416fd..7d640e7f7 100644 --- a/src/protocol/decoder.js +++ b/src/protocol/decoder.js @@ -5,6 +5,9 @@ const INT16_SIZE = 2 const INT32_SIZE = 4 const INT64_SIZE = 8 +const MOST_SIGNIFICANT_BIT = 0x80 // 128 +const OTHER_BITS = 0x7f // 127 + module.exports = class Decoder { static int32Size() { return INT32_SIZE @@ -120,6 +123,45 @@ module.exports = class Decoder { return array } + // https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/util/Varint.java#L207 + readUnsignedVarInt32() { + let value = 0 + let i = 0 + let lastByte = 0 + + while (true) { + const currentByte = this.buffer[this.offset++] + lastByte = currentByte + const isLastByte = (currentByte & MOST_SIGNIFICANT_BIT) === 0 + + if (isLastByte) break + + // Concatenate the octets (sum the numbers) + value = value | ((currentByte & OTHER_BITS) << i) + i += 7 + + if (i > 35) { + throw new Error('Variable length quantity is too long') + } + } + + return value | (lastByte << i) + } + + // https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/util/Varint.java#L197 + readSignedVarInt32() { + const raw = this.readUnsignedVarInt32() + + // This undoes the trick in writeSignedVarInt() + // https://developers.google.com/protocol-buffers/docs/encoding?csw=1#types + const temp = (((raw << 31) >> 31) ^ raw) >> 1 + + // This extra step lets us deal with the largest signed values by treating + // negative results from read unsigned methods as like unsigned values. + // Must re-flip the top bit if the original read value had it set. + return temp ^ (raw & (1 << 31)) + } + slice(size) { return new Decoder(this.buffer.slice(this.offset, this.offset + size)) } diff --git a/src/protocol/encoder.js b/src/protocol/encoder.js index 144380060..4201ce19b 100644 --- a/src/protocol/encoder.js +++ b/src/protocol/encoder.js @@ -5,6 +5,10 @@ const INT16_SIZE = 2 const INT32_SIZE = 4 const INT64_SIZE = 8 +const MOST_SIGNIFICANT_BIT = 0x80 // 128 +const OTHER_BITS = 0x7f // 127 +const UNSIGNED_INT32_MAX_NUMBER = 0xffffff80 + module.exports = class Encoder { constructor() { this.buffer = Buffer.alloc(0) @@ -109,6 +113,25 @@ module.exports = class Encoder { return this } + // Based on: + // https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/util/Varint.java#L106 + writeUnsignedVarInt32(value) { + const byteArray = [] + while ((value & UNSIGNED_INT32_MAX_NUMBER) !== 0) { + byteArray.push((value & OTHER_BITS) | MOST_SIGNIFICANT_BIT) + value >>>= 7 + } + + byteArray.push(value & OTHER_BITS) + this.buffer = Buffer.concat([this.buffer, Buffer.from(byteArray)]) + return this + } + + // https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/util/Varint.java#L95 + writeSignedVarInt32(value) { + return this.writeUnsignedVarInt32((value << 1) ^ (value >> 31)) + } + size() { return Buffer.byteLength(this.buffer) } diff --git a/src/protocol/encoder.spec.js b/src/protocol/encoder.spec.js new file mode 100644 index 000000000..452463221 --- /dev/null +++ b/src/protocol/encoder.spec.js @@ -0,0 +1,21 @@ +const Encoder = require('./encoder') +const Decoder = require('./decoder') + +describe('Protocol > Encoder', () => { + const unsigned32 = number => new Encoder().writeUnsignedVarInt32(number).buffer + const signed32 = number => new Encoder().writeSignedVarInt32(number).buffer + const decodeUnsigned32 = buffer => new Decoder(buffer).readUnsignedVarInt32() + const decodeSigned32 = buffer => new Decoder(buffer).readSignedVarInt32() + + describe('varint', () => { + test('encode numbers', () => { + expect(decodeUnsigned32(unsigned32(0))).toEqual(0) + expect(decodeUnsigned32(unsigned32(4))).toEqual(4) + expect(decodeUnsigned32(unsigned32(300))).toEqual(300) + + expect(decodeSigned32(signed32(0))).toEqual(0) + expect(decodeSigned32(signed32(4))).toEqual(4) + expect(decodeSigned32(signed32(-2))).toEqual(-2) + }) + }) +}) From 0167812b3aff44aa84e1d5858b5c73b97c9cef99 Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Tue, 15 May 2018 14:45:17 +0200 Subject: [PATCH 02/10] Ignore vscode directory --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 4793235a5..fbf4bba9a 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ test-report.xml junit.xml coverage/ yarn-error.log +.vscode \ No newline at end of file From e4f25cf8f410c0983517f808e23a6cd6146e7c1e Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Tue, 15 May 2018 16:01:43 +0200 Subject: [PATCH 03/10] Test encoding/decoding of int32 varint --- src/protocol/encoder.spec.js | 69 ++++++++++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 6 deletions(-) diff --git a/src/protocol/encoder.spec.js b/src/protocol/encoder.spec.js index 452463221..63f9ca6f8 100644 --- a/src/protocol/encoder.spec.js +++ b/src/protocol/encoder.spec.js @@ -1,6 +1,9 @@ const Encoder = require('./encoder') const Decoder = require('./decoder') +const MAX_SAFE_POSITIVE_SIGNED_INT = 2147483647 +const MIN_SAFE_NEGATIVE_SIGNED_INT = -2147483648 + describe('Protocol > Encoder', () => { const unsigned32 = number => new Encoder().writeUnsignedVarInt32(number).buffer const signed32 = number => new Encoder().writeSignedVarInt32(number).buffer @@ -8,14 +11,68 @@ describe('Protocol > Encoder', () => { const decodeSigned32 = buffer => new Decoder(buffer).readSignedVarInt32() describe('varint', () => { - test('encode numbers', () => { - expect(decodeUnsigned32(unsigned32(0))).toEqual(0) - expect(decodeUnsigned32(unsigned32(4))).toEqual(4) - expect(decodeUnsigned32(unsigned32(300))).toEqual(300) + test('encode signed int32 numbers', () => { + expect(signed32(0)).toEqual(Buffer.from([0x00])) + expect(signed32(1)).toEqual(Buffer.from([0x02])) + expect(signed32(63)).toEqual(Buffer.from([0x7e])) + expect(signed32(64)).toEqual(Buffer.from([0x80, 0x01])) + expect(signed32(8191)).toEqual(Buffer.from([0xfe, 0x7f])) + expect(signed32(8192)).toEqual(Buffer.from([0x80, 0x80, 0x01])) + expect(signed32(1048575)).toEqual(Buffer.from([0xfe, 0xff, 0x7f])) + expect(signed32(1048576)).toEqual(Buffer.from([0x80, 0x80, 0x80, 0x01])) + expect(signed32(134217727)).toEqual(Buffer.from([0xfe, 0xff, 0xff, 0x7f])) + expect(signed32(134217728)).toEqual(Buffer.from([0x80, 0x80, 0x80, 0x80, 0x01])) + + expect(signed32(-1)).toEqual(Buffer.from([0x01])) + expect(signed32(-64)).toEqual(Buffer.from([0x7f])) + expect(signed32(-65)).toEqual(Buffer.from([0x81, 0x01])) + expect(signed32(-8192)).toEqual(Buffer.from([0xff, 0x7f])) + expect(signed32(-8193)).toEqual(Buffer.from([0x81, 0x80, 0x01])) + expect(signed32(-1048576)).toEqual(Buffer.from([0xff, 0xff, 0x7f])) + expect(signed32(-1048577)).toEqual(Buffer.from([0x81, 0x80, 0x80, 0x01])) + expect(signed32(-134217728)).toEqual(Buffer.from([0xff, 0xff, 0xff, 0x7f])) + expect(signed32(-134217729)).toEqual(Buffer.from([0x81, 0x80, 0x80, 0x80, 0x01])) + }) + + test('encode signed int32 boundaries', () => { + expect(signed32(MAX_SAFE_POSITIVE_SIGNED_INT)).toEqual( + Buffer.from([0xfe, 0xff, 0xff, 0xff, 0x0f]) + ) + expect(signed32(MIN_SAFE_NEGATIVE_SIGNED_INT)).toEqual( + Buffer.from([0xff, 0xff, 0xff, 0xff, 0x0f]) + ) + }) + test('decode int32 numbers', () => { expect(decodeSigned32(signed32(0))).toEqual(0) - expect(decodeSigned32(signed32(4))).toEqual(4) - expect(decodeSigned32(signed32(-2))).toEqual(-2) + expect(decodeSigned32(signed32(1))).toEqual(1) + expect(decodeSigned32(signed32(63))).toEqual(63) + expect(decodeSigned32(signed32(64))).toEqual(64) + expect(decodeSigned32(signed32(8191))).toEqual(8191) + expect(decodeSigned32(signed32(8192))).toEqual(8192) + expect(decodeSigned32(signed32(1048575))).toEqual(1048575) + expect(decodeSigned32(signed32(1048576))).toEqual(1048576) + expect(decodeSigned32(signed32(134217727))).toEqual(134217727) + expect(decodeSigned32(signed32(134217728))).toEqual(134217728) + + expect(decodeSigned32(signed32(-1))).toEqual(-1) + expect(decodeSigned32(signed32(-64))).toEqual(-64) + expect(decodeSigned32(signed32(-65))).toEqual(-65) + expect(decodeSigned32(signed32(-8192))).toEqual(-8192) + expect(decodeSigned32(signed32(-8193))).toEqual(-8193) + expect(decodeSigned32(signed32(-1048576))).toEqual(-1048576) + expect(decodeSigned32(signed32(-1048577))).toEqual(-1048577) + expect(decodeSigned32(signed32(-134217728))).toEqual(-134217728) + expect(decodeSigned32(signed32(-134217729))).toEqual(-134217729) + }) + + test('decode signed int32 boundaries', () => { + expect(decodeSigned32(signed32(MAX_SAFE_POSITIVE_SIGNED_INT))).toEqual( + MAX_SAFE_POSITIVE_SIGNED_INT + ) + expect(decodeSigned32(signed32(MIN_SAFE_NEGATIVE_SIGNED_INT))).toEqual( + MIN_SAFE_NEGATIVE_SIGNED_INT + ) }) }) }) From bb9124bb28a923d7d778a5d65646007cd9568011 Mon Sep 17 00:00:00 2001 From: tulios Date: Tue, 15 May 2018 16:32:59 +0200 Subject: [PATCH 04/10] Update to long v4.0.0 --- package.json | 2 +- yarn.lock | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index cfe9403e6..432c96477 100644 --- a/package.json +++ b/package.json @@ -47,7 +47,7 @@ "prettier": "^1.7.0" }, "dependencies": { - "long": "^3.2.0" + "long": "^4.0.0" }, "lint-staged": { "*.js": [ diff --git a/yarn.lock b/yarn.lock index 4a563c90c..b43899d38 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2287,9 +2287,9 @@ log-update@^1.0.2: ansi-escapes "^1.0.0" cli-cursor "^1.0.2" -long@^3.2.0: - version "3.2.0" - resolved "https://registry.yarnpkg.com/long/-/long-3.2.0.tgz#d821b7138ca1cb581c172990ef14db200b5c474b" +long@^4.0.0: + version "4.0.0" + resolved "https://registry.yarnpkg.com/long/-/long-4.0.0.tgz#9a7b71cfb7d361a194ea555241c92f7468d5bf28" longest@^1.0.1: version "1.0.1" From 47a92311eded1eb40484db2514d829dcdf2b9eab Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Tue, 15 May 2018 16:38:29 +0200 Subject: [PATCH 05/10] Encode int64 --- src/protocol/encoder.js | 29 ++++++++++ src/protocol/encoder.spec.js | 101 +++++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+) diff --git a/src/protocol/encoder.js b/src/protocol/encoder.js index 4201ce19b..4eabc1a86 100644 --- a/src/protocol/encoder.js +++ b/src/protocol/encoder.js @@ -8,6 +8,7 @@ const INT64_SIZE = 8 const MOST_SIGNIFICANT_BIT = 0x80 // 128 const OTHER_BITS = 0x7f // 127 const UNSIGNED_INT32_MAX_NUMBER = 0xffffff80 +const UNSIGNED_INT64_MAX_NUMBER = Long.fromBytes([-1, -1, -1, -1, -1, -1, -1, -128]) module.exports = class Encoder { constructor() { @@ -132,6 +133,34 @@ module.exports = class Encoder { return this.writeUnsignedVarInt32((value << 1) ^ (value >> 31)) } + writeUnsignedVarInt64(value) { + const byteArray = [] + let longValue = Long.fromValue(value) + + while (longValue.and(UNSIGNED_INT64_MAX_NUMBER).notEquals(Long.fromInt(0))) { + byteArray.push( + longValue + .and(OTHER_BITS) + .or(MOST_SIGNIFICANT_BIT) + .toInt() + ) + longValue = longValue.shiftRightUnsigned(7) + } + + byteArray.push(longValue.toInt()) + + this.buffer = Buffer.concat([this.buffer, Buffer.from(byteArray)]) + return this + } + + writeSignedVarInt64(value) { + let longValue = Long.fromValue(value) + const unsignedLong = longValue.shiftLeft(1).xor(longValue.shiftRight(63)) + this.writeUnsignedVarInt64(unsignedLong) + + return this + } + size() { return Buffer.byteLength(this.buffer) } diff --git a/src/protocol/encoder.spec.js b/src/protocol/encoder.spec.js index 63f9ca6f8..e7b8a12dd 100644 --- a/src/protocol/encoder.spec.js +++ b/src/protocol/encoder.spec.js @@ -1,3 +1,5 @@ +const Long = require('long') + const Encoder = require('./encoder') const Decoder = require('./decoder') @@ -10,6 +12,8 @@ describe('Protocol > Encoder', () => { const decodeUnsigned32 = buffer => new Decoder(buffer).readUnsignedVarInt32() const decodeSigned32 = buffer => new Decoder(buffer).readSignedVarInt32() + const signed64 = number => new Encoder().writeSignedVarInt64(number).buffer + describe('varint', () => { test('encode signed int32 numbers', () => { expect(signed32(0)).toEqual(Buffer.from([0x00])) @@ -75,4 +79,101 @@ describe('Protocol > Encoder', () => { ) }) }) + + describe('varlong', () => { + test('encode signed int64 number', () => { + expect(signed64(0)).toEqual(Buffer.from([0x00])) + expect(signed64(1)).toEqual(Buffer.from([0x02])) + expect(signed64(63)).toEqual(Buffer.from([0x7e])) + expect(signed64(64)).toEqual(Buffer.from([0x80, 0x01])) + expect(signed64(8191)).toEqual(Buffer.from([0xfe, 0x7f])) + expect(signed64(8192)).toEqual(Buffer.from([0x80, 0x80, 0x01])) + expect(signed64(1048575)).toEqual(Buffer.from([0xfe, 0xff, 0x7f])) + expect(signed64(1048576)).toEqual(Buffer.from([0x80, 0x80, 0x80, 0x01])) + expect(signed64(134217727)).toEqual(Buffer.from([0xfe, 0xff, 0xff, 0x7f])) + expect(signed64(134217728)).toEqual(Buffer.from([0x80, 0x80, 0x80, 0x80, 0x01])) + expect(signed64(MAX_SAFE_POSITIVE_SIGNED_INT)).toEqual( + Buffer.from([0xfe, 0xff, 0xff, 0xff, 0x0f]) + ) + expect(signed64(Long.fromString('17179869183'))).toEqual( + Buffer.from([0xfe, 0xff, 0xff, 0xff, 0x7f]) + ) + expect(signed64(Long.fromString('17179869184'))).toEqual( + Buffer.from([0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) + ) + expect(signed64(Long.fromString('2199023255551'))).toEqual( + Buffer.from([0xfe, 0xff, 0xff, 0xff, 0xff, 0x7f]) + ) + expect(signed64(Long.fromString('2199023255552'))).toEqual( + Buffer.from([0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) + ) + expect(signed64(Long.fromString('281474976710655'))).toEqual( + Buffer.from([0xfe, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f]) + ) + expect(signed64(Long.fromString('281474976710656'))).toEqual( + Buffer.from([0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) + ) + expect(signed64(Long.fromString('36028797018963967'))).toEqual( + Buffer.from([0xfe, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f]) + ) + expect(signed64(Long.fromString('36028797018963968'))).toEqual( + Buffer.from([0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) + ) + expect(signed64(Long.fromString('4611686018427387903'))).toEqual( + Buffer.from([0xfe, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f]) + ) + expect(signed64(Long.fromString('4611686018427387904'))).toEqual( + Buffer.from([0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) + ) + expect(signed64(Long.MAX_VALUE)).toEqual( + Buffer.from([0xfe, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x01]) + ) + + expect(signed64(-1)).toEqual(Buffer.from([0x01])) + expect(signed64(-64)).toEqual(Buffer.from([0x7f])) + expect(signed64(-65)).toEqual(Buffer.from([0x81, 0x01])) + expect(signed64(-8192)).toEqual(Buffer.from([0xff, 0x7f])) + expect(signed64(-8193)).toEqual(Buffer.from([0x81, 0x80, 0x01])) + expect(signed64(-1048576)).toEqual(Buffer.from([0xff, 0xff, 0x7f])) + expect(signed64(-1048577)).toEqual(Buffer.from([0x81, 0x80, 0x80, 0x01])) + expect(signed64(-134217728)).toEqual(Buffer.from([0xff, 0xff, 0xff, 0x7f])) + expect(signed64(-134217729)).toEqual(Buffer.from([0x81, 0x80, 0x80, 0x80, 0x01])) + expect(signed64(MIN_SAFE_NEGATIVE_SIGNED_INT)).toEqual( + Buffer.from([0xff, 0xff, 0xff, 0xff, 0x0f]) + ) + expect(signed64(Long.fromString('-17179869184'))).toEqual( + Buffer.from([0xff, 0xff, 0xff, 0xff, 0x7f]) + ) + expect(signed64(Long.fromString('-17179869185'))).toEqual( + Buffer.from([0x81, 0x80, 0x80, 0x80, 0x80, 0x01]) + ) + expect(signed64(Long.fromString('-2199023255552'))).toEqual( + Buffer.from([0xff, 0xff, 0xff, 0xff, 0xff, 0x7f]) + ) + expect(signed64(Long.fromString('-2199023255553'))).toEqual( + Buffer.from([0x81, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) + ) + expect(signed64(Long.fromString('-281474976710656'))).toEqual( + Buffer.from([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f]) + ) + expect(signed64(Long.fromString('-281474976710657'))).toEqual( + Buffer.from([0x81, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 1]) + ) + expect(signed64(Long.fromString('-36028797018963968'))).toEqual( + Buffer.from([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f]) + ) + expect(signed64(Long.fromString('-36028797018963969'))).toEqual( + Buffer.from([0x81, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) + ) + expect(signed64(Long.fromString('-4611686018427387904'))).toEqual( + Buffer.from([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f]) + ) + expect(signed64(Long.fromString('-4611686018427387905'))).toEqual( + Buffer.from([0x81, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) + ) + expect(signed64(Long.MIN_VALUE)).toEqual( + Buffer.from([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x01]) + ) + }) + }) }) From c7fd49b734f39e746719c1b8ab55da6004e337d9 Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Tue, 15 May 2018 16:51:46 +0200 Subject: [PATCH 06/10] WIP: (Almost) decoding varlongs --- src/protocol/decoder.js | 22 ++++++++++++++++ src/protocol/encoder.spec.js | 50 ++++++++++++++++++++++++++++++++++-- 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/src/protocol/decoder.js b/src/protocol/decoder.js index 7d640e7f7..957d79c7c 100644 --- a/src/protocol/decoder.js +++ b/src/protocol/decoder.js @@ -162,6 +162,28 @@ module.exports = class Decoder { return temp ^ (raw & (1 << 31)) } + readUnsignedVarInt64() { + let value = Long.fromInt(0) + let i = 0 + let currentByte + + while (((currentByte = this.buffer[this.offset++]) & MOST_SIGNIFICANT_BIT) !== 0) { + value = value.or((currentByte & OTHER_BITS) << i) + i += 7 + + if (i > 63) { + throw new Error('Variable length quantity is too long') + } + } + + return value.or(currentByte << i) + } + + readSignedVarInt64() { + const longValue = this.readUnsignedVarInt64() + return longValue.shiftRightUnsigned(1).xor(longValue.and(1).multiply(-1)) + } + slice(size) { return new Decoder(this.buffer.slice(this.offset, this.offset + size)) } diff --git a/src/protocol/encoder.spec.js b/src/protocol/encoder.spec.js index e7b8a12dd..2532a07da 100644 --- a/src/protocol/encoder.spec.js +++ b/src/protocol/encoder.spec.js @@ -7,12 +7,11 @@ const MAX_SAFE_POSITIVE_SIGNED_INT = 2147483647 const MIN_SAFE_NEGATIVE_SIGNED_INT = -2147483648 describe('Protocol > Encoder', () => { - const unsigned32 = number => new Encoder().writeUnsignedVarInt32(number).buffer const signed32 = number => new Encoder().writeSignedVarInt32(number).buffer - const decodeUnsigned32 = buffer => new Decoder(buffer).readUnsignedVarInt32() const decodeSigned32 = buffer => new Decoder(buffer).readSignedVarInt32() const signed64 = number => new Encoder().writeSignedVarInt64(number).buffer + const decode64 = buffer => new Decoder(buffer).readSignedVarInt64() describe('varint', () => { test('encode signed int32 numbers', () => { @@ -175,5 +174,52 @@ describe('Protocol > Encoder', () => { Buffer.from([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x01]) ) }) + + test('decode signed int64 number', () => { + expect(decode64(signed64(0))).toEqual(Long.fromInt(0)) + expect(decode64(signed64(1))).toEqual(Long.fromInt(1)) + expect(decode64(signed64(63))).toEqual(Long.fromInt(63)) + expect(decode64(signed64(64))).toEqual(Long.fromInt(64)) + expect(decode64(signed64(8191))).toEqual(Long.fromInt(8191)) + expect(decode64(signed64(8192))).toEqual(Long.fromInt(8192)) + expect(decode64(signed64(1048575))).toEqual(Long.fromInt(1048575)) + expect(decode64(signed64(1048576))).toEqual(Long.fromInt(1048576)) + expect(decode64(signed64(134217727))).toEqual(Long.fromInt(134217727)) + expect(decode64(signed64(134217728))).toEqual(Long.fromInt(134217728)) + expect(decode64(signed64(MAX_SAFE_POSITIVE_SIGNED_INT))).toEqual( + Long.fromInt(MAX_SAFE_POSITIVE_SIGNED_INT) + ) + expect(decode64(signed64(Long.fromString('17179869183')))).toEqual( + Long.fromString('17179869183') + ) + expect(decode64(signed64(Long.fromString('17179869184')))).toEqual( + Long.fromString('17179869184') + ) + expect(decode64(signed64(Long.fromString('2199023255551')))).toEqual( + Long.fromString('2199023255551') + ) + expect(decode64(signed64(Long.fromString('2199023255552')))).toEqual( + Long.fromString('2199023255552') + ) + expect(decode64(signed64(Long.fromString('281474976710655')))).toEqual( + Long.fromString('281474976710655') + ) + expect(decode64(signed64(Long.fromString('281474976710656')))).toEqual( + Long.fromString('281474976710656') + ) + expect(decode64(signed64(Long.fromString('36028797018963967')))).toEqual( + Long.fromString('36028797018963967') + ) + expect(decode64(signed64(Long.fromString('36028797018963968')))).toEqual( + Long.fromString('36028797018963968') + ) + expect(decode64(signed64(Long.fromString('4611686018427387903')))).toEqual( + Long.fromString('4611686018427387903') + ) + expect(decode64(signed64(Long.fromString('4611686018427387904')))).toEqual( + Long.fromString('4611686018427387904') + ) + expect(decode64(signed64(Long.MAX_VALUE))).toEqual(Long.MAX_VALUE) + }) }) }) From ab8584fe5e0a4211cd37586a409d51f6b625d05a Mon Sep 17 00:00:00 2001 From: tulios Date: Tue, 15 May 2018 23:36:59 +0200 Subject: [PATCH 07/10] Fix varlong decode --- src/protocol/decoder.js | 20 ++++++--------- src/protocol/encoder.spec.js | 48 +++++++++++++++++------------------- 2 files changed, 30 insertions(+), 38 deletions(-) diff --git a/src/protocol/decoder.js b/src/protocol/decoder.js index 957d79c7c..ea4b18a71 100644 --- a/src/protocol/decoder.js +++ b/src/protocol/decoder.js @@ -163,25 +163,21 @@ module.exports = class Decoder { } readUnsignedVarInt64() { - let value = Long.fromInt(0) - let i = 0 let currentByte + let result = Long.fromInt(0) + let i = 0 - while (((currentByte = this.buffer[this.offset++]) & MOST_SIGNIFICANT_BIT) !== 0) { - value = value.or((currentByte & OTHER_BITS) << i) + do { + currentByte = this.buffer[this.offset++] + result = result.add(Long.fromInt(currentByte & OTHER_BITS).shiftLeft(i)) i += 7 - - if (i > 63) { - throw new Error('Variable length quantity is too long') - } - } - - return value.or(currentByte << i) + } while (currentByte >= MOST_SIGNIFICANT_BIT) + return result } readSignedVarInt64() { const longValue = this.readUnsignedVarInt64() - return longValue.shiftRightUnsigned(1).xor(longValue.and(1).multiply(-1)) + return longValue.shiftRightUnsigned(1).xor(longValue.and(Long.fromInt(1)).negate()) } slice(size) { diff --git a/src/protocol/encoder.spec.js b/src/protocol/encoder.spec.js index 2532a07da..f8f78caf4 100644 --- a/src/protocol/encoder.spec.js +++ b/src/protocol/encoder.spec.js @@ -8,7 +8,7 @@ const MIN_SAFE_NEGATIVE_SIGNED_INT = -2147483648 describe('Protocol > Encoder', () => { const signed32 = number => new Encoder().writeSignedVarInt32(number).buffer - const decodeSigned32 = buffer => new Decoder(buffer).readSignedVarInt32() + const decode32 = buffer => new Decoder(buffer).readSignedVarInt32() const signed64 = number => new Encoder().writeSignedVarInt64(number).buffer const decode64 = buffer => new Decoder(buffer).readSignedVarInt64() @@ -47,35 +47,31 @@ describe('Protocol > Encoder', () => { }) test('decode int32 numbers', () => { - expect(decodeSigned32(signed32(0))).toEqual(0) - expect(decodeSigned32(signed32(1))).toEqual(1) - expect(decodeSigned32(signed32(63))).toEqual(63) - expect(decodeSigned32(signed32(64))).toEqual(64) - expect(decodeSigned32(signed32(8191))).toEqual(8191) - expect(decodeSigned32(signed32(8192))).toEqual(8192) - expect(decodeSigned32(signed32(1048575))).toEqual(1048575) - expect(decodeSigned32(signed32(1048576))).toEqual(1048576) - expect(decodeSigned32(signed32(134217727))).toEqual(134217727) - expect(decodeSigned32(signed32(134217728))).toEqual(134217728) + expect(decode32(signed32(0))).toEqual(0) + expect(decode32(signed32(1))).toEqual(1) + expect(decode32(signed32(63))).toEqual(63) + expect(decode32(signed32(64))).toEqual(64) + expect(decode32(signed32(8191))).toEqual(8191) + expect(decode32(signed32(8192))).toEqual(8192) + expect(decode32(signed32(1048575))).toEqual(1048575) + expect(decode32(signed32(1048576))).toEqual(1048576) + expect(decode32(signed32(134217727))).toEqual(134217727) + expect(decode32(signed32(134217728))).toEqual(134217728) - expect(decodeSigned32(signed32(-1))).toEqual(-1) - expect(decodeSigned32(signed32(-64))).toEqual(-64) - expect(decodeSigned32(signed32(-65))).toEqual(-65) - expect(decodeSigned32(signed32(-8192))).toEqual(-8192) - expect(decodeSigned32(signed32(-8193))).toEqual(-8193) - expect(decodeSigned32(signed32(-1048576))).toEqual(-1048576) - expect(decodeSigned32(signed32(-1048577))).toEqual(-1048577) - expect(decodeSigned32(signed32(-134217728))).toEqual(-134217728) - expect(decodeSigned32(signed32(-134217729))).toEqual(-134217729) + expect(decode32(signed32(-1))).toEqual(-1) + expect(decode32(signed32(-64))).toEqual(-64) + expect(decode32(signed32(-65))).toEqual(-65) + expect(decode32(signed32(-8192))).toEqual(-8192) + expect(decode32(signed32(-8193))).toEqual(-8193) + expect(decode32(signed32(-1048576))).toEqual(-1048576) + expect(decode32(signed32(-1048577))).toEqual(-1048577) + expect(decode32(signed32(-134217728))).toEqual(-134217728) + expect(decode32(signed32(-134217729))).toEqual(-134217729) }) test('decode signed int32 boundaries', () => { - expect(decodeSigned32(signed32(MAX_SAFE_POSITIVE_SIGNED_INT))).toEqual( - MAX_SAFE_POSITIVE_SIGNED_INT - ) - expect(decodeSigned32(signed32(MIN_SAFE_NEGATIVE_SIGNED_INT))).toEqual( - MIN_SAFE_NEGATIVE_SIGNED_INT - ) + expect(decode32(signed32(MAX_SAFE_POSITIVE_SIGNED_INT))).toEqual(MAX_SAFE_POSITIVE_SIGNED_INT) + expect(decode32(signed32(MIN_SAFE_NEGATIVE_SIGNED_INT))).toEqual(MIN_SAFE_NEGATIVE_SIGNED_INT) }) }) From 01423d746be5ffbd210fd82c21b69508e169ab23 Mon Sep 17 00:00:00 2001 From: tulios Date: Tue, 15 May 2018 23:49:25 +0200 Subject: [PATCH 08/10] Update readUnsignedVarInt32 error to an unretriable error --- src/protocol/decoder.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/protocol/decoder.js b/src/protocol/decoder.js index ea4b18a71..9fc3d024b 100644 --- a/src/protocol/decoder.js +++ b/src/protocol/decoder.js @@ -1,4 +1,5 @@ const Long = require('long') +const { KafkaJSNonRetriableError } = require('../errors') const INT8_SIZE = 1 const INT16_SIZE = 2 @@ -141,7 +142,9 @@ module.exports = class Decoder { i += 7 if (i > 35) { - throw new Error('Variable length quantity is too long') + throw new KafkaJSNonRetriableError( + `Failed to decode varint, variable length quantity is too long (i > 25) i = ${i}` + ) } } From 973d86f87688f5164a222ce4bd2347e81367385c Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 16 May 2018 00:00:25 +0200 Subject: [PATCH 09/10] Make encoder tests easier to read --- src/protocol/encoder.spec.js | 259 +++++++++++++++-------------------- 1 file changed, 107 insertions(+), 152 deletions(-) diff --git a/src/protocol/encoder.spec.js b/src/protocol/encoder.spec.js index f8f78caf4..3d7f6bb80 100644 --- a/src/protocol/encoder.spec.js +++ b/src/protocol/encoder.spec.js @@ -13,37 +13,36 @@ describe('Protocol > Encoder', () => { const signed64 = number => new Encoder().writeSignedVarInt64(number).buffer const decode64 = buffer => new Decoder(buffer).readSignedVarInt64() + const B = (...args) => Buffer.from(args) + const L = value => Long.fromString(`${value}`) + describe('varint', () => { test('encode signed int32 numbers', () => { - expect(signed32(0)).toEqual(Buffer.from([0x00])) - expect(signed32(1)).toEqual(Buffer.from([0x02])) - expect(signed32(63)).toEqual(Buffer.from([0x7e])) - expect(signed32(64)).toEqual(Buffer.from([0x80, 0x01])) - expect(signed32(8191)).toEqual(Buffer.from([0xfe, 0x7f])) - expect(signed32(8192)).toEqual(Buffer.from([0x80, 0x80, 0x01])) - expect(signed32(1048575)).toEqual(Buffer.from([0xfe, 0xff, 0x7f])) - expect(signed32(1048576)).toEqual(Buffer.from([0x80, 0x80, 0x80, 0x01])) - expect(signed32(134217727)).toEqual(Buffer.from([0xfe, 0xff, 0xff, 0x7f])) - expect(signed32(134217728)).toEqual(Buffer.from([0x80, 0x80, 0x80, 0x80, 0x01])) + expect(signed32(0)).toEqual(B(0x00)) + expect(signed32(1)).toEqual(B(0x02)) + expect(signed32(63)).toEqual(B(0x7e)) + expect(signed32(64)).toEqual(B(0x80, 0x01)) + expect(signed32(8191)).toEqual(B(0xfe, 0x7f)) + expect(signed32(8192)).toEqual(B(0x80, 0x80, 0x01)) + expect(signed32(1048575)).toEqual(B(0xfe, 0xff, 0x7f)) + expect(signed32(1048576)).toEqual(B(0x80, 0x80, 0x80, 0x01)) + expect(signed32(134217727)).toEqual(B(0xfe, 0xff, 0xff, 0x7f)) + expect(signed32(134217728)).toEqual(B(0x80, 0x80, 0x80, 0x80, 0x01)) - expect(signed32(-1)).toEqual(Buffer.from([0x01])) - expect(signed32(-64)).toEqual(Buffer.from([0x7f])) - expect(signed32(-65)).toEqual(Buffer.from([0x81, 0x01])) - expect(signed32(-8192)).toEqual(Buffer.from([0xff, 0x7f])) - expect(signed32(-8193)).toEqual(Buffer.from([0x81, 0x80, 0x01])) - expect(signed32(-1048576)).toEqual(Buffer.from([0xff, 0xff, 0x7f])) - expect(signed32(-1048577)).toEqual(Buffer.from([0x81, 0x80, 0x80, 0x01])) - expect(signed32(-134217728)).toEqual(Buffer.from([0xff, 0xff, 0xff, 0x7f])) - expect(signed32(-134217729)).toEqual(Buffer.from([0x81, 0x80, 0x80, 0x80, 0x01])) + expect(signed32(-1)).toEqual(B(0x01)) + expect(signed32(-64)).toEqual(B(0x7f)) + expect(signed32(-65)).toEqual(B(0x81, 0x01)) + expect(signed32(-8192)).toEqual(B(0xff, 0x7f)) + expect(signed32(-8193)).toEqual(B(0x81, 0x80, 0x01)) + expect(signed32(-1048576)).toEqual(B(0xff, 0xff, 0x7f)) + expect(signed32(-1048577)).toEqual(B(0x81, 0x80, 0x80, 0x01)) + expect(signed32(-134217728)).toEqual(B(0xff, 0xff, 0xff, 0x7f)) + expect(signed32(-134217729)).toEqual(B(0x81, 0x80, 0x80, 0x80, 0x01)) }) test('encode signed int32 boundaries', () => { - expect(signed32(MAX_SAFE_POSITIVE_SIGNED_INT)).toEqual( - Buffer.from([0xfe, 0xff, 0xff, 0xff, 0x0f]) - ) - expect(signed32(MIN_SAFE_NEGATIVE_SIGNED_INT)).toEqual( - Buffer.from([0xff, 0xff, 0xff, 0xff, 0x0f]) - ) + expect(signed32(MAX_SAFE_POSITIVE_SIGNED_INT)).toEqual(B(0xfe, 0xff, 0xff, 0xff, 0x0f)) + expect(signed32(MIN_SAFE_NEGATIVE_SIGNED_INT)).toEqual(B(0xff, 0xff, 0xff, 0xff, 0x0f)) }) test('decode int32 numbers', () => { @@ -77,144 +76,100 @@ describe('Protocol > Encoder', () => { describe('varlong', () => { test('encode signed int64 number', () => { - expect(signed64(0)).toEqual(Buffer.from([0x00])) - expect(signed64(1)).toEqual(Buffer.from([0x02])) - expect(signed64(63)).toEqual(Buffer.from([0x7e])) - expect(signed64(64)).toEqual(Buffer.from([0x80, 0x01])) - expect(signed64(8191)).toEqual(Buffer.from([0xfe, 0x7f])) - expect(signed64(8192)).toEqual(Buffer.from([0x80, 0x80, 0x01])) - expect(signed64(1048575)).toEqual(Buffer.from([0xfe, 0xff, 0x7f])) - expect(signed64(1048576)).toEqual(Buffer.from([0x80, 0x80, 0x80, 0x01])) - expect(signed64(134217727)).toEqual(Buffer.from([0xfe, 0xff, 0xff, 0x7f])) - expect(signed64(134217728)).toEqual(Buffer.from([0x80, 0x80, 0x80, 0x80, 0x01])) - expect(signed64(MAX_SAFE_POSITIVE_SIGNED_INT)).toEqual( - Buffer.from([0xfe, 0xff, 0xff, 0xff, 0x0f]) - ) - expect(signed64(Long.fromString('17179869183'))).toEqual( - Buffer.from([0xfe, 0xff, 0xff, 0xff, 0x7f]) - ) - expect(signed64(Long.fromString('17179869184'))).toEqual( - Buffer.from([0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) - ) - expect(signed64(Long.fromString('2199023255551'))).toEqual( - Buffer.from([0xfe, 0xff, 0xff, 0xff, 0xff, 0x7f]) - ) - expect(signed64(Long.fromString('2199023255552'))).toEqual( - Buffer.from([0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) - ) - expect(signed64(Long.fromString('281474976710655'))).toEqual( - Buffer.from([0xfe, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f]) - ) - expect(signed64(Long.fromString('281474976710656'))).toEqual( - Buffer.from([0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) - ) - expect(signed64(Long.fromString('36028797018963967'))).toEqual( - Buffer.from([0xfe, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f]) - ) - expect(signed64(Long.fromString('36028797018963968'))).toEqual( - Buffer.from([0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) - ) - expect(signed64(Long.fromString('4611686018427387903'))).toEqual( - Buffer.from([0xfe, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f]) - ) - expect(signed64(Long.fromString('4611686018427387904'))).toEqual( - Buffer.from([0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) + expect(signed64(0)).toEqual(B(0x00)) + expect(signed64(1)).toEqual(B(0x02)) + expect(signed64(63)).toEqual(B(0x7e)) + expect(signed64(64)).toEqual(B(0x80, 0x01)) + expect(signed64(8191)).toEqual(B(0xfe, 0x7f)) + expect(signed64(8192)).toEqual(B(0x80, 0x80, 0x01)) + expect(signed64(1048575)).toEqual(B(0xfe, 0xff, 0x7f)) + expect(signed64(1048576)).toEqual(B(0x80, 0x80, 0x80, 0x01)) + expect(signed64(134217727)).toEqual(B(0xfe, 0xff, 0xff, 0x7f)) + expect(signed64(134217728)).toEqual(B(0x80, 0x80, 0x80, 0x80, 0x01)) + expect(signed64(MAX_SAFE_POSITIVE_SIGNED_INT)).toEqual(B(0xfe, 0xff, 0xff, 0xff, 0x0f)) + expect(signed64(L('17179869183'))).toEqual(B(0xfe, 0xff, 0xff, 0xff, 0x7f)) + expect(signed64(L('17179869184'))).toEqual(B(0x80, 0x80, 0x80, 0x80, 0x80, 0x01)) + expect(signed64(L('2199023255551'))).toEqual(B(0xfe, 0xff, 0xff, 0xff, 0xff, 0x7f)) + expect(signed64(L('2199023255552'))).toEqual(B(0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01)) + expect(signed64(L('281474976710655'))).toEqual(B(0xfe, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f)) + expect(signed64(L('281474976710656'))).toEqual( + B(0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01) + ) + expect(signed64(L('36028797018963967'))).toEqual( + B(0xfe, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f) + ) + expect(signed64(L('36028797018963968'))).toEqual( + B(0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01) + ) + expect(signed64(L('4611686018427387903'))).toEqual( + B(0xfe, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f) + ) + expect(signed64(L('4611686018427387904'))).toEqual( + B(0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01) ) expect(signed64(Long.MAX_VALUE)).toEqual( - Buffer.from([0xfe, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x01]) + B(0xfe, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x01) ) - expect(signed64(-1)).toEqual(Buffer.from([0x01])) - expect(signed64(-64)).toEqual(Buffer.from([0x7f])) - expect(signed64(-65)).toEqual(Buffer.from([0x81, 0x01])) - expect(signed64(-8192)).toEqual(Buffer.from([0xff, 0x7f])) - expect(signed64(-8193)).toEqual(Buffer.from([0x81, 0x80, 0x01])) - expect(signed64(-1048576)).toEqual(Buffer.from([0xff, 0xff, 0x7f])) - expect(signed64(-1048577)).toEqual(Buffer.from([0x81, 0x80, 0x80, 0x01])) - expect(signed64(-134217728)).toEqual(Buffer.from([0xff, 0xff, 0xff, 0x7f])) - expect(signed64(-134217729)).toEqual(Buffer.from([0x81, 0x80, 0x80, 0x80, 0x01])) - expect(signed64(MIN_SAFE_NEGATIVE_SIGNED_INT)).toEqual( - Buffer.from([0xff, 0xff, 0xff, 0xff, 0x0f]) - ) - expect(signed64(Long.fromString('-17179869184'))).toEqual( - Buffer.from([0xff, 0xff, 0xff, 0xff, 0x7f]) - ) - expect(signed64(Long.fromString('-17179869185'))).toEqual( - Buffer.from([0x81, 0x80, 0x80, 0x80, 0x80, 0x01]) - ) - expect(signed64(Long.fromString('-2199023255552'))).toEqual( - Buffer.from([0xff, 0xff, 0xff, 0xff, 0xff, 0x7f]) - ) - expect(signed64(Long.fromString('-2199023255553'))).toEqual( - Buffer.from([0x81, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) - ) - expect(signed64(Long.fromString('-281474976710656'))).toEqual( - Buffer.from([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f]) - ) - expect(signed64(Long.fromString('-281474976710657'))).toEqual( - Buffer.from([0x81, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 1]) - ) - expect(signed64(Long.fromString('-36028797018963968'))).toEqual( - Buffer.from([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f]) - ) - expect(signed64(Long.fromString('-36028797018963969'))).toEqual( - Buffer.from([0x81, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) - ) - expect(signed64(Long.fromString('-4611686018427387904'))).toEqual( - Buffer.from([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f]) - ) - expect(signed64(Long.fromString('-4611686018427387905'))).toEqual( - Buffer.from([0x81, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01]) + expect(signed64(-1)).toEqual(B(0x01)) + expect(signed64(-64)).toEqual(B(0x7f)) + expect(signed64(-65)).toEqual(B(0x81, 0x01)) + expect(signed64(-8192)).toEqual(B(0xff, 0x7f)) + expect(signed64(-8193)).toEqual(B(0x81, 0x80, 0x01)) + expect(signed64(-1048576)).toEqual(B(0xff, 0xff, 0x7f)) + expect(signed64(-1048577)).toEqual(B(0x81, 0x80, 0x80, 0x01)) + expect(signed64(-134217728)).toEqual(B(0xff, 0xff, 0xff, 0x7f)) + expect(signed64(-134217729)).toEqual(B(0x81, 0x80, 0x80, 0x80, 0x01)) + expect(signed64(MIN_SAFE_NEGATIVE_SIGNED_INT)).toEqual(B(0xff, 0xff, 0xff, 0xff, 0x0f)) + expect(signed64(L('-17179869184'))).toEqual(B(0xff, 0xff, 0xff, 0xff, 0x7f)) + expect(signed64(L('-17179869185'))).toEqual(B(0x81, 0x80, 0x80, 0x80, 0x80, 0x01)) + expect(signed64(L('-2199023255552'))).toEqual(B(0xff, 0xff, 0xff, 0xff, 0xff, 0x7f)) + expect(signed64(L('-2199023255553'))).toEqual(B(0x81, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01)) + expect(signed64(L('-281474976710656'))).toEqual(B(0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f)) + expect(signed64(L('-281474976710657'))).toEqual( + B(0x81, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 1) + ) + expect(signed64(L('-36028797018963968'))).toEqual( + B(0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f) + ) + expect(signed64(L('-36028797018963969'))).toEqual( + B(0x81, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01) + ) + expect(signed64(L('-4611686018427387904'))).toEqual( + B(0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x7f) + ) + expect(signed64(L('-4611686018427387905'))).toEqual( + B(0x81, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x01) ) expect(signed64(Long.MIN_VALUE)).toEqual( - Buffer.from([0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x01]) + B(0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x01) ) }) test('decode signed int64 number', () => { - expect(decode64(signed64(0))).toEqual(Long.fromInt(0)) - expect(decode64(signed64(1))).toEqual(Long.fromInt(1)) - expect(decode64(signed64(63))).toEqual(Long.fromInt(63)) - expect(decode64(signed64(64))).toEqual(Long.fromInt(64)) - expect(decode64(signed64(8191))).toEqual(Long.fromInt(8191)) - expect(decode64(signed64(8192))).toEqual(Long.fromInt(8192)) - expect(decode64(signed64(1048575))).toEqual(Long.fromInt(1048575)) - expect(decode64(signed64(1048576))).toEqual(Long.fromInt(1048576)) - expect(decode64(signed64(134217727))).toEqual(Long.fromInt(134217727)) - expect(decode64(signed64(134217728))).toEqual(Long.fromInt(134217728)) + expect(decode64(signed64(0))).toEqual(L(0)) + expect(decode64(signed64(1))).toEqual(L(1)) + expect(decode64(signed64(63))).toEqual(L(63)) + expect(decode64(signed64(64))).toEqual(L(64)) + expect(decode64(signed64(8191))).toEqual(L(8191)) + expect(decode64(signed64(8192))).toEqual(L(8192)) + expect(decode64(signed64(1048575))).toEqual(L(1048575)) + expect(decode64(signed64(1048576))).toEqual(L(1048576)) + expect(decode64(signed64(134217727))).toEqual(L(134217727)) + expect(decode64(signed64(134217728))).toEqual(L(134217728)) expect(decode64(signed64(MAX_SAFE_POSITIVE_SIGNED_INT))).toEqual( - Long.fromInt(MAX_SAFE_POSITIVE_SIGNED_INT) - ) - expect(decode64(signed64(Long.fromString('17179869183')))).toEqual( - Long.fromString('17179869183') - ) - expect(decode64(signed64(Long.fromString('17179869184')))).toEqual( - Long.fromString('17179869184') - ) - expect(decode64(signed64(Long.fromString('2199023255551')))).toEqual( - Long.fromString('2199023255551') - ) - expect(decode64(signed64(Long.fromString('2199023255552')))).toEqual( - Long.fromString('2199023255552') - ) - expect(decode64(signed64(Long.fromString('281474976710655')))).toEqual( - Long.fromString('281474976710655') - ) - expect(decode64(signed64(Long.fromString('281474976710656')))).toEqual( - Long.fromString('281474976710656') - ) - expect(decode64(signed64(Long.fromString('36028797018963967')))).toEqual( - Long.fromString('36028797018963967') - ) - expect(decode64(signed64(Long.fromString('36028797018963968')))).toEqual( - Long.fromString('36028797018963968') - ) - expect(decode64(signed64(Long.fromString('4611686018427387903')))).toEqual( - Long.fromString('4611686018427387903') - ) - expect(decode64(signed64(Long.fromString('4611686018427387904')))).toEqual( - Long.fromString('4611686018427387904') - ) + L(MAX_SAFE_POSITIVE_SIGNED_INT) + ) + expect(decode64(signed64(L('17179869183')))).toEqual(L('17179869183')) + expect(decode64(signed64(L('17179869184')))).toEqual(L('17179869184')) + expect(decode64(signed64(L('2199023255551')))).toEqual(L('2199023255551')) + expect(decode64(signed64(L('2199023255552')))).toEqual(L('2199023255552')) + expect(decode64(signed64(L('281474976710655')))).toEqual(L('281474976710655')) + expect(decode64(signed64(L('281474976710656')))).toEqual(L('281474976710656')) + expect(decode64(signed64(L('36028797018963967')))).toEqual(L('36028797018963967')) + expect(decode64(signed64(L('36028797018963968')))).toEqual(L('36028797018963968')) + expect(decode64(signed64(L('4611686018427387903')))).toEqual(L('4611686018427387903')) + expect(decode64(signed64(L('4611686018427387904')))).toEqual(L('4611686018427387904')) expect(decode64(signed64(Long.MAX_VALUE))).toEqual(Long.MAX_VALUE) }) }) From f26657ae4045f844aba56d510c27cb6951581148 Mon Sep 17 00:00:00 2001 From: tulios Date: Wed, 16 May 2018 23:27:36 +0200 Subject: [PATCH 10/10] Rewrite read varint to match varlong implementation and remove the unsigned version of the varint/long --- src/protocol/decoder.js | 52 +++++++++++------------------------------ src/protocol/encoder.js | 30 +++++++++++------------- 2 files changed, 28 insertions(+), 54 deletions(-) diff --git a/src/protocol/decoder.js b/src/protocol/decoder.js index 9fc3d024b..f7cad260c 100644 --- a/src/protocol/decoder.js +++ b/src/protocol/decoder.js @@ -1,5 +1,4 @@ const Long = require('long') -const { KafkaJSNonRetriableError } = require('../errors') const INT8_SIZE = 1 const INT16_SIZE = 2 @@ -124,48 +123,25 @@ module.exports = class Decoder { return array } - // https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/util/Varint.java#L207 - readUnsignedVarInt32() { - let value = 0 + readSignedVarInt32() { + let currentByte + let result = 0 let i = 0 - let lastByte = 0 - - while (true) { - const currentByte = this.buffer[this.offset++] - lastByte = currentByte - const isLastByte = (currentByte & MOST_SIGNIFICANT_BIT) === 0 - - if (isLastByte) break - // Concatenate the octets (sum the numbers) - value = value | ((currentByte & OTHER_BITS) << i) + do { + currentByte = this.buffer[this.offset++] + result += (currentByte & OTHER_BITS) << i i += 7 + } while (currentByte >= MOST_SIGNIFICANT_BIT) - if (i > 35) { - throw new KafkaJSNonRetriableError( - `Failed to decode varint, variable length quantity is too long (i > 25) i = ${i}` - ) - } - } - - return value | (lastByte << i) + return this.decodeZigZag(result) } - // https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/util/Varint.java#L197 - readSignedVarInt32() { - const raw = this.readUnsignedVarInt32() - - // This undoes the trick in writeSignedVarInt() - // https://developers.google.com/protocol-buffers/docs/encoding?csw=1#types - const temp = (((raw << 31) >> 31) ^ raw) >> 1 - - // This extra step lets us deal with the largest signed values by treating - // negative results from read unsigned methods as like unsigned values. - // Must re-flip the top bit if the original read value had it set. - return temp ^ (raw & (1 << 31)) + decodeZigZag(value) { + return (value >>> 1) ^ -(value & 1) } - readUnsignedVarInt64() { + readSignedVarInt64() { let currentByte let result = Long.fromInt(0) let i = 0 @@ -175,11 +151,11 @@ module.exports = class Decoder { result = result.add(Long.fromInt(currentByte & OTHER_BITS).shiftLeft(i)) i += 7 } while (currentByte >= MOST_SIGNIFICANT_BIT) - return result + + return this.decodeZigZag64(result) } - readSignedVarInt64() { - const longValue = this.readUnsignedVarInt64() + decodeZigZag64(longValue) { return longValue.shiftRightUnsigned(1).xor(longValue.and(Long.fromInt(1)).negate()) } diff --git a/src/protocol/encoder.js b/src/protocol/encoder.js index 4eabc1a86..4c4f2df5a 100644 --- a/src/protocol/encoder.js +++ b/src/protocol/encoder.js @@ -116,26 +116,27 @@ module.exports = class Encoder { // Based on: // https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/util/Varint.java#L106 - writeUnsignedVarInt32(value) { + writeSignedVarInt32(value) { const byteArray = [] - while ((value & UNSIGNED_INT32_MAX_NUMBER) !== 0) { - byteArray.push((value & OTHER_BITS) | MOST_SIGNIFICANT_BIT) - value >>>= 7 + let encodedValue = this.encodeZigZag(value) + + while ((encodedValue & UNSIGNED_INT32_MAX_NUMBER) !== 0) { + byteArray.push((encodedValue & OTHER_BITS) | MOST_SIGNIFICANT_BIT) + encodedValue >>>= 7 } - byteArray.push(value & OTHER_BITS) + byteArray.push(encodedValue & OTHER_BITS) this.buffer = Buffer.concat([this.buffer, Buffer.from(byteArray)]) return this } - // https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/util/Varint.java#L95 - writeSignedVarInt32(value) { - return this.writeUnsignedVarInt32((value << 1) ^ (value >> 31)) + encodeZigZag(value) { + return (value << 1) ^ (value >> 31) } - writeUnsignedVarInt64(value) { + writeSignedVarInt64(value) { const byteArray = [] - let longValue = Long.fromValue(value) + let longValue = this.encodeZigZag64(value) while (longValue.and(UNSIGNED_INT64_MAX_NUMBER).notEquals(Long.fromInt(0))) { byteArray.push( @@ -153,12 +154,9 @@ module.exports = class Encoder { return this } - writeSignedVarInt64(value) { - let longValue = Long.fromValue(value) - const unsignedLong = longValue.shiftLeft(1).xor(longValue.shiftRight(63)) - this.writeUnsignedVarInt64(unsignedLong) - - return this + encodeZigZag64(value) { + const longValue = Long.fromValue(value) + return longValue.shiftLeft(1).xor(longValue.shiftRight(63)) } size() {