Skip to content

Commit 0a7f37d

Browse files
feat(): add incoming request & response serializers
1 parent 7f8aa92 commit 0a7f37d

16 files changed

Lines changed: 257 additions & 35 deletions

packages/microservices/client/client-proxy.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
} from 'rxjs';
1212
import { map, mergeMap, publish, take } from 'rxjs/operators';
1313
import { CONNECT_EVENT, ERROR_EVENT } from '../constants';
14-
import { IdentityDeserializer } from '../deserializers/identity.deserializer';
14+
import { IncomingResponseDeserializer } from '../deserializers/incoming-response.deserializer';
1515
import { InvalidMessageException } from '../errors/invalid-message.exception';
1616
import {
1717
ClientOptions,
@@ -145,6 +145,6 @@ export abstract class ClientProxy {
145145
| MqttOptions['options']
146146
| TcpClientOptions['options']
147147
| RmqOptions['options']).deserializer) ||
148-
new IdentityDeserializer();
148+
new IncomingResponseDeserializer();
149149
}
150150
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import { isUndefined } from '@nestjs/common/utils/shared.utils';
2+
import {
3+
ConsumerDeserializer,
4+
IncomingEvent,
5+
IncomingRequest,
6+
} from '../interfaces';
7+
8+
export class IncomingRequestDeserializer implements ConsumerDeserializer {
9+
deserialize(
10+
value: any,
11+
options?: Record<string, any>,
12+
): IncomingRequest | IncomingEvent {
13+
return this.isExternal(value) ? this.mapToSchema(value, options) : value;
14+
}
15+
16+
isExternal(value: any): boolean {
17+
if (!value) {
18+
return true;
19+
}
20+
if (
21+
!isUndefined((value as IncomingRequest).pattern) ||
22+
!isUndefined((value as IncomingRequest).data)
23+
) {
24+
return false;
25+
}
26+
return true;
27+
}
28+
29+
mapToSchema(
30+
value: any,
31+
options?: Record<string, any>,
32+
): IncomingRequest | IncomingEvent {
33+
if (!options) {
34+
return {
35+
pattern: undefined,
36+
data: undefined,
37+
};
38+
}
39+
return {
40+
pattern: options.channel,
41+
data: value,
42+
};
43+
}
44+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import { isUndefined } from '@nestjs/common/utils/shared.utils';
2+
import { IncomingResponse, ProducerDeserializer } from '../interfaces';
3+
4+
export class IncomingResponseDeserializer implements ProducerDeserializer {
5+
deserialize(value: any, options?: Record<string, any>): IncomingResponse {
6+
return this.isExternal(value) ? this.mapToSchema(value) : value;
7+
}
8+
9+
isExternal(value: any): boolean {
10+
if (!value) {
11+
return true;
12+
}
13+
if (
14+
!isUndefined((value as IncomingResponse).err) ||
15+
!isUndefined((value as IncomingResponse).response) ||
16+
!isUndefined((value as IncomingResponse).isDisposed)
17+
) {
18+
return false;
19+
}
20+
return true;
21+
}
22+
23+
mapToSchema(value: any): IncomingResponse {
24+
return {
25+
id: value && value.id,
26+
response: value,
27+
isDisposed: true,
28+
};
29+
}
30+
}
Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
1-
import { IncomingRequest, IncomingResponse } from './packet.interface';
1+
import {
2+
IncomingEvent,
3+
IncomingRequest,
4+
IncomingResponse,
5+
} from './packet.interface';
26

37
export interface Deserializer<TInput = any, TOutput = any> {
4-
deserialize(value: TInput): TOutput;
8+
deserialize(value: TInput, options?: Record<string, any>): TOutput;
59
}
610

711
export type ProducerDeserializer = Deserializer<any, IncomingResponse>;
8-
export type ConsumerDeserializer = Deserializer<any, IncomingRequest>;
12+
export type ConsumerDeserializer = Deserializer<
13+
any,
14+
IncomingRequest | IncomingEvent
15+
>;

packages/microservices/server/server-mqtt.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@ import {
88
NO_MESSAGE_HANDLER,
99
} from '../constants';
1010
import { MqttClient } from '../external/mqtt-client.interface';
11-
import { CustomTransportStrategy, PacketId, ReadPacket } from '../interfaces';
11+
import {
12+
CustomTransportStrategy,
13+
IncomingRequest,
14+
PacketId,
15+
ReadPacket,
16+
} from '../interfaces';
1217
import { MqttOptions } from '../interfaces/microservice-configuration.interface';
1318
import { Server } from './server';
1419

@@ -72,18 +77,22 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {
7277
pub: MqttClient,
7378
): Promise<any> {
7479
const rawPacket = this.parseMessage(buffer.toString());
75-
const packet = this.deserializer.deserialize(rawPacket);
76-
if (isUndefined(packet.id)) {
80+
const packet = this.deserializer.deserialize(rawPacket, { channel });
81+
if (isUndefined((packet as IncomingRequest).id)) {
7782
return this.handleEvent(channel, packet);
7883
}
7984
const pattern = channel.replace(/_ack$/, '');
80-
const publish = this.getPublisher(pub, pattern, packet.id);
85+
const publish = this.getPublisher(
86+
pub,
87+
pattern,
88+
(packet as IncomingRequest).id,
89+
);
8190
const handler = this.getHandlerByPattern(pattern);
8291

8392
if (!handler) {
8493
const status = 'error';
8594
const noHandlerPacket = {
86-
id: packet.id,
95+
id: (packet as IncomingRequest).id,
8796
status,
8897
err: NO_MESSAGE_HANDLER,
8998
};

packages/microservices/server/server-nats.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
import { Client } from '../external/nats-client.interface';
1010
import { CustomTransportStrategy, PacketId } from '../interfaces';
1111
import { NatsOptions } from '../interfaces/microservice-configuration.interface';
12-
import { ReadPacket } from '../interfaces/packet.interface';
12+
import { IncomingRequest, ReadPacket } from '../interfaces/packet.interface';
1313
import { Server } from './server';
1414

1515
let natsPackage: any = {};
@@ -85,16 +85,20 @@ export class ServerNats extends Server implements CustomTransportStrategy {
8585
client: Client,
8686
replyTo: string,
8787
) {
88-
const message = this.deserializer.deserialize(rawMessage);
89-
if (isUndefined(message.id)) {
88+
const message = this.deserializer.deserialize(rawMessage, { channel });
89+
if (isUndefined((message as IncomingRequest).id)) {
9090
return this.handleEvent(channel, message);
9191
}
92-
const publish = this.getPublisher(client, replyTo, message.id);
92+
const publish = this.getPublisher(
93+
client,
94+
replyTo,
95+
(message as IncomingRequest).id,
96+
);
9397
const handler = this.getHandlerByPattern(channel);
9498
if (!handler) {
9599
const status = 'error';
96100
const noHandlerPacket = {
97-
id: message.id,
101+
id: (message as IncomingRequest).id,
98102
status,
99103
err: NO_MESSAGE_HANDLER,
100104
};

packages/microservices/server/server-redis.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
RedisClient,
1313
RetryStrategyOptions,
1414
} from '../external/redis.interface';
15-
import { CustomTransportStrategy } from '../interfaces';
15+
import { CustomTransportStrategy, IncomingRequest } from '../interfaces';
1616
import { RedisOptions } from '../interfaces/microservice-configuration.interface';
1717
import { Server } from './server';
1818

@@ -85,18 +85,22 @@ export class ServerRedis extends Server implements CustomTransportStrategy {
8585
pub: RedisClient,
8686
) {
8787
const rawMessage = this.parseMessage(buffer);
88-
const packet = this.deserializer.deserialize(rawMessage);
89-
if (isUndefined(packet.id)) {
88+
const packet = this.deserializer.deserialize(rawMessage, { channel });
89+
if (isUndefined((packet as IncomingRequest).id)) {
9090
return this.handleEvent(channel, packet);
9191
}
9292
const pattern = channel.replace(/_ack$/, '');
93-
const publish = this.getPublisher(pub, pattern, packet.id);
93+
const publish = this.getPublisher(
94+
pub,
95+
pattern,
96+
(packet as IncomingRequest).id,
97+
);
9498
const handler = this.getHandlerByPattern(pattern);
9599

96100
if (!handler) {
97101
const status = 'error';
98102
const noHandlerPacket = {
99-
id: packet.id,
103+
id: (packet as IncomingRequest).id,
100104
status,
101105
err: NO_MESSAGE_HANDLER,
102106
};

packages/microservices/server/server-rmq.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ import {
1212
RQM_DEFAULT_URL,
1313
} from '../constants';
1414
import { CustomTransportStrategy, RmqOptions } from '../interfaces';
15-
import { OutgoingResponse } from '../interfaces/packet.interface';
15+
import {
16+
IncomingRequest,
17+
OutgoingResponse,
18+
} from '../interfaces/packet.interface';
1619
import { Server } from './server';
1720

1821
let rqmPackage: any = {};
@@ -96,15 +99,15 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
9699
? packet.pattern
97100
: JSON.stringify(packet.pattern);
98101

99-
if (isUndefined(packet.id)) {
102+
if (isUndefined((packet as IncomingRequest).id)) {
100103
return this.handleEvent(pattern, packet);
101104
}
102105
const handler = this.getHandlerByPattern(pattern);
103106

104107
if (!handler) {
105108
const status = 'error';
106109
const noHandlerPacket = {
107-
id: packet.id,
110+
id: (packet as IncomingRequest).id,
108111
err: NO_MESSAGE_HANDLER,
109112
status,
110113
};

packages/microservices/server/server-tcp.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
import { JsonSocket } from '../helpers/json-socket';
1414
import {
1515
CustomTransportStrategy,
16+
IncomingRequest,
1617
PacketId,
1718
ReadPacket,
1819
WritePacket,
@@ -61,14 +62,14 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
6162
? JSON.stringify(packet.pattern)
6263
: packet.pattern;
6364

64-
if (isUndefined(packet.id)) {
65+
if (isUndefined((packet as IncomingRequest).id)) {
6566
return this.handleEvent(pattern, packet);
6667
}
6768
const handler = this.getHandlerByPattern(pattern);
6869
if (!handler) {
6970
const status = 'error';
7071
const noHandlerPacket = this.serializer.serialize({
71-
id: packet.id,
72+
id: (packet as IncomingRequest).id,
7273
status,
7374
err: NO_MESSAGE_HANDLER,
7475
});
@@ -80,7 +81,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
8081

8182
response$ &&
8283
this.send(response$, data => {
83-
Object.assign(data, { id: packet.id });
84+
Object.assign(data, { id: (packet as IncomingRequest).id });
8485
const outgoingResponse = this.serializer.serialize(data as WritePacket &
8586
PacketId);
8687
socket.sendMessage(outgoingResponse);

packages/microservices/server/server.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
Subscription,
1111
} from 'rxjs';
1212
import { catchError, finalize, publish } from 'rxjs/operators';
13-
import { IdentityDeserializer } from '../deserializers/identity.deserializer';
13+
import { IncomingRequestDeserializer } from '../deserializers/incoming-request.deserializer';
1414
import {
1515
ClientOptions,
1616
MessageHandler,
@@ -65,7 +65,10 @@ export abstract class Server {
6565
const scheduleOnNextTick = (data: WritePacket) => {
6666
if (!dataBuffer) {
6767
dataBuffer = [data];
68-
process.nextTick(() => dataBuffer.forEach(buffer => respond(buffer)));
68+
process.nextTick(() => {
69+
dataBuffer.forEach(buffer => respond(buffer));
70+
dataBuffer = null;
71+
});
6972
} else if (!data.isDisposed) {
7073
dataBuffer = dataBuffer.concat(data);
7174
} else {
@@ -145,7 +148,7 @@ export abstract class Server {
145148
| MqttOptions['options']
146149
| TcpOptions['options']
147150
| RmqOptions['options']).deserializer) ||
148-
new IdentityDeserializer();
151+
new IncomingRequestDeserializer();
149152
}
150153

151154
private isObservable(input: unknown): input is Observable<any> {

0 commit comments

Comments
 (0)