Skip to content

Commit cf362d3

Browse files
feat(microservices): support nats v2
1 parent 1f8560a commit cf362d3

20 files changed

Lines changed: 561 additions & 352 deletions

integration/microservices/e2e/broadcast-nats.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ describe('NATS transport', () => {
1919
app.connectMicroservice({
2020
transport: Transport.NATS,
2121
options: {
22-
url: 'nats://0.0.0.0:4222',
22+
servers: 'nats://0.0.0.0:4222',
2323
},
2424
});
2525
app.connectMicroservice({
2626
transport: Transport.NATS,
2727
options: {
28-
url: 'nats://0.0.0.0:4222',
28+
servers: 'servers://0.0.0.0:4222',
2929
},
3030
});
3131
await app.startAllMicroservices();

integration/microservices/e2e/disconnected-client.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ describe('Disconnected client', () => {
4646
.send({
4747
transport: Transport.NATS,
4848
options: {
49-
url: 'nats://localhost:4224',
49+
servers: 'nats://localhost:4224',
5050
},
5151
})
5252
.expect(408);

integration/microservices/e2e/sum-nats.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ describe('NATS transport', () => {
2222
app.connectMicroservice({
2323
transport: Transport.NATS,
2424
options: {
25-
url: 'nats://0.0.0.0:4222',
25+
servers: 'nats://0.0.0.0:4222',
2626
},
2727
});
2828
await app.startAllMicroservices();

integration/microservices/src/disconnected.controller.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ export class DisconnectedClientController {
1818
.send<number, number[]>({ cmd: 'none' }, [1, 2, 3])
1919
.pipe(
2020
/*tap(
21-
console.log.bind(console, 'data'),
22-
console.error.bind(console, 'error'),
23-
),*/
21+
console.log.bind(console, 'data'),
22+
console.error.bind(console, 'error'),
23+
),*/
2424
catchError(error => {
2525
const { code } = error || { code: 'CONN_ERR' };
2626
return throwError(
27-
code === 'ECONNREFUSED' || code === 'CONN_ERR'
27+
code === 'ECONNREFUSED' ||
28+
code === 'CONN_ERR' ||
29+
code === 'CONNECTION_REFUSED'
2830
? new RequestTimeoutException('ECONNREFUSED')
2931
: new InternalServerErrorException(),
3032
);

integration/microservices/src/nats/nats-broadcast.controller.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,12 @@ import { scan, take } from 'rxjs/operators';
1010

1111
@Controller()
1212
export class NatsBroadcastController {
13-
@Client({ transport: Transport.NATS })
13+
@Client({
14+
transport: Transport.NATS,
15+
options: {
16+
servers: 'nats://localhost:4222',
17+
},
18+
})
1419
client: ClientProxy;
1520

1621
@Get('broadcast')

integration/microservices/src/nats/nats.controller.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export class NatsController {
2424
client: ClientProxy = ClientProxyFactory.create({
2525
transport: Transport.NATS,
2626
options: {
27-
url: 'nats://localhost:4222',
27+
servers: 'nats://localhost:4222',
2828
},
2929
});
3030

package-lock.json

Lines changed: 22 additions & 28 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@
149149
"mqtt": "4.2.6",
150150
"multer": "1.4.2",
151151
"mysql": "2.18.1",
152-
"nats": "1.4.12",
152+
"nats": "2.0.0",
153153
"nodemon": "2.0.7",
154154
"nyc": "15.1.0",
155155
"point-of-view": "4.13.0",
Lines changed: 60 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,75 @@
11
import { Logger } from '@nestjs/common/services/logger.service';
22
import { loadPackage } from '@nestjs/common/utils/load-package.util';
3-
import { share } from 'rxjs/operators';
4-
import { CONN_ERR, ERROR_EVENT, NATS_DEFAULT_URL } from '../constants';
5-
import { Client } from '../external/nats-client.interface';
3+
import { NATS_DEFAULT_URL } from '../constants';
4+
import { NatsResponseJSONDeserializer } from '../deserializers/nats-response-json.deserializer';
5+
import { Client, NatsMsg } from '../external/nats-client.interface';
66
import { NatsOptions, PacketId, ReadPacket, WritePacket } from '../interfaces';
7+
import { NatsJSONSerializer } from '../serializers/nats-json.serializer';
78
import { ClientProxy } from './client-proxy';
89

9-
let natsPackage: any = {};
10+
let natsPackage = {} as any;
1011

1112
export class ClientNats extends ClientProxy {
1213
protected readonly logger = new Logger(ClientProxy.name);
13-
protected readonly url: string;
1414
protected natsClient: Client;
15-
protected connection: Promise<any>;
1615

1716
constructor(protected readonly options: NatsOptions['options']) {
1817
super();
19-
this.url = this.getOptionsProp(this.options, 'url') || NATS_DEFAULT_URL;
2018
natsPackage = loadPackage('nats', ClientNats.name, () => require('nats'));
2119

2220
this.initializeSerializer(options);
2321
this.initializeDeserializer(options);
2422
}
2523

26-
public close() {
27-
this.natsClient && this.natsClient.close();
24+
public async close() {
25+
await this.natsClient?.close();
2826
this.natsClient = null;
29-
this.connection = null;
3027
}
3128

3229
public async connect(): Promise<any> {
3330
if (this.natsClient) {
34-
return this.connection;
31+
return this.natsClient;
3532
}
36-
this.natsClient = this.createClient();
37-
this.handleError(this.natsClient);
38-
39-
this.connection = await this.connect$(this.natsClient)
40-
.pipe(share())
41-
.toPromise();
42-
return this.connection;
33+
this.natsClient = await this.createClient();
34+
this.handleStatusUpdates(this.natsClient);
35+
return this.natsClient;
4336
}
4437

45-
public createClient(): Client {
38+
public createClient(): Promise<Client> {
4639
const options: any = this.options || ({} as NatsOptions);
4740
return natsPackage.connect({
41+
servers: NATS_DEFAULT_URL,
4842
...options,
49-
url: this.url,
50-
json: true,
5143
});
5244
}
5345

54-
public handleError(client: Client) {
55-
client.addListener(
56-
ERROR_EVENT,
57-
(err: any) => err.code !== CONN_ERR && this.logger.error(err),
58-
);
46+
public async handleStatusUpdates(client: Client) {
47+
for await (const status of client.status()) {
48+
const data =
49+
status.data && typeof status.data === 'object'
50+
? JSON.stringify(status.data)
51+
: status.data;
52+
if (status.type === 'disconnect' || status.type === 'error') {
53+
this.logger.error(
54+
`NatsError: type: "${status.type}", data: "${data}".`,
55+
);
56+
} else {
57+
this.logger.log(`NatsStatus: type: "${status.type}", data: "${data}".`);
58+
}
59+
}
5960
}
6061

6162
public createSubscriptionHandler(
6263
packet: ReadPacket & PacketId,
6364
callback: (packet: WritePacket) => any,
64-
): Function {
65-
return (rawPacket: unknown) => {
65+
) {
66+
return (error: unknown | undefined, natsMsg: NatsMsg) => {
67+
if (error) {
68+
return callback({
69+
err: error,
70+
});
71+
}
72+
const rawPacket = natsMsg.data;
6673
const message = this.deserializer.deserialize(rawPacket);
6774
if (message.id && message.id !== packet.id) {
6875
return undefined;
@@ -95,12 +102,14 @@ export class ClientNats extends ClientProxy {
95102
packet,
96103
callback,
97104
);
98-
const subscriptionId = this.natsClient.request(
99-
channel,
100-
serializedPacket as any,
101-
subscriptionHandler,
102-
);
103-
return () => this.natsClient.unsubscribe(subscriptionId);
105+
this.natsClient.publish(channel, serializedPacket, {
106+
reply: packet.id,
107+
});
108+
const subscription = this.natsClient.subscribe(packet.id, {
109+
callback: subscriptionHandler,
110+
});
111+
112+
return () => subscription.unsubscribe();
104113
} catch (err) {
105114
callback({ err });
106115
}
@@ -110,10 +119,22 @@ export class ClientNats extends ClientProxy {
110119
const pattern = this.normalizePattern(packet.pattern);
111120
const serializedPacket = this.serializer.serialize(packet);
112121

113-
return new Promise<void>((resolve, reject) =>
114-
this.natsClient.publish(pattern, serializedPacket as any, err =>
115-
err ? reject(err) : resolve(),
116-
),
117-
);
122+
return new Promise<void>((resolve, reject) => {
123+
try {
124+
this.natsClient.publish(pattern, serializedPacket);
125+
resolve();
126+
} catch (err) {
127+
reject(err);
128+
}
129+
});
130+
}
131+
132+
protected initializeSerializer(options: NatsOptions['options']) {
133+
this.serializer = options?.serializer ?? new NatsJSONSerializer();
134+
}
135+
136+
protected initializeDeserializer(options: NatsOptions['options']) {
137+
this.deserializer =
138+
options?.deserializer ?? new NatsResponseJSONDeserializer();
118139
}
119140
}

packages/microservices/ctx-host/nats.context.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { BaseRpcContext } from './base-rpc.context';
22

3-
type NatsContextArgs = [string];
3+
type NatsContextArgs = [string, any];
44

55
export class NatsContext extends BaseRpcContext<NatsContextArgs> {
66
constructor(args: NatsContextArgs) {
@@ -13,4 +13,11 @@ export class NatsContext extends BaseRpcContext<NatsContextArgs> {
1313
getSubject() {
1414
return this.args[0];
1515
}
16+
17+
/**
18+
* Returns message headers (if exist).
19+
*/
20+
getHeaders() {
21+
return this.args[1];
22+
}
1623
}

0 commit comments

Comments
 (0)