Skip to content

Commit 92bae86

Browse files
committed
New queue system, final push.
1 parent 7912f55 commit 92bae86

21 files changed

Lines changed: 380 additions & 555 deletions

OpenFlow/src/Config.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,14 @@ export class Config {
3939
public static domain: string = Config.getEnv("domain", "localhost"); // sent to website and used in baseurl()
4040

4141

42+
public static amqp_force_queue_prefix: boolean = Config.parseBoolean(Config.getEnv("amqp_force_queue_prefix", "true"));
43+
public static amqp_force_exchange_prefix: boolean = Config.parseBoolean(Config.getEnv("amqp_force_exchange_prefix", "true"));
4244
public static amqp_url: string = Config.getEnv("amqp_url", "amqp://localhost"); // used to register queues and by personal nodered
4345
public static amqp_check_for_consumer: boolean = Config.parseBoolean(Config.getEnv("amqp_check_for_consumer", "true"));
4446
public static amqp_default_expiration: number = parseInt(Config.getEnv("amqp_default_expiration", "10000")); // 10 seconds
47+
public static amqp_requeue_time: number = parseInt(Config.getEnv("amqp_requeue_time", "1000")); // 1 seconds
48+
public static amqp_dlx: string = Config.getEnv("amqp_dlx", "openflow-dlx"); // Dead letter exchange, used to pickup dead or timeout messages
49+
4550
// public static amqp_default_expiration: number = parseInt(Config.getEnv("amqp_default_expiration", (60 * 1000).toString())); // 1 min
4651
// public static deadLetterExchange: string = Config.getEnv("deadletterexchange", "openflow-dlx"); // queue used to handle messages, that was not picked up.
4752
// public static dlxmessagettl: number = parseInt(Config.getEnv("dlxmessagettl", "2000")); // time to live for messages in miliseconds
@@ -62,7 +67,7 @@ export class Config {
6267
public static personalnoderedtoken_expires_in: string = Config.getEnv("personalnoderedtoken_expires_in", "365d");
6368

6469
// Used to configure personal nodered's
65-
public static force_queue_prefix: boolean = Config.parseBoolean(Config.getEnv("force_queue_prefix", "true"));
70+
// public static force_queue_prefix: boolean = Config.parseBoolean(Config.getEnv("force_queue_prefix", "true"));
6671
public static nodered_image: string = Config.getEnv("nodered_image", "cloudhack/openflownodered:edge");
6772
public static saml_federation_metadata: string = Config.getEnv("saml_federation_metadata", "");
6873
public static api_ws_url: string = Config.getEnv("api_ws_url", "ws://localhost:3000");
@@ -73,10 +78,10 @@ export class Config {
7378
// Environment variables to set a prefix for RabbitMQs Dead Letter Exchange, Dead Letter Routing Key,
7479
// Dead Letter Queue, and Message Time to Live - to enable timeouts for RabbitMQ messages
7580
// These values must be the same for OpenFlowNodeRED and OpenFlow, or will cause errors when asserting queues
76-
public static amqp_dlx_prefix: string = Config.getEnv("amqp_dlx_prefix", "DLX.");
77-
public static amqp_dlrk_prefix: string = Config.getEnv("amqp_dlrk_prefix", "dlx.");
78-
public static amqp_dlq_prefix: string = Config.getEnv("amqp_dlq_prefix", "dlq.");
79-
public static amqp_message_ttl: number = parseInt(Config.getEnv("amqp_message_ttl", "20000"));
81+
// public static amqp_dlx_prefix: string = Config.getEnv("amqp_dlx_prefix", "DLX.");
82+
// public static amqp_dlrk_prefix: string = Config.getEnv("amqp_dlrk_prefix", "dlx.");
83+
// public static amqp_dlq_prefix: string = Config.getEnv("amqp_dlq_prefix", "dlq.");
84+
// public static amqp_message_ttl: number = parseInt(Config.getEnv("amqp_message_ttl", "20000"));
8085

8186
public static baseurl(): string {
8287
var result: string = "";

OpenFlow/src/DatabaseConnection.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,9 +276,12 @@ export class DatabaseConnection {
276276
if (typeof orderby === "string" || orderby instanceof String) {
277277
var neworderby = null;
278278
try {
279-
neworderby = JSON.parse((orderby as string));
280-
mysort = neworderby;
279+
if (orderby.indexOf("{") > -1) {
280+
neworderby = JSON.parse((orderby as string));
281+
mysort = neworderby;
282+
}
281283
} catch (error) {
284+
console.log(error);
282285
}
283286
if (neworderby == null) mysort[(orderby as string)] = 1;
284287
} else {

OpenFlow/src/Messages/Message.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -257,15 +257,16 @@ export class Message {
257257
await amqpwrapper.Instance().send("", msg.queuename, sendthis, expiration, msg.correlationId);
258258
} else {
259259
if (msg.queuename === msg.replyto) {
260-
cli._logger.warn("Ignore reply to self queuename:" + msg.queuename + " correlationId:" + msg.correlationId);
261-
return
260+
throw new Error("Cannot send reply to self queuename:" + msg.queuename + " correlationId:" + msg.correlationId);
261+
// cli._logger.warn("Ignore reply to self queuename:" + msg.queuename + " correlationId:" + msg.correlationId);
262+
// return
262263
}
263264
//var sendthis = { data: msg.data, jwt: cli.jwt, user: cli.user };
264265
var sendthis = msg.data;
265266
var result = await amqpwrapper.Instance().sendWithReplyTo("", msg.queuename, msg.replyto, sendthis, expiration, msg.correlationId);
266267
// var result = await amqpwrapper.Instance().sendWithReply("", msg.queuename, sendthis, expiration, msg.correlationId);
267268

268-
this.replyto = msg.correlationId;
269+
// this.replyto = msg.correlationId;
269270
// await cli.sendQueueReply(msg, expiration);
270271
}
271272
} catch (error) {
@@ -812,9 +813,9 @@ export class Message {
812813
var tuser: TokenUser = new TokenUser(nodereduser);
813814
var nodered_jwt: string = Crypt.createToken(tuser, Config.personalnoderedtoken_expires_in);
814815

815-
if (Config.force_queue_prefix) {
816-
user.nodered.queue_prefix = nodereduser.username;
817-
}
816+
// if (Config.force_queue_prefix) {
817+
// user.nodered.queue_prefix = nodereduser.username;
818+
// }
818819

819820
cli._logger.debug("[" + cli.user.username + "] ensure nodered role " + name + "noderedadmins");
820821
var noderedadmins = await User.ensureRole(cli.jwt, name + "noderedadmins", null);

OpenFlow/src/WebSocketClient.ts

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,19 +116,36 @@ export class WebSocketClient {
116116
}
117117
public async CreateConsumer(queuename: string): Promise<string> {
118118
var autoDelete: boolean = false; // Should we keep the queue around ? for robots and roles
119-
if (Util.IsNullEmpty(queuename)) { queuename = "web." + Math.random().toString(36).substr(2, 9); autoDelete = true; }
120-
121-
var queuename = await amqpwrapper.Instance().AddQueueConsumer(queuename, { autoDelete: autoDelete }, async (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
119+
if (Util.IsNullEmpty(queuename)) {
120+
if (this.clientagent == "nodered") {
121+
queuename = "nodered." + Math.random().toString(36).substr(2, 9); autoDelete = true;
122+
} else if (this.clientagent == "webapp") {
123+
queuename = "webapp." + Math.random().toString(36).substr(2, 9); autoDelete = true;
124+
} else if (this.clientagent == "web") {
125+
queuename = "web." + Math.random().toString(36).substr(2, 9); autoDelete = true;
126+
} else {
127+
queuename = "unknown." + Math.random().toString(36).substr(2, 9); autoDelete = true;
128+
}
129+
}
130+
var AssertQueueOptions: any = new Object(amqpwrapper.Instance().AssertQueueOptions);
131+
AssertQueueOptions.autoDelete = autoDelete;
132+
var queuename = await amqpwrapper.Instance().AddQueueConsumer(queuename, AssertQueueOptions, this.jwt, async (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
122133
var _data = msg;
123134
try {
124135
_data = await this.Queue(msg, queuename, options);
125136
ack();
126137
done(_data);
127138
} catch (error) {
128-
// ack(false);
129-
ack(); // just eat the error
130-
done(_data);
131-
console.log(queuename + " failed message queue message, discarding error and message: ", error);
139+
setTimeout(() => {
140+
ack(false);
141+
// ack(); // just eat the error
142+
done(_data);
143+
if (error.message != null && error.message != "") {
144+
console.log(queuename + " failed message queue message, nack and re queue message: ", error.message);
145+
} else {
146+
console.log(queuename + " failed message queue message, nack and re queue message: ", error);
147+
}
148+
}, Config.amqp_requeue_time);
132149
}
133150
});
134151
this.queues[queuename] = queuename;

OpenFlow/src/amqpwrapper.ts

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import * as winston from "winston";
22
import * as amqplib from "amqplib";
33
import { Util } from "./Util";
44
import { Config } from "./Config";
5+
import { cli } from "winston/lib/winston/config";
6+
import { Crypt } from "./Crypt";
57

68
type QueueOnMessage = (msg: string, options: QueueMessageOptions, ack: any, done: any) => void;
79
interface IHashTable<T> {
@@ -104,6 +106,16 @@ export class amqpwrapper {
104106
// Bad idear ...
105107
// this.AssertExchangeOptions.arguments['alternate-exchange'] = Config.deadLetterExchange;
106108
//}
109+
110+
if (!Util.IsNullEmpty(Config.amqp_dlx)) {
111+
this.AssertQueueOptions.arguments = {};
112+
this.AssertQueueOptions.arguments['x-dead-letter-exchange'] = Config.amqp_dlx;
113+
// // arguments: {
114+
// // 'x-dead-letter-exchange': Config.amqp_dlx_prefix + queue,
115+
// // 'x-dead-letter-routing-key': Config.amqp_dlrk_prefix + queue,
116+
// // 'x-message-ttl': Config.amqp_message_ttl
117+
118+
}
107119
}
108120
private timeout: NodeJS.Timeout = null;
109121
async connect(): Promise<void> {
@@ -131,7 +143,7 @@ export class amqpwrapper {
131143
if (!Util.IsNullEmpty(this.replyqueue)) {
132144
delete this.queues[this.replyqueue];
133145
}
134-
this.replyqueue = await this.AddQueueConsumer("", null, (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
146+
this.replyqueue = await this.AddQueueConsumer("", null, null, (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
135147
if (!Util.IsNullUndefinded(this.activecalls[options.correlationId])) {
136148
this.activecalls[options.correlationId].resolve(msg);
137149
this.activecalls[options.correlationId] = null;
@@ -163,13 +175,13 @@ export class amqpwrapper {
163175
var keys = Object.keys(this.exchanges);
164176
for (var i = 0; i < keys.length; i++) {
165177
var q1: amqpexchange = this.exchanges[keys[i]];
166-
this.AddExchangeConsumer(q1.exchange, q1.algorithm, q1.routingkey, q1.ExchangeOptions, q1.callback);
178+
this.AddExchangeConsumer(q1.exchange, q1.algorithm, q1.routingkey, q1.ExchangeOptions, null, q1.callback);
167179
}
168180
var keys = Object.keys(this.queues);
169181
for (var i = 0; i < keys.length; i++) {
170182
if (keys[i] != this.replyqueue) {
171183
var q2: amqpqueue = this.queues[keys[i]];
172-
this.AddQueueConsumer(q2.queue, q2.QueueOptions, q2.callback);
184+
this.AddQueueConsumer(q2.queue, q2.QueueOptions, null, q2.callback);
173185
}
174186
}
175187
}
@@ -183,8 +195,16 @@ export class amqpwrapper {
183195
await this.channel.cancel(q.consumerTag);
184196
delete this.queues[q.queue];
185197
}
186-
async AddQueueConsumer(queue: string, QueueOptions: any, callback: QueueOnMessage): Promise<string> {
198+
async AddQueueConsumer(queue: string, QueueOptions: any, jwt: string, callback: QueueOnMessage): Promise<string> {
187199
var q: amqpqueue = null;
200+
if (Config.amqp_force_queue_prefix && !Util.IsNullEmpty(jwt)) {
201+
var tuser = Crypt.verityToken(jwt);
202+
var name = tuser.username.split("@").join("").split(".").join("");
203+
name = name.toLowerCase();
204+
var isrole = tuser.roles.filter(x => x._id == queue);
205+
if (isrole.length == 0 && tuser._id != queue) queue = name + queue;
206+
}
207+
188208
if (this.exchanges[queue] != null) {
189209
q = this.queues[queue];
190210
} else {
@@ -211,8 +231,15 @@ export class amqpwrapper {
211231
this.queues[q.queue] = q;
212232
return q.queue;
213233
}
214-
async AddExchangeConsumer(exchange: string, algorithm: string, routingkey: string, ExchangeOptions: any, callback: QueueOnMessage): Promise<void> {
234+
async AddExchangeConsumer(exchange: string, algorithm: string, routingkey: string, ExchangeOptions: any, jwt: string, callback: QueueOnMessage): Promise<void> {
215235
var q: amqpexchange = null;
236+
if (Config.amqp_force_exchange_prefix && !Util.IsNullEmpty(jwt)) {
237+
var tuser = Crypt.verityToken(jwt);
238+
var name = tuser.username.split("@").join("").split(".").join("");
239+
name = name.toLowerCase();
240+
exchange = name + exchange;
241+
}
242+
216243
if (this.exchanges[exchange] != null) {
217244
q = this.exchanges[exchange];
218245
} else {
@@ -224,7 +251,12 @@ export class amqpwrapper {
224251
q.ExchangeOptions = new Object((ExchangeOptions != null ? ExchangeOptions : this.AssertExchangeOptions));
225252
q.exchange = exchange; q.algorithm = algorithm; q.routingkey = routingkey; q.callback = callback;
226253
this._ok = await this.channel.assertExchange(q.exchange, q.algorithm, q.ExchangeOptions);
227-
q.queue = await this.AddQueueConsumer("", null, q.callback);
254+
var AssertQueueOptions = null;
255+
if (!Util.IsNullEmpty(Config.amqp_dlx) && exchange == Config.amqp_dlx) {
256+
AssertQueueOptions = Object.create(this.AssertQueueOptions);
257+
delete AssertQueueOptions.arguments;
258+
}
259+
q.queue = await this.AddQueueConsumer("", AssertQueueOptions, jwt, q.callback);
228260
this.channel.bindQueue(q.queue, q.exchange, q.routingkey);
229261
this._logger.info("[AMQP] Added exchange consumer " + q.exchange);
230262
this.exchanges[exchange] = q;
@@ -254,13 +286,14 @@ export class amqpwrapper {
254286
}
255287
this.channel.ack(msg);
256288
}, (result) => {
257-
if (msg != null && !Util.IsNullEmpty(replyTo)) {
258-
try {
259-
this.channel.sendToQueue(replyTo, Buffer.from(result), { correlationId: msg.properties.correlationId });
260-
} catch (error) {
261-
console.error("Error sending response to " + replyTo + " " + JSON.stringify(error))
262-
}
263-
}
289+
// ROLLBACK
290+
// if (msg != null && !Util.IsNullEmpty(replyTo)) {
291+
// try {
292+
// this.channel.sendToQueue(replyTo, Buffer.from(result), { correlationId: msg.properties.correlationId });
293+
// } catch (error) {
294+
// console.error("Error sending response to " + replyTo + " " + JSON.stringify(error))
295+
// }
296+
// }
264297
});
265298
}
266299
async sendWithReply(exchange: string, queue: string, data: any, expiration: number, correlationId: string): Promise<string> {

OpenFlow/src/index.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { Role } from "./Role";
1919
import { Config } from "./Config";
2020
import { KubeUtil } from "./KubeUtil";
2121
import { amqpwrapper, QueueMessageOptions } from "./amqpwrapper";
22+
import { Util } from "./Util";
2223

2324
const logger: winston.Logger = Logger.configure();
2425
Config.db = new DatabaseConnection(logger, Config.mongodb_url, Config.mongodb_db);
@@ -32,6 +33,32 @@ async function initamqp() {
3233
amqpwrapper.SetTestInstance(testamqp);
3334
await testamqp.connect();
3435

36+
// Must also consume messages in the dead letter queue, to catch messages that have timed out
37+
await amqp.AddExchangeConsumer(Config.amqp_dlx, "fanout", "", null, null, (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
38+
// This is the function to run when the dead letter (timed out) message is picked up
39+
// var data = JSON.parse(msg.content.toString());
40+
// Change the command and return back to the correct queue (replyTo) to be handled
41+
// Clear x-first-death-reason header
42+
// msg.properties.headers["x-first-death-reason"] = null;
43+
// Set command to timeout to be handled when collected from the node's queue
44+
if (typeof msg === "string" || msg instanceof String) {
45+
try {
46+
msg = JSON.parse((msg as any));
47+
} catch (error) {
48+
}
49+
}
50+
try {
51+
msg.command = "timeout";
52+
// Resend message, this time to the reply queue for the correct node (replyTo)
53+
// this.SendMessage(JSON.stringify(data), msg.properties.replyTo, msg.properties.correlationId, false);
54+
console.log("[DLX][" + options.exchange + "] Send timeout to " + options.replyTo)
55+
amqpwrapper.Instance().sendWithReply("", options.replyTo, msg, 20000, options.correlationId);
56+
} catch (error) {
57+
}
58+
ack();
59+
done();
60+
});
61+
3562
// await amqp.AddExchangeConsumer("testexchange", "fanout", "", null, (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
3663
// console.log("testexchange: " + msg);
3764
// ack();

OpenFlow/src/public/Controllers.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,8 @@ module openflow {
696696
this.basequery = {};
697697
this.basequery = { $or: ors };
698698
if (!this.showcompleted) {
699-
this.basequery.state = { $ne: "completed" };
699+
// this.basequery.state = { $ne: "completed" };
700+
this.basequery["$and"] = [{ state: { $ne: "completed" } }, { state: { $ne: "failed" } }];
700701
this.basequery.form = { $exists: true };
701702
// this.basequery.$or = ors;
702703
} else {
@@ -1813,9 +1814,12 @@ module openflow {
18131814
if (data.queuename == this.queuename) {
18141815
if (this.instanceid == null && data.data._id != null) {
18151816
this.instanceid = data.data._id;
1816-
this.$location.path("/Form/" + this.id + "/" + this.instanceid);
1817-
if (!this.$scope.$$phase) { this.$scope.$apply(); }
1817+
// this.$location.path("/Form/" + this.id + "/" + this.instanceid);
1818+
// if (!this.$scope.$$phase) { this.$scope.$apply(); }
1819+
this.loadData();
18181820
return;
1821+
} else {
1822+
this.loadData();
18191823
}
18201824
}
18211825
if (!this.$scope.$$phase) { this.$scope.$apply(); }
@@ -2193,6 +2197,18 @@ module openflow {
21932197

21942198
$('#workflowform :button').hide();
21952199
$('input[type="submit"]').hide();
2200+
if (this.model.state == "failed") {
2201+
if (!this.model.payload) {
2202+
this.errormessage = "An unknown error occurred";
2203+
} else if (this.model.payload.message != null && this.model.payload.message != "") {
2204+
this.errormessage = this.model.payload.message;
2205+
} else if (this.model.payload.Message != null && this.model.payload.Message != "") {
2206+
this.errormessage = this.model.payload.Message;
2207+
} else {
2208+
this.errormessage = this.model.payload;
2209+
}
2210+
console.log(this.model.payload);
2211+
}
21962212
}
21972213
if (!this.$scope.$$phase) { this.$scope.$apply(); }
21982214
}

0 commit comments

Comments
 (0)