Skip to content

Commit

Permalink
fix: unabled initialize consumer with named kafka connection
Browse files Browse the repository at this point in the history
  • Loading branch information
Val-istar-Guo committed Jan 28, 2024
1 parent bbc834f commit aee4c5c
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/decorator/kafka-consume.decorator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import { KAFKA_CONSUME } from '../constant'
import { KafkaConsumeMetadata } from '../interface/kafka-consume-metadata.interface'
import { KafkaConsumeOptions } from '../interface/kafka-consume-options.interface'

export function KafkaConsume(topic: string, options: KafkaConsumeOptions = {}): MethodDecorator {
export function KafkaConsume(topic: string | RegExp | string[] | RegExp[], options: KafkaConsumeOptions = {}): MethodDecorator {
return SetMetadata(KAFKA_CONSUME, <KafkaConsumeMetadata>{
topic,
topics: Array.isArray(topic) ? topic : [topic],
...options,
})
}
3 changes: 2 additions & 1 deletion src/interface/kafka-consume-metadata.interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export interface KafkaConsumeMetadata {
topic: string
topics: (string | RegExp)[]
json?: boolean
fromBeginning?: boolean
}
7 changes: 4 additions & 3 deletions src/interface/kafka-consume-options.interface.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export interface KafkaConsumeOptions {
json?: boolean
}
import { KafkaConsumeMetadata } from './kafka-consume-metadata.interface.js'


export type KafkaConsumeOptions = Omit<KafkaConsumeMetadata, 'topics'>
8 changes: 5 additions & 3 deletions src/interface/kafka-module-options-async.interface.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { InjectionToken, ModuleMetadata, OptionalFactoryDependency } from '@nestjs/common'
import { InjectionToken, OptionalFactoryDependency, Type } from '@nestjs/common'
import { KafkaModuleOptions } from './kafka-module-options.interface'


export interface KafkaModuleOptionsAsync extends Pick<ModuleMetadata, 'imports'> {
export interface KafkaModuleOptionsAsync {
/**
* @default "default"
*/
name?: string
useFactory: (...args: any[]) => Promise<KafkaModuleOptions> | KafkaModuleOptions
useClass?: Type<KafkaModuleOptions>
useExisting?: Type<KafkaModuleOptions>
useFactory?: (...args: any[]) => Promise<KafkaModuleOptions> | KafkaModuleOptions
inject?: Array<InjectionToken | OptionalFactoryDependency>
}
9 changes: 6 additions & 3 deletions src/kafka-consumer.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { OnModuleDestroy, OnModuleInit } from '@nestjs/common'
import { Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common'
import { DiscoveryService, MetadataScanner, Reflector } from '@nestjs/core'
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper'
import { Consumer } from 'kafkajs'
Expand All @@ -9,6 +9,8 @@ import { KafkaConsumerMetadata } from './interface/kafka-consumer-metadata.inter
import { KafkaService } from './kafka.service'

export class KafkaConsumerService implements OnModuleInit, OnModuleDestroy {
logger = new Logger(KafkaConsumerService.name)

consumers: Consumer[] = []
subscriber: Record<string, KafkaConsumeOptions> = {}

Expand All @@ -28,7 +30,7 @@ export class KafkaConsumerService implements OnModuleInit, OnModuleDestroy {
}

async onModuleInit(): Promise<void> {
this.kafka.consumer({ groupId: 'test' })
this.kafka.consumer({ groupId: this.options.groupId })

const providers = this.discoveryService.getProviders()
.filter((provider) => {
Expand Down Expand Up @@ -56,7 +58,8 @@ export class KafkaConsumerService implements OnModuleInit, OnModuleDestroy {
for (const method of methods) {
const options = this.reflector.get<KafkaConsumeMetadata>(KAFKA_CONSUME, instance[method])

await consumer.subscribe({ topic: options.topic })
await consumer.subscribe({ topics: options.topics })
this.logger.log(`Subscribe ${options.topics.join(',')}`)

const messageIndex = this.reflector.get(MESSAGE_ARGUMENT_INDEX, instance[method])
const contextIndex = this.reflector.get(CONTEXT_ARGUMENT_INDEX, instance[method])
Expand Down
29 changes: 27 additions & 2 deletions src/kafka.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { KafkaModuleForProducerOptions } from './interface/kafka-module-for-prod
import { KafkaConsumerService } from './kafka-consumer.service'
import { KafkaProducer } from './kafka-producer.service'
import { KafkaService } from './kafka.service'
import { getAsyncOptionsProvideName } from './utils/get-async-options-provide-name.js'
import { getForProducerOptionsProvideName } from './utils/get-for-producer-options-provide-name.js'
import { getKafkaConsumerServiceProvideName } from './utils/get-kafka-consumer-service-provide-name'
import { getKafkaProducerServiceProvideName } from './utils/get-kafka-producer-service-provide-name'
Expand Down Expand Up @@ -65,20 +66,44 @@ export class KafkaModule {
}

static forRootAsync(options: KafkaModuleOptionsAsync): DynamicModule {
const asyncOptionsProvideName = getAsyncOptionsProvideName(options.name)
const optionsProvideName = getOptionsProvideName(options.name)
const providers = this.getProviders(options.name)

let asyncOptionsProvide: Provider

if (options.useFactory) {
asyncOptionsProvide = {
provide: asyncOptionsProvideName,
inject: options.inject,
useFactory: options.useFactory,
}
} else if (options.useClass) {
asyncOptionsProvide = {
provide: asyncOptionsProvideName,
useClass: options.useClass,
}
} else if (options.useExisting) {
asyncOptionsProvide = {
provide: asyncOptionsProvideName,
useExisting: options.useExisting,
}
} else {
throw new Error('Invalid KafkaModuleOptionsAsync: useClass, useExisting or useFactory must be defined')
}

const optionsProvide: Provider = {
inject: [asyncOptionsProvideName],
provide: optionsProvideName,
inject: options.inject,
useFactory: options.useFactory,
useFactory: (o: KafkaModuleOptions): KafkaModuleOptions => ({ name: options.name, ...o }),
}

return {
global: true,
module: KafkaModule,
imports: [DiscoveryModule],
providers: [
asyncOptionsProvide,
optionsProvide,
...providers,
],
Expand Down
3 changes: 3 additions & 0 deletions src/utils/get-async-options-provide-name.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function getAsyncOptionsProvideName(name?: string): string {
return `KafkaAsyncOptions.${name || 'default'}`
}

0 comments on commit aee4c5c

Please sign in to comment.