From aee4c5ca8e08ba94b9e603a78d00d4ff9e7534fc Mon Sep 17 00:00:00 2001 From: "val.istar.guo" Date: Mon, 29 Jan 2024 00:21:54 +0800 Subject: [PATCH] fix: unabled initialize consumer with named kafka connection --- src/decorator/kafka-consume.decorator.ts | 4 +-- .../kafka-consume-metadata.interface.ts | 3 +- .../kafka-consume-options.interface.ts | 7 +++-- .../kafka-module-options-async.interface.ts | 8 +++-- src/kafka-consumer.service.ts | 9 ++++-- src/kafka.module.ts | 29 +++++++++++++++++-- src/utils/get-async-options-provide-name.ts | 3 ++ 7 files changed, 49 insertions(+), 14 deletions(-) create mode 100644 src/utils/get-async-options-provide-name.ts diff --git a/src/decorator/kafka-consume.decorator.ts b/src/decorator/kafka-consume.decorator.ts index b8d4767..22034d1 100644 --- a/src/decorator/kafka-consume.decorator.ts +++ b/src/decorator/kafka-consume.decorator.ts @@ -3,9 +3,9 @@ import { KAFKA_CONSUME } from '../constant' import { KafkaConsumeMetadata } from '../interface/kafka-consume-metadata.interface' import { KafkaConsumeOptions } from '../interface/kafka-consume-options.interface' -export function KafkaConsume(topic: string, options: KafkaConsumeOptions = {}): MethodDecorator { +export function KafkaConsume(topic: string | RegExp | string[] | RegExp[], options: KafkaConsumeOptions = {}): MethodDecorator { return SetMetadata(KAFKA_CONSUME, { - topic, + topics: Array.isArray(topic) ? topic : [topic], ...options, }) } diff --git a/src/interface/kafka-consume-metadata.interface.ts b/src/interface/kafka-consume-metadata.interface.ts index c3a96ee..631d313 100644 --- a/src/interface/kafka-consume-metadata.interface.ts +++ b/src/interface/kafka-consume-metadata.interface.ts @@ -1,4 +1,5 @@ export interface KafkaConsumeMetadata { - topic: string + topics: (string | RegExp)[] json?: boolean + fromBeginning?: boolean } diff --git a/src/interface/kafka-consume-options.interface.ts b/src/interface/kafka-consume-options.interface.ts index 5b5b1e3..a49d91f 100644 --- a/src/interface/kafka-consume-options.interface.ts +++ b/src/interface/kafka-consume-options.interface.ts @@ -1,3 +1,4 @@ -export interface KafkaConsumeOptions { - json?: boolean -} +import { KafkaConsumeMetadata } from './kafka-consume-metadata.interface.js' + + +export type KafkaConsumeOptions = Omit diff --git a/src/interface/kafka-module-options-async.interface.ts b/src/interface/kafka-module-options-async.interface.ts index d481070..3e85ca4 100644 --- a/src/interface/kafka-module-options-async.interface.ts +++ b/src/interface/kafka-module-options-async.interface.ts @@ -1,12 +1,14 @@ -import { InjectionToken, ModuleMetadata, OptionalFactoryDependency } from '@nestjs/common' +import { InjectionToken, OptionalFactoryDependency, Type } from '@nestjs/common' import { KafkaModuleOptions } from './kafka-module-options.interface' -export interface KafkaModuleOptionsAsync extends Pick { +export interface KafkaModuleOptionsAsync { /** * @default "default" */ name?: string - useFactory: (...args: any[]) => Promise | KafkaModuleOptions + useClass?: Type + useExisting?: Type + useFactory?: (...args: any[]) => Promise | KafkaModuleOptions inject?: Array } diff --git a/src/kafka-consumer.service.ts b/src/kafka-consumer.service.ts index 00e5bc9..5ee6cdd 100644 --- a/src/kafka-consumer.service.ts +++ b/src/kafka-consumer.service.ts @@ -1,4 +1,4 @@ -import { OnModuleDestroy, OnModuleInit } from '@nestjs/common' +import { Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common' import { DiscoveryService, MetadataScanner, Reflector } from '@nestjs/core' import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper' import { Consumer } from 'kafkajs' @@ -9,6 +9,8 @@ import { KafkaConsumerMetadata } from './interface/kafka-consumer-metadata.inter import { KafkaService } from './kafka.service' export class KafkaConsumerService implements OnModuleInit, OnModuleDestroy { + logger = new Logger(KafkaConsumerService.name) + consumers: Consumer[] = [] subscriber: Record = {} @@ -28,7 +30,7 @@ export class KafkaConsumerService implements OnModuleInit, OnModuleDestroy { } async onModuleInit(): Promise { - this.kafka.consumer({ groupId: 'test' }) + this.kafka.consumer({ groupId: this.options.groupId }) const providers = this.discoveryService.getProviders() .filter((provider) => { @@ -56,7 +58,8 @@ export class KafkaConsumerService implements OnModuleInit, OnModuleDestroy { for (const method of methods) { const options = this.reflector.get(KAFKA_CONSUME, instance[method]) - await consumer.subscribe({ topic: options.topic }) + await consumer.subscribe({ topics: options.topics }) + this.logger.log(`Subscribe ${options.topics.join(',')}`) const messageIndex = this.reflector.get(MESSAGE_ARGUMENT_INDEX, instance[method]) const contextIndex = this.reflector.get(CONTEXT_ARGUMENT_INDEX, instance[method]) diff --git a/src/kafka.module.ts b/src/kafka.module.ts index 07179e1..4269646 100644 --- a/src/kafka.module.ts +++ b/src/kafka.module.ts @@ -8,6 +8,7 @@ import { KafkaModuleForProducerOptions } from './interface/kafka-module-for-prod import { KafkaConsumerService } from './kafka-consumer.service' import { KafkaProducer } from './kafka-producer.service' import { KafkaService } from './kafka.service' +import { getAsyncOptionsProvideName } from './utils/get-async-options-provide-name.js' import { getForProducerOptionsProvideName } from './utils/get-for-producer-options-provide-name.js' import { getKafkaConsumerServiceProvideName } from './utils/get-kafka-consumer-service-provide-name' import { getKafkaProducerServiceProvideName } from './utils/get-kafka-producer-service-provide-name' @@ -65,13 +66,36 @@ export class KafkaModule { } static forRootAsync(options: KafkaModuleOptionsAsync): DynamicModule { + const asyncOptionsProvideName = getAsyncOptionsProvideName(options.name) const optionsProvideName = getOptionsProvideName(options.name) const providers = this.getProviders(options.name) + let asyncOptionsProvide: Provider + + if (options.useFactory) { + asyncOptionsProvide = { + provide: asyncOptionsProvideName, + inject: options.inject, + useFactory: options.useFactory, + } + } else if (options.useClass) { + asyncOptionsProvide = { + provide: asyncOptionsProvideName, + useClass: options.useClass, + } + } else if (options.useExisting) { + asyncOptionsProvide = { + provide: asyncOptionsProvideName, + useExisting: options.useExisting, + } + } else { + throw new Error('Invalid KafkaModuleOptionsAsync: useClass, useExisting or useFactory must be defined') + } + const optionsProvide: Provider = { + inject: [asyncOptionsProvideName], provide: optionsProvideName, - inject: options.inject, - useFactory: options.useFactory, + useFactory: (o: KafkaModuleOptions): KafkaModuleOptions => ({ name: options.name, ...o }), } return { @@ -79,6 +103,7 @@ export class KafkaModule { module: KafkaModule, imports: [DiscoveryModule], providers: [ + asyncOptionsProvide, optionsProvide, ...providers, ], diff --git a/src/utils/get-async-options-provide-name.ts b/src/utils/get-async-options-provide-name.ts new file mode 100644 index 0000000..7922ece --- /dev/null +++ b/src/utils/get-async-options-provide-name.ts @@ -0,0 +1,3 @@ +export function getAsyncOptionsProvideName(name?: string): string { + return `KafkaAsyncOptions.${name || 'default'}` +}