Skip to content

Commit 528f4e0

Browse files
committed
Add cache experiment
1 parent 7486f3c commit 528f4e0

9 files changed

Lines changed: 198 additions & 145 deletions

File tree

OpenFlow.code-workspace

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
{
44
"name": "OpenFlow",
55
"path": "."
6+
},
7+
{
8+
"path": "..\\openflow-api"
69
}
710
]
811
}

OpenFlowNodeRED/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "openflow-nodered",
3-
"version": "1.1.10",
3+
"version": "1.1.11",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {
@@ -39,7 +39,7 @@
3939
"morgan": "^1.10.0",
4040
"node-red": "^1.1.3",
4141
"node-red-node-email": "^1.7.8",
42-
"openflow-api": "^1.0.24",
42+
"openflow-api": "^1.0.25",
4343
"os-service": "^2.2.0",
4444
"passport-saml": "^1.3.4",
4545
"passport-saml-metadata": "^2.3.0",

OpenFlowNodeRED/src/node-red-contrib-openflow-storage.ts

Lines changed: 67 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,9 @@ export class noderedcontribopenflowstorage {
387387
const filename: string = Config.nodered_id + "_flows.json";
388388
await backupStore.set(filename, JSON.stringify(flows));
389389
if (WebSocketClient.instance.isConnected()) {
390+
this.last_reload = new Date();
390391
var result = await NoderedUtil.Query("nodered", { _type: "flow", nodered_id: Config.nodered_id }, null, null, 1, 0, null);
392+
this.last_reload = new Date();
391393
if (result.length === 0) {
392394
var item: any = {
393395
name: "flows for " + Config.nodered_id,
@@ -448,7 +450,9 @@ export class noderedcontribopenflowstorage {
448450
const filename: string = Config.nodered_id + "_credentials";
449451
await backupStore.set(filename, noderedcontribopenflowstorage.encrypt(JSON.stringify(credentials)));
450452
if (WebSocketClient.instance.isConnected()) {
453+
this.last_reload = new Date();
451454
var result = await NoderedUtil.Query("nodered", { _type: "credential", nodered_id: Config.nodered_id }, null, null, 1, 0, null);
455+
this.last_reload = new Date();
452456
var credentialsarray = [];
453457
var orgkeys = Object.keys(credentials);
454458
for (var i = 0; i < orgkeys.length; i++) {
@@ -556,6 +560,7 @@ export class noderedcontribopenflowstorage {
556560
if (error.stdout) this._logger.error("npm install stdout: " + error.stdout);
557561
}
558562
}
563+
this.last_reload = new Date();
559564
this._logger.silly("noderedcontribopenflowstorage::_getSettings: return result");
560565
} catch (error) {
561566
if (error.message) { this._logger.error(error.message); }
@@ -583,35 +588,36 @@ export class noderedcontribopenflowstorage {
583588
try {
584589
await this.CheckUpdates();
585590
} catch (error) {
586-
console.error(error);
591+
this._logger.error(error);
587592
}
588593
await NoderedUtil.Watch("nodered", [{ "$match": { "fullDocument.nodered_id": Config.nodered_id } }], WebSocketClient.instance.jwt, this.onupdate.bind(this));
589594
} else {
590595
setTimeout(this.CheckUpdates.bind(this), Config.flow_refresh_initial_interval);
591596
}
592597
} catch (error) {
593-
console.error(error);
598+
this._logger.error(error);
594599
}
595600
});
596601
}
597602
} catch (error) {
598-
console.error(error);
603+
this._logger.error(error);
599604
}
600605
return settings;
601606
}
602607
public last_reload: Date = new Date();
608+
public bussy: boolean = false;
603609
public async onupdate(msg: any) {
604610
let update: boolean = false;
605611
let entity: Base = msg.fullDocument;
606612

607613
var begin: number = this.last_reload.getTime();
608614
var end: number = new Date().getTime();
609615
var seconds = Math.round((end - begin) / 1000);
610-
if (seconds < 2) {
611-
console.log("**************************************************");
612-
console.log("* " + entity._type);
613-
console.log("* Skip, less than 2 seconds since last update " + seconds);
614-
console.log("**************************************************");
616+
if (seconds < 2 || this.bussy) {
617+
this._logger.info("**************************************************");
618+
this._logger.info("* " + entity._type);
619+
this._logger.info("* Skip, less than 2 seconds since last update " + seconds + " or is bussy");
620+
this._logger.info("**************************************************");
615621
return;
616622
}
617623
if (entity._type == "flow") {
@@ -643,97 +649,72 @@ export class noderedcontribopenflowstorage {
643649
}
644650
let oldsettings: any = null;
645651
if (this._settings != null) {
646-
oldsettings = JSON.parse(JSON.stringify(this._settings));
647-
let newsettings = (entity as any).settings;
648-
newsettings = JSON.parse(newsettings);
652+
this.bussy = true;
653+
try {
654+
oldsettings = JSON.parse(JSON.stringify(this._settings));
655+
let newsettings = (entity as any).settings;
656+
newsettings = JSON.parse(newsettings);
649657

650-
var keys = Object.keys(oldsettings.nodes);
651-
var modules = {};
652-
for (var i = 0; i < keys.length; i++) {
653-
var key = keys[i];
654-
if (key != "node-red") {
655-
var val = oldsettings.nodes[key];
656-
try {
657-
if (newsettings.nodes[key] == null) {
658-
console.log("Remove module " + key + "@" + val.version);
659-
await this.RED.runtime.nodes.removeModule({ user: "admin", module: key, version: val.version });
660-
} else if (newsettings.nodes[key].version != oldsettings.nodes[key].version) {
661-
console.log("Install module " + key + "@" + newsettings.nodes[key].version + " up from " + oldsettings.nodes[key].version);
662-
await this.RED.runtime.nodes.addModule({ user: "admin", module: key, version: newsettings.nodes[key].version });
658+
var keys = Object.keys(oldsettings.nodes);
659+
var modules = {};
660+
for (var i = 0; i < keys.length; i++) {
661+
var key = keys[i];
662+
if (key != "node-red") {
663+
var val = oldsettings.nodes[key];
664+
try {
665+
if (newsettings.nodes[key] == null) {
666+
this._logger.info("Remove module " + key + "@" + val.version);
667+
await this.RED.runtime.nodes.removeModule({ user: "admin", module: key, version: val.version });
668+
} else if (newsettings.nodes[key].version != oldsettings.nodes[key].version) {
669+
this._logger.info("Install module " + key + "@" + newsettings.nodes[key].version + " up from " + oldsettings.nodes[key].version);
670+
await this.RED.runtime.nodes.addModule({ user: "admin", module: key, version: newsettings.nodes[key].version });
671+
}
672+
} catch (error) {
673+
this._logger.error((error.message ? error.message : error));
663674
}
664-
} catch (error) {
665-
console.error((error.message ? error.message : error));
666675
}
667676
}
668-
}
669-
var keys = Object.keys(newsettings.nodes);
670-
for (var i = 0; i < keys.length; i++) {
671-
var key = keys[i];
672-
if (key != "node-red") {
673-
var val = newsettings.nodes[key];
674-
try {
675-
if (oldsettings.nodes[key] == null) {
676-
console.log("Install new module " + key + "@" + val.version);
677-
await this.RED.runtime.nodes.addModule({ user: "admin", module: key, version: val.version });
678-
} else if (newsettings.nodes[key].version != oldsettings.nodes[key].version) {
679-
console.log("Install module " + key + "@" + newsettings.nodes[key].version + " up from " + oldsettings.nodes[key].version);
680-
await this.RED.runtime.nodes.addModule({ user: "admin", module: key, version: val.version });
677+
var keys = Object.keys(newsettings.nodes);
678+
for (var i = 0; i < keys.length; i++) {
679+
var key = keys[i];
680+
if (key != "node-red") {
681+
var val = newsettings.nodes[key];
682+
try {
683+
if (oldsettings.nodes[key] == null) {
684+
this._logger.info("Install new module " + key + "@" + val.version);
685+
await this.RED.runtime.nodes.addModule({ user: "admin", module: key, version: val.version });
686+
} else if (newsettings.nodes[key].version != oldsettings.nodes[key].version) {
687+
this._logger.info("Install module " + key + "@" + newsettings.nodes[key].version + " up from " + oldsettings.nodes[key].version);
688+
await this.RED.runtime.nodes.addModule({ user: "admin", module: key, version: val.version });
689+
}
690+
} catch (error) {
691+
this._logger.error((error.message ? error.message : error));
681692
}
682-
} catch (error) {
683-
console.error((error.message ? error.message : error));
684693
}
685694
}
686-
}
687695

688-
if (this.DiffObjects(newsettings, oldsettings)) {
696+
if (this.DiffObjects(newsettings, oldsettings)) {
697+
update = true;
698+
}
699+
} catch (error) {
700+
this._logger.error(error);
689701
update = true;
690702
}
703+
this.bussy = false;
691704
} else {
692705
update = true;
693706
}
694707
} else {
695-
console.log("**************************************************");
696-
console.log("* Unknown type " + entity._type + " last updated " + seconds + " seconds ago");
697-
console.log("**************************************************");
698-
708+
this._logger.info("**************************************************");
709+
this._logger.info("* Unknown type " + entity._type + " last updated " + seconds + " seconds ago");
710+
this._logger.info("**************************************************");
699711
}
700-
// if (donpm) {
701-
// this._settings = null;
702-
// this._settings = await this.getSettings();
703-
// }
704712
if (update) {
705-
// this.RED.nodes.startFlows();
706-
// this.RED.nodes.stopFlows();
707-
// this.RED.runtime.addModule("")
708-
709-
// try {
710-
// await this.RED.nodes.addModule("test-module");
711-
// } catch (error) {
712-
// console.error(error);
713-
// }
714-
// try {
715-
// var opts = {
716-
// user: "admin",
717-
// module: "node-red-contrib-rate",
718-
// version: "1.4.0"
719-
// }
720-
// await this.RED.runtime.nodes.addModule(opts);
721-
// } catch (error) {
722-
// console.error(error);
723-
// }
724-
// try {
725-
// await this.RED.nodes.load();
726-
// } catch (error) {
727-
// console.error(error);
728-
// }
729-
730-
731-
732713
this.last_reload = new Date();
733-
console.log("**************************************************");
734-
console.log("* " + entity._type);
735-
console.log("* loadFlows last updated " + seconds + " seconds ago");
736-
console.log("**************************************************");
714+
this._logger.info("**************************************************");
715+
this._logger.info("* " + entity._type);
716+
this._logger.info("* loadFlows last updated " + seconds + " seconds ago");
717+
this._logger.info("**************************************************");
737718
await this.RED.nodes.loadFlows(true);
738719
}
739720
}
@@ -743,7 +724,9 @@ export class noderedcontribopenflowstorage {
743724
const filename: string = Config.nodered_id + "_settings";
744725
await backupStore.set(filename, JSON.stringify(settings));
745726
if (WebSocketClient.instance.isConnected()) {
727+
this.last_reload = new Date();
746728
var result = await NoderedUtil.Query("nodered", { _type: "setting", nodered_id: Config.nodered_id }, null, null, 1, 0, null);
729+
this.last_reload = new Date();
747730
if (result.length === 0) {
748731
var item: any = {
749732
name: "settings for " + Config.nodered_id,
@@ -792,7 +775,9 @@ export class noderedcontribopenflowstorage {
792775
const filename: string = Config.nodered_id + "_sessions";
793776
await backupStore.set(filename, JSON.stringify(sessions));
794777
if (WebSocketClient.instance.isConnected()) {
778+
this.last_reload = new Date();
795779
var result = await NoderedUtil.Query("nodered", { _type: "session", nodered_id: Config.nodered_id }, null, null, 1, 0, null);
780+
this.last_reload = new Date();
796781
if (result.length === 0) {
797782
var item: any = {
798783
name: "sessions for " + Config.nodered_id,

OpenFlowNodeRED/src/nodered/nodes/amqp_nodes.ts

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,24 +76,30 @@ export class amqp_consumer_node {
7676
public host: string = null;
7777
public localqueue: string = "";
7878
private connection: amqp_connection;
79+
private _onsignedin: any = null;
80+
private _onsocketclose: any = null;
7981
constructor(public config: Iamqp_consumer_node) {
8082
RED.nodes.createNode(this, config);
8183
try {
8284
this.node = this;
8385
this.node.status({});
8486
this.node.on("close", this.onclose);
8587
this.connection = RED.nodes.getNode(this.config.config);
86-
this.websocket().events.on("onsignedin", this.onsignedin.bind(this));
87-
this.websocket().events.on("onclose", this.onsocketclose.bind(this));
88-
this.connect();
88+
this._onsignedin = this.onsignedin.bind(this);
89+
this._onsocketclose = this.onsocketclose.bind(this);
90+
this.websocket().events.on("onsignedin", this._onsignedin);
91+
this.websocket().events.on("onclose", this._onsocketclose);
92+
if (this.websocket().isConnected && this.websocket().user != null) {
93+
this.connect();
94+
}
8995
} catch (error) {
9096
NoderedUtil.HandleError(this, error);
9197
}
9298
}
9399
onsocketclose(message) {
94100
if (message == null) message = "";
95101
this.node.status({ fill: "red", shape: "dot", text: "Disconnected " + message });
96-
this.onclose(false, null);
102+
// this.onclose(false, null);
97103
}
98104
onsignedin() {
99105
this.connect();
@@ -140,6 +146,8 @@ export class amqp_consumer_node {
140146
NoderedUtil.CloseQueue(this.websocket(), this.localqueue);
141147
this.localqueue = "";
142148
}
149+
this.websocket().events.removeListener("onsignedin", this._onsignedin);
150+
this.websocket().events.removeListener("onclose", this._onsocketclose);
143151
if (done != null) done();
144152
}
145153
}
@@ -159,6 +167,8 @@ export class amqp_publisher_node {
159167
public host: string = null;
160168
public localqueue: string = "";
161169
private connection: amqp_connection;
170+
private _onsignedin: any = null;
171+
private _onsocketclose: any = null;
162172
constructor(public config: Iamqp_publisher_node) {
163173
RED.nodes.createNode(this, config);
164174
try {
@@ -168,9 +178,14 @@ export class amqp_publisher_node {
168178
this.node.on("close", this.onclose);
169179

170180
this.connection = RED.nodes.getNode(this.config.config);
171-
this.websocket().events.on("onsignedin", this.onsignedin.bind(this));
172-
this.websocket().events.on("onclose", this.onsocketclose.bind(this));
173-
this.connect();
181+
this._onsignedin = this.onsignedin.bind(this);
182+
this._onsocketclose = this.onsocketclose.bind(this);
183+
this.websocket().events.on("onsignedin", this._onsignedin);
184+
this.websocket().events.on("onclose", this._onsocketclose);
185+
186+
if (this.websocket().isConnected && this.websocket().user != null) {
187+
this.connect();
188+
}
174189
} catch (error) {
175190
NoderedUtil.HandleError(this, error);
176191
}
@@ -181,7 +196,7 @@ export class amqp_publisher_node {
181196
onsocketclose(message) {
182197
if (message == null) message = "";
183198
this.node.status({ fill: "red", shape: "dot", text: "Disconnected " + message });
184-
this.onclose(false, null);
199+
// this.onclose(false, null);
185200
}
186201
websocket(): WebSocketClient {
187202
if (this.connection != null) {
@@ -260,6 +275,9 @@ export class amqp_publisher_node {
260275
NoderedUtil.CloseQueue(this.websocket(), this.localqueue);
261276
this.localqueue = "";
262277
}
278+
this.websocket().events.removeListener("onsignedin", this._onsignedin);
279+
this.websocket().events.removeListener("onclose", this._onsocketclose);
280+
263281
if (done != null) done();
264282
}
265283
}

0 commit comments

Comments
 (0)