-
-
Notifications
You must be signed in to change notification settings - Fork 405
feat: add lmdb database from lodestar-bun #8483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: unstable
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @wemeetagain, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the database flexibility of the beacon node by integrating LMDB as an alternative to the existing LevelDB. The change introduces a new Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces support for LMDB as a database backend, adding a --dbType
flag to switch between LevelDB and LMDB. The changes primarily involve plumbing the new dbType
option through various parts of the application and providing two implementations for LmdbController
: a functional one for the Bun runtime and a stub for Node.js. The overall approach is solid. I've identified a typing issue in the Node.js stub and significant code duplication in the Bun implementation, for which I've provided suggestions to improve maintainability and correctness.
keysStream(opts: FilterOptions<Uint8Array> = {}): AsyncIterable<Uint8Array> { | ||
const tx = transactionBegin(this.db); | ||
const db = 0 as Database; | ||
|
||
const iterator = databaseCursor(tx, db); | ||
|
||
const metrics = this.metrics; | ||
const bucket = opts.bucketId ?? BUCKET_ID_UNKNOWN; | ||
metrics?.dbReadReq.inc({bucket}, 1); | ||
let itemsRead = 0; | ||
|
||
return (async function* () { | ||
try { | ||
if (opts.gt) { | ||
cursorSeek(iterator, opts.gt); | ||
const first = cursorGoToNext(iterator); | ||
if (!first) return; | ||
itemsRead++; | ||
yield first.slice(); | ||
} else if (opts.gte) { | ||
const first = cursorSeek(iterator, opts.gte); | ||
if (!first) return; | ||
itemsRead++; | ||
yield first.slice(); | ||
} else { | ||
const first = cursorGoToFirst(iterator); | ||
if (!first) return; | ||
itemsRead++; | ||
yield first.slice(); | ||
} | ||
|
||
while (true) { | ||
const key = cursorGoToNext(iterator); | ||
if (!key) return; | ||
if (opts.lt && Buffer.compare(key, opts.lt) >= 0) break; | ||
if (opts.lte && Buffer.compare(key, opts.lte) > 0) break; | ||
itemsRead++; | ||
yield key.slice(); | ||
} | ||
} finally { | ||
metrics?.dbReadItems.inc({bucket}, itemsRead); | ||
cursorDeinit(iterator); | ||
transactionAbort(tx); | ||
} | ||
})(); | ||
} | ||
|
||
async keys(opts: FilterOptions<Uint8Array> = {}): Promise<Uint8Array[]> { | ||
const tx = transactionBegin(this.db); | ||
const db = 0 as Database; | ||
const iterator = databaseCursor(tx, db); | ||
this.metrics?.dbReadReq.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, 1); | ||
const keys: Uint8Array[] = []; | ||
try { | ||
if (opts.gt) { | ||
cursorSeek(iterator, opts.gt); | ||
const first = cursorGoToNext(iterator); | ||
if (!first) return keys; | ||
keys.push(first.slice()); | ||
} else if (opts.gte) { | ||
const first = cursorSeek(iterator, opts.gte); | ||
if (!first) return keys; | ||
keys.push(first.slice()); | ||
} else { | ||
const first = cursorGoToFirst(iterator); | ||
if (!first) return keys; | ||
keys.push(first.slice()); | ||
} | ||
while (true) { | ||
const key = cursorGoToNext(iterator); | ||
if (!key) break; | ||
if (opts.lt && Buffer.compare(key, opts.lt) >= 0) break; | ||
if (opts.lte && Buffer.compare(key, opts.lte) > 0) break; | ||
keys.push(key.slice()); | ||
} | ||
return keys; | ||
} finally { | ||
this.metrics?.dbReadItems.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, keys.length); | ||
cursorDeinit(iterator); | ||
transactionAbort(tx); | ||
} | ||
} | ||
|
||
valuesStream(opts: FilterOptions<Uint8Array> = {}): AsyncIterable<Uint8Array> { | ||
const tx = transactionBegin(this.db); | ||
const db = 0 as Database; | ||
const iterator = databaseCursor(tx, db); | ||
const bucket = opts.bucketId ?? BUCKET_ID_UNKNOWN; | ||
const metrics = this.metrics; | ||
metrics?.dbReadReq.inc({bucket}, 1); | ||
let itemsRead = 0; | ||
return (async function* () { | ||
try { | ||
if (opts.gt) { | ||
cursorSeek(iterator, opts.gt); | ||
const first = cursorGoToNext(iterator); | ||
if (!first) return; | ||
const value = cursorGetCurrentValue(iterator); | ||
itemsRead++; | ||
yield value.slice(); | ||
} else if (opts.gte) { | ||
const first = cursorSeek(iterator, opts.gte); | ||
if (!first) return; | ||
const value = cursorGetCurrentValue(iterator); | ||
itemsRead++; | ||
yield value.slice(); | ||
} else { | ||
const first = cursorGoToFirst(iterator); | ||
if (!first) return; | ||
const value = cursorGetCurrentValue(iterator); | ||
itemsRead++; | ||
yield value.slice(); | ||
} | ||
while (true) { | ||
const key = cursorGoToNext(iterator); | ||
if (!key) return; | ||
if (opts.lt && Buffer.compare(key, opts.lt) >= 0) break; | ||
if (opts.lte && Buffer.compare(key, opts.lte) > 0) break; | ||
const value = cursorGetCurrentValue(iterator); | ||
itemsRead++; | ||
yield value.slice(); | ||
} | ||
} finally { | ||
metrics?.dbReadItems.inc({bucket}, itemsRead); | ||
cursorDeinit(iterator); | ||
transactionAbort(tx); | ||
} | ||
})(); | ||
} | ||
|
||
async values(opts: FilterOptions<Uint8Array> = {}): Promise<Uint8Array[]> { | ||
const tx = transactionBegin(this.db); | ||
const db = 0 as Database; | ||
const iterator = databaseCursor(tx, db); | ||
const values: Uint8Array[] = []; | ||
this.metrics?.dbReadReq.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, 1); | ||
try { | ||
if (opts.gt) { | ||
cursorSeek(iterator, opts.gt); | ||
const first = cursorGoToNext(iterator); | ||
if (!first) return values; | ||
const value = cursorGetCurrentValue(iterator); | ||
values.push(value.slice()); | ||
} else if (opts.gte) { | ||
const first = cursorSeek(iterator, opts.gte); | ||
if (!first) return values; | ||
const value = cursorGetCurrentValue(iterator); | ||
values.push(value.slice()); | ||
} else { | ||
const first = cursorGoToFirst(iterator); | ||
if (!first) return values; | ||
const value = cursorGetCurrentValue(iterator); | ||
values.push(value.slice()); | ||
} | ||
while (true) { | ||
const key = cursorGoToNext(iterator); | ||
if (!key) break; | ||
if (opts.lt && Buffer.compare(key, opts.lt) >= 0) break; | ||
if (opts.lte && Buffer.compare(key, opts.lte) > 0) break; | ||
const value = cursorGetCurrentValue(iterator); | ||
values.push(value.slice()); | ||
} | ||
return values; | ||
} finally { | ||
this.metrics?.dbReadItems.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, values.length); | ||
cursorDeinit(iterator); | ||
transactionAbort(tx); | ||
} | ||
} | ||
|
||
entriesStream(opts: FilterOptions<Uint8Array> = {}): AsyncIterable<KeyValue<Uint8Array, Uint8Array>> { | ||
const tx = transactionBegin(this.db); | ||
const db = 0 as Database; | ||
const iterator = databaseCursor(tx, db); | ||
const bucket = opts.bucketId ?? BUCKET_ID_UNKNOWN; | ||
const metrics = this.metrics; | ||
metrics?.dbReadReq.inc({bucket}, 1); | ||
let itemsRead = 0; | ||
return (async function* () { | ||
try { | ||
if (opts.gt) { | ||
cursorSeek(iterator, opts.gt); | ||
const first = cursorGoToNext(iterator); | ||
if (!first) return; | ||
const value = cursorGetCurrentValue(iterator); | ||
itemsRead++; | ||
yield {key: first.slice(), value: value.slice()}; | ||
} else if (opts.gte) { | ||
const first = cursorSeek(iterator, opts.gte); | ||
if (!first) return; | ||
const value = cursorGetCurrentValue(iterator); | ||
itemsRead++; | ||
yield {key: first.slice(), value: value.slice()}; | ||
} else { | ||
const first = cursorGoToFirst(iterator); | ||
if (!first) return; | ||
const value = cursorGetCurrentValue(iterator); | ||
itemsRead++; | ||
yield {key: first.slice(), value: value.slice()}; | ||
} | ||
while (true) { | ||
const key = cursorGoToNext(iterator); | ||
if (!key) return; | ||
if (opts.lt && Buffer.compare(key, opts.lt) >= 0) break; | ||
if (opts.lte && Buffer.compare(key, opts.lte) > 0) break; | ||
const value = cursorGetCurrentValue(iterator); | ||
itemsRead++; | ||
yield {key: key.slice(), value: value.slice()}; | ||
} | ||
} finally { | ||
metrics?.dbReadItems.inc({bucket}, itemsRead); | ||
cursorDeinit(iterator); | ||
transactionAbort(tx); | ||
} | ||
})(); | ||
} | ||
|
||
async entries(opts: FilterOptions<Uint8Array> = {}): Promise<KeyValue<Uint8Array, Uint8Array>[]> { | ||
const tx = transactionBegin(this.db); | ||
const db = 0 as Database; | ||
const iterator = databaseCursor(tx, db); | ||
const entries: KeyValue<Uint8Array, Uint8Array>[] = []; | ||
this.metrics?.dbReadReq.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, 1); | ||
try { | ||
if (opts.gt) { | ||
cursorSeek(iterator, opts.gt); | ||
const first = cursorGoToNext(iterator); | ||
if (!first) return entries; | ||
const value = cursorGetCurrentValue(iterator); | ||
entries.push({key: first.slice(), value: value.slice()}); | ||
} else if (opts.gte) { | ||
const first = cursorSeek(iterator, opts.gte); | ||
if (!first) return entries; | ||
const value = cursorGetCurrentValue(iterator); | ||
entries.push({key: first.slice(), value: value.slice()}); | ||
} else { | ||
const first = cursorGoToFirst(iterator); | ||
if (!first) return entries; | ||
const value = cursorGetCurrentValue(iterator); | ||
entries.push({key: first.slice(), value: value.slice()}); | ||
} | ||
while (true) { | ||
const key = cursorGoToNext(iterator); | ||
if (!key) break; | ||
if (opts.lt && Buffer.compare(key, opts.lt) >= 0) break; | ||
if (opts.lte && Buffer.compare(key, opts.lte) > 0) break; | ||
const value = cursorGetCurrentValue(iterator); | ||
entries.push({key: key.slice(), value: value.slice()}); | ||
} | ||
return entries; | ||
} finally { | ||
this.metrics?.dbReadItems.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, entries.length); | ||
cursorDeinit(iterator); | ||
transactionAbort(tx); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is significant code duplication across the keys
, values
, entries
methods and their ...Stream
counterparts. This can be refactored to improve maintainability.
You could introduce two private helper methods:
- A private async generator
_iterate
to handle the core cursor logic and yield key-value pairs. The...Stream
methods can then wrap this generator to transform the output. - A private
_collect
method that uses the same iteration logic to build and return an array. Thekeys
,values
, andentries
methods can then use this helper.
This will centralize the iteration and filtering logic, making the code cleaner and easier to maintain.
private async* _iterate(opts: FilterOptions<Uint8Array> = {}): AsyncIterable<{key: Uint8Array; value: Uint8Array}> {
const tx = transactionBegin(this.db);
const db = 0 as Database;
const iterator = databaseCursor(tx, db);
const bucket = opts.bucketId ?? BUCKET_ID_UNKNOWN;
const metrics = this.metrics;
metrics?.dbReadReq.inc({bucket}, 1);
let itemsRead = 0;
try {
let key: Uint8Array | null;
if (opts.gt) {
cursorSeek(iterator, opts.gt);
key = cursorGoToNext(iterator);
} else if (opts.gte) {
key = cursorSeek(iterator, opts.gte);
} else {
key = cursorGoToFirst(iterator);
}
if (key) {
do {
if (opts.lt && Buffer.compare(key, opts.lt) >= 0) break;
if (opts.lte && Buffer.compare(key, opts.lte) > 0) break;
const value = cursorGetCurrentValue(iterator);
itemsRead++;
yield {key: key.slice(), value: value.slice()};
} while ((key = cursorGoToNext(iterator)));
}
} finally {
metrics?.dbReadItems.inc({bucket}, itemsRead);
cursorDeinit(iterator);
transactionAbort(tx);
}
}
private async _collect<T>(
opts: FilterOptions<Uint8Array> = {},
transform: (key: Uint8Array, value: Uint8Array) => T
): Promise<T[]> {
const tx = transactionBegin(this.db);
const db = 0 as Database;
const iterator = databaseCursor(tx, db);
this.metrics?.dbReadReq.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, 1);
const results: T[] = [];
try {
let key: Uint8Array | null;
if (opts.gt) {
cursorSeek(iterator, opts.gt);
key = cursorGoToNext(iterator);
} else if (opts.gte) {
key = cursorSeek(iterator, opts.gte);
} else {
key = cursorGoToFirst(iterator);
}
if (key) {
do {
if (opts.lt && Buffer.compare(key, opts.lt) >= 0) break;
if (opts.lte && Buffer.compare(key, opts.lte) > 0) break;
const value = cursorGetCurrentValue(iterator);
results.push(transform(key.slice(), value.slice()));
} while ((key = cursorGoToNext(iterator)));
}
return results;
} finally {
this.metrics?.dbReadItems.inc({bucket: opts?.bucketId ?? BUCKET_ID_UNKNOWN}, results.length);
cursorDeinit(iterator);
transactionAbort(tx);
}
}
async* keysStream(opts: FilterOptions<Uint8Array> = {}): AsyncIterable<Uint8Array> {
for await (const {key} of this._iterate(opts)) {
yield key;
}
}
async keys(opts: FilterOptions<Uint8Array> = {}): Promise<Uint8Array[]> {
return this._collect(opts, (key) => key);
}
async* valuesStream(opts: FilterOptions<Uint8Array> = {}): AsyncIterable<Uint8Array> {
for await (const {value} of this._iterate(opts)) {
yield value;
}
}
async values(opts: FilterOptions<Uint8Array> = {}): Promise<Uint8Array[]> {
return this._collect(opts, (_, value) => value);
}
entriesStream(opts: FilterOptions<Uint8Array> = {}): AsyncIterable<KeyValue<Uint8Array, Uint8Array>> {
return this._iterate(opts);
}
async entries(opts: FilterOptions<Uint8Array> = {}): Promise<KeyValue<Uint8Array, Uint8Array>[]> {
return this._collect(opts, (key, value) => ({key, value}));
}
}
if (options.db.type === "level") { | ||
controller = await LevelDbController.create(options.db, {metrics: null, logger}); | ||
logger.info("Connected to LevelDB database", {path: options.db.name}); | ||
} else if (options.db.type === "lmdb") { | ||
controller = await LmdbController.create(options.db, {metrics: null, logger}); | ||
logger.info("Connected to LMDB database", {path: options.db.name}); | ||
} else { | ||
throw new Error(`Unsupported db type: ${options.db.type}`); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That logic might go well as static method inside BeaconDb
as BeaconDB.init
. This will help us reuse it.
metrics?: LevelDbControllerMetrics | null; | ||
}; | ||
|
||
export class LmdbController implements DatabaseController<Uint8Array, Uint8Array> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code must be covered with unit tests. May be using same fixture to test different implementations of db controllers.
It is important as if something missed goes wrong here make it very difficult to debug on higher layers.
Performance Report🚀🚀 Significant benchmark improvement detected
Full benchmark results
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## unstable #8483 +/- ##
============================================
- Coverage 52.25% 52.24% -0.02%
============================================
Files 852 852
Lines 64977 64992 +15
Branches 4768 4769 +1
============================================
- Hits 33956 33955 -1
- Misses 30952 30968 +16
Partials 69 69 🚀 New features to boost your workflow:
|
|
Motivation
Description
LmdbController
-- uses the lmdb binding from lodestar-bun in bun, empty implementation in nodejs--dbType
hidden flag to select"level" | "lmdb"