Skip to content

Commit

Permalink
fix: queue module dependency imports (#1033)
Browse files Browse the repository at this point in the history
* fix: queue module dependency imports

* chore: bull module for root
  • Loading branch information
boris-w authored Oct 28, 2024
1 parent ac1626a commit 83e0b8a
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 19 deletions.
35 changes: 24 additions & 11 deletions apps/nestjs-backend/src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { BullModule } from '@nestjs/bullmq';
import type { ModuleMetadata } from '@nestjs/common';
import { Module } from '@nestjs/common';
import { ConditionalModule, ConfigService } from '@nestjs/config';
import Redis from 'ioredis';
import type { ICacheConfig } from './configs/cache.config';
import { ConfigModule } from './configs/config.module';
import { AccessTokenModule } from './features/access-token/access-token.module';
import { AggregationOpenApiModule } from './features/aggregation/open-api/aggregation-open-api.module';
import { AttachmentsModule } from './features/attachments/attachments.module';
Expand Down Expand Up @@ -71,17 +75,26 @@ export const appModules = {
imports: [
GlobalModule,
...appModules.imports,
...(process.env.BACKEND_CACHE_REDIS_URI
? [
BullModule.forRoot({
connection: {
lazyConnect: true,
maxRetriesPerRequest: null,
url: process.env.BACKEND_CACHE_REDIS_URI,
},
}),
]
: []),
ConditionalModule.registerWhen(
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => {
const redisUri = configService.get<ICacheConfig>('cache')?.redis.uri;
if (!redisUri) {
throw new Error('Redis URI is not defined');
}
const redis = new Redis(redisUri, { lazyConnect: true, maxRetriesPerRequest: null });
await redis.connect();
return {
connection: redis,
};
},
inject: [ConfigService],
}),
(env) => {
return Boolean(env.BACKEND_CACHE_REDIS_URI);
}
),
],
controllers: [],
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
import { BullModule } from '@nestjs/bullmq';
import type { DynamicModule } from '@nestjs/common';
import { Module } from '@nestjs/common';
import { ConditionalModule } from '@nestjs/config';
import { ConfigModule } from '../../configs/config.module';
import { FallbackQueueModule } from './fallback/fallback-queue.module';

@Module({})
@Module({
imports: [ConfigModule],
})
export class EventJobModule {
static registerQueue(name: string): DynamicModule {
const exportModule = process.env.BACKEND_CACHE_REDIS_URI ? BullModule : FallbackQueueModule;
static async registerQueue(name: string): Promise<DynamicModule> {
const [bullQueue, fallbackQueue] = await Promise.all([
ConditionalModule.registerWhen(BullModule.registerQueue({ name }), (env) =>
Boolean(env.BACKEND_CACHE_REDIS_URI)
),
ConditionalModule.registerWhen(
FallbackQueueModule.registerQueue(name),
(env) => !env.BACKEND_CACHE_REDIS_URI
),
]);

return {
module: EventJobModule,
imports: [exportModule.registerQueue(name)],
exports: [exportModule],
imports: [bullQueue, fallbackQueue],
exports: [bullQueue, fallbackQueue],
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export class FallbackQueueService implements OnModuleInit {
) {}

async onModuleInit() {
this.logger.debug('FallbackQueueService init');
this.collectionProcess();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InjectQueue, Processor } from '@nestjs/bullmq';
import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq';
import { Injectable, Logger } from '@nestjs/common';
import { UploadType } from '@teable/openapi';
import { Queue } from 'bullmq';
Expand All @@ -24,12 +24,15 @@ export const ATTACHMENTS_TABLE_QUEUE = 'attachments-table-queue';

@Injectable()
@Processor(ATTACHMENTS_TABLE_QUEUE)
export class AttachmentsTableQueueProcessor {
export class AttachmentsTableQueueProcessor extends WorkerHost {
private logger = new Logger(AttachmentsTableQueueProcessor.name);

constructor(
private readonly attachmentsStorageService: AttachmentsStorageService,
@InjectQueue(ATTACHMENTS_TABLE_QUEUE) public readonly queue: Queue<IRecordImageJob>
) {}
) {
super();
}

public async process(job: Job<IRecordImageJob>) {
const { tableId, attachmentItem } = job.data;
Expand Down

0 comments on commit 83e0b8a

Please sign in to comment.