Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -299,55 +299,45 @@ async function migrateBlobSidecarsFromHotToColdDb(
return migratedWrappedBlobSidecars;
}

// TODO: This function can be simplified further by reducing layers of promises in a loop
async function migrateDataColumnSidecarsFromHotToColdDb(
config: ChainForkConfig,
db: IBeaconDb,
blocks: BlockRootSlot[],
currentEpoch: Epoch
): Promise<number> {
let migratedWrappedDataColumns = 0;
// Only Fulu and newer blocks and within the retention window
const withinDataColumnWindow = (slot: Slot) =>
config.getForkSeq(slot) >= ForkSeq.fulu &&
computeEpochAtSlot(slot) >= currentEpoch - config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS;
let migratedDataColumnsCount = 0;

for (let i = 0; i < blocks.length; i += BLOB_SIDECAR_BATCH_SIZE) {
const toIdx = Math.min(i + BLOB_SIDECAR_BATCH_SIZE, blocks.length);
const canonicalBlocks = blocks.slice(i, toIdx);

// processCanonicalBlocks
if (canonicalBlocks.length === 0) break;
const promises = [];

// load Buffer instead of ssz deserialized to improve performance
for (const block of canonicalBlocks) {
const blockSlot = block.slot;
const blockEpoch = computeEpochAtSlot(blockSlot);

if (
config.getForkSeq(blockSlot) < ForkSeq.fulu ||
// if block is out of ${config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS}, skip this step
blockEpoch < currentEpoch - config.MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS
) {
continue;
}
if (!withinDataColumnWindow(block.slot)) continue;

const dataColumnSidecarBytes = await db.dataColumnSidecar.valuesBinary(block.root);
if (!dataColumnSidecarBytes) {
const dataColumnSidecars = await db.dataColumnSidecar.valuesBinary(block.root);
if (!dataColumnSidecars || dataColumnSidecars.length === 0) {
throw Error(`No dataColumnSidecars found for slot ${block.slot} root ${toHex(block.root)}`);
}

promises.push(
db.dataColumnSidecarArchive.putManyBinary(
block.slot,
dataColumnSidecarBytes.map((p) => ({key: p.id, value: p.value}))
dataColumnSidecars.map(({id, value}) => ({id, value}))
)
);
migratedWrappedDataColumns += dataColumnSidecarBytes.length;
migratedDataColumnsCount += dataColumnSidecars.length;
}

promises.push(db.dataColumnSidecar.deleteMany(canonicalBlocks.map((block) => block.root)));

// put to blockArchive db and delete block db
await Promise.all(promises);
}

return migratedWrappedDataColumns;
return migratedDataColumnsCount;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ describe("block archiver task", () => {
for (const block of canonicalBlocks) {
expect(dbStub.dataColumnSidecarArchive.putManyBinary).toHaveBeenCalledWith(block.slot, [
{
key: 0,
id: 0,
value: dataColumnBytes,
},
]);
Expand Down
16 changes: 10 additions & 6 deletions packages/db/src/abstractPrefixedRepository.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import {Type} from "@chainsafe/ssz";
import {ChainForkConfig} from "@lodestar/config";
import {BUCKET_LENGTH} from "./const.js";
import {KeyValue} from "./controller/index.js";
import {Db, DbReqOpts, FilterOptions} from "./controller/interface.js";
import {encodeKey} from "./util.js";

type Id = Uint8Array | string | number | bigint;

export interface IdValue<I, V> {
id: I;
value: V;
}

/**
* Repository is a high level kv storage
* This abstract repository is designed in a way to store items with different prefixed
Expand Down Expand Up @@ -100,7 +104,7 @@ export abstract class PrefixedRepository<P, I extends Id, T> {
}

async putMany(prefix: P, items: T[]): Promise<void> {
const batch: KeyValue<Uint8Array, Uint8Array>[] = [];
const batch = [];
for (const item of items) {
const id = this.getId(item);
const key = this.wrapKey(this.encodeKeyRaw(prefix, id));
Expand All @@ -114,10 +118,10 @@ export abstract class PrefixedRepository<P, I extends Id, T> {
await this.db.put(key, bytes, this.dbReqOpts);
}

async putManyBinary(prefix: P, items: KeyValue<I, Uint8Array>[]): Promise<void> {
const batch: KeyValue<Uint8Array, Uint8Array>[] = [];
for (const {key, value} of items) {
batch.push({key: this.wrapKey(this.encodeKeyRaw(prefix, key)), value: value});
async putManyBinary(prefix: P, items: IdValue<I, Uint8Array>[]): Promise<void> {
const batch = [];
for (const {id, value} of items) {
batch.push({key: this.wrapKey(this.encodeKeyRaw(prefix, id)), value: value});
}
await this.db.batchPut(batch, this.dbReqOpts);
}
Expand Down
Loading