diff --git a/packages/beacon-node/src/chain/archiveStore/utils/archiveBlocks.ts b/packages/beacon-node/src/chain/archiveStore/utils/archiveBlocks.ts index f6b86ac9606f..355e14967c16 100644 --- a/packages/beacon-node/src/chain/archiveStore/utils/archiveBlocks.ts +++ b/packages/beacon-node/src/chain/archiveStore/utils/archiveBlocks.ts @@ -299,55 +299,45 @@ async function migrateBlobSidecarsFromHotToColdDb( return migratedWrappedBlobSidecars; } -// TODO: This function can be simplified further by reducing layers of promises in a loop async function migrateDataColumnSidecarsFromHotToColdDb( config: ChainForkConfig, db: IBeaconDb, blocks: BlockRootSlot[], currentEpoch: Epoch ): Promise { - let migratedWrappedDataColumns = 0; + // Only Fulu and newer blocks and within the retention window + const withinRetentionWindow = (slot: Slot) => + config.getForkSeq(slot) >= ForkSeq.fulu && + computeEpochAtSlot(slot) >= currentEpoch - config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS; + let migratedDataColumnsCount = 0; + for (let i = 0; i < blocks.length; i += BLOB_SIDECAR_BATCH_SIZE) { const toIdx = Math.min(i + BLOB_SIDECAR_BATCH_SIZE, blocks.length); const canonicalBlocks = blocks.slice(i, toIdx); - - // processCanonicalBlocks - if (canonicalBlocks.length === 0) break; const promises = []; - // load Buffer instead of ssz deserialized to improve performance for (const block of canonicalBlocks) { - const blockSlot = block.slot; - const blockEpoch = computeEpochAtSlot(blockSlot); - - if ( - config.getForkSeq(blockSlot) < ForkSeq.fulu || - // if block is out of ${config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS}, skip this step - blockEpoch < currentEpoch - config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS - ) { - continue; - } + if (!withinRetentionWindow(block.slot)) continue; - const dataColumnSidecarBytes = await db.dataColumnSidecar.valuesBinary(block.root); - if (!dataColumnSidecarBytes) { + const dataColumnSidecars = await db.dataColumnSidecar.valuesBinary(block.root); + if (!dataColumnSidecars || dataColumnSidecars.length === 0) { throw Error(`No dataColumnSidecars found for slot ${block.slot} root ${toHex(block.root)}`); } + promises.push( db.dataColumnSidecarArchive.putManyBinary( block.slot, - dataColumnSidecarBytes.map((p) => ({key: p.id, value: p.value})) + dataColumnSidecars.map(({id, value}) => ({id, value})) ) ); - migratedWrappedDataColumns += dataColumnSidecarBytes.length; + migratedDataColumnsCount += dataColumnSidecars.length; } promises.push(db.dataColumnSidecar.deleteMany(canonicalBlocks.map((block) => block.root))); - // put to blockArchive db and delete block db await Promise.all(promises); } - - return migratedWrappedDataColumns; + return migratedDataColumnsCount; } /** diff --git a/packages/beacon-node/test/unit/chain/archiveStore/blockArchiver.test.ts b/packages/beacon-node/test/unit/chain/archiveStore/blockArchiver.test.ts index 0a6427de3034..339a3578e4f1 100644 --- a/packages/beacon-node/test/unit/chain/archiveStore/blockArchiver.test.ts +++ b/packages/beacon-node/test/unit/chain/archiveStore/blockArchiver.test.ts @@ -141,7 +141,7 @@ describe("block archiver task", () => { for (const block of canonicalBlocks) { expect(dbStub.dataColumnSidecarArchive.putManyBinary).toHaveBeenCalledWith(block.slot, [ { - key: 0, + id: 0, value: dataColumnBytes, }, ]); diff --git a/packages/db/src/abstractPrefixedRepository.ts b/packages/db/src/abstractPrefixedRepository.ts index 84abda31878a..1f41e113a17f 100644 --- a/packages/db/src/abstractPrefixedRepository.ts +++ b/packages/db/src/abstractPrefixedRepository.ts @@ -1,12 +1,16 @@ import {Type} from "@chainsafe/ssz"; import {ChainForkConfig} from "@lodestar/config"; import {BUCKET_LENGTH} from "./const.js"; -import {KeyValue} from "./controller/index.js"; import {Db, DbReqOpts, FilterOptions} from "./controller/interface.js"; import {encodeKey} from "./util.js"; type Id = Uint8Array | string | number | bigint; +export interface IdValue { + id: I; + value: V; +} + /** * Repository is a high level kv storage * This abstract repository is designed in a way to store items with different prefixed @@ -100,7 +104,7 @@ export abstract class PrefixedRepository { } async putMany(prefix: P, items: T[]): Promise { - const batch: KeyValue[] = []; + const batch = []; for (const item of items) { const id = this.getId(item); const key = this.wrapKey(this.encodeKeyRaw(prefix, id)); @@ -114,10 +118,10 @@ export abstract class PrefixedRepository { await this.db.put(key, bytes, this.dbReqOpts); } - async putManyBinary(prefix: P, items: KeyValue[]): Promise { - const batch: KeyValue[] = []; - for (const {key, value} of items) { - batch.push({key: this.wrapKey(this.encodeKeyRaw(prefix, key)), value: value}); + async putManyBinary(prefix: P, items: IdValue[]): Promise { + const batch = []; + for (const {id, value} of items) { + batch.push({key: this.wrapKey(this.encodeKeyRaw(prefix, id)), value: value}); } await this.db.batchPut(batch, this.dbReqOpts); }