Skip to content

Commit 3da993e

Browse files
committed
Added RMQ Client and Server
1 parent e1abf80 commit 3da993e

10 files changed

Lines changed: 5845 additions & 7960 deletions

File tree

package-lock.json

Lines changed: 5675 additions & 7959 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
"@nestjs/microservices": "5.0.0",
3636
"@nestjs/testing": "5.0.0",
3737
"@nestjs/websockets": "5.0.0",
38+
"@types/amqplib": "^0.5.7",
39+
"amqplib": "^0.5.2",
3840
"axios": "^0.17.1",
3941
"class-transformer": "^0.1.8",
4042
"class-validator": "^0.8.1",

packages/microservices/client/client-proxy-factory.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { Closeable } from '../interfaces/closeable.interface';
77
import { ClientNats } from './client-nats';
88
import { ClientMqtt } from './client-mqtt';
99
import { ClientGrpcProxy } from './client-grpc';
10+
import { ClientRMQ } from './client-rmq';
1011

1112
export class ClientProxyFactory {
1213
public static create(options: ClientOptions): ClientProxy & Closeable {
@@ -20,6 +21,8 @@ export class ClientProxyFactory {
2021
return new ClientMqtt(options);
2122
case Transport.GRPC:
2223
return new ClientGrpcProxy(options);
24+
case Transport.RMQ:
25+
return new ClientRMQ(options);
2326
default:
2427
return new ClientTCP(options);
2528
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import { Channel, Connection } from 'amqplib';
2+
import { CONNECT_EVENT, ERROR_EVENT, MESSAGE_EVENT, RQM_DEFAULT_URL, SUBSCRIBE, RQM_DEFAULT_QUEUE } from './../constants';
3+
import { Logger } from '@nestjs/common/services/logger.service';
4+
import { loadPackage } from '@nestjs/common/utils/load-package.util';
5+
import { ClientProxy } from './client-proxy';
6+
import { ClientOptions, RmqOptions } from '../interfaces';
7+
8+
let rqmPackage: any = {};
9+
10+
export class ClientRMQ extends ClientProxy {
11+
private readonly logger = new Logger(ClientProxy.name);
12+
private client: Connection = null;
13+
private channel: Channel = null;
14+
private url: string;
15+
private queue: string;
16+
17+
constructor(
18+
private readonly options: ClientOptions) {
19+
super();
20+
this.url =
21+
this.getOptionsProp<RmqOptions>(this.options, 'url') || RQM_DEFAULT_URL;
22+
this.queue =
23+
this.getOptionsProp<RmqOptions>(this.options, 'queue') || RQM_DEFAULT_QUEUE;
24+
rqmPackage = loadPackage('amqplib', ClientRMQ.name);
25+
this.connect();
26+
}
27+
28+
protected async publish(messageObj, callback: (err, result, disposed?: boolean) => void) {
29+
try {
30+
if (!this.client) {
31+
await this.connect();
32+
}
33+
this.channel.assertQueue('', { exclusive: true }).then(responseQ => {
34+
messageObj.replyTo = responseQ.queue;
35+
this.channel.consume(responseQ.queue, (message) => {
36+
this.handleMessage(message, this.client, callback)
37+
}, { noAck: true });
38+
this.channel.sendToQueue(this.queue, Buffer.from(JSON.stringify(messageObj)));
39+
});
40+
} catch (err) {
41+
console.log(err);
42+
callback(err, null);
43+
}
44+
}
45+
46+
private async handleMessage(message, server, callback): Promise<void> {
47+
if(message) {
48+
const { content, fields } = message;
49+
const { err, response, isDisposed } = JSON.parse(content.toString());
50+
if (isDisposed || err) {
51+
callback({
52+
err,
53+
response: null,
54+
isDisposed: true,
55+
});
56+
this.channel.deleteQueue(fields.routingKey);
57+
}
58+
callback({
59+
err,
60+
response,
61+
});
62+
}
63+
}
64+
65+
public close(): void {
66+
this.channel && this.channel.close();
67+
this.client && this.client.close();
68+
}
69+
70+
public handleError(client: Connection): void {
71+
client.addListener(ERROR_EVENT, err => this.logger.error(err));
72+
}
73+
74+
public async connect(): Promise<any> {
75+
return new Promise(async (resolve, reject) => {
76+
this.client = await rqmPackage.connect(this.url);
77+
this.channel = await this.client.createChannel();
78+
this.channel.assertQueue(this.queue, { durable: false });
79+
resolve();
80+
});
81+
}
82+
}

packages/microservices/constants.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ export const REDIS_DEFAULT_URL = 'redis://localhost:6379';
44
export const NATS_DEFAULT_URL = 'nats://localhost:4222';
55
export const MQTT_DEFAULT_URL = 'mqtt://localhost:1883';
66
export const GRPC_DEFAULT_URL = 'localhost:5000';
7+
export const RQM_DEFAULT_URL = 'localhost';
8+
export const RQM_DEFAULT_QUEUE = 'default';
79

810
export const CONNECT_EVENT = 'connect';
911
export const MESSAGE_EVENT = 'message';

packages/microservices/enums/transport.enum.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ export enum Transport {
44
NATS,
55
MQTT,
66
GRPC,
7+
RMQ,
78
}

packages/microservices/interfaces/client-metadata.interface.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@ import {
55
NatsOptions,
66
MqttOptions,
77
GrpcOptions,
8+
RmqOptions,
89
} from './microservice-configuration.interface';
910

1011
export type ClientOptions =
1112
| RedisOptions
1213
| NatsOptions
1314
| MqttOptions
1415
| GrpcOptions
15-
| TcpClientOptions;
16+
| TcpClientOptions
17+
| RmqOptions;
1618

1719
export interface TcpClientOptions {
1820
transport: Transport.TCP;

packages/microservices/interfaces/microservice-configuration.interface.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export type MicroserviceOptions =
99
| RedisOptions
1010
| NatsOptions
1111
| MqttOptions
12+
| RmqOptions
1213
| CustomStrategy;
1314

1415
export interface CustomStrategy {
@@ -65,3 +66,11 @@ export interface NatsOptions {
6566
tls?: any;
6667
};
6768
}
69+
70+
export interface RmqOptions {
71+
transport?: Transport.RMQ;
72+
options?: {
73+
url?: string;
74+
queue?: string;
75+
};
76+
}

packages/microservices/server/server-factory.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { ServerMqtt } from './server-mqtt';
66
import { ServerNats } from './server-nats';
77
import { ServerRedis } from './server-redis';
88
import { ServerTCP } from './server-tcp';
9+
import { ServerRMQ } from './server-rqm';
910

1011
export class ServerFactory {
1112
public static create(
@@ -21,6 +22,8 @@ export class ServerFactory {
2122
return new ServerMqtt(options);
2223
case Transport.GRPC:
2324
return new ServerGrpc(options);
25+
case Transport.RMQ:
26+
return new ServerRMQ(options);
2427
default:
2528
return new ServerTCP(options);
2629
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import { Server } from './server';
2+
import { Channel, Connection } from 'amqplib';
3+
import { RQM_DEFAULT_URL , RQM_DEFAULT_QUEUE } from './../constants';
4+
import { CustomTransportStrategy, RmqOptions } from './../interfaces';
5+
import { MicroserviceOptions } from '../interfaces/microservice-configuration.interface';
6+
import { loadPackage } from '@nestjs/common/utils/load-package.util';
7+
import { Observable } from 'rxjs';
8+
9+
let rqmPackage: any = {};
10+
11+
export class ServerRMQ extends Server implements CustomTransportStrategy {
12+
private server: Connection = null;
13+
private channel: Channel = null;
14+
private url: string;
15+
private queue: string;
16+
17+
constructor(private readonly options: MicroserviceOptions) {
18+
super();
19+
this.url =
20+
this.getOptionsProp<RmqOptions>(this.options, 'url') || RQM_DEFAULT_URL;
21+
this.queue =
22+
this.getOptionsProp<RmqOptions>(this.options, 'queue') || RQM_DEFAULT_QUEUE;
23+
rqmPackage = loadPackage('amqplib', ServerRMQ.name);
24+
}
25+
26+
public async listen(callback: () => void): Promise<void> {
27+
await this.start(callback);
28+
this.channel.consume(this.queue, (msg) => this.handleMessage(msg) , {
29+
noAck: true,
30+
});
31+
}
32+
33+
private async start(callback?: () => void) {
34+
try {
35+
this.server = await rqmPackage.connect(this.url);
36+
this.channel = await this.server.createChannel();
37+
this.channel.assertQueue(this.queue, { durable: false });
38+
} catch (err) {
39+
this.logger.error(err);
40+
}
41+
}
42+
43+
public close(): void {
44+
this.channel && this.channel.close();
45+
this.server && this.server.close();
46+
}
47+
48+
private async handleMessage(message): Promise<void> {
49+
const { content } = message;
50+
const messageObj = JSON.parse(content.toString());
51+
const handlers = this.getHandlers();
52+
const pattern = JSON.stringify(messageObj.pattern);
53+
if (!this.messageHandlers[pattern]) {
54+
return;
55+
}
56+
const handler = this.messageHandlers[pattern];
57+
const response$ = this.transformToObservable(await handler(messageObj.data)) as Observable<any>;
58+
response$ && this.send(response$, (data) => this.sendMessage(data, messageObj.replyTo));
59+
}
60+
61+
private sendMessage(message, replyTo): void {
62+
const buffer = Buffer.from(JSON.stringify(message));
63+
this.channel.sendToQueue(replyTo, buffer);
64+
}
65+
}

0 commit comments

Comments
 (0)