Skip to content

Commit e2b5a42

Browse files
authored
feat: publishing tasks for nodes failing to be published into error queues (#433)
* feat: publishing failing nodes task * refactor: update eventRouter to include frontendErrorPublishedQueue and improve import structure * chore: update yarn.lock to reflect dependency version changes and add new bun packages for cross-platform support * fix: enhance error logging in onchainPublisher to include maxRetries from config * refactor: improve import structure in eventRouter by adding downloadErrorPublishedQueue
1 parent a56bc7e commit e2b5a42

File tree

6 files changed

+207
-67
lines changed

6 files changed

+207
-67
lines changed

backend/src/infrastructure/eventRouter/index.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
import { Rabbit } from '../drivers/rabbit.js'
2-
import { processDownloadTask } from './processors/download.js'
3-
import { processFrontendTask } from './processors/frontend.js'
2+
import {
3+
downloadErrorPublishedQueue,
4+
processDownloadTask,
5+
} from './processors/download.js'
6+
import {
7+
frontendErrorPublishedQueue,
8+
processFrontendTask,
9+
} from './processors/frontend.js'
410
import { Task } from './tasks.js'
511

612
export const EventRouter = {
@@ -19,6 +25,9 @@ export const EventRouter = {
1925
Rabbit.publish(getTargetQueueByTask(tasks), tasks)
2026
}
2127
},
28+
publishFailedTask: (task: Task) => {
29+
Rabbit.publish(getFailedTaskQueue(task), task)
30+
},
2231
}
2332

2433
const getTargetQueueByTask = (task: Task) => {
@@ -30,3 +39,14 @@ const getTargetQueueByTask = (task: Task) => {
3039
return 'task-manager'
3140
}
3241
}
42+
43+
const getFailedTaskQueue = (task: Task) => {
44+
switch (getTargetQueueByTask(task)) {
45+
case 'task-manager':
46+
return frontendErrorPublishedQueue
47+
case 'download-manager':
48+
return downloadErrorPublishedQueue
49+
default:
50+
throw new Error(`Unknown task: ${task.id}`)
51+
}
52+
}

backend/src/infrastructure/eventRouter/processors/download.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import { createHandlerWithRetries } from '../utils.js'
66

77
const logger = createLogger('eventRouter:processor:download')
88

9+
export const downloadErrorPublishedQueue = 'download-errors'
10+
911
export const processDownloadTask = createHandlerWithRetries(
1012
({ id, params }: Task) => {
1113
if (id === 'async-download-created') {

backend/src/infrastructure/eventRouter/processors/frontend.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import { UploadsUseCases } from '../../../core/uploads/uploads.js'
44
import { Task } from '../tasks.js'
55
import { createHandlerWithRetries } from '../utils.js'
66

7+
export const frontendErrorPublishedQueue = 'frontend-errors'
8+
79
export const processFrontendTask = createHandlerWithRetries(
810
({ id, params, retriesLeft }: Task) => {
911
if (id === 'migrate-upload-nodes') {
@@ -19,6 +21,6 @@ export const processFrontendTask = createHandlerWithRetries(
1921
}
2022
},
2123
{
22-
errorPublishQueue: 'frontend-errors',
24+
errorPublishQueue: frontendErrorPublishedQueue,
2325
},
2426
)

backend/src/infrastructure/services/upload/onchainPublisher/index.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import { createTransactionManager } from './transactionManager.js'
55
import { compactAddLength } from '@polkadot/util'
66
import { nodesRepository } from '../../../repositories/objects/nodes.js'
77
import { EventRouter } from '../../../eventRouter/index.js'
8+
import { MAX_RETRIES } from '../../../eventRouter/tasks.js'
9+
import { config } from '../../../../config.js'
810

911
const logger = createLogger('upload:onchainPublisher')
1012

@@ -72,6 +74,20 @@ const republishNodes = safeCallback(
7274
nodes,
7375
},
7476
})
77+
} else {
78+
logger.error(
79+
'Failed to publish nodes after %d retries',
80+
config.services.taskManager.maxRetries,
81+
)
82+
logger.info('Publishing failed task to event router')
83+
logger.debug('Nodes: %s', nodes.join(', '))
84+
EventRouter.publishFailedTask({
85+
id: 'publish-nodes',
86+
retriesLeft: MAX_RETRIES,
87+
params: {
88+
nodes,
89+
},
90+
})
7591
}
7692
},
7793
)

submodules/files-gateway

Submodule files-gateway updated 55 files

0 commit comments

Comments
 (0)