From 83e0b8a521ca0872fd6c46bac070d329c16804d7 Mon Sep 17 00:00:00 2001 From: Boris Date: Mon, 28 Oct 2024 14:30:24 +0800 Subject: [PATCH] fix: queue module dependency imports (#1033) * fix: queue module dependency imports * chore: bull module for root --- apps/nestjs-backend/src/app.module.ts | 35 +++++++++++++------ .../event-job/event-job.module.ts | 23 +++++++++--- .../fallback/fallback-queue.service.ts | 1 + .../attachments-table.processor.ts | 9 +++-- 4 files changed, 49 insertions(+), 19 deletions(-) diff --git a/apps/nestjs-backend/src/app.module.ts b/apps/nestjs-backend/src/app.module.ts index 9ca6a3e06..258940acb 100644 --- a/apps/nestjs-backend/src/app.module.ts +++ b/apps/nestjs-backend/src/app.module.ts @@ -1,6 +1,10 @@ import { BullModule } from '@nestjs/bullmq'; import type { ModuleMetadata } from '@nestjs/common'; import { Module } from '@nestjs/common'; +import { ConditionalModule, ConfigService } from '@nestjs/config'; +import Redis from 'ioredis'; +import type { ICacheConfig } from './configs/cache.config'; +import { ConfigModule } from './configs/config.module'; import { AccessTokenModule } from './features/access-token/access-token.module'; import { AggregationOpenApiModule } from './features/aggregation/open-api/aggregation-open-api.module'; import { AttachmentsModule } from './features/attachments/attachments.module'; @@ -71,17 +75,26 @@ export const appModules = { imports: [ GlobalModule, ...appModules.imports, - ...(process.env.BACKEND_CACHE_REDIS_URI - ? [ - BullModule.forRoot({ - connection: { - lazyConnect: true, - maxRetriesPerRequest: null, - url: process.env.BACKEND_CACHE_REDIS_URI, - }, - }), - ] - : []), + ConditionalModule.registerWhen( + BullModule.forRootAsync({ + imports: [ConfigModule], + useFactory: async (configService: ConfigService) => { + const redisUri = configService.get('cache')?.redis.uri; + if (!redisUri) { + throw new Error('Redis URI is not defined'); + } + const redis = new Redis(redisUri, { lazyConnect: true, maxRetriesPerRequest: null }); + await redis.connect(); + return { + connection: redis, + }; + }, + inject: [ConfigService], + }), + (env) => { + return Boolean(env.BACKEND_CACHE_REDIS_URI); + } + ), ], controllers: [], }) diff --git a/apps/nestjs-backend/src/event-emitter/event-job/event-job.module.ts b/apps/nestjs-backend/src/event-emitter/event-job/event-job.module.ts index b892c0db4..6c0ac312c 100644 --- a/apps/nestjs-backend/src/event-emitter/event-job/event-job.module.ts +++ b/apps/nestjs-backend/src/event-emitter/event-job/event-job.module.ts @@ -1,16 +1,29 @@ import { BullModule } from '@nestjs/bullmq'; import type { DynamicModule } from '@nestjs/common'; import { Module } from '@nestjs/common'; +import { ConditionalModule } from '@nestjs/config'; +import { ConfigModule } from '../../configs/config.module'; import { FallbackQueueModule } from './fallback/fallback-queue.module'; -@Module({}) +@Module({ + imports: [ConfigModule], +}) export class EventJobModule { - static registerQueue(name: string): DynamicModule { - const exportModule = process.env.BACKEND_CACHE_REDIS_URI ? BullModule : FallbackQueueModule; + static async registerQueue(name: string): Promise { + const [bullQueue, fallbackQueue] = await Promise.all([ + ConditionalModule.registerWhen(BullModule.registerQueue({ name }), (env) => + Boolean(env.BACKEND_CACHE_REDIS_URI) + ), + ConditionalModule.registerWhen( + FallbackQueueModule.registerQueue(name), + (env) => !env.BACKEND_CACHE_REDIS_URI + ), + ]); + return { module: EventJobModule, - imports: [exportModule.registerQueue(name)], - exports: [exportModule], + imports: [bullQueue, fallbackQueue], + exports: [bullQueue, fallbackQueue], }; } } diff --git a/apps/nestjs-backend/src/event-emitter/event-job/fallback/fallback-queue.service.ts b/apps/nestjs-backend/src/event-emitter/event-job/fallback/fallback-queue.service.ts index 4f2631def..95f8e7f34 100644 --- a/apps/nestjs-backend/src/event-emitter/event-job/fallback/fallback-queue.service.ts +++ b/apps/nestjs-backend/src/event-emitter/event-job/fallback/fallback-queue.service.ts @@ -16,6 +16,7 @@ export class FallbackQueueService implements OnModuleInit { ) {} async onModuleInit() { + this.logger.debug('FallbackQueueService init'); this.collectionProcess(); } diff --git a/apps/nestjs-backend/src/features/attachments/attachments-table.processor.ts b/apps/nestjs-backend/src/features/attachments/attachments-table.processor.ts index 6dae16fa5..e6cba97ae 100644 --- a/apps/nestjs-backend/src/features/attachments/attachments-table.processor.ts +++ b/apps/nestjs-backend/src/features/attachments/attachments-table.processor.ts @@ -1,4 +1,4 @@ -import { InjectQueue, Processor } from '@nestjs/bullmq'; +import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq'; import { Injectable, Logger } from '@nestjs/common'; import { UploadType } from '@teable/openapi'; import { Queue } from 'bullmq'; @@ -24,12 +24,15 @@ export const ATTACHMENTS_TABLE_QUEUE = 'attachments-table-queue'; @Injectable() @Processor(ATTACHMENTS_TABLE_QUEUE) -export class AttachmentsTableQueueProcessor { +export class AttachmentsTableQueueProcessor extends WorkerHost { private logger = new Logger(AttachmentsTableQueueProcessor.name); + constructor( private readonly attachmentsStorageService: AttachmentsStorageService, @InjectQueue(ATTACHMENTS_TABLE_QUEUE) public readonly queue: Queue - ) {} + ) { + super(); + } public async process(job: Job) { const { tableId, attachmentItem } = job.data;