Skip to content

Commit 3893571

Browse files
committed
queue improvements, ui improvements
1 parent f934a36 commit 3893571

15 files changed

Lines changed: 107 additions & 38 deletions

OpenFlow/src/Config.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,12 @@ export class Config {
108108
public static domain: string = Config.getEnv("domain", "localhost"); // sent to website and used in baseurl()
109109

110110

111-
public static amqp_reply_expiration: number = parseInt(Config.getEnv("amqp_reply_expiration", "10000")); // 10 seconds
111+
public static amqp_reply_expiration: number = parseInt(Config.getEnv("amqp_reply_expiration", (60 * 1000).toString())); // 1 min
112112
public static amqp_force_queue_prefix: boolean = Config.parseBoolean(Config.getEnv("amqp_force_queue_prefix", "true"));
113113
public static amqp_force_exchange_prefix: boolean = Config.parseBoolean(Config.getEnv("amqp_force_exchange_prefix", "true"));
114114
public static amqp_url: string = Config.getEnv("amqp_url", "amqp://localhost"); // used to register queues and by personal nodered
115115
public static amqp_check_for_consumer: boolean = Config.parseBoolean(Config.getEnv("amqp_check_for_consumer", "true"));
116-
public static amqp_default_expiration: number = parseInt(Config.getEnv("amqp_default_expiration", "10000")); // 10 seconds
116+
public static amqp_default_expiration: number = parseInt(Config.getEnv("amqp_default_expiration", (60 * 1000).toString())); // 1 min
117117
public static amqp_requeue_time: number = parseInt(Config.getEnv("amqp_requeue_time", "1000")); // 1 seconds
118118
public static amqp_dlx: string = Config.getEnv("amqp_dlx", "openflow-dlx"); // Dead letter exchange, used to pickup dead or timeout messages
119119

OpenFlow/src/Messages/Message.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,27 @@ export class Message {
252252
}
253253
var expiration: number = Config.amqp_default_expiration;
254254
if (typeof msg.expiration == 'number') expiration = msg.expiration;
255+
if (typeof msg.data === 'string' || msg.data instanceof String) {
256+
try {
257+
msg.data = JSON.parse((msg.data as any));
258+
} catch (error) {
259+
}
260+
}
261+
try {
262+
if (!Util.IsNullEmpty(msg.data) && Util.IsNullEmpty(msg.data.jwt)) {
263+
if (!Util.IsNullEmpty(msg.jwt)) {
264+
msg.data.jwt = msg.jwt;
265+
} else {
266+
msg.data.jwt = cli.jwt;
267+
}
268+
}
269+
} catch (error) {
270+
cli._logger.error(error);
271+
}
272+
if (!Util.IsNullEmpty(msg.data) && !Util.IsNullEmpty(msg.data.jwt)) {
273+
var tuser = Crypt.verityToken(msg.data.jwt);
274+
msg.data.user = tuser;
275+
}
255276
if (Util.IsNullEmpty(msg.replyto)) {
256277
// var sendthis = { data: msg.data, jwt: cli.jwt, user: cli.user };
257278
var sendthis = msg.data;

OpenFlow/src/amqpwrapper.ts

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -201,12 +201,56 @@ export class amqpwrapper {
201201
if (this.channel == null || this.conn == null) throw new Error("Cannot Add new Queue Consumer, not connected to rabbitmq");
202202
if (queue == null) queue = "";
203203
var q: amqpqueue = null;
204-
if (Config.amqp_force_queue_prefix && !Util.IsNullEmpty(jwt)) {
205-
var tuser = Crypt.verityToken(jwt);
206-
var name = tuser.username.split("@").join("").split(".").join("");
207-
name = name.toLowerCase();
204+
if (Config.amqp_force_queue_prefix && !Util.IsNullEmpty(jwt) && !Util.IsNullEmpty(queue)) {
205+
if (queue.length == 24) {
206+
var tuser = Crypt.verityToken(jwt);
207+
var name = tuser.username.split("@").join("").split(".").join("");
208+
name = name.toLowerCase();
209+
var isrole = tuser.roles.filter(x => x._id == queue);
210+
if (isrole.length == 0 && tuser._id != queue) {
211+
var skip: boolean = false;
212+
var arr = await Config.db.query({ _id: queue }, { name: 1 }, 1, 0, null, "users", jwt);
213+
if (arr.length == 0) skip = true;
214+
if (!skip) {
215+
var arr = await Config.db.query({ _id: queue }, { name: 1 }, 1, 0, null, "openrpa", jwt);
216+
if (arr.length == 0) skip = true;
217+
}
218+
if (!skip) {
219+
var arr = await Config.db.query({ _id: queue }, { name: 1 }, 1, 0, null, "workflow", jwt);
220+
if (arr.length == 0) skip = true;
221+
}
222+
if (!skip) {
223+
queue = name + queue;
224+
} else {
225+
this._logger.info("[SKIP] skipped force prefix for " + queue);
226+
}
227+
} else {
228+
this._logger.info("[SKIP] skipped force prefix for " + queue);
229+
}
230+
} else {
231+
var tuser = Crypt.verityToken(jwt);
232+
var name = tuser.username.split("@").join("").split(".").join("");
233+
name = name.toLowerCase();
234+
queue = name + queue;
235+
}
236+
} else if (queue.length == 24) {
208237
var isrole = tuser.roles.filter(x => x._id == queue);
209-
if (isrole.length == 0 && tuser._id != queue) queue = name + queue;
238+
if (isrole.length == 0 && tuser._id != queue) {
239+
var skip: boolean = false;
240+
var arr = await Config.db.query({ _id: queue }, { name: 1 }, 1, 0, null, "users", jwt);
241+
if (arr.length == 0) skip = true;
242+
if (!skip) {
243+
var arr = await Config.db.query({ _id: queue }, { name: 1 }, 1, 0, null, "openrpa", jwt);
244+
if (arr.length == 0) skip = true;
245+
}
246+
if (!skip) {
247+
var arr = await Config.db.query({ _id: queue }, { name: 1 }, 1, 0, null, "workflow", jwt);
248+
if (arr.length == 0) skip = true;
249+
}
250+
if (!skip) {
251+
throw new Error("Access denied creating consumer for " + queue);
252+
}
253+
}
210254
}
211255
// if (!await amqpwrapper.TestInstance().checkQueue(queue)) {
212256
// if (amqpwrapper.TestInstance().conn == null || amqpwrapper.TestInstance().channel == null) {
@@ -340,9 +384,8 @@ export class amqpwrapper {
340384
options.replyTo = replyTo;
341385
if (Util.IsNullEmpty(correlationId)) correlationId = this.generateUuid();
342386
if (!Util.IsNullEmpty(correlationId)) options.correlationId = correlationId;
343-
if (!Util.IsNullEmpty(expiration)) {
344-
if (expiration > 0) options.expiration = expiration.toString();
345-
}
387+
if (expiration < 1) expiration = Config.amqp_default_expiration;
388+
options.expiration = expiration.toString();
346389
if (Util.IsNullEmpty(exchange)) {
347390
if (!await this.checkQueue(queue)) {
348391
throw new Error("No consumer listening at " + queue);
@@ -364,9 +407,8 @@ export class amqpwrapper {
364407
this._logger.info("send to queue: " + queue + " exchange: " + exchange);
365408
var options: any = { mandatory: true };
366409
if (!Util.IsNullEmpty(correlationId)) options.correlationId = correlationId;
367-
if (!Util.IsNullEmpty(expiration)) {
368-
if (expiration > 0) options.expiration = expiration.toString();
369-
}
410+
if (expiration < 1) expiration = Config.amqp_default_expiration;
411+
options.expiration = expiration.toString();
370412
if (Util.IsNullEmpty(exchange)) {
371413
if (!await this.checkQueue(queue)) {
372414
throw new Error("No consumer listening at " + queue);

OpenFlow/src/public/Controllers.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1966,7 +1966,13 @@ module openflow {
19661966
}
19671967
// console.debug("SendOne: " + this.workflow._id + " / " + this.workflow.queue);
19681968
this.model.payload._id = this.instanceid;
1969-
await this.SendOne(this.workflow.queue, this.model.payload);
1969+
try {
1970+
await this.SendOne(this.workflow.queue, this.model.payload);
1971+
} catch (error) {
1972+
this.errormessage = error;
1973+
if (!this.$scope.$$phase) { this.$scope.$apply(); }
1974+
console.error(this.errormessage);
1975+
}
19701976
this.loadData();
19711977
}
19721978
traversecomponentsPostProcess(components: any[], data: any) {

OpenFlowNodeRED/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "openflow-nodered",
3-
"version": "1.0.40",
3+
"version": "1.0.41",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {
@@ -46,4 +46,4 @@
4646
"unhandled-rejection": "^1.0.0",
4747
"winston": "^3.3.3"
4848
}
49-
}
49+
}

OpenFlowNodeRED/src/nodeclient/Message.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,11 @@ export class Message {
536536
handled = true;
537537
}
538538
}
539+
if (!handled) {
540+
this.Reply("error");
541+
this.data = "nack message";
542+
await this.Send(cli);
543+
}
539544
// ROLLBACK
540545
// if (!handled) {
541546
// if (!NoderedUtil.IsNullEmpty(msg.correlationId)) {

OpenFlowNodeRED/src/nodered/nodes/amqp_nodes.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ export class amqp_consumer_node {
108108
this.OnMessage(msg, ack);
109109
});
110110
this.websocket._logger.info("registed amqp consumer as " + this.localqueue);
111-
this.node.status({ fill: "green", shape: "dot", text: "Connected" });
111+
this.node.status({ fill: "green", shape: "dot", text: "Connected " + this.localqueue });
112112
} catch (error) {
113113
NoderedUtil.HandleError(this, error);
114114
}
@@ -198,7 +198,7 @@ export class amqp_publisher_node {
198198
});
199199
console.log(this.localqueue);
200200
this.websocket._logger.info("registed amqp published return queue as " + this.localqueue);
201-
this.node.status({ fill: "green", shape: "dot", text: "Connected" });
201+
this.node.status({ fill: "green", shape: "dot", text: "Connected " + this.localqueue });
202202

203203
} catch (error) {
204204
NoderedUtil.HandleError(this, error);
@@ -238,7 +238,7 @@ export class amqp_publisher_node {
238238
this.node.status({ fill: "blue", shape: "dot", text: "Sending message ..." });
239239
await NoderedUtil.QueueMessage(this.websocket, queue, this.localqueue, data, null, expiration);
240240
// this.con.SendMessage(JSON.stringify(data), this.config.queue, null, true);
241-
this.node.status({ fill: "green", shape: "dot", text: "Connected" });
241+
this.node.status({ fill: "green", shape: "dot", text: "Connected " + this.localqueue });
242242
} catch (error) {
243243
NoderedUtil.HandleError(this, error);
244244
}

OpenFlowNodeRED/src/nodered/nodes/rpa.html

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,6 @@
66
</select>
77
<input id="node-input-queue" type="hidden">
88
</div>
9-
<div class="form-row">
10-
<label><i class="fa fa-tag"></i> Auto acknowledgment</label>
11-
<input type="checkbox" id="node-input-noack" style="width: auto;">
12-
</div>
139
<div class="form-row">
1410
<label ><i class="fa fa-tag"></i> Name</label>
1511
<input type="text" id="node-input-name" placeholder="Node name">
@@ -26,7 +22,6 @@
2622
icon: "font-awesome/fa-signal",
2723
defaults: {
2824
queue: { value: "", required: true },
29-
noack: { value: true, required: true },
3025
name: { value: "" }
3126
},
3227
inputs: 0,

OpenFlowNodeRED/src/nodered/nodes/rpa_nodes.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import { QueueMessage } from "../../nodeclient/Message";
88

99
export interface Irpa_detector_node {
1010
queue: string;
11-
noack: boolean;
1211
}
1312
export class rpa_detector_node {
1413
public node: Red = null;
@@ -28,7 +27,7 @@ export class rpa_detector_node {
2827
WebSocketClient.instance.events.on("onclose", (message) => {
2928
if (message == null) message = "";
3029
this.node.status({ fill: "red", shape: "dot", text: "Disconnected " + message });
31-
this.onclose();
30+
this.onclose(null);
3231
});
3332
this.connect();
3433
} catch (error) {
@@ -70,11 +69,12 @@ export class rpa_detector_node {
7069
NoderedUtil.HandleError(this, error);
7170
}
7271
}
73-
onclose() {
72+
async onclose(done: any) {
7473
if (!NoderedUtil.IsNullEmpty(this.localqueue)) {
75-
NoderedUtil.CloseQueue(WebSocketClient.instance, this.localqueue);
74+
await NoderedUtil.CloseQueue(WebSocketClient.instance, this.localqueue);
7675
this.localqueue = "";
7776
}
77+
if (done != null) done();
7878
}
7979
}
8080

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.0.40
1+
1.0.41

0 commit comments

Comments
 (0)