Skip to content

Commit e6c20c6

Browse files
authored
Unify task retry handling (#479)
* feat: add ensureObjectPublished function to validate and publish nodes by root CID; update TaskSchema to include new task type * update: unify how errors in tasks are managed * feat: add handler for ensureObjectPublished task in frontend processor * remove maxUploadNodesPerBatch parameter from config * refactor: remove safeCallback utility from publishNodes function for improved clarity
1 parent eb335e9 commit e6c20c6

File tree

6 files changed

+78
-95
lines changed

6 files changed

+78
-95
lines changed

apps/backend/src/config.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ export const config = {
6161
},
6262
params: {
6363
maxConcurrentUploads: Number(env('MAX_CONCURRENT_UPLOADS', '40')),
64-
maxUploadNodesPerBatch: Number(env('MAX_UPLOAD_NODES_PER_BATCH', '20')),
6564
maxAnonymousDownloadSize: Number(
6665
env('MAX_ANONYMOUS_DOWNLOAD_SIZE', ONE_HUNDRED_MiB.toString()),
6766
),

apps/backend/src/core/objects/nodes.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import { createLogger } from '../../infrastructure/drivers/logger.js'
2727
import { Node } from '../../infrastructure/repositories/objects/nodes.js'
2828
import { EventRouter } from '../../infrastructure/eventRouter/index.js'
2929
import { createTask, Task } from '../../infrastructure/eventRouter/tasks.js'
30+
import { OnchainPublisher } from '../../infrastructure/services/upload/onchainPublisher/index.js'
3031

3132
const logger = createLogger('useCases:objects:nodes')
3233

@@ -259,6 +260,15 @@ const setPublishedOn = async (
259260
return
260261
}
261262

263+
const ensureObjectPublished = async (cid: string): Promise<void> => {
264+
const nodes = await nodesRepository.getNodesByRootCid(cid)
265+
if (nodes.length === 0) {
266+
throw new Error(`Nodes not found for ${cid}`)
267+
}
268+
269+
await OnchainPublisher.publishNodes(nodes.map((node) => node.cid))
270+
}
271+
262272
export const NodesUseCases = {
263273
getNode,
264274
saveNode,
@@ -270,4 +280,5 @@ export const NodesUseCases = {
270280
getNodesByCids,
271281
scheduleNodeArchiving,
272282
setPublishedOn,
283+
ensureObjectPublished,
273284
}

apps/backend/src/core/uploads/uploads.ts

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import { ObjectUseCases } from '../objects/object.js'
2020
import { cidToString, FileUploadOptions } from '@autonomys/auto-dag-data'
2121
import { EventRouter } from '../../infrastructure/eventRouter/index.js'
2222
import { createTask, Task } from '../../infrastructure/eventRouter/tasks.js'
23-
import { chunkArray } from '../../shared/utils/misc.js'
2423
import { config } from '../../config.js'
2524
import { blockstoreRepository } from '../../infrastructure/repositories/uploads/blockstore.js'
2625
import { BlockstoreUseCases } from './blockstore.js'
@@ -310,19 +309,14 @@ const removeUploadArtifacts = async (uploadId: string): Promise<void> => {
310309
const scheduleNodesPublish = async (cid: string): Promise<void> => {
311310
const nodes = await NodesUseCases.getCidsByRootCid(cid)
312311

313-
const tasks: Task[] = chunkArray(
314-
nodes,
315-
config.params.maxUploadNodesPerBatch,
316-
).map((nodes) =>
317-
createTask({
318-
id: 'publish-nodes',
319-
params: {
320-
nodes,
321-
},
322-
}),
323-
)
324-
325-
EventRouter.publish(tasks)
312+
nodes.forEach((node) => {
313+
EventRouter.publish(
314+
createTask({
315+
id: 'publish-nodes',
316+
params: { nodes: [node] },
317+
}),
318+
)
319+
})
326320
}
327321

328322
const scheduleUploadTagging = async (cid: string): Promise<void> => {

apps/backend/src/infrastructure/eventRouter/processors/frontend.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,17 @@ import { createHandlerWithRetries } from '../utils.js'
77
export const frontendErrorPublishedQueue = 'frontend-errors'
88

99
export const processFrontendTask = createHandlerWithRetries(
10-
({ id, params, retriesLeft }: Task) => {
10+
({ id, params }: Task) => {
1111
if (id === 'migrate-upload-nodes') {
1212
return UploadsUseCases.processMigration(params.uploadId)
1313
} else if (id === 'archive-objects') {
1414
return NodesUseCases.processNodeArchived(params.objects)
1515
} else if (id === 'publish-nodes') {
16-
return OnchainPublisher.publishNodes(params.nodes, retriesLeft)
16+
return OnchainPublisher.publishNodes(params.nodes)
1717
} else if (id === 'tag-upload') {
1818
return UploadsUseCases.tagUpload(params.cid)
19+
} else if (id === 'ensure-object-published') {
20+
return NodesUseCases.ensureObjectPublished(params.cid)
1921
} else {
2022
throw new Error(`Received task ${id} but no handler found.`)
2123
}

apps/backend/src/infrastructure/eventRouter/tasks.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ export const TaskSchema = z.discriminatedUnion('id', [
4848
cid: z.string(),
4949
}),
5050
}),
51+
z.object({
52+
id: z.literal('ensure-object-published'),
53+
retriesLeft: z.number().default(MAX_RETRIES),
54+
params: z.object({
55+
cid: z.string(),
56+
}),
57+
}),
5158
])
5259

5360
export type MigrateUploadTask = z.infer<typeof TaskSchema>
@@ -90,6 +97,12 @@ type TaskCreateParams =
9097
cid: string
9198
}
9299
}
100+
| {
101+
id: 'ensure-object-published'
102+
params: {
103+
cid: string
104+
}
105+
}
93106

94107
export const createTask = (task: TaskCreateParams): Task => {
95108
switch (task.id) {
@@ -129,6 +142,12 @@ export const createTask = (task: TaskCreateParams): Task => {
129142
params: task.params,
130143
retriesLeft: MAX_RETRIES,
131144
}
145+
case 'ensure-object-published':
146+
return {
147+
id: task.id,
148+
params: task.params,
149+
retriesLeft: MAX_RETRIES,
150+
}
132151
default:
133152
return exhaustiveCheck(task)
134153
}

apps/backend/src/infrastructure/services/upload/onchainPublisher/index.ts

Lines changed: 36 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,96 +1,54 @@
11
import { createLogger } from '../../../drivers/logger.js'
22
import { NodesUseCases } from '../../../../core/objects/nodes.js'
3-
import { safeCallback } from '../../../../shared/utils/safe.js'
43
import { createTransactionManager } from './transactionManager.js'
54
import { compactAddLength } from '@polkadot/util'
65
import { nodesRepository } from '../../../repositories/objects/nodes.js'
7-
import { EventRouter } from '../../../eventRouter/index.js'
8-
import { MAX_RETRIES } from '../../../eventRouter/tasks.js'
9-
import { config } from '../../../../config.js'
106

117
const logger = createLogger('upload:onchainPublisher')
128

139
export const transactionManager = createTransactionManager()
1410

15-
const REPUBLISH_DELAY = 10_000
11+
const publishNodes = async (cids: string[]) => {
12+
logger.info('Uploading %d nodes', cids.length)
1613

17-
const publishNodes = safeCallback(
18-
async (cids: string[], retriesLeft: number) => {
19-
logger.info('Uploading %d nodes', cids.length)
20-
21-
const nodes = await nodesRepository.getNodesByCids(cids)
22-
const nodeCounts = await Promise.all(
23-
nodes.map((e) =>
24-
nodesRepository.getNodeCount({
25-
cid: e.cid,
26-
}),
27-
),
28-
)
29-
30-
const filteredNodes = nodes.filter(
31-
(_, index) =>
32-
nodeCounts[index] && (nodeCounts[index].publishedCount ?? 0) === 0,
33-
)
34-
35-
const transactions = filteredNodes.map((node) => {
36-
const buffer = Buffer.from(node.encoded_node, 'base64')
37-
38-
return {
39-
module: 'system',
40-
method: 'remark',
41-
params: [compactAddLength(buffer)],
42-
}
43-
})
14+
const nodes = await nodesRepository.getNodesByCids(cids)
15+
const nodeCounts = await Promise.all(
16+
nodes.map((e) =>
17+
nodesRepository.getNodeCount({
18+
cid: e.cid,
19+
}),
20+
),
21+
)
4422

45-
const results = await transactionManager.submit(transactions)
46-
const someNodeFailed = results.some((result) => !result.success)
47-
if (someNodeFailed) {
48-
republishNodes(
49-
filteredNodes
50-
.filter((_, index) => !results[index].success)
51-
.map((node) => node.cid),
52-
retriesLeft,
53-
)
54-
}
23+
const filteredNodes = nodes.filter(
24+
(_, index) =>
25+
nodeCounts[index] && (nodeCounts[index].publishedCount ?? 0) === 0,
26+
)
5527

56-
await Promise.all(
57-
filteredNodes.map((node, index) => {
58-
const isSuccess = results[index].success
59-
if (!isSuccess) return null
60-
return NodesUseCases.setPublishedOn(node.cid, results[index])
61-
}),
62-
)
63-
},
64-
)
28+
const transactions = filteredNodes.map((node) => {
29+
const buffer = Buffer.from(node.encoded_node, 'base64')
6530

66-
const republishNodes = safeCallback(
67-
async (nodes: string[], retriesLeft: number) => {
68-
await new Promise((resolve) => setTimeout(resolve, REPUBLISH_DELAY))
69-
if (retriesLeft > 0) {
70-
EventRouter.publish({
71-
id: 'publish-nodes',
72-
retriesLeft: retriesLeft - 1,
73-
params: {
74-
nodes,
75-
},
76-
})
77-
} else {
78-
logger.error(
79-
'Failed to publish nodes after %d retries',
80-
config.services.taskManager.maxRetries,
81-
)
82-
logger.info('Publishing failed task to event router')
83-
logger.debug('Nodes: %s', nodes.join(', '))
84-
EventRouter.publishFailedTask({
85-
id: 'publish-nodes',
86-
retriesLeft: MAX_RETRIES,
87-
params: {
88-
nodes,
89-
},
90-
})
31+
return {
32+
module: 'system',
33+
method: 'remark',
34+
params: [compactAddLength(buffer)],
9135
}
92-
},
93-
)
36+
})
37+
38+
const results = await transactionManager.submit(transactions)
39+
const someNodeFailed = results.some((result) => !result.success)
40+
if (someNodeFailed) {
41+
throw new Error('Failed to publish nodes')
42+
}
43+
44+
await Promise.all(
45+
filteredNodes.map((node, index) => {
46+
const isSuccess = results[index].success
47+
if (!isSuccess) return null
48+
return NodesUseCases.setPublishedOn(node.cid, results[index])
49+
}),
50+
)
51+
}
9452

9553
export const OnchainPublisher = {
9654
publishNodes,

0 commit comments

Comments
 (0)