diff --git a/packages/beacon-node/src/network/gossip/encoding.ts b/packages/beacon-node/src/network/gossip/encoding.ts index 32cc36d9a2f..8dc84e17b57 100644 --- a/packages/beacon-node/src/network/gossip/encoding.ts +++ b/packages/beacon-node/src/network/gossip/encoding.ts @@ -9,6 +9,7 @@ import {ForkName} from "@lodestar/params"; import {intToBytes} from "@lodestar/utils"; import {MESSAGE_DOMAIN_VALID_SNAPPY} from "./constants.js"; import {Eth2GossipsubMetrics} from "./metrics.js"; +import {getSnappyDecompressor} from "./snappy/index.js"; import {GossipTopicCache, getGossipSSZType} from "./topic.js"; // Load WASM @@ -17,9 +18,8 @@ const xxhash = await xxhashFactory(); // Use salt to prevent msgId from being mined for collisions const h64Seed = BigInt(Math.floor(Math.random() * 1e9)); -// create singleton snappy encoder + decoder +// to compress outgoing data, we always go with snappy-wasm, this is singleton encoder const encoder = new snappyWasm.Encoder(); -const decoder = new snappyWasm.Decoder(); // Shared buffer to convert msgId to string const sharedMsgIdBuf = Buffer.alloc(20); @@ -86,26 +86,46 @@ export class DataTransformSnappy implements DataTransform { * - `outboundTransform()`: compress snappy payload */ inboundTransform(topicStr: string, data: Uint8Array): Uint8Array { - // check uncompressed data length before we actually decompress - const uncompressedDataLength = snappyWasm.decompress_len(data); - if (uncompressedDataLength > this.maxSizePerMessage) { - throw Error(`ssz_snappy decoded data length ${uncompressedDataLength} > ${this.maxSizePerMessage}`); - } - const topic = this.gossipTopicCache.getTopic(topicStr); const sszType = getGossipSSZType(topic); this.metrics?.dataTransform.inbound.inc({type: topic.type}); + // check uncompressed data length before we actually decompress + const decompressor = getSnappyDecompressor(topic.type, data); + const uncompressedDataLength = decompressor.readUncompressedLength(); + if (uncompressedDataLength < 0) { + throw Error( + `ssz_snappy failed to read uncompressed length for topic ${topicStr}, compressed length ${data.length}` + ); + } + + if (uncompressedDataLength > this.maxSizePerMessage) { + throw Error( + `ssz_snappy decoded data length ${uncompressedDataLength} > ${this.maxSizePerMessage} for topic ${topicStr}` + ); + } + if (uncompressedDataLength < sszType.minSize) { - throw Error(`ssz_snappy decoded data length ${uncompressedDataLength} < ${sszType.minSize}`); + throw Error( + `ssz_snappy decoded data length ${uncompressedDataLength} < ${sszType.minSize} for topic ${topicStr}` + ); } + if (uncompressedDataLength > sszType.maxSize) { - throw Error(`ssz_snappy decoded data length ${uncompressedDataLength} > ${sszType.maxSize}`); + throw Error( + `ssz_snappy decoded data length ${uncompressedDataLength} > ${sszType.maxSize} for topic ${topicStr}` + ); } // Only after sanity length checks, we can decompress the data - const uncompressedData = Buffer.allocUnsafe(uncompressedDataLength); - decoder.decompress_into(data, uncompressedData); + // using Buffer.allocUnsafe() caused huge MarkSweepCompact gc on the main thread of sas nodes + const uncompressedData = Buffer.alloc(uncompressedDataLength); + if (!decompressor.uncompressInto(uncompressedData)) { + throw Error( + `ssz_snappy failed to decompress data for topic ${topicStr}, compressed length ${data.length}, expected uncompressed length ${uncompressedDataLength}` + ); + } + return uncompressedData; } @@ -120,7 +140,8 @@ export class DataTransformSnappy implements DataTransform { throw Error(`ssz_snappy encoded data length ${data.length} > ${this.maxSizePerMessage}`); } - const compressedData = Buffer.allocUnsafe(snappyWasm.max_compress_len(data.length)); + // using Buffer.allocUnsafe() caused huge MarkSweepCompact gc on the main thread of sas nodes + const compressedData = Buffer.alloc(snappyWasm.max_compress_len(data.length)); const compressedLen = encoder.compress_into(data, compressedData); return compressedData.subarray(0, compressedLen); } diff --git a/packages/beacon-node/src/network/gossip/snappy/index.ts b/packages/beacon-node/src/network/gossip/snappy/index.ts new file mode 100644 index 00000000000..73706f279fb --- /dev/null +++ b/packages/beacon-node/src/network/gossip/snappy/index.ts @@ -0,0 +1,19 @@ +import {GossipType} from "../interface.js"; +import {ISnappyDecompressor} from "./interface.js"; +import {SnappyDecompressor} from "./snappy-js/decompressor.js"; +import {SnappyWasmDecompressor} from "./snappy-wasm.js"; + +/** + * for decompression, we use different implementations based on topic type + * snappy-wasm is generally better for larger payloads and snappyjs is better for smaller payloads + */ +export function getSnappyDecompressor(topicType: GossipType, data: Uint8Array): ISnappyDecompressor { + switch (topicType) { + case GossipType.beacon_block: + case GossipType.blob_sidecar: + case GossipType.data_column_sidecar: + return new SnappyWasmDecompressor(data); + default: + return new SnappyDecompressor(data); + } +} diff --git a/packages/beacon-node/src/network/gossip/snappy/interface.ts b/packages/beacon-node/src/network/gossip/snappy/interface.ts new file mode 100644 index 00000000000..6bebb37be59 --- /dev/null +++ b/packages/beacon-node/src/network/gossip/snappy/interface.ts @@ -0,0 +1,4 @@ +export interface ISnappyDecompressor { + readUncompressedLength(): number; + uncompressInto(outBuffer: Uint8Array): boolean; +} diff --git a/packages/beacon-node/src/network/gossip/snappy/snappy-js/compressor.ts b/packages/beacon-node/src/network/gossip/snappy/snappy-js/compressor.ts new file mode 100644 index 00000000000..703e59ea4a6 --- /dev/null +++ b/packages/beacon-node/src/network/gossip/snappy/snappy-js/compressor.ts @@ -0,0 +1,212 @@ +const BLOCK_LOG = 16; +const BLOCK_SIZE = 1 << BLOCK_LOG; + +const MAX_HASH_TABLE_BITS = 14; +const globalHashTables = new Array(MAX_HASH_TABLE_BITS + 1); + +export class SnappyCompressor { + constructor(private readonly array: Uint8Array) {} + + maxCompressedLength(): number { + const sourceLen = this.array.length; + return 32 + sourceLen + Math.floor(sourceLen / 6); + } + + compressToBuffer(outBuffer: Uint8Array): number { + const array = this.array; + const length = array.length; + let pos = 0; + let outPos = 0; + + let fragmentSize: number; + + outPos = putVarint(length, outBuffer, outPos); + while (pos < length) { + fragmentSize = Math.min(length - pos, BLOCK_SIZE); + outPos = compressFragment(array, pos, fragmentSize, outBuffer, outPos); + pos += fragmentSize; + } + + return outPos; + } +} + +function hashFunc(key: number, hashFuncShift: number): number { + return (key * 0x1e35a7bd) >>> hashFuncShift; +} + +function load32(array: Uint8Array, pos: number): number { + return array[pos] + (array[pos + 1] << 8) + (array[pos + 2] << 16) + (array[pos + 3] << 24); +} + +function equals32(array: Uint8Array, pos1: number, pos2: number): boolean { + return ( + array[pos1] === array[pos2] && + array[pos1 + 1] === array[pos2 + 1] && + array[pos1 + 2] === array[pos2 + 2] && + array[pos1 + 3] === array[pos2 + 3] + ); +} + +function copyBytes(fromArray: Uint8Array, fromPos: number, toArray: Uint8Array, toPos: number, length: number): void { + for (let i = 0; i < length; i++) { + toArray[toPos + i] = fromArray[fromPos + i]; + } +} + +function emitLiteral(input: Uint8Array, ip: number, len: number, output: Uint8Array, op: number): number { + if (len <= 60) { + output[op] = (len - 1) << 2; + op += 1; + } else if (len < 256) { + output[op] = 60 << 2; + output[op + 1] = len - 1; + op += 2; + } else { + output[op] = 61 << 2; + output[op + 1] = (len - 1) & 0xff; + output[op + 2] = (len - 1) >>> 8; + op += 3; + } + copyBytes(input, ip, output, op, len); + return op + len; +} + +function emitCopyLessThan64(output: Uint8Array, op: number, offset: number, len: number): number { + if (len < 12 && offset < 2048) { + output[op] = 1 + ((len - 4) << 2) + ((offset >>> 8) << 5); + output[op + 1] = offset & 0xff; + return op + 2; + } + output[op] = 2 + ((len - 1) << 2); + output[op + 1] = offset & 0xff; + output[op + 2] = offset >>> 8; + return op + 3; +} + +function emitCopy(output: Uint8Array, op: number, offset: number, len: number): number { + while (len >= 68) { + op = emitCopyLessThan64(output, op, offset, 64); + len -= 64; + } + if (len > 64) { + op = emitCopyLessThan64(output, op, offset, 60); + len -= 60; + } + return emitCopyLessThan64(output, op, offset, len); +} + +function compressFragment(input: Uint8Array, ip: number, inputSize: number, output: Uint8Array, op: number): number { + let hashTableBits = 1; + while (1 << hashTableBits <= inputSize && hashTableBits <= MAX_HASH_TABLE_BITS) { + hashTableBits += 1; + } + hashTableBits -= 1; + const hashFuncShift = 32 - hashTableBits; + + if (typeof globalHashTables[hashTableBits] === "undefined") { + globalHashTables[hashTableBits] = new Uint16Array(1 << hashTableBits); + } + const hashTable = globalHashTables[hashTableBits]; + for (let i = 0; i < hashTable.length; i++) { + hashTable[i] = 0; + } + + const ipEnd = ip + inputSize; + let ipLimit: number; + const baseIp = ip; + let nextEmit = ip; + + let hash: number; + let nextHash: number; + let nextIp: number; + let candidate = 0; + let skip: number; + let bytesBetweenHashLookups: number; + let base: number; + let matched: number; + let offset: number; + let prevHash: number; + let curHash: number; + let flag = true; + + const INPUT_MARGIN = 15; + if (inputSize >= INPUT_MARGIN) { + ipLimit = ipEnd - INPUT_MARGIN; + + ip += 1; + nextHash = hashFunc(load32(input, ip), hashFuncShift); + + while (flag) { + skip = 32; + nextIp = ip; + do { + ip = nextIp; + hash = nextHash; + bytesBetweenHashLookups = skip >>> 5; + skip += 1; + nextIp = ip + bytesBetweenHashLookups; + if (ip > ipLimit) { + flag = false; + break; + } + nextHash = hashFunc(load32(input, nextIp), hashFuncShift); + candidate = baseIp + hashTable[hash]; + hashTable[hash] = ip - baseIp; + } while (!equals32(input, ip, candidate)); + + if (!flag) { + break; + } + + op = emitLiteral(input, nextEmit, ip - nextEmit, output, op); + + do { + base = ip; + matched = 4; + while (ip + matched < ipEnd && input[ip + matched] === input[candidate + matched]) { + matched += 1; + } + ip += matched; + offset = base - candidate; + op = emitCopy(output, op, offset, matched); + + nextEmit = ip; + if (ip >= ipLimit) { + flag = false; + break; + } + prevHash = hashFunc(load32(input, ip - 1), hashFuncShift); + hashTable[prevHash] = ip - 1 - baseIp; + curHash = hashFunc(load32(input, ip), hashFuncShift); + candidate = baseIp + hashTable[curHash]; + hashTable[curHash] = ip - baseIp; + } while (equals32(input, ip, candidate)); + + if (!flag) { + break; + } + + ip += 1; + nextHash = hashFunc(load32(input, ip), hashFuncShift); + } + } + + if (nextEmit < ipEnd) { + op = emitLiteral(input, nextEmit, ipEnd - nextEmit, output, op); + } + + return op; +} + +function putVarint(value: number, output: Uint8Array, op: number): number { + do { + output[op] = value & 0x7f; + value = value >>> 7; + if (value > 0) { + output[op] += 0x80; + } + op += 1; + } while (value > 0); + return op; +} diff --git a/packages/beacon-node/src/network/gossip/snappy/snappy-js/decompressor.ts b/packages/beacon-node/src/network/gossip/snappy/snappy-js/decompressor.ts new file mode 100644 index 00000000000..4135c502119 --- /dev/null +++ b/packages/beacon-node/src/network/gossip/snappy/snappy-js/decompressor.ts @@ -0,0 +1,109 @@ +import {ISnappyDecompressor} from "../interface.js"; + +const WORD_MASK = [0, 0xff, 0xffff, 0xffffff, 0xffffffff]; +function copyBytes(fromArray: Uint8Array, fromPos: number, toArray: Uint8Array, toPos: number, length: number): void { + for (let i = 0; i < length; i++) { + toArray[toPos + i] = fromArray[fromPos + i]; + } +} + +function selfCopyBytes(array: Uint8Array, pos: number, offset: number, length: number): void { + for (let i = 0; i < length; i++) { + array[pos + i] = array[pos - offset + i]; + } +} + +export class SnappyDecompressor implements ISnappyDecompressor { + private pos = 0; + constructor(private readonly array: Uint8Array) {} + + readUncompressedLength(): number { + let result = 0; + let shift = 0; + let c: number; + let val: number; + + while (shift < 32 && this.pos < this.array.length) { + c = this.array[this.pos]; + this.pos += 1; + val = c & 0x7f; + if ((val << shift) >>> shift !== val) { + return -1; + } + result |= val << shift; + if (c < 128) { + return result; + } + shift += 7; + } + return -1; + } + + uncompressInto(outBuffer: Uint8Array): boolean { + const array = this.array; + const arrayLength = array.length; + let pos = this.pos; + let outPos = 0; + + let c: number; + let len = 0; + let smallLen: number; + let offset = 0; + + while (pos < array.length) { + c = array[pos]; + pos += 1; + if ((c & 0x3) === 0) { + // Literal + len = (c >>> 2) + 1; + if (len > 60) { + if (pos + 3 >= arrayLength) { + return false; + } + smallLen = len - 60; + len = array[pos] + (array[pos + 1] << 8) + (array[pos + 2] << 16) + (array[pos + 3] << 24); + len = (len & WORD_MASK[smallLen]) + 1; + pos += smallLen; + } + if (pos + len > arrayLength) { + return false; + } + copyBytes(array, pos, outBuffer, outPos, len); + pos += len; + outPos += len; + } else { + switch (c & 0x3) { + case 1: + len = ((c >>> 2) & 0x7) + 4; + offset = array[pos] + ((c >>> 5) << 8); + pos += 1; + break; + case 2: + if (pos + 1 >= arrayLength) { + return false; + } + len = (c >>> 2) + 1; + offset = array[pos] + (array[pos + 1] << 8); + pos += 2; + break; + case 3: + if (pos + 3 >= arrayLength) { + return false; + } + len = (c >>> 2) + 1; + offset = array[pos] + (array[pos + 1] << 8) + (array[pos + 2] << 16) + (array[pos + 3] << 24); + pos += 4; + break; + default: + break; + } + if (offset === 0 || offset > outPos) { + return false; + } + selfCopyBytes(outBuffer, outPos, offset, len); + outPos += len; + } + } + return true; + } +} diff --git a/packages/beacon-node/src/network/gossip/snappy/snappy-js/error.ts b/packages/beacon-node/src/network/gossip/snappy/snappy-js/error.ts new file mode 100644 index 00000000000..84188e1b17f --- /dev/null +++ b/packages/beacon-node/src/network/gossip/snappy/snappy-js/error.ts @@ -0,0 +1,14 @@ +import {LodestarError} from "@lodestar/utils"; + +export enum SnappyErrorCode { + UNCOMPRESS_EXCEED_MAX_LENGTH = "SNAPPY_ERROR_UNCOMPRESS_EXCEED_MAX_LENGTH", + UNCOMPRESS_CANNOT_EXTRACT_LENGTH = "SNAPPY_ERROR_UNCOMPRESS_CANNOT_EXTRACT_LENGTH", + UNCOMPRESS_BUFFER_TOO_SMALL = "SNAPPY_ERROR_UNCOMPRESS_BUFFER_TOO_SMALL", + UNCOMPRESS_INVALID_BITSTREAM = "SNAPPY_ERROR_UNCOMPRESS_INVALID_BITSTREAM", +} + +export class SnappyError extends LodestarError { + constructor(type: T) { + super(type); + } +} diff --git a/packages/beacon-node/src/network/gossip/snappy/snappy-js/index.ts b/packages/beacon-node/src/network/gossip/snappy/snappy-js/index.ts new file mode 100644 index 00000000000..dcb412596e2 --- /dev/null +++ b/packages/beacon-node/src/network/gossip/snappy/snappy-js/index.ts @@ -0,0 +1,57 @@ +/** Based on snappyjs - https://github.com/zhipeng-jia/snappyjs */ +import {SnappyCompressor} from "./compressor.js"; +import {SnappyDecompressor} from "./decompressor.js"; +import {SnappyError, SnappyErrorCode} from "./error.js"; + +function isNode(): boolean { + if ( + typeof process === "object" && + typeof process.versions === "object" && + typeof process.versions.node !== "undefined" + ) { + return true; + } + return false; +} + +function isUint8Array(object: Uint8Array | Buffer): object is Uint8Array { + return object instanceof Uint8Array && (!isNode() || !Buffer.isBuffer(object)); +} + +export function uncompress(compressed: T, maxLength: number, outBuf?: Uint8Array): T { + const decompressor = new SnappyDecompressor(compressed); + const length = decompressor.readUncompressedLength(); + if (length === -1) { + throw new SnappyError({code: SnappyErrorCode.UNCOMPRESS_CANNOT_EXTRACT_LENGTH}); + } + if (maxLength !== undefined && length > maxLength) { + throw new SnappyError({code: SnappyErrorCode.UNCOMPRESS_EXCEED_MAX_LENGTH}); + } + if (outBuf !== undefined && outBuf.length < length) { + throw new SnappyError({code: SnappyErrorCode.UNCOMPRESS_BUFFER_TOO_SMALL}); + } + + const uncompressed = + outBuf !== undefined + ? outBuf.subarray(0, length) + : isUint8Array(compressed) + ? new Uint8Array(length) + : Buffer.allocUnsafe(length); + + if (!decompressor.uncompressInto(uncompressed)) { + throw new SnappyError({code: SnappyErrorCode.UNCOMPRESS_INVALID_BITSTREAM}); + } + return uncompressed as T; +} + +export function compress(uncompressed: T): T { + const compressor = new SnappyCompressor(uncompressed); + const maxLength = compressor.maxCompressedLength(); + const uint8Mode = isUint8Array(uncompressed); + const compressed = uint8Mode ? new Uint8Array(maxLength) : Buffer.allocUnsafe(maxLength); + const length = compressor.compressToBuffer(compressed); + if (uint8Mode) { + return compressed.subarray(0, length) as T; + } + return compressed.slice(0, length) as T; +} diff --git a/packages/beacon-node/src/network/gossip/snappy/snappy-wasm.ts b/packages/beacon-node/src/network/gossip/snappy/snappy-wasm.ts new file mode 100644 index 00000000000..c3887cb3215 --- /dev/null +++ b/packages/beacon-node/src/network/gossip/snappy/snappy-wasm.ts @@ -0,0 +1,26 @@ +import snappyWasm from "@chainsafe/snappy-wasm"; +import {ISnappyDecompressor} from "./interface.js"; + +// create singleton snappy encoder + decoder +const decoder = new snappyWasm.Decoder(); + +export class SnappyWasmDecompressor implements ISnappyDecompressor { + constructor(private readonly data: Uint8Array) {} + + readUncompressedLength(): number { + try { + return snappyWasm.decompress_len(this.data); + } catch { + return -1; + } + } + + uncompressInto(outBuffer: Uint8Array): boolean { + try { + decoder.decompress_into(this.data, outBuffer); + return true; + } catch { + return false; + } + } +} diff --git a/packages/beacon-node/test/perf/network/gossip/snappy.test.ts b/packages/beacon-node/test/perf/network/gossip/snappy.test.ts index 514ab7fe99f..5900cee65d3 100644 --- a/packages/beacon-node/test/perf/network/gossip/snappy.test.ts +++ b/packages/beacon-node/test/perf/network/gossip/snappy.test.ts @@ -1,8 +1,9 @@ import {randomBytes} from "node:crypto"; -import * as snappyRs from "snappy"; import * as snappyJs from "snappyjs"; import {bench, describe} from "@chainsafe/benchmark"; import snappyWasm from "@chainsafe/snappy-wasm"; +import {SnappyDecompressor} from "../../../../src/network/gossip/snappy/snappy-js/decompressor.js"; +import {SnappyWasmDecompressor} from "../../../../src/network/gossip/snappy/snappy-wasm.js"; describe("network / gossip / snappy", () => { const msgLens = [ @@ -32,16 +33,6 @@ describe("network / gossip / snappy", () => { }, }); - bench({ - id: `${msgLen} bytes - compress - snappy`, - runsFactor: RUNS_FACTOR, - fn: () => { - for (let i = 0; i < RUNS_FACTOR; i++) { - snappyRs.compressSync(uncompressed); - } - }, - }); - bench({ id: `${msgLen} bytes - compress - snappy-wasm`, runsFactor: RUNS_FACTOR, @@ -65,9 +56,8 @@ describe("network / gossip / snappy", () => { }); } }); - describe("uncompress", () => { - const decoder = new snappyWasm.Decoder(); + describe("uncompress", () => { for (const msgLen of msgLens) { const uncompressed = randomBytes(msgLen); const compressed = snappyJs.compress(uncompressed); @@ -78,17 +68,12 @@ describe("network / gossip / snappy", () => { runsFactor: RUNS_FACTOR, fn: () => { for (let i = 0; i < RUNS_FACTOR; i++) { - snappyJs.uncompress(compressed); - } - }, - }); - - bench({ - id: `${msgLen} bytes - uncompress - snappy`, - runsFactor: RUNS_FACTOR, - fn: () => { - for (let i = 0; i < RUNS_FACTOR; i++) { - snappyRs.uncompressSync(compressed); + const snappyJsDecompressor = new SnappyDecompressor(compressed); + const uncompressedDataLength = snappyJsDecompressor.readUncompressedLength(); + const uncompressedData = Buffer.alloc(uncompressedDataLength); + if (!snappyJsDecompressor.uncompressInto(uncompressedData)) { + throw Error("Decompression failed"); + } } }, }); @@ -98,17 +83,12 @@ describe("network / gossip / snappy", () => { runsFactor: RUNS_FACTOR, fn: () => { for (let i = 0; i < RUNS_FACTOR; i++) { - decoder.decompress(compressed); - } - }, - }); - - bench({ - id: `${msgLen} bytes - uncompress - snappy-wasm - prealloc`, - runsFactor: RUNS_FACTOR, - fn: () => { - for (let i = 0; i < RUNS_FACTOR; i++) { - decoder.decompress_into(compressed, Buffer.allocUnsafe(snappyWasm.decompress_len(compressed))); + const snappyWasmDecompressor = new SnappyWasmDecompressor(compressed); + const uncompressedDataLength = snappyWasmDecompressor.readUncompressedLength(); + const uncompressedData = Buffer.alloc(uncompressedDataLength); + if (!snappyWasmDecompressor.uncompressInto(uncompressedData)) { + throw Error("Decompression failed"); + } } }, }); diff --git a/packages/beacon-node/test/unit/network/gossip/snappy.test.ts b/packages/beacon-node/test/unit/network/gossip/snappy.test.ts new file mode 100644 index 00000000000..3ff33c7cf7a --- /dev/null +++ b/packages/beacon-node/test/unit/network/gossip/snappy.test.ts @@ -0,0 +1,32 @@ +import {randomBytes} from "crypto"; +import {describe, expect, it} from "vitest"; +import snappyWasm from "@chainsafe/snappy-wasm"; +import {getSnappyDecompressor} from "../../../../src/network/gossip/snappy/index.js"; +import {GossipType} from "../../../../src/network/index.js"; + +const encoder = new snappyWasm.Encoder(); + +function compress(data: Uint8Array): Uint8Array { + const compressedData = Buffer.allocUnsafe(snappyWasm.max_compress_len(data.length)); + const compressedLen = encoder.compress_into(data, compressedData); + return compressedData.subarray(0, compressedLen); +} + +describe("snappy", () => { + const lengths = [0, 1, 10, 100, 1000, 10000, 100000]; + for (const length of lengths) { + it(`should decompress data of length ${length} compressed by snappy-wasm`, () => { + const buffer = randomBytes(length); + const compressed = compress(buffer); + for (const gossipType of [GossipType.beacon_attestation, GossipType.beacon_block]) { + const decompressor = getSnappyDecompressor(gossipType, compressed); + const uncompressedLength = decompressor.readUncompressedLength(); + expect(uncompressedLength).toBe(length); + const out = new Uint8Array(length); + const success = decompressor.uncompressInto(out); + expect(success).toBe(true); + expect(out).toEqual(new Uint8Array(buffer)); + } + }); + } +});