Skip to content

Commit 7d867d8

Browse files
committed
improve amqp part 1
1 parent 31d5b78 commit 7d867d8

23 files changed

Lines changed: 554 additions & 385 deletions

OpenFlow/src/Config.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ export class Config {
4040

4141

4242
public static amqp_url: string = Config.getEnv("amqp_url", "amqp://localhost"); // used to register queues and by personal nodered
43+
public static amqp_check_for_consumer: boolean = Config.parseBoolean(Config.getEnv("amqp_check_for_consumer", "true"));
44+
// public static deadLetterExchange: string = Config.getEnv("deadletterexchange", "openflow-dlx"); // queue used to handle messages, that was not picked up.
45+
// public static dlxmessagettl: number = parseInt(Config.getEnv("dlxmessagettl", "2000")); // time to live for messages in miliseconds
46+
// public static dlxmessageexpires: number = parseInt(Config.getEnv("dlxmessageexpires", "1500")); // expire messages after this amount of miliseconds
4347
public static mongodb_url: string = Config.getEnv("mongodb_url", "mongodb://localhost:27017");
4448
public static mongodb_db: string = Config.getEnv("mongodb_db", "openflow");
4549

OpenFlow/src/Messages/Message.ts

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import { DatabaseConnection } from "../DatabaseConnection";
4343
import { CreateWorkflowInstanceMessage } from "./CreateWorkflowInstanceMessage";
4444
import { StripeMessage, EnsureStripeCustomerMessage, Billing, stripe_customer, stripe_base, stripe_list, StripeAddPlanMessage, StripeCancelPlanMessage, stripe_subscription, stripe_subscription_item, stripe_plan, stripe_coupon } from "./StripeMessage";
4545
import { V1ResourceRequirements } from "@kubernetes/client-node";
46+
import { amqpwrapper } from "../amqpwrapper";
4647
var request = require("request");
4748
var got = require("got");
4849

@@ -213,7 +214,7 @@ export class Message {
213214
var msg: RegisterQueueMessage<Base>
214215
try {
215216
msg = RegisterQueueMessage.assign(this.data);
216-
await cli.CreateConsumer(msg.queuename);
217+
msg.queuename = await cli.CreateConsumer(msg.queuename);
217218
} catch (error) {
218219
cli._logger.error(error);
219220
if (Util.IsNullUndefinded(msg)) { (msg as any) = {}; }
@@ -248,15 +249,24 @@ export class Message {
248249
msg.data.jwt = msg.jwt;
249250
}
250251
}
252+
var expiration: number = -1;
253+
if (typeof msg.expiration == 'number') expiration = msg.expiration;
251254
if (Util.IsNullEmpty(msg.replyto)) {
252-
await cli.sendToQueue(msg);
255+
// var sendthis = { data: msg.data, jwt: cli.jwt, user: cli.user };
256+
var sendthis = msg.data;
257+
await amqpwrapper.Instance().send("", msg.queuename, sendthis, expiration, msg.correlationId);
253258
} else {
254259
if (msg.queuename === msg.replyto) {
255260
cli._logger.warn("Ignore reply to self queuename:" + msg.queuename + " correlationId:" + msg.correlationId);
256261
return
257262
}
263+
//var sendthis = { data: msg.data, jwt: cli.jwt, user: cli.user };
264+
var sendthis = msg.data;
265+
var result = await amqpwrapper.Instance().sendWithReplyTo("", msg.queuename, msg.replyto, sendthis, expiration, msg.correlationId);
266+
// var result = await amqpwrapper.Instance().sendWithReply("", msg.queuename, sendthis, expiration, msg.correlationId);
267+
258268
this.replyto = msg.correlationId;
259-
await cli.sendQueueReply(msg);
269+
// await cli.sendQueueReply(msg, expiration);
260270
}
261271
} catch (error) {
262272
cli._logger.error(error);
@@ -1475,10 +1485,10 @@ export class Message {
14751485
var tuser = Crypt.verityToken(msg.jwt);
14761486
msg.jwt = Crypt.createToken(tuser, Config.longtoken_expires_in);
14771487

1478-
if (cli.consumers.length == 0) {
1479-
await cli.CreateConsumer("nodered." + Math.random().toString(36).substr(2, 9));
1480-
// throw new Error("Client not connected to any message queues");
1481-
}
1488+
// if (cli.consumers.length == 0) {
1489+
// await cli.CreateConsumer("nodered." + Math.random().toString(36).substr(2, 9));
1490+
// // throw new Error("Client not connected to any message queues");
1491+
// }
14821492
if (Util.IsNullEmpty(msg.queue)) {
14831493
var workflow: any = null;
14841494
var user: any = null;
@@ -1515,7 +1525,8 @@ export class Message {
15151525

15161526
if (msg.initialrun) {
15171527
var message = { _id: res2._id };
1518-
cli.consumers[0].sendToQueueWithReply(msg.queue, msg.resultqueue, msg.correlationId, message);
1528+
amqpwrapper.Instance().sendWithReplyTo("", msg.queue, msg.resultqueue, message, (60 * (60 * 1000)), msg.correlationId);
1529+
// cli.consumers[0].sendToQueueWithReply(msg.queue, msg.resultqueue, msg.correlationId, message, (60 * (60 * 1000))); // 1 hour
15191530
}
15201531
} catch (error) {
15211532
cli._logger.error(error);

OpenFlow/src/Messages/QueueMessage.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export class QueueMessage implements IReplyMessage {
88
public replyto: string;
99
public queuename: string;
1010
public data: any;
11+
public expiration: number;
1112
static assign(o: any): QueueMessage {
1213
if (typeof o === "string" || o instanceof String) {
1314
return Object.assign(new QueueMessage(), JSON.parse(o.toString()));

OpenFlow/src/WebSocketClient.ts

Lines changed: 69 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { Message, JSONfn } from "./Messages/Message";
66
import { User } from "./User";
77
import { DatabaseConnection, mapFunc, reduceFunc, finalizeFunc } from "./DatabaseConnection";
88
import { Config } from "./Config";
9-
import { amqp_consumer } from "./amqp_consumer";
9+
// import { amqp_consumer } from "./amqp_consumer";
1010
import { QueueMessage } from "./Messages/QueueMessage";
1111
import { QueryMessage } from "./Messages/QueryMessage";
1212
import { MapReduceMessage } from "./Messages/MapReduceMessage";
@@ -16,6 +16,7 @@ import { DeleteOneMessage } from "./Messages/DeleteOneMessage";
1616
import { Base } from "./base";
1717
import { UpdateManyMessage } from "./Messages/UpdateManyMessage";
1818
import { Util } from "./Util";
19+
import { amqpwrapper } from "./amqpwrapper";
1920

2021
interface IHashTable<T> {
2122
[key: string]: T;
@@ -42,8 +43,11 @@ export class WebSocketClient {
4243
public clientagent: string;
4344
public clientversion: string;
4445

46+
4547
user: User;
46-
public consumers: amqp_consumer[] = [];
48+
// public consumers: amqp_consumer[] = [];
49+
private queues: IHashTable<string> = {};
50+
4751
constructor(logger: winston.Logger, socketObject: WebSocket) {
4852
this._logger = logger;
4953
this._socketObject = socketObject;
@@ -81,88 +85,94 @@ export class WebSocketClient {
8185
}
8286
}
8387
public async CloseConsumers(): Promise<void> {
84-
for (let i = 0; i < this.consumers.length; i++) {
88+
var keys = Object.keys(this.queues);
89+
for (let i = 0; i < keys.length; i++) {
8590
try {
86-
this.consumers[i].OnMessage = null;
87-
await this.consumers[i].close();
91+
await this.CloseConsumer(keys[i]);
8892
} catch (error) {
8993
this._logger.error("WebSocketclient::closeconsumers " + error);
9094
}
9195
}
92-
this.consumers = [];
9396
}
9497
public async Close(): Promise<void> {
98+
await this.CloseConsumers();
9599
if (this._socketObject != null) {
96-
await this.CloseConsumers();
97100
try {
98101
this._socketObject.close();
99102
} catch (error) {
100103
this._logger.error("WebSocketclient::Close " + error);
101104
}
102105
}
103-
104-
}
105-
public async CreateConsumer(queuename: string): Promise<void> {
106-
var autoDelete: boolean = false;
107-
108-
if (Util.IsNullEmpty(queuename)) { queuename = "web." + Math.random().toString(36).substr(2, 9); autoDelete = true; }
109-
var consumer = new amqp_consumer(this._logger, Config.amqp_url, queuename);
110-
consumer.OnMessage = this.OnMessage.bind(this);
111-
this.consumers.push(consumer);
112-
await consumer.connect(false, autoDelete);
113106
}
114107
public async CloseConsumer(queuename: string): Promise<void> {
115-
var index = -1;
116-
for (let i = 0; i < this.consumers.length; i++) {
117-
if (this.consumers[i].queue == queuename) index = i;
118-
}
119-
if (index == -1) return;
120-
var consumer: amqp_consumer = this.consumers[index];
121-
this.consumers = this.consumers.splice(index, 1);
122-
consumer.OnMessage = null;
123-
await consumer.close();
124-
}
125-
public async sendQueueReply(msg: QueueMessage) {
126-
try {
127-
var index = -1;
128-
for (let i = 0; i < this.consumers.length; i++) {
129-
if (this.consumers[i].queue == msg.queuename) index = i;
108+
if (this.queues[queuename] != null) {
109+
try {
110+
await amqpwrapper.Instance().RemoveQueueConsumer(queuename);
111+
delete this.queues[queuename];
112+
} catch (error) {
113+
this._logger.error("WebSocketclient::CloseConsumer " + error);
130114
}
131-
if (index == -1) return;
132-
this.consumers[index].sendToQueue(msg.replyto, msg.correlationId, msg.data);
133-
} catch (error) {
134-
this._logger.error("WebSocketclient::WebSocket error encountered " + error);
135115
}
136116
}
137-
public async sendToQueue(msg: QueueMessage) {
138-
if (Util.IsNullEmpty(msg.queuename)) { throw new Error("sendToQueue, queuename is mandatory") }
139-
if (this.consumers.length === 0) { throw new Error("No consumers for client available to send message through") }
140-
// var result = this.consumers[0].sendToQueue(msg.queuename, msg.correlationId, { payload: msg.data, jwt: this.jwt, user: this.user });
141-
var result = this.consumers[0].sendToQueue(msg.queuename, msg.correlationId, msg.data);
117+
public async CreateConsumer(queuename: string): Promise<string> {
118+
var autoDelete: boolean = false; // Should we keep the queue around ? for robots and roles
119+
if (Util.IsNullEmpty(queuename)) { queuename = "web." + Math.random().toString(36).substr(2, 9); autoDelete = true; }
120+
121+
var queuename = await amqpwrapper.Instance().AddQueueConsumer(queuename, { autoDelete: autoDelete }, async (msg: any, ack: any, correlationId: string, replyTo: string, done: any) => {
122+
var _data = msg;
123+
try {
124+
_data = await this.Queue(msg, replyTo, correlationId, queuename);
125+
ack();
126+
done(_data);
127+
} catch (error) {
128+
ack(false);
129+
done(_data);
130+
}
131+
});
132+
this.queues[queuename] = queuename;
133+
return queuename;
142134
}
135+
// public async sendQueueReply(msg: QueueMessage, expiration: Number) {
136+
// try {
137+
// var index = -1;
138+
// for (let i = 0; i < this.consumers.length; i++) {
139+
// if (this.consumers[i].queue == msg.queuename) index = i;
140+
// }
141+
// if (index == -1) return;
142+
// this.consumers[index].sendToQueue(msg.replyto, msg.correlationId, msg.data, expiration);
143+
// } catch (error) {
144+
// this._logger.error("WebSocketclient::WebSocket error encountered " + error);
145+
// }
146+
// }
147+
// public async sendToQueue(msg: QueueMessage, expiration: Number) {
148+
// if (Util.IsNullEmpty(msg.queuename)) { throw new Error("sendToQueue, queuename is mandatory") }
149+
// if (this.consumers.length === 0) { throw new Error("No consumers for client available to send message through") }
150+
// // var result = this.consumers[0].sendToQueue(msg.queuename, msg.correlationId, { payload: msg.data, jwt: this.jwt, user: this.user });
151+
// var result = this.consumers[0].sendToQueue(msg.queuename, msg.correlationId, msg.data, expiration);
152+
// }
143153
sleep(ms) {
144154
return new Promise(resolve => {
145155
setTimeout(resolve, ms)
146156
})
147157
}
148-
async OnMessage(sender: amqp_consumer, msg: amqplib.ConsumeMessage) {
149-
try {
150-
this._logger.debug("WebSocketclient::WebSocket Send message to socketclient, from " + msg.properties.replyTo + " correlationId: " + msg.properties.correlationId);
151-
var _data = msg.content.toString();
152-
var data = await this.Queue(_data, msg.properties.replyTo, msg.properties.correlationId, sender.queue);
153-
this._logger.debug("WebSocketclient::WebSocket ack message in queue " + sender.queue);
154-
sender.channel.ack(msg);
155-
} catch (error) {
156-
this._logger.error("WebSocketclient::WebSocket error in queue " + sender.queue + " / " + error);
157-
setTimeout(() => {
158-
try {
159-
sender.channel.nack(msg);
160-
} catch (error) {
161-
this._logger.error("WebSocketclient::WebSocket nack message in queue " + sender.queue);
162-
}
163-
}, 2000);
164-
}
165-
}
158+
// async OnMessage(sender: amqp_consumer, msg: amqplib.ConsumeMessage) {
159+
// try {
160+
// this._logger.debug("WebSocketclient::WebSocket Send message to socketclient, from " + msg.properties.replyTo + " correlationId: " + msg.properties.correlationId);
161+
// var _data = msg.content.toString();
162+
// var data = await this.Queue(_data, msg.properties.replyTo, msg.properties.correlationId, sender.queue);
163+
// this._logger.debug("WebSocketclient::WebSocket ack message in queue " + sender.queue);
164+
// sender.channel.ack(msg);
165+
// } catch (error) {
166+
// this._logger.error("WebSocketclient::WebSocket error in queue " + sender.queue + " / " + error);
167+
// setTimeout(() => {
168+
// try {
169+
// sender.channel.nack(msg);
170+
// } catch (error) {
171+
// this._logger.error("WebSocketclient::WebSocket nack message in queue " + sender.queue);
172+
// }
173+
// }, 2000);
174+
// }
175+
// }
166176
public ping(): boolean {
167177
try {
168178
let msg: SocketMessage = SocketMessage.fromcommand("ping");

OpenFlow/src/WebSocketServer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ export class WebSocketServer {
3636
// });
3737
setInterval(this.pingClients, 10000);
3838
}
39-
private static async pingClients(): Promise<void> {
39+
private static async pingClients(): Promise<void> {
4040
let count: number = WebSocketServer._clients.length;
4141
WebSocketServer._clients = WebSocketServer._clients.filter(function (cli: WebSocketClient): boolean {
4242
try {
@@ -94,7 +94,7 @@ export class WebSocketServer {
9494
console.log(err);
9595
});
9696
}
97-
else if (cli.consumers != null && cli.consumers.length > 0) {
97+
else {
9898
// Should proberly turn this a little down, so we dont update all online users every 10th second
9999
Config.db.db.collection("users").updateOne({ _id: cli.user._id }, { $set: { _heartbeat: new Date(new Date().toISOString()) } }).catch((err) => {
100100
console.log(err);

OpenFlow/src/amqp_consumer.ts

Lines changed: 0 additions & 84 deletions
This file was deleted.

0 commit comments

Comments
 (0)