Skip to content

Commit 0e5b4db

Browse files
authored
fix: big embeddings ingestion (#21)
1 parent 89539b7 commit 0e5b4db

File tree

2 files changed

+44
-28
lines changed

2 files changed

+44
-28
lines changed

packages/agents/src/db/postgresVectorStore.ts

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import { VectorStoreConfig, DocumentSource } from '../types';
55
import pg, { Pool, PoolClient } from 'pg';
66
import { DatabaseError as PgError } from 'pg';
77

8-
98
/**
109
* Custom error class for handling Postgres-specific errors
1110
*/
@@ -106,7 +105,7 @@ export class VectorStore {
106105
): Promise<VectorStore> {
107106
if (!VectorStore.instance) {
108107
const pool = new Pool({
109-
user: config.POSTGRES_USER,
108+
user: config.POSTGRES_USER,
110109
host: config.POSTGRES_HOST,
111110
database: config.POSTGRES_DB,
112111
password: config.POSTGRES_PASSWORD,
@@ -122,7 +121,7 @@ export class VectorStore {
122121
logger.info('Connected to PostgreSQL');
123122

124123
const tableName = 'documents';
125-
124+
126125
// Create instance first, then initialize DB
127126
VectorStore.instance = new VectorStore(pool, embeddings, tableName);
128127
await VectorStore.instance.initializeDb();
@@ -152,7 +151,7 @@ export class VectorStore {
152151
UNIQUE(uniqueId)
153152
);
154153
`);
155-
154+
156155
// Create index on source for filtering
157156
await client.query(`
158157
CREATE INDEX IF NOT EXISTS idx_${this.tableName}_source ON ${this.tableName} (source);
@@ -191,8 +190,8 @@ export class VectorStore {
191190

192191
// Build SQL query
193192
let sql = `
194-
SELECT
195-
content,
193+
SELECT
194+
content,
196195
metadata,
197196
1 - (embedding <=> $1) as similarity
198197
FROM ${this.tableName}
@@ -223,7 +222,7 @@ export class VectorStore {
223222
const client = await this.pool.connect();
224223
try {
225224
const result = await client.query(sql, values);
226-
225+
227226
// Convert to DocumentInterface format
228227
return result.rows.map((row) => ({
229228
pageContent: row.content,
@@ -244,15 +243,32 @@ export class VectorStore {
244243
* @param uniqueIds - Optional array of unique IDs for the documents
245244
* @returns Promise<void>
246245
*/
247-
async addDocuments(documents: DocumentInterface[], options?: { ids?: string[] }): Promise<void> {
246+
async addDocuments(
247+
documents: DocumentInterface[],
248+
options?: { ids?: string[] },
249+
): Promise<void> {
248250
logger.info(`Adding ${documents.length} documents to the vector store`);
249-
251+
250252
if (documents.length === 0) return;
251253

252254
try {
253-
// Generate embeddings for all documents
254-
const texts = documents.map((doc) => doc.pageContent);
255-
const embeddings = await this.embeddings.embedDocuments(texts);
255+
// Generate embeddings in batches by content length
256+
const documentBatches = documents.reduce((batches: string[][], doc) => {
257+
const batch = batches[batches.length - 1] || [];
258+
const totalLength = batch.reduce((sum, text) => sum + text.length, 0);
259+
totalLength + doc.pageContent.length > 100000 && batch.length > 0
260+
? batches.push([doc.pageContent])
261+
: batch.push(doc.pageContent);
262+
return batches.length === 0 ? [[doc.pageContent]] : batches;
263+
}, []);
264+
265+
// Process all batches
266+
const batchEmbeddings = await Promise.all(
267+
documentBatches.map((batch) => this.embeddings.embedDocuments(batch)),
268+
);
269+
270+
// Merge all embeddings
271+
const embeddings = batchEmbeddings.flat();
256272

257273
const client = await this.pool.connect();
258274
try {
@@ -267,7 +283,7 @@ export class VectorStore {
267283
const query = `
268284
INSERT INTO ${this.tableName} (content, metadata, embedding, uniqueId, contentHash, source)
269285
VALUES ($1, $2, $3, $4, $5, $6)
270-
ON CONFLICT (uniqueId)
286+
ON CONFLICT (uniqueId)
271287
DO UPDATE SET
272288
content = EXCLUDED.content,
273289
metadata = EXCLUDED.metadata,
@@ -281,13 +297,13 @@ export class VectorStore {
281297
JSON.stringify(embeddings[i]),
282298
uniqueId,
283299
contentHash,
284-
source
300+
source,
285301
]);
286302
});
287303

288304
await Promise.all(insertPromises);
289305
await client.query('COMMIT');
290-
306+
291307
logger.info(`Successfully added ${documents.length} documents`);
292308
} catch (error) {
293309
await client.query('ROLLBACK');
@@ -312,16 +328,16 @@ export class VectorStore {
312328
try {
313329
const result = await client.query(
314330
`SELECT content, metadata, contentHash FROM ${this.tableName} WHERE uniqueId = $1`,
315-
[name]
331+
[name],
316332
);
317-
333+
318334
if (result.rows.length > 0) {
319335
const row = result.rows[0];
320336
return {
321-
metadata: {
322-
_id: name,
337+
metadata: {
338+
_id: name,
323339
contentHash: row.contentHash,
324-
...JSON.parse(row.metadata)
340+
...JSON.parse(row.metadata),
325341
},
326342
pageContent: row.content,
327343
};
@@ -356,7 +372,7 @@ export class VectorStore {
356372
WHERE uniqueId = ANY($1)
357373
AND source = $2
358374
`;
359-
375+
360376
await client.query(query, [uniqueIds, source]);
361377
logger.info(`Removed ${uniqueIds.length} pages from source ${source}`);
362378
} finally {
@@ -381,9 +397,9 @@ export class VectorStore {
381397
try {
382398
const result = await client.query(
383399
`SELECT uniqueId, contentHash FROM ${this.tableName} WHERE source = $1`,
384-
[source]
400+
[source],
385401
);
386-
402+
387403
return result.rows.map((row) => ({
388404
uniqueId: row.uniqueid,
389405
contentHash: row.contenthash,
@@ -434,14 +450,14 @@ export class VectorStore {
434450
private async transaction<T = any>(queries: Query[]): Promise<T[]> {
435451
const client = await this.pool.connect();
436452
let result;
437-
453+
438454
try {
439455
await client.query('BEGIN');
440-
456+
441457
for (const q of queries) {
442458
result = await client.query(q.query, q.values);
443459
}
444-
460+
445461
await client.query('COMMIT');
446462
return result ? result.rows : [];
447463
} catch (error) {
@@ -451,4 +467,4 @@ export class VectorStore {
451467
client.release();
452468
}
453469
}
454-
}
470+
}

packages/ingester/asciidoc/oz-playbook.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ content:
1010
sources:
1111
- url: https://github.com/OpenZeppelin/cairo-contracts
1212
branches:
13-
- docs-v*
13+
- docs-v2.0.0
1414
start_path: docs
1515
ui:
1616
bundle:

0 commit comments

Comments
 (0)