Skip to content

Commit 4bba628

Browse files
committed
Adds generic serialization which won't require users to implement custom serializations between microservices.
1 parent 4480a7c commit 4bba628

5 files changed

Lines changed: 152 additions & 141 deletions

File tree

integration/microservices/e2e/sum-kafka.spec.ts

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,17 @@ describe('Kafka transport', () => {
4949
// });
5050
// }).timeout(5000);
5151

52-
// it(`/POST (async event notification)`, done => {
53-
// request(server)
54-
// .post('/notify')
55-
// .send([1, 2, 3, 4, 5])
56-
// .end(() => {
57-
// setTimeout(() => {
58-
// expect(KafkaController.IS_NOTIFIED).to.be.true;
59-
// done();
60-
// }, 4000);
61-
// });
62-
// }).timeout(5000);
52+
it(`/POST (async event notification)`, done => {
53+
request(server)
54+
.post('/notify')
55+
.send([1, 2, 3, 4, 5])
56+
.end(() => {
57+
setTimeout(() => {
58+
expect(KafkaController.IS_NOTIFIED).to.be.true;
59+
done();
60+
}, 4000);
61+
});
62+
}).timeout(5000);
6363

6464
after(`Stopping Kafka app`, async () => {
6565
await app.close();

integration/microservices/src/kafka/kafka.controller.ts

Lines changed: 8 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -34,41 +34,6 @@ export class KafkaController implements OnModuleInit {
3434
await this.client.connect();
3535
}
3636

37-
private parse(data: any) {
38-
data.key = data.key.toString();
39-
data.value = JSON.parse(data.value.toString());
40-
}
41-
42-
// @Post()
43-
// @HttpCode(200)
44-
// async call(
45-
// @Query('command') cmd,
46-
// @Body() data: number[],
47-
// ): Promise<Observable<any>> {
48-
// const key = uuid.v4(); // stick to a single partition
49-
50-
// const result = await this.client.emit('math.sum', data.map((num) => {
51-
// return {
52-
// key,
53-
// value: num.toString(),
54-
// headers: {
55-
// 'correlation-id': key,
56-
// },
57-
// };
58-
// })).toPromise();
59-
60-
// this.logger.error(util.format('@Query math.sum result %o', result));
61-
62-
// return result;
63-
// }
64-
65-
// @EventPattern('math.sum')
66-
// mathSum(data: any){
67-
// this.logger.error(util.format('@EventPattern math.sum data %o', data));
68-
69-
// KafkaController.MATH_SUM += parseFloat(data.value);
70-
// }
71-
7237
@Post()
7338
@HttpCode(200)
7439
@MessageRequest('math.sum', 'math.sum.reply')
@@ -80,48 +45,32 @@ export class KafkaController implements OnModuleInit {
8045

8146
const result = await this.client.send('math.sum', {
8247
key: '1',
83-
value: JSON.stringify({
48+
value: {
8449
numbers: data,
85-
}),
50+
},
8651
}).toPromise();
8752

8853
// await Bluebird.delay(30000);
8954

90-
// this.logger.error(util.format('@Query math.sum result %o', result));
55+
this.logger.error(util.format('@Query math.sum result %o', result));
9156

92-
const sum = JSON.parse(result.value.toString());
93-
94-
return sum.result;
57+
return result.value;
9558
}
9659

9760
@MessagePattern('math.sum')
9861
mathSum(data: any){
99-
// this.logger.error(util.format('@MessagePattern math.sum data %o', data));
100-
101-
const value = JSON.parse(data.value);
62+
this.logger.error(util.format('@MessagePattern math.sum data %o', data));
10263

103-
return {
104-
value: JSON.stringify({
105-
result: (value.numbers || []).reduce((a, b) => a + b),
106-
}),
107-
};
64+
return (data.value.numbers || []).reduce((a, b) => a + b);
10865
}
10966

11067
@Post('notify')
111-
@MessageRequest('test.one', 'test.one.reply')
11268
async sendNotification(): Promise<any> {
113-
return this.client.emit('notification', [{
114-
key: 'notify',
115-
value: JSON.stringify({
116-
notify: true,
117-
}),
118-
}]);
69+
return this.client.emit('notify', {notify: true});
11970
}
12071

121-
@EventPattern('notification')
72+
@EventPattern('notify')
12273
eventHandler(data: any) {
123-
this.parse(data);
124-
12574
KafkaController.IS_NOTIFIED = data.value.notify;
12675
}
12776
}

packages/microservices/client/client-kafka.ts

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import * as util from 'util';
2-
import { isUndefined, isNil } from '@nestjs/common/utils/shared.utils';
2+
import { isUndefined } from '@nestjs/common/utils/shared.utils';
33
import { Logger } from '@nestjs/common/services/logger.service';
44
import { loadPackage } from '@nestjs/common/utils/load-package.util';
5-
import { Observable } from 'rxjs';
65

76
import {
87
KAFKA_DEFAULT_BROKER,
@@ -12,6 +11,8 @@ import {
1211
import { ReadPacket, KafkaOptions, WritePacket, PacketId } from '../interfaces';
1312
import { ClientProxy } from './client-proxy';
1413

14+
import KafkaSerializer from '../helpers/kafka-serializer';
15+
1516
import {
1617
KafkaConfig,
1718
Kafka,
@@ -145,7 +146,24 @@ export class ClientKafka extends ClientProxy {
145146

146147
public createResponseCallback(): (payload: EachMessagePayload) => any {
147148
return (payload: EachMessagePayload) => {
148-
const packet = this.deserialize(payload);
149+
// create response from a deserialized payload
150+
const response = KafkaSerializer.deserialize<KafkaMessage>(Object.assign(payload.message, {
151+
topic: payload.topic,
152+
partition: payload.partition
153+
}));
154+
155+
// construct packet
156+
const packet = {
157+
id: undefined,
158+
pattern: payload.topic,
159+
response
160+
};
161+
162+
// parse the correlation id
163+
if (!isUndefined(packet.response.headers[KafkaHeaders.CORRELATION_ID])) {
164+
// assign the correlation id as the packet id
165+
packet.id = packet.response.headers[KafkaHeaders.CORRELATION_ID].toString();
166+
}
149167

150168
const callback = this.routingMap.get(packet.id);
151169

@@ -165,38 +183,16 @@ export class ClientKafka extends ClientProxy {
165183
};
166184
}
167185

168-
private deserialize(payload: EachMessagePayload): WritePacket<Message> & PacketId {
169-
// build
170-
const packet = {
171-
id: undefined,
172-
pattern: payload.topic,
173-
response: Object.assign(payload.message, {
174-
topic: payload.topic,
175-
partition: payload.partition
176-
})
177-
};
178-
179-
// parse the correlation id
180-
if (!isUndefined(packet.response.headers[KafkaHeaders.CORRELATION_ID])) {
181-
// assign the correlation id as the packet id
182-
packet.id = packet.response.headers[KafkaHeaders.CORRELATION_ID].toString();
183-
}
184-
185-
return packet;
186-
}
187-
188186
protected dispatchEvent(packet: ReadPacket & PacketId): Promise<any> {
189187
const pattern = this.normalizePattern(packet.pattern);
190188

191189
// assign a packet id
192190
packet = this.assignPacketId(packet);
193191

194-
// create headers if they don't exist
195-
if (isUndefined(packet.data.headers)){
196-
packet.data.headers = {};
197-
}
192+
// serialize for sanity
193+
packet.data = KafkaSerializer.serialize<Message>(packet.data);
198194

199-
// correlate
195+
// correlate via kafka headers
200196
packet.data.headers[KafkaHeaders.CORRELATION_ID] = packet.id;
201197

202198
// send
@@ -243,19 +239,18 @@ export class ClientKafka extends ClientProxy {
243239
callback: (packet: WritePacket) => any,
244240
): Function {
245241
try {
242+
// create packet
246243
const packet = this.assignPacketId(partialPacket);
244+
packet.data = KafkaSerializer.serialize<Message>(packet.data);
245+
246+
// get the response meta
247247
const pattern = this.normalizePattern(partialPacket.pattern);
248248
const replyTopic = this.getReplyPattern(pattern, ClientKafka.REPLY_PATTERN_AFFIX);
249249
const replyPartition = this.getReplyPartition(replyTopic);
250250

251251
// set the route
252252
this.routingMap.set(packet.id, callback);
253253

254-
// create headers if they don't exist
255-
if (isUndefined(packet.data.headers)){
256-
packet.data.headers = {};
257-
}
258-
259254
// correlate
260255
packet.data.headers[KafkaHeaders.CORRELATION_ID] = packet.id;
261256
packet.data.headers[KafkaHeaders.REPLY_TOPIC] = replyTopic;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import { isUndefined, isNil, isObject, isString, isFunction } from '@nestjs/common/utils/shared.utils';
2+
3+
export default class KafkaSerializer {
4+
public static deserialize<T>(data: any): T {
5+
// parse the value
6+
data.value = this.decode(data.value);
7+
8+
// parse the key
9+
if (!isNil(data.key)) {
10+
data.key = this.decode(data.key);
11+
}
12+
13+
return data;
14+
}
15+
16+
public static decode(value: Buffer): object | string {
17+
if (!isNil(value)) {
18+
// convert to string
19+
let result = value.toString();
20+
21+
// type to parse
22+
try {
23+
result = JSON.parse(result);
24+
} catch (e){}
25+
26+
return result;
27+
}
28+
}
29+
30+
public static serialize<T>(data: any): T {
31+
// wrap the packet in an a kafka message when data is not an object
32+
// when data is an object but key and value are undefined, then the user is not passing a kafka message
33+
if ((isUndefined(data.key) && isUndefined(data.value)) || !isObject(data)) {
34+
data = {
35+
value: data
36+
};
37+
}
38+
39+
// make sure the value is a buffer or string
40+
data.value = this.encode(data.value);
41+
42+
// make sure that if there is a ket then it is a buffer or a string
43+
if (!isNil(data.key)) {
44+
data.key = this.encode(data.key);
45+
}
46+
47+
// create headers if they don't exist
48+
if (isUndefined(data.headers)){
49+
data.headers = {};
50+
}
51+
52+
return data;
53+
}
54+
55+
public static encode(value: any): string {
56+
if (!isNil(value) && !isString(value) && !Buffer.isBuffer(value)) {
57+
if (isObject(value) || Array.isArray(value)) {
58+
// convert to stringified object
59+
return JSON.stringify(value);
60+
} else if (isFunction(value.toString)) {
61+
// convert to string
62+
return value.toString();
63+
}
64+
}
65+
66+
// return the value and hope for the best by default
67+
return value;
68+
}
69+
}

0 commit comments

Comments
 (0)