Skip to content

Commit 87a953a

Browse files
committed
add default token for assign_workflow
1 parent f87304a commit 87a953a

12 files changed

Lines changed: 32 additions & 53 deletions

OpenFlow/src/WebSocketServer.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@ export class WebSocketServer {
2929
})
3030
public static websocket_queue_count = new client.Gauge({
3131
name: 'openflow_websocket_queue_count',
32-
help: 'Total number of registered queues'
32+
help: 'Total number of registered queues',
33+
labelNames: ["clientid"]
3334
})
3435
public static websocket_queue_message_count = new client.Counter({
3536
name: 'openflow_websocket_queue_message_count',
36-
help: 'Total number of queues messages'
37+
help: 'Total number of queues messages',
38+
labelNames: ["queuename"]
3739
})
3840

3941
static configure(logger: winston.Logger, server: http.Server, register: client.Registry): void {

OpenFlow/src/WebSocketServerClient.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ export class WebSocketServerClient {
158158
this._logger.error("WebSocketclient::closeconsumers " + error);
159159
}
160160
}
161-
WebSocketServer.websocket_queue_count.set(this._queues.length);
161+
WebSocketServer.websocket_queue_count.labels(this.id).set(this._queues.length);
162162
semaphore.up();
163163
// return await this.queuesMutex.dispatch(async () => {
164164
// });
@@ -183,13 +183,14 @@ export class WebSocketServerClient {
183183
}
184184
}
185185
public async CloseConsumer(queuename: string): Promise<void> {
186+
var old = this._queues.length;
186187
for (let i = this._queues.length - 1; i >= 0; i--) {
187188
const q = this._queues[i];
188189
if (q.queue == queuename || q.queuename == queuename) {
189190
try {
190191
await amqpwrapper.Instance().RemoveQueueConsumer(this._queues[i]);
191192
this._queues.splice(i, 1);
192-
WebSocketServer.websocket_queue_count.set(this._queues.length);
193+
WebSocketServer.websocket_queue_count.labels(this.id).set(this._queues.length);
193194
} catch (error) {
194195
this._logger.error("WebSocketclient::CloseConsumer " + error);
195196
}
@@ -236,9 +237,8 @@ export class WebSocketServerClient {
236237
}
237238
});
238239
qname = queue.queue;
239-
WebSocketServer.websocket_queue_count.set(this._queues.length);
240240
this._queues.push(queue);
241-
// console.log('_queues.length: ' + this._queues.length);
241+
WebSocketServer.websocket_queue_count.labels(this.id).set(this._queues.length);
242242
} catch (error) {
243243
this._logger.error("WebSocketclient::CreateConsumer " + error);
244244
}

OpenFlow/src/amqpwrapper.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ export class amqpwrapper {
114114
// }
115115
this.replyqueue = await this.AddQueueConsumer("", null, null, (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
116116
WebSocketServer.websocket_queue_message_count.inc();
117+
WebSocketServer.websocket_queue_message_count.labels(this.replyqueue.queue).inc();
117118
if (!NoderedUtil.IsNullUndefinded(this.activecalls[options.correlationId])) {
118119
this.activecalls[options.correlationId].resolve(msg);
119120
this.activecalls[options.correlationId] = null;
@@ -359,6 +360,7 @@ export class amqpwrapper {
359360
throw new Error("No consumer listening at " + queue);
360361
}
361362
WebSocketServer.websocket_queue_message_count.inc();
363+
WebSocketServer.websocket_queue_message_count.labels(queue).inc();
362364
} else {
363365
this.channel.publish(exchange, "", Buffer.from(data), options);
364366
}
@@ -390,6 +392,7 @@ export class amqpwrapper {
390392
throw new Error("No consumer listening at " + queue);
391393
}
392394
WebSocketServer.websocket_queue_message_count.inc();
395+
WebSocketServer.websocket_queue_message_count.labels(queue).inc();
393396
} else {
394397
this.channel.publish(exchange, "", Buffer.from(data), options);
395398
}

OpenFlowNodeRED/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "openflow-nodered",
3-
"version": "1.1.78",
3+
"version": "1.1.81",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {

OpenFlowNodeRED/src/nodered/nodes/api_nodes.ts

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ export class api_get_jwt {
6464
if (!NoderedUtil.IsNullEmpty(username) && !NoderedUtil.IsNullEmpty(password)) {
6565
q.username = username; q.password = password;
6666
} else {
67-
if (Config.jwt !== "") {
68-
q.jwt = Config.jwt;
67+
if (!NoderedUtil.IsNullUndefinded(WebSocketClient.instance) && !NoderedUtil.IsNullEmpty(WebSocketClient.instance.jwt)) {
68+
q.jwt = WebSocketClient.instance.jwt;
6969
} else if (Crypt.encryption_key() !== "") {
7070
const user = new TokenUser();
7171
if (NoderedUtil.IsNullEmpty(Config.nodered_sa)) {
@@ -133,9 +133,6 @@ export class api_get {
133133
if (!NoderedUtil.IsNullUndefinded(msg.orderby)) { this.config.orderby = msg.orderby; }
134134
if (!NoderedUtil.IsNullEmpty(msg.top)) { this.config.top = parseInt(msg.top); }
135135
if (!NoderedUtil.IsNullEmpty(msg.skip)) { this.config.skip = parseInt(msg.skip); }
136-
// if (NoderedUtil.IsNullEmpty(msg.jwt) && !NoderedUtil.IsNullEmpty(Config.jwt)) {
137-
// msg.jwt = Config.jwt;
138-
// }
139136

140137
if (NoderedUtil.IsNullEmpty(this.config.top)) { this.config.top = 500; }
141138
if (NoderedUtil.IsNullEmpty(this.config.skip)) { this.config.skip = 0; }
@@ -376,17 +373,12 @@ export class api_update {
376373
async oninput(msg: any) {
377374
try {
378375
this.node.status({});
379-
// if (NoderedUtil.IsNullEmpty(msg.jwt)) { return NoderedUtil.HandleError(this, "Missing jwt token"); }
380-
381376
if (!NoderedUtil.IsNullEmpty(msg.entitytype)) { this.config.entitytype = msg.entitytype; }
382377
if (!NoderedUtil.IsNullEmpty(msg.collection)) { this.config.collection = msg.collection; }
383378
if (!NoderedUtil.IsNullEmpty(msg.inputfield)) { this.config.inputfield = msg.inputfield; }
384379
if (!NoderedUtil.IsNullEmpty(msg.resultfield)) { this.config.resultfield = msg.resultfield; }
385380
if (!NoderedUtil.IsNullEmpty(msg.writeconcern)) { this.config.writeconcern = msg.writeconcern; }
386381
if (!NoderedUtil.IsNullEmpty(msg.journal)) { this.config.journal = msg.journal; }
387-
// if (NoderedUtil.IsNullEmpty(msg.jwt) && !NoderedUtil.IsNullEmpty(Config.jwt)) {
388-
// msg.jwt = Config.jwt;
389-
// }
390382

391383
if ((this.config.writeconcern as any) === undefined || (this.config.writeconcern as any) === null) this.config.writeconcern = 0;
392384
if ((this.config.journal as any) === undefined || (this.config.journal as any) === null) this.config.journal = false;
@@ -398,16 +390,6 @@ export class api_update {
398390
if (data.length === 0) { this.node.warn("input array is empty"); }
399391
} else { this.node.warn("Input data is null"); }
400392

401-
// this.node.status({ fill: "blue", shape: "dot", text: "Inserting items" });
402-
// const Promises: Promise<any>[] = [];
403-
// for (let i: number = 0; i < data.length; i++) {
404-
// const element: any = data[i];
405-
// if (!NoderedUtil.IsNullEmpty(this.config.entitytype)) {
406-
// element._type = this.config.entitytype;
407-
// }
408-
// Promises.push(NoderedUtil.UpdateOne(this.config.collection, null, element, this.config.writeconcern, this.config.journal, msg.jwt));
409-
// }
410-
// data = await Promise.all(Promises.map(p => p.catch(e => e)));
411393
this.node.status({ fill: "blue", shape: "dot", text: "processing ..." });
412394
let Promises: Promise<any>[] = [];
413395
let results: any[] = [];
@@ -678,17 +660,12 @@ export class api_map_reduce {
678660
async oninput(msg: any) {
679661
try {
680662
this.node.status({});
681-
// if (NoderedUtil.IsNullEmpty(msg.jwt)) { return NoderedUtil.HandleError(this, "Missing jwt token"); }
682-
683663
if (!NoderedUtil.IsNullEmpty(msg.collection)) { this.config.collection = msg.collection; }
684664
if (!NoderedUtil.IsNullUndefinded(msg.map)) { this.config.map = msg.map; }
685665
if (!NoderedUtil.IsNullUndefinded(msg.reduce)) { this.config.reduce = msg.reduce; }
686666
if (!NoderedUtil.IsNullUndefinded(msg.finalize)) { this.config.finalize = msg.finalize; }
687667
if (!NoderedUtil.IsNullUndefinded(msg.scope)) { this.config.finalize = msg.scope; }
688668
if (!NoderedUtil.IsNullUndefinded(msg.query)) { this.config.query = msg.query; }
689-
// if (NoderedUtil.IsNullEmpty(msg.jwt) && !NoderedUtil.IsNullEmpty(Config.jwt)) {
690-
// msg.jwt = Config.jwt;
691-
// }
692669

693670
const scope = NoderedUtil.FetchFromObject(msg, this.config.scope);
694671
const _output: any = {};
@@ -846,9 +823,6 @@ export class api_updatedocument {
846823
if (!NoderedUtil.IsNullEmpty(msg.collection)) { collection = msg.collection; }
847824
if (!NoderedUtil.IsNullEmpty(msg.writeconcern)) { writeconcern = msg.writeconcern; }
848825
if (!NoderedUtil.IsNullEmpty(msg.journal)) { journal = msg.journal; }
849-
// if (NoderedUtil.IsNullEmpty(jwt) && !NoderedUtil.IsNullEmpty(Config.jwt)) {
850-
// jwt = Config.jwt;
851-
// }
852826

853827
if ((writeconcern as any) === undefined || (writeconcern as any) === null) writeconcern = 0;
854828
if ((journal as any) === undefined || (journal as any) === null) journal = false;
@@ -1061,9 +1035,6 @@ export class download_file {
10611035

10621036
if (!NoderedUtil.IsNullEmpty(msg.fileid)) { this.config.fileid = msg.fileid; }
10631037
if (!NoderedUtil.IsNullEmpty(msg.filename)) { this.config.filename = msg.filename; }
1064-
// if (NoderedUtil.IsNullEmpty(jwt) && !NoderedUtil.IsNullEmpty(Config.jwt)) {
1065-
// jwt = Config.jwt;
1066-
// }
10671038

10681039
this.node.status({ fill: "blue", shape: "dot", text: "Getting file" });
10691040
const file = await NoderedUtil.GetFile(filename, fileid, jwt);
@@ -1108,10 +1079,6 @@ export class upload_file {
11081079
let mimeType = this.config.mimeType;
11091080
if (!NoderedUtil.IsNullEmpty(msg.filename)) { filename = msg.filename; }
11101081
if (!NoderedUtil.IsNullEmpty(msg.mimeType)) { mimeType = msg.mimeType; }
1111-
// if (NoderedUtil.IsNullEmpty(jwt) && !NoderedUtil.IsNullEmpty(Config.jwt)) {
1112-
// jwt = Config.jwt;
1113-
// }
1114-
11151082

11161083
this.node.status({ fill: "blue", shape: "dot", text: "Saving file" });
11171084
const file = await NoderedUtil.SaveFile(filename, mimeType, msg.metadata, msg.payload, jwt);

OpenFlowNodeRED/src/nodered/nodes/workflow_nodes.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,7 @@ export class assign_workflow_node {
555555

556556
res2[0].state = state;
557557
result = res[0].msg;
558+
if (NoderedUtil.IsNullUndefinded(result)) result = {};
558559
result.payload = data.payload;
559560
result.jwt = data.jwt;
560561
result.user = data.user;
@@ -577,7 +578,6 @@ export class assign_workflow_node {
577578
try {
578579
this.node.status({ fill: "blue", shape: "dot", text: "Processing" });
579580
// if (this.localqueue !== null && this.localqueue !== undefined && this.localqueue !== "") { this.localqueue = Config.queue_prefix + this.localqueue; }
580-
const jwt = msg.jwt;
581581
const workflowid = (!NoderedUtil.IsNullEmpty(this.config.workflowid) ? this.config.workflowid : msg.workflowid);
582582
let name = this.config.name;
583583
if (NoderedUtil.IsNullEmpty(name)) name = msg.name;
@@ -592,7 +592,14 @@ export class assign_workflow_node {
592592
this.node.status({ fill: "red", shape: "dot", text: "workflowid is mandatory" });
593593
return;
594594
}
595+
let jwt = msg.jwt;
595596
const initialrun = (!NoderedUtil.IsNullEmpty(msg.initialrun) ? msg.initialrun : this.config.initialrun);
597+
if (NoderedUtil.IsNullEmpty(jwt) && !NoderedUtil.IsNullUndefinded(WebSocketClient.instance)
598+
&& !NoderedUtil.IsNullEmpty(WebSocketClient.instance.jwt)) {
599+
jwt = WebSocketClient.instance.jwt;
600+
}
601+
602+
596603
msg.jwt = (await NoderedUtil.RenewToken(jwt, true)).jwt;
597604
// Logger.instanse.info("run workflow called with id " + msg._id + " (" + msg.name + ")");
598605
const runnerinstance = new Base();

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.1.78
1+
1.1.81

docker-compose-toolbox.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ services:
4545
- "traefik.http.routers.web.rule=Host(`toolbox.openrpa.dk`)"
4646
- "traefik.http.routers.web.entrypoints=web"
4747
- "traefik.frontend.passHostHeader=true"
48-
image: "cloudhack/openflow:1.1.78"
48+
image: "cloudhack/openflow:1.1.81"
4949
container_name: "web"
5050
environment:
5151
- update_acl_based_on_groups=true
@@ -82,7 +82,7 @@ services:
8282
- "traefik.http.routers.nodered.rule=Host(`nodered1.toolbox.openrpa.dk`)"
8383
- "traefik.http.routers.nodered.entrypoints=web"
8484
- "traefik.http.services.nodered.loadbalancer.server.port=1880"
85-
image: "cloudhack/openflownodered:1.1.78"
85+
image: "cloudhack/openflownodered:1.1.81"
8686
container_name: "nodered"
8787
environment:
8888
# - nodered_id=1

docker-compose-traefik-letsencrypt.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ services:
6969
- "traefik.http.routers.web.entrypoints=web,websecure"
7070
- "traefik.frontend.passHostHeader=true"
7171
- "traefik.http.routers.web.tls.certresolver=myresolver"
72-
image: "cloudhack/openflow:1.1.78"
72+
image: "cloudhack/openflow:1.1.81"
7373
container_name: "web"
7474
environment:
7575
- update_acl_based_on_groups=true
@@ -107,7 +107,7 @@ services:
107107
- "traefik.http.routers.nodered.entrypoints=web,websecure"
108108
- "traefik.http.services.nodered.loadbalancer.server.port=1880"
109109
- "traefik.http.routers.nodered.tls.certresolver=myresolver"
110-
image: "cloudhack/openflownodered:1.1.78"
110+
image: "cloudhack/openflownodered:1.1.81"
111111
container_name: "nodered"
112112
environment:
113113
# - nodered_id=1

docker-compose-traefik.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ services:
4545
- "traefik.http.routers.web.rule=Host(`localhost.openrpa.dk`)"
4646
- "traefik.http.routers.web.entrypoints=web"
4747
- "traefik.frontend.passHostHeader=true"
48-
image: "cloudhack/openflow:1.1.78"
48+
image: "cloudhack/openflow:1.1.81"
4949
container_name: "web"
5050
environment:
5151
- update_acl_based_on_groups=true
@@ -82,7 +82,7 @@ services:
8282
- "traefik.http.routers.nodered.rule=Host(`nodered1.localhost.openrpa.dk`)"
8383
- "traefik.http.routers.nodered.entrypoints=web"
8484
- "traefik.http.services.nodered.loadbalancer.server.port=1880"
85-
image: "cloudhack/openflownodered:1.1.78"
85+
image: "cloudhack/openflownodered:1.1.81"
8686
container_name: "nodered"
8787
environment:
8888
# - nodered_id=1

0 commit comments

Comments
 (0)