Skip to content

Commit c376018

Browse files
committed
Adds MessageRequest decorator to define topic for client consumer subscriptions.
1 parent 05fc85d commit c376018

9 files changed

Lines changed: 173 additions & 42 deletions

File tree

integration/microservices/src/kafka/kafka.controller.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ import {
55
EventPattern,
66
MessagePattern,
77
Transport,
8+
MessageRequest,
89
} from '@nestjs/microservices';
910
import { Logger } from '@nestjs/common/services/logger.service';
1011
import * as util from 'util';
1112

1213
import { Observable } from 'rxjs';
1314
import * as uuid from 'uuid';
15+
import * as Bluebird from 'bluebird';
1416

1517
@Controller()
1618
export class KafkaController implements OnModuleInit {
@@ -69,6 +71,7 @@ export class KafkaController implements OnModuleInit {
6971

7072
@Post()
7173
@HttpCode(200)
74+
@MessageRequest('math.sum', 'math.sum.reply')
7275
async call(
7376
@Query('command') cmd,
7477
@Body() data: number[],
@@ -82,7 +85,9 @@ export class KafkaController implements OnModuleInit {
8285
}),
8386
}).toPromise();
8487

85-
this.logger.error(util.format('@Query math.sum result %o', result));
88+
await Bluebird.delay(30000);
89+
90+
// this.logger.error(util.format('@Query math.sum result %o', result));
8691

8792
const sum = JSON.parse(result.value.toString());
8893

@@ -91,7 +96,7 @@ export class KafkaController implements OnModuleInit {
9196

9297
@MessagePattern('math.sum')
9398
mathSum(data: any){
94-
this.logger.error(util.format('@MessagePattern math.sum data %o', data));
99+
// this.logger.error(util.format('@MessagePattern math.sum data %o', data));
95100

96101
const value = JSON.parse(data.value);
97102

@@ -103,6 +108,7 @@ export class KafkaController implements OnModuleInit {
103108
}
104109

105110
@Post('notify')
111+
@MessageRequest('test.one', 'test.one.reply')
106112
async sendNotification(): Promise<any> {
107113
return this.client.emit('notification', [{
108114
key: 'notify',

packages/microservices/client/client-kafka.ts

Lines changed: 62 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import * as util from 'util';
2-
import { isUndefined } from '@nestjs/common/utils/shared.utils';
2+
import { isUndefined, isNil } from '@nestjs/common/utils/shared.utils';
33
import { Logger } from '@nestjs/common/services/logger.service';
44
import { loadPackage } from '@nestjs/common/utils/load-package.util';
55
import { Observable } from 'rxjs';
@@ -20,7 +20,8 @@ import {
2020
EachMessagePayload,
2121
Message,
2222
KafkaMessage,
23-
logLevel
23+
logLevel,
24+
ConsumerGroupJoinEvent
2425
} from '../external/kafka.interface';
2526
import { KafkaHeaders } from '../enums';
2627

@@ -40,6 +41,10 @@ export class ClientKafka extends ClientProxy {
4041
private readonly clientId: string;
4142
private readonly groupId: string;
4243

44+
private consumerAssignments: {[key: string]: number[]} = {};
45+
46+
private static REPLY_PATTERN_AFFIX: string = '.reply';
47+
4348
constructor(protected readonly options: KafkaOptions['options']) {
4449
super();
4550
this.brokers = this.getOptionsProp(this.options.client, 'brokers') || [KAFKA_DEFAULT_BROKER];
@@ -66,22 +71,44 @@ export class ClientKafka extends ClientProxy {
6671
return this.producer;
6772
}
6873
this.client = this.createClient();
74+
6975
this.producer = this.client.producer(this.options.producer || {});
7076
this.consumer = this.client.consumer(Object.assign(this.options.consumer || {}, {
7177
groupId: this.groupId
7278
}));
7379

80+
// set member assignments on join and rebalance
81+
this.consumer.on(this.consumer.events.GROUP_JOIN, (data: ConsumerGroupJoinEvent) => {
82+
this.consumerAssignments = data.payload.memberAssignment;
83+
});
84+
85+
// connect the producer and consumer
7486
await this.producer.connect();
7587
await this.consumer.connect();
7688

77-
// @TODO: Use descriptors to define the reply topics
78-
// Run the consumer
79-
await this.consumer.subscribe({topic: 'math.sum.reply'});
89+
// bind the topics
90+
await this.bindTopics();
91+
92+
return this.producer;
93+
}
94+
95+
public async bindTopics(): Promise<void> {
96+
const requestPatterns = [...this.requestMap.keys()];
97+
98+
await Promise.all(requestPatterns.map(async requestPattern => {
99+
// get the reply pattern
100+
const replyPattern = this.getReplyPattern(requestPattern, ClientKafka.REPLY_PATTERN_AFFIX);
101+
102+
// subscribe to the pattern of the topic
103+
await this.consumer.subscribe({
104+
topic: replyPattern
105+
});
106+
}));
107+
108+
// run the consumer to start listening on the reply topics
80109
await this.consumer.run(Object.assign(this.options.run || {}, {
81110
eachMessage: this.createResponseCallback()
82111
}));
83-
84-
return this.producer;
85112
}
86113

87114
public createClient<T = any>(): T {
@@ -118,19 +145,10 @@ export class ClientKafka extends ClientProxy {
118145

119146
public createResponseCallback(): (payload: EachMessagePayload) => any {
120147
return (payload: EachMessagePayload) => {
121-
// const { err, response, isDisposed, id } = JSON.parse(
122-
// buffer.toString(),
123-
// ) as WritePacket & PacketId;
124-
125148
const packet = this.deserialize(payload);
126149

127-
this.logger.error(util.format('createResponseCallback() fn() packet: %o', packet));
128-
129150
const callback = this.routingMap.get(packet.id);
130151

131-
this.logger.error(util.format('createResponseCallback() fn() this.routingMap: %o', this.routingMap));
132-
this.logger.error(util.format('createResponseCallback() fn() callback: %o', callback));
133-
134152
if (!callback) {
135153
return undefined;
136154
}
@@ -188,19 +206,37 @@ export class ClientKafka extends ClientProxy {
188206
}, this.options.send || {}));
189207
}
190208

191-
private getReplyTopic(pattern: string): string {
192-
return `${pattern}.reply`;
193-
}
194-
195-
private getReplyPartition(topic: string): string {
209+
private getReplyPartition(topic: string): number {
196210
// this.consumer.describeGroup().then((description) => {
197-
// this.logger.error(util.format('getReplyTopicPartition(): topic: %s groupDescription: %o', topic, description));
211+
// // this.logger.error(util.format('getReplyTopicPartition(): groupDescription: %o', description));
212+
213+
// description.members.forEach((member) => {
214+
// // const memberMetadata = kafkaPackage.AssignerProtocol.MemberMetadata.decode(member.memberMetadata);
215+
// const memberAssignment = kafkaPackage.AssignerProtocol.MemberAssignment.decode(member.memberAssignment);
216+
217+
// this.logger.error(util.format('getReplyTopicPartition(): groupDescription.member[i]: %o', member));
218+
// // this.logger.error(util.format('getReplyTopicPartition(): groupDescription.member[i] metadata: %o', memberMetadata));
219+
// this.logger.error(util.format('getReplyTopicPartition(): groupDescription.member[i] assignment: %o', memberAssignment));
220+
// });
198221
// });
199222

200-
return '0';
201-
}
223+
// return 0;
202224

203-
// private getReplyTopic
225+
// get topic assignment
226+
const topicAssignments = this.consumerAssignments[topic];
227+
228+
// throw error
229+
if (isUndefined(topicAssignments)) {
230+
throw new Error(`Unable to send the message request because the client consumer is not subscribed to the topic (${topic}).`);
231+
}
232+
233+
// if the current member isn't listening to any partitions on the topic then throw an error.
234+
if (isUndefined(topicAssignments[0])) {
235+
throw new Error(`Unable to send the message request because the client consumer subscribed to the topic (${topic}) is not assigned any partitions.`);
236+
}
237+
238+
return topicAssignments[0];
239+
}
204240

205241
protected publish(
206242
partialPacket: ReadPacket,
@@ -209,18 +245,9 @@ export class ClientKafka extends ClientProxy {
209245
try {
210246
const packet = this.assignPacketId(partialPacket);
211247
const pattern = this.normalizePattern(partialPacket.pattern);
212-
const replyTopic = this.getReplyTopic(pattern);
248+
const replyTopic = this.getReplyPattern(pattern, ClientKafka.REPLY_PATTERN_AFFIX);
213249
const replyPartition = this.getReplyPartition(replyTopic);
214250

215-
// subscribe
216-
// this.consumer.subscribe({
217-
// topic: replyTopic
218-
// }).then(() => {
219-
// return this.consumer.run(Object.assign(this.options.run || {}, {
220-
// eachMessage: this.createResponseCallback()
221-
// }));
222-
// });
223-
224251
// set the route
225252
this.routingMap.set(packet.id, callback);
226253

@@ -234,8 +261,6 @@ export class ClientKafka extends ClientProxy {
234261
packet.data.headers[KafkaHeaders.REPLY_TOPIC] = replyTopic;
235262
packet.data.headers[KafkaHeaders.REPLY_PARTITION] = replyPartition;
236263

237-
this.logger.error(util.format('publish() packet %o', packet));
238-
239264
// send through unhandled promise
240265
this.producer.send(Object.assign({
241266
topic: pattern,

packages/microservices/client/client-proxy.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,27 @@ export abstract class ClientProxy {
2525
public abstract close(): any;
2626

2727
protected routingMap = new Map<string, Function>();
28+
protected requestMap = new Map<string, string>();
29+
30+
public addMessageRequest(
31+
requestPattern: any,
32+
replyPattern: any
33+
) {
34+
const request = this.normalizePattern(requestPattern);
35+
const reply = this.normalizePattern(replyPattern);
36+
37+
this.requestMap.set(request, reply);
38+
}
39+
40+
protected getReplyPattern(requestPattern: string, affix: string): string {
41+
const replyPattern = this.requestMap.get(requestPattern);
42+
43+
if (isNil(replyPattern)) {
44+
return `${requestPattern}${affix}`;
45+
}
46+
47+
return replyPattern;
48+
}
2849

2950
public send<TResult = any, TInput = any>(
3051
pattern: any,

packages/microservices/constants.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ export const CLIENT_CONFIGURATION_METADATA = 'microservices:client';
2121
export const PATTERN_HANDLER_METADATA = 'microservices:handler_type';
2222
export const CLIENT_METADATA = 'microservices:is_client_instance';
2323

24+
export const REQUEST_PATTERN_METADATA = 'microservices:request_pattern';
25+
export const REPLY_PATTERN_METADATA = 'microservices:reply_pattern';
26+
2427
export const RQM_DEFAULT_QUEUE = 'default';
2528
export const RQM_DEFAULT_PREFETCH_COUNT = 0;
2629
export const RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT = false;

packages/microservices/decorators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ export * from './client.decorator';
22
export * from './event-pattern.decorator';
33
export * from './grpc-service.decorator';
44
export * from './message-pattern.decorator';
5+
export * from './message-request.decorator';
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { REQUEST_PATTERN_METADATA, REPLY_PATTERN_METADATA } from '../constants';
2+
import { PatternMetadata } from '../interfaces/pattern-metadata.interface';
3+
4+
/**
5+
* Makes outgoing to incoming messages that fulfils the chosen patterns.
6+
*/
7+
export const MessageRequest = <T = PatternMetadata | string>(
8+
requestMetadata?: T,
9+
replyMetadata?: T
10+
): MethodDecorator => {
11+
return (
12+
target: any,
13+
key: string | symbol,
14+
descriptor: PropertyDescriptor,
15+
) => {
16+
Reflect.defineMetadata(REQUEST_PATTERN_METADATA, requestMetadata, descriptor.value);
17+
Reflect.defineMetadata(REPLY_PATTERN_METADATA, replyMetadata, descriptor.value);
18+
return descriptor;
19+
};
20+
};
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
export enum PatternHandler {
22
MESSAGE = 1,
3-
EVENT = 2,
3+
EVENT = 2
44
}

packages/microservices/listener-metadata-explorer.ts

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import {
66
CLIENT_METADATA,
77
PATTERN_HANDLER_METADATA,
88
PATTERN_METADATA,
9+
REQUEST_PATTERN_METADATA,
10+
REPLY_PATTERN_METADATA,
911
} from './constants';
1012
import { PatternHandler } from './enums/pattern-handler.enum';
1113
import { ClientOptions } from './interfaces/client-metadata.interface';
@@ -23,6 +25,11 @@ export interface PatternProperties {
2325
targetCallback: (...args: any[]) => any;
2426
}
2527

28+
export interface MessageRequestProperties {
29+
requestPattern: PatternMetadata;
30+
replyPattern: PatternMetadata;
31+
}
32+
2633
export class ListenerMetadataExplorer {
2734
constructor(private readonly metadataScanner: MetadataScanner) {}
2835

@@ -32,12 +39,11 @@ export class ListenerMetadataExplorer {
3239
Controller,
3340
PatternProperties
3441
>(instance, instancePrototype, method =>
35-
this.exploreMethodMetadata(instance, instancePrototype, method),
42+
this.exploreMethodMetadata(instancePrototype, method),
3643
);
3744
}
3845

3946
public exploreMethodMetadata(
40-
instance: object,
4147
instancePrototype: any,
4248
methodKey: string,
4349
): PatternProperties {
@@ -78,4 +84,41 @@ export class ListenerMetadataExplorer {
7884
yield { property, metadata };
7985
}
8086
}
87+
88+
public exploreMessageRequests(
89+
instance: Controller,
90+
): MessageRequestProperties[] {
91+
const instancePrototype = Object.getPrototypeOf(instance);
92+
return this.metadataScanner.scanFromPrototype<
93+
Controller,
94+
MessageRequestProperties
95+
>(instance, instancePrototype, method =>
96+
this.exploreMessageRequestMethodMetadata(instancePrototype, method),
97+
);
98+
}
99+
100+
public exploreMessageRequestMethodMetadata(
101+
instancePrototype: any,
102+
methodKey: string,
103+
): MessageRequestProperties {
104+
const targetCallback = instancePrototype[methodKey];
105+
106+
const requestPattern = Reflect.getMetadata(
107+
REQUEST_PATTERN_METADATA,
108+
targetCallback,
109+
);
110+
const replyPattern = Reflect.getMetadata(
111+
REPLY_PATTERN_METADATA,
112+
targetCallback,
113+
);
114+
115+
if (isUndefined(requestPattern) || isUndefined(replyPattern)) {
116+
return;
117+
}
118+
119+
return {
120+
requestPattern,
121+
replyPattern,
122+
};
123+
}
81124
}

packages/microservices/listeners-controller.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,19 @@ export class ListenersController {
7777
} of this.metadataExplorer.scanForClientHooks(instance)) {
7878
const client = this.clientFactory.create(metadata);
7979

80+
const messageRequests = this.metadataExplorer.exploreMessageRequests(
81+
instance,
82+
);
83+
84+
messageRequests.forEach(messageRequest => {
85+
client.addMessageRequest(
86+
messageRequest.requestPattern,
87+
messageRequest.replyPattern,
88+
);
89+
});
90+
8091
this.clientsContainer.addClient(client);
92+
8193
this.assignClientToInstance(instance, property, client);
8294
}
8395
}

0 commit comments

Comments
 (0)