Skip to content

Commit 0651d4e

Browse files
Merge pull request #533 from autonomys/perf-improvements
feat: speed up publish-nodes queue with node indexes + safe batch updates
2 parents acc019e + bf26d4e commit 0651d4e

File tree

7 files changed

+185
-47
lines changed

7 files changed

+185
-47
lines changed

apps/backend/__tests__/unit/OnchainPublisher.spec.ts

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,9 @@ describe('OnchainPublisher', () => {
4242
.spyOn(transactionManager, 'submit')
4343
.mockResolvedValue(nodes.map(() => MOCK_PUBLISH_RESULT))
4444

45-
jest.spyOn(nodesRepository, 'getNodeCount').mockResolvedValue({
46-
totalCount: 0,
47-
publishedCount: 0,
48-
archivedCount: 0,
49-
})
45+
jest
46+
.spyOn(nodesRepository, 'getNodesBlockchainDataBatch')
47+
.mockResolvedValue([])
5048

5149
await OnchainPublisher.publishNodes(nodes.map((e) => e.cid))
5250

@@ -76,21 +74,17 @@ describe('OnchainPublisher', () => {
7674
.spyOn(transactionManager, 'submit')
7775
.mockResolvedValue(nodes.map(() => MOCK_PUBLISH_RESULT))
7876

79-
jest.spyOn(nodesRepository, 'getNodeCount').mockImplementation((cid) => {
80-
if (publishedNodes.includes(cid.cid!)) {
81-
return Promise.resolve({
82-
totalCount: 1,
83-
publishedCount: 1,
84-
archivedCount: 0,
85-
})
86-
}
87-
88-
return Promise.resolve({
89-
totalCount: 1,
90-
publishedCount: 0,
91-
archivedCount: 0,
92-
})
93-
})
77+
jest
78+
.spyOn(nodesRepository, 'getNodesBlockchainDataBatch')
79+
.mockResolvedValue(
80+
publishedNodes.map((cid) => ({
81+
cid,
82+
block_published_on: 1,
83+
tx_published_on: '0x123',
84+
piece_index: 0,
85+
piece_offset: 0,
86+
})),
87+
)
9488
await OnchainPublisher.publishNodes(nodes.map((e) => e.cid))
9589

9690
const transactions = nodes
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
'use strict'
2+
3+
var dbm
4+
var type
5+
var seed
6+
var fs = require('fs')
7+
var path = require('path')
8+
var Promise
9+
10+
/**
11+
* We receive the dbmigrate dependency from dbmigrate initially.
12+
* This enables us to not have to rely on NODE_PATH.
13+
*/
14+
exports.setup = function (options, seedLink) {
15+
dbm = options.dbmigrate
16+
type = dbm.dataType
17+
seed = seedLink
18+
Promise = options.Promise
19+
}
20+
21+
exports.up = function (db) {
22+
var filePath = path.join(
23+
__dirname,
24+
'sqls',
25+
'20251118215346-add-nodes-cid-indexes-up.sql',
26+
)
27+
return new Promise(function (resolve, reject) {
28+
fs.readFile(filePath, { encoding: 'utf-8' }, function (err, data) {
29+
if (err) return reject(err)
30+
console.log('received data: ' + data)
31+
32+
resolve(data)
33+
})
34+
}).then(function (data) {
35+
return db.runSql(data)
36+
})
37+
}
38+
39+
exports.down = function (db) {
40+
var filePath = path.join(
41+
__dirname,
42+
'sqls',
43+
'20251118215346-add-nodes-cid-indexes-down.sql',
44+
)
45+
return new Promise(function (resolve, reject) {
46+
fs.readFile(filePath, { encoding: 'utf-8' }, function (err, data) {
47+
if (err) return reject(err)
48+
console.log('received data: ' + data)
49+
50+
resolve(data)
51+
})
52+
}).then(function (data) {
53+
return db.runSql(data)
54+
})
55+
}
56+
57+
exports._meta = {
58+
version: 1,
59+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- Remove indexes added in up migration
2+
DROP INDEX IF EXISTS idx_nodes_cid;
3+
DROP INDEX IF EXISTS idx_nodes_root_cid_cid;
4+
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
-- Add indexes for nodes table to improve query performance
2+
-- These indexes optimize queries that filter by cid and (root_cid, cid)
3+
4+
-- Index for cid lookups (used in getNodesByCids, getNodeCount, getNodeBlockchainData)
5+
CREATE INDEX IF NOT EXISTS idx_nodes_cid ON public.nodes USING btree (cid);
6+
7+
-- Composite index for UPDATE queries that filter by root_cid AND cid
8+
-- Used in updateNodeBlockchainData
9+
CREATE INDEX IF NOT EXISTS idx_nodes_root_cid_cid ON public.nodes USING btree (root_cid, cid);
10+

apps/backend/src/core/objects/nodes.ts

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -272,31 +272,40 @@ const ensureObjectPublished = async (cid: string): Promise<void> => {
272272
}
273273

274274
const handleRepeatedNodes = async (nodes: Node[]): Promise<void> => {
275+
if (nodes.length === 0) return
276+
275277
logger.info(
276278
'Handling repeated nodes (cids=%s)',
277279
nodes.map((node) => node.cid).join(', '),
278280
)
279281

280-
const nodesWithBlockchainData = await Promise.all(
281-
nodes.map((node) => nodesRepository.getNodeBlockchainData(node.cid)),
282-
).then((e) => e.filter((e) => e !== undefined).map((e) => e!))
282+
// Optimize: Batch fetch blockchain data for all unique CIDs
283+
const uniqueCids = Array.from(new Set(nodes.map((node) => node.cid)))
284+
const nodesWithBlockchainData =
285+
await nodesRepository.getNodesBlockchainDataBatch(uniqueCids)
283286

284287
const nodeBlockchainDataMap = new Map(
285288
nodesWithBlockchainData.map((e) => [e.cid, e]),
286289
)
287290

288-
const updatedNodes = nodes.map((node) => {
289-
const nodeBlockchainData = nodeBlockchainDataMap.get(node.cid)
290-
if (!nodeBlockchainData) return node
291+
// Prepare batch updates - only update nodes that have blockchain data
292+
const updates = nodes
293+
.map((node) => {
294+
const nodeBlockchainData = nodeBlockchainDataMap.get(node.cid)
295+
if (!nodeBlockchainData) return null
291296

292-
return { ...node, ...nodeBlockchainData }
293-
})
297+
return {
298+
rootCid: node.root_cid,
299+
cid: node.cid,
300+
blockchainData: nodeBlockchainData,
301+
}
302+
})
303+
.filter((u): u is NonNullable<typeof u> => u !== null)
294304

295-
await Promise.all(
296-
updatedNodes.map((node) =>
297-
nodesRepository.updateNodeBlockchainData(node.root_cid, node.cid, node),
298-
),
299-
)
305+
// Optimize with batch update
306+
if (updates.length > 0) {
307+
await nodesRepository.updateNodesBlockchainDataBatch(updates)
308+
}
300309
}
301310

302311
export const NodesUseCases = {

apps/backend/src/infrastructure/repositories/objects/nodes.ts

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,12 +253,27 @@ const getNodeBlockchainData = async (
253253

254254
return db
255255
.query<NodeBlockchainData>({
256-
text: 'SELECT cid, block_published_on, tx_published_on, piece_index, piece_offset FROM nodes WHERE cid = $1',
256+
text: 'SELECT cid, block_published_on, tx_published_on, piece_index, piece_offset FROM nodes WHERE cid = $1 AND block_published_on IS NOT NULL',
257257
values: [cid],
258258
})
259259
.then((e) => e.rows.at(0))
260260
}
261261

262+
const getNodesBlockchainDataBatch = async (
263+
cids: string[],
264+
): Promise<NodeBlockchainData[]> => {
265+
if (cids.length === 0) return []
266+
267+
const db = await getDatabase()
268+
269+
return db
270+
.query<NodeBlockchainData>({
271+
text: 'SELECT DISTINCT ON (cid) cid, block_published_on, tx_published_on, piece_index, piece_offset FROM nodes WHERE cid = ANY($1) AND block_published_on IS NOT NULL ORDER BY cid, block_published_on DESC',
272+
values: [cids],
273+
})
274+
.then((e) => e.rows)
275+
}
276+
262277
const updateNodeBlockchainData = async (
263278
rootCid: string,
264279
cid: string,
@@ -278,6 +293,54 @@ const updateNodeBlockchainData = async (
278293
})
279294
}
280295

296+
const updateNodesBlockchainDataBatch = async (
297+
updates: Array<{
298+
rootCid: string
299+
cid: string
300+
blockchainData: NodeBlockchainData
301+
}>,
302+
) => {
303+
if (updates.length === 0) return
304+
305+
const db = await getDatabase()
306+
307+
// Use a single query with VALUES to batch update
308+
// Group by (rootCid, cid) to avoid duplicates
309+
const uniqueUpdates = Array.from(
310+
new Map(updates.map((u) => [`${u.rootCid}:${u.cid}`, u])).values(),
311+
)
312+
313+
const payload = uniqueUpdates.map((u) => ({
314+
block_published_on: u.blockchainData.block_published_on,
315+
tx_published_on: u.blockchainData.tx_published_on,
316+
piece_index: u.blockchainData.piece_index,
317+
piece_offset: u.blockchainData.piece_offset,
318+
root_cid: u.rootCid,
319+
cid: u.cid,
320+
}))
321+
322+
return db.query({
323+
text: `
324+
UPDATE nodes AS n
325+
SET
326+
block_published_on = data.block_published_on,
327+
tx_published_on = data.tx_published_on,
328+
piece_index = data.piece_index,
329+
piece_offset = data.piece_offset
330+
FROM jsonb_to_recordset($1::jsonb) AS data(
331+
block_published_on INTEGER,
332+
tx_published_on TEXT,
333+
piece_index INTEGER,
334+
piece_offset INTEGER,
335+
root_cid TEXT,
336+
cid TEXT
337+
)
338+
WHERE n.root_cid = data.root_cid AND n.cid = data.cid
339+
`,
340+
values: [JSON.stringify(payload)],
341+
})
342+
}
343+
281344
const hasEncodedNode = async (cid: string) => {
282345
const db = await getDatabase()
283346
return db
@@ -305,6 +368,8 @@ export const nodesRepository = {
305368
removeNodeByRootCid,
306369
getNodesCountWithoutDataByRootCid,
307370
getNodeBlockchainData,
371+
getNodesBlockchainDataBatch,
308372
updateNodeBlockchainData,
373+
updateNodesBlockchainDataBatch,
309374
hasEncodedNode,
310375
}

apps/backend/src/infrastructure/services/upload/onchainPublisher/index.ts

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { createLogger } from '../../../drivers/logger.js'
22
import { NodesUseCases } from '../../../../core/objects/nodes.js'
33
import { createTransactionManager } from './transactionManager.js'
44
import { compactAddLength } from '@polkadot/util'
5-
import { nodesRepository, Node } from '../../../repositories/objects/nodes.js'
5+
import { nodesRepository } from '../../../repositories/objects/nodes.js'
66

77
const logger = createLogger('upload:onchainPublisher')
88

@@ -12,22 +12,19 @@ const publishNodes = async (cids: string[]) => {
1212
logger.info('Uploading %d nodes', cids.length)
1313

1414
const nodes = await nodesRepository.getNodesByCids(cids)
15-
const nodeCounts = await Promise.all(
16-
nodes.map((e) =>
17-
nodesRepository.getNodeCount({
18-
cid: e.cid,
19-
}),
20-
),
21-
)
2215

23-
const filter = (node: Node, index: number) => {
24-
return nodeCounts[index] && (nodeCounts[index].publishedCount ?? 0) === 0
25-
}
16+
// Optimize: Batch-check which CIDs are already published anywhere in the table
17+
// We consider a node "repeated" if there exists ANY row with the same CID that has blockchain data
18+
const uniqueCids = Array.from(new Set(nodes.map((n) => n.cid)))
19+
const publishedRows =
20+
await nodesRepository.getNodesBlockchainDataBatch(uniqueCids)
21+
const publishedCidSet = new Set(publishedRows.map((r) => r.cid))
2622

27-
const repeatedNodes = nodes.filter((_, index) => !filter(_, index))
23+
const repeatedNodes = nodes.filter((node) => publishedCidSet.has(node.cid))
2824
await NodesUseCases.handleRepeatedNodes(repeatedNodes)
2925

30-
const publishingNodes = nodes.filter(filter)
26+
// Nodes that are not known as published anywhere should be sent for publishing
27+
const publishingNodes = nodes.filter((node) => !publishedCidSet.has(node.cid))
3128

3229
const transactions = publishingNodes.map((node) => {
3330
const buffer = Buffer.from(node.encoded_node, 'base64')

0 commit comments

Comments
 (0)