Skip to content

Commit 110a34b

Browse files
committed
Working on Kafka Events.
1 parent 1ee760d commit 110a34b

10 files changed

Lines changed: 448 additions & 389 deletions

File tree

integration/docker-compose.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,11 @@ services:
4343
- "15672:15672"
4444
- "5672:5672"
4545
tty: true
46+
kafka:
47+
container_name: test-kafka
48+
hostname: kafka
49+
image: wurstmeister/kafka
50+
ports:
51+
- "9092:9092"
52+
53+

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@
8383
"@types/cors": "2.8.5",
8484
"@types/express": "4.17.0",
8585
"@types/fastify-cors": "2.1.0",
86-
"@types/kafka-node": "2.0.8",
8786
"@types/mocha": "5.2.7",
8887
"@types/node": "10.14.8",
8988
"@types/redis": "2.8.13",
@@ -113,6 +112,7 @@
113112
"husky": "1.3.1",
114113
"imports-loader": "0.8.0",
115114
"json-loader": "0.5.7",
115+
"kafkajs": "^1.8.0",
116116
"lerna": "3.14.1",
117117
"lint-staged": "8.2.0",
118118
"memory-usage": "1.2.1",

packages/microservices/constants.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,5 @@ export const DISCONNECTED_RMQ_MESSAGE = `Disconnected from RMQ. Trying to reconn
3636
export const GRPC_DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH = 4 * 1024 * 1024;
3737
export const GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH = 4 * 1024 * 1024;
3838

39-
export const KAFKA_DEFAULT_CLIENT = 'nestjs';
40-
export const KAFKA_DEFAULT_GROUP = 'nestjs';
39+
export const KAFKA_DEFAULT_CLIENT = 'nestjs-consumer';
40+
export const KAFKA_DEFAULT_GROUP = 'nestjs-group';

packages/microservices/decorators/message-pattern.decorator.ts

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { PATTERN_HANDLER_METADATA, PATTERN_METADATA } from '../constants';
22
import { PatternHandler } from '../enums/pattern-handler.enum';
33
import { PatternMetadata } from '../interfaces/pattern-metadata.interface';
4+
import { ConsumerConfig } from '../external/kafka.interface';
45

56
export enum GrpcMethodStreamingType {
67
NO_STREAMING = 'no_stream',
@@ -40,7 +41,7 @@ export function GrpcMethod(service: string, method?: string): MethodDecorator {
4041
key: string | symbol,
4142
descriptor: PropertyDescriptor,
4243
) => {
43-
const metadata = createMethodMetadata(target, key, service, method);
44+
const metadata = createGrpcMethodMetadata(target, key, service, method);
4445
return MessagePattern(metadata)(target, key, descriptor);
4546
};
4647
}
@@ -62,7 +63,7 @@ export function GrpcStreamMethod(service: string, method?: string) {
6263
key: string | symbol,
6364
descriptor: PropertyDescriptor,
6465
) => {
65-
const metadata = createMethodMetadata(
66+
const metadata = createGrpcMethodMetadata(
6667
target,
6768
key,
6869
service,
@@ -90,7 +91,7 @@ export function GrpcStreamCall(service: string, method?: string) {
9091
key: string | symbol,
9192
descriptor: PropertyDescriptor,
9293
) => {
93-
const metadata = createMethodMetadata(
94+
const metadata = createGrpcMethodMetadata(
9495
target,
9596
key,
9697
service,
@@ -101,7 +102,7 @@ export function GrpcStreamCall(service: string, method?: string) {
101102
};
102103
}
103104

104-
export function createMethodMetadata(
105+
export function createGrpcMethodMetadata(
105106
target: any,
106107
key: string | symbol,
107108
service: string | undefined,
@@ -124,3 +125,73 @@ export function createMethodMetadata(
124125
}
125126
return { service, rpc: method, streaming };
126127
}
128+
129+
export enum KafkaConsumerHandlerType {
130+
EACH_MESSAGE = 'eachMessage',
131+
EACH_BATCH = 'eachBatch',
132+
}
133+
134+
export interface KafkaConsumerSubscriptionOptions {
135+
topic: string | RegExp;
136+
fromBeginning?: boolean;
137+
}
138+
139+
export interface KafkaConsumerRunOptions {
140+
autoCommit?: boolean;
141+
autoCommitInterval?: number | null;
142+
autoCommitThreshold?: number | null;
143+
eachBatchAutoResolve?: boolean;
144+
partitionsConsumedConcurrently?: number;
145+
}
146+
147+
/**
148+
* Registers a consumer handler for processing each message of a topic individually.
149+
*
150+
* @param subscription The options for the subscription to the topic.
151+
* @param run The options for the consumption of messages from the topic.
152+
*/
153+
export function KafkaConsumer(subscription: KafkaConsumerSubscriptionOptions): MethodDecorator;
154+
export function KafkaConsumer(subscription: KafkaConsumerSubscriptionOptions, run?: KafkaConsumerRunOptions): MethodDecorator;
155+
export function KafkaConsumer(subscription: KafkaConsumerSubscriptionOptions, run?: KafkaConsumerRunOptions): MethodDecorator {
156+
return (
157+
target: any,
158+
key: string | symbol,
159+
descriptor: PropertyDescriptor,
160+
) => {
161+
const metadata = createKafkaMethodMetadata(target, key, subscription, run);
162+
return MessagePattern(metadata)(target, key, descriptor);
163+
};
164+
}
165+
166+
/**
167+
* Registers a consumer handler for processing each message of a topic in bitch.
168+
*
169+
* @param subscription The options for the subscription to the topic.
170+
* @param run The options for the consumption of messages from the topic.
171+
*/
172+
export function KafkaBatchConsumer(subscription: KafkaConsumerSubscriptionOptions): MethodDecorator;
173+
export function KafkaBatchConsumer(subscription: KafkaConsumerSubscriptionOptions, run?: KafkaConsumerRunOptions): MethodDecorator;
174+
export function KafkaBatchConsumer(subscription: KafkaConsumerSubscriptionOptions, run?: KafkaConsumerRunOptions): MethodDecorator {
175+
return (
176+
target: any,
177+
key: string | symbol,
178+
descriptor: PropertyDescriptor,
179+
) => {
180+
const metadata = createKafkaMethodMetadata(target, key, subscription, run, KafkaConsumerHandlerType.EACH_BATCH);
181+
return MessagePattern(metadata)(target, key, descriptor);
182+
};
183+
}
184+
185+
export function createKafkaMethodMetadata(
186+
target: any,
187+
key: string | symbol,
188+
subscription: KafkaConsumerSubscriptionOptions,
189+
run: KafkaConsumerRunOptions | undefined,
190+
handler = KafkaConsumerHandlerType.EACH_MESSAGE
191+
) {
192+
return {
193+
subscription,
194+
run,
195+
handler
196+
};
197+
}

0 commit comments

Comments
 (0)