Skip to content

Commit 96f35f3

Browse files
committed
Add global watches to support delete
1 parent 9310f06 commit 96f35f3

10 files changed

Lines changed: 304 additions & 90 deletions

File tree

OpenFlow/src/Config.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ export class Config {
2929
Config.log_openflow_amqp = Config.parseBoolean(Config.getEnv("log_openflow_amqp", "false"));
3030
Config.log_amqp = Config.parseBoolean(Config.getEnv("log_amqp", "true"));
3131
Config.log_index_mngt = Config.parseBoolean(Config.getEnv("log_index_mngt", "true"));
32+
Config.log_watches = Config.parseBoolean(Config.getEnv("log_watches", "false"));
33+
Config.log_watches_notify = Config.parseBoolean(Config.getEnv("log_watches_notify", "true"));
34+
3235
Config.openflow_uniqueid = Config.getEnv("openflow_uniqueid", "");
3336
Config.enable_openflow_amqp = Config.parseBoolean(Config.getEnv("enable_openflow_amqp", "false"));
3437
Config.openflow_amqp_expiration = parseInt(Config.getEnv("openflow_amqp_expiration", (60 * 1000 * 25).toString())); // 25 min
@@ -193,6 +196,8 @@ export class Config {
193196
public static log_openflow_amqp: boolean = Config.parseBoolean(Config.getEnv("log_openflow_amqp", "false"));
194197
public static log_amqp: boolean = Config.parseBoolean(Config.getEnv("log_amqp", "true"));
195198
public static log_index_mngt: boolean = Config.parseBoolean(Config.getEnv("log_index_mngt", "true"));
199+
public static log_watches: boolean = Config.parseBoolean(Config.getEnv("log_watches", "false"));
200+
public static log_watches_notify: boolean = Config.parseBoolean(Config.getEnv("log_watches_notify", "true"));
196201
public static openflow_uniqueid: string = Config.getEnv("openflow_uniqueid", "");
197202
public static enable_openflow_amqp: boolean = Config.parseBoolean(Config.getEnv("enable_openflow_amqp", "false"));
198203
public static openflow_amqp_expiration: number = parseInt(Config.getEnv("openflow_amqp_expiration", (60 * 1000 * 25).toString())); // 25 min

OpenFlow/src/DatabaseConnection.ts

Lines changed: 209 additions & 8 deletions
Large diffs are not rendered by default.

OpenFlow/src/Messages/Message.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1444,6 +1444,9 @@ export class Message {
14441444
} else if (tuser != null) {
14451445
Logger.instanse.info(tuser.username + " successfully signed in");
14461446
Audit.LoginSuccess(tuser, type, "websocket", cli.remoteip, cli.clientagent, cli.clientversion, span);
1447+
const updatedoc = { _heartbeat: new Date(new Date().toISOString()), lastseen: new Date(new Date().toISOString()) };
1448+
Config.db.synRawUpdateOne("users", { _id: cli.user._id }, { $set: updatedoc, }, Config.prometheus_measure_onlineuser, null);
1449+
14471450
}
14481451
} catch (error) {
14491452
Logger.instanse.error(error);

OpenFlow/src/WebSocketServer.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ export class WebSocketServer {
3535
WebSocketServer.mongodb_watch_count.clear();
3636
for (let i = WebSocketServer._clients.length - 1; i >= 0; i--) {
3737
const cli: WebSocketServerClient = WebSocketServer._clients[i];
38-
WebSocketServer.mongodb_watch_count.bind({ ...Logger.otel.defaultlabels, clientid: cli.id, agent: cli.clientagent }).update(cli.streamcount());
38+
// WebSocketServer.mongodb_watch_count.bind({ ...Logger.otel.defaultlabels, clientid: cli.id, agent: cli.clientagent }).update(cli.streamcount());
3939
}
4040
}
4141
static configure(server: http.Server): void {
@@ -122,7 +122,7 @@ export class WebSocketServer {
122122
cli.Close();
123123
}
124124
cli.ping(span);
125-
if (!cli.connected() && cli.queuecount() == 0 && cli.streamcount() == 0) {
125+
if (!cli.connected() && cli.queuecount() == 0) { // && cli.streamcount() == 0
126126
if (cli.user != null) {
127127
Logger.instanse.info("removing disconnected client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent);
128128
span?.addEvent("removing disconnected client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent);
@@ -154,36 +154,36 @@ export class WebSocketServer {
154154
p_all[cli.clientagent] += 1;
155155
}
156156
}
157-
// Lets assume only robots register queues ( not true )
157+
const updatedoc = { _heartbeat: new Date(new Date().toISOString()), lastseen: new Date(new Date().toISOString()) };
158158
if (cli.clientagent == "openrpa") {
159159
Config.db.synRawUpdateOne("users", { _id: cli.user._id },
160-
{ $set: { _rpaheartbeat: new Date(new Date().toISOString()), _heartbeat: new Date(new Date().toISOString()) } },
160+
{ $set: { ...updatedoc, _rpaheartbeat: new Date(new Date().toISOString()) } },
161161
Config.prometheus_measure_onlineuser, null);
162162
}
163163
if (cli.clientagent == "nodered") {
164164
Config.db.synRawUpdateOne("users", { _id: cli.user._id },
165-
{ $set: { _noderedheartbeat: new Date(new Date().toISOString()), _heartbeat: new Date(new Date().toISOString()) } },
165+
{ $set: { ...updatedoc, _noderedheartbeat: new Date(new Date().toISOString()) } },
166166
Config.prometheus_measure_onlineuser, null);
167167
}
168168
if (cli.clientagent == "webapp" || cli.clientagent == "aiotwebapp") {
169169
Config.db.synRawUpdateOne("users", { _id: cli.user._id },
170-
{ $set: { _webheartbeat: new Date(new Date().toISOString()), _heartbeat: new Date(new Date().toISOString()) } },
170+
{ $set: { ...updatedoc, _webheartbeat: new Date(new Date().toISOString()) } },
171171
Config.prometheus_measure_onlineuser, null);
172172
}
173173
if (cli.clientagent == "powershell") {
174174
Config.db.synRawUpdateOne("users", { _id: cli.user._id },
175-
{ $set: { _powershellheartbeat: new Date(new Date().toISOString()), _heartbeat: new Date(new Date().toISOString()) } },
175+
{ $set: { ...updatedoc, _powershellheartbeat: new Date(new Date().toISOString()) } },
176176
Config.prometheus_measure_onlineuser, null);
177177
}
178178
if (cli.clientagent == "mobileapp" || cli.clientagent == "aiotmobileapp") {
179179
Config.db.synRawUpdateOne("users", { _id: cli.user._id },
180-
{ $set: { _webheartbeat: new Date(new Date().toISOString()), _mobilheartbeat: new Date(new Date().toISOString()), _heartbeat: new Date(new Date().toISOString()) } },
180+
{ $set: { ...updatedoc, _webheartbeat: new Date(new Date().toISOString()), _mobilheartbeat: new Date(new Date().toISOString()) } },
181181
Config.prometheus_measure_onlineuser, null);
182182
}
183183
else {
184184
// Should proberly turn this a little down, so we dont update all online users every 10th second
185185
Config.db.synRawUpdateOne("users", { _id: cli.user._id },
186-
{ $set: { _heartbeat: new Date(new Date().toISOString()) } },
186+
{ $set: updatedoc, },
187187
Config.prometheus_measure_onlineuser, null);
188188
}
189189
}

OpenFlow/src/WebSocketServerClient.ts

Lines changed: 73 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@ export class clsstream {
3939
public stream: ChangeStream;
4040
public id: string;
4141
public callback: any;
42+
aggregates: object[];
43+
collectionname: string;
4244
}
4345
export class WebSocketServerClient {
4446
public jwt: string;
45-
private _socketObject: WebSocket;
47+
public _socketObject: WebSocket;
4648
private _receiveQueue: SocketMessage[];
4749
private _sendQueue: SocketMessage[];
4850
public messageQueue: IHashTable<QueuedMessage> = {};
@@ -165,9 +167,9 @@ export class WebSocketServerClient {
165167
if (this.queuecount() > 0) {
166168
this.CloseConsumers(span);
167169
}
168-
if (this.streamcount() > 0) {
169-
this.CloseConsumers(span);
170-
}
170+
// if (this.streamcount() > 0) {
171+
// this.CloseStreams();
172+
// }
171173
return;
172174
}
173175
if (this._socketObject.readyState === this._socketObject.CLOSED || this._socketObject.readyState === this._socketObject.CLOSING) {
@@ -228,7 +230,7 @@ export class WebSocketServerClient {
228230
const span: Span = Logger.otel.startSpan("WebSocketServerClient.Close");
229231
try {
230232
await this.CloseConsumers(span);
231-
await this.CloseStreams();
233+
// await this.CloseStreams();
232234
if (this._socketObject != null) {
233235
try {
234236
this._socketObject.removeAllListeners();
@@ -579,85 +581,87 @@ export class WebSocketServerClient {
579581
const msg: Message = new Message(); msg.command = "deleteone"; msg.data = JSON.stringify(q);
580582
await this.Send<DeleteOneMessage>(msg);
581583
}
582-
streams: clsstream[] = [];
583-
public streamcount(): number {
584-
if (this.streams == null) return 0;
585-
return this.streams.length;
586-
}
587-
async CloseStreams(): Promise<void> {
588-
if (this.streams != null && this.streams.length > 0) {
589-
for (let i = this.streams.length - 1; i >= 0; i--) {
590-
try {
591-
if (this.streams[i] != null && this.streams[i].stream != null && !this.streams[i].stream.isClosed()) {
592-
await this.streams[i].stream.close();
593-
}
594-
this.streams.splice(i, 1);
595-
} catch (error) {
596-
Logger.instanse.error("WebSocketclient::CloseStreams " + error + " " + this.id + "/" + this.clientagent);
597-
}
598-
}
599-
}
600-
}
601-
async CloseStream(id: string): Promise<void> {
602-
if (this.streams != null && this.streams.length > 0) {
603-
for (let i = this.streams.length - 1; i >= 0; i--) {
604-
try {
605-
if (this.streams[i] != null && this.streams[i].id == id) {
606-
if (!this.streams[i].stream.isClosed()) await this.streams[i].stream.close();
607-
this.streams.splice(i, 1);
608-
}
609-
} catch (error) {
610-
Logger.instanse.error("WebSocketclient::CloseStream " + error + " " + this.id + "/" + this.clientagent);
611-
}
612-
}
613-
}
614-
WebSocketServer.update_mongodb_watch_count(this);
615-
}
584+
// streams: clsstream[] = [];
585+
// public streamcount(): number {
586+
// if (this.streams == null) return 0;
587+
// return this.streams.length;
588+
// }
589+
// async CloseStreams(): Promise<void> {
590+
// if (this.streams != null && this.streams.length > 0) {
591+
// for (let i = this.streams.length - 1; i >= 0; i--) {
592+
// try {
593+
// if (this.streams[i] != null && this.streams[i].stream != null && !this.streams[i].stream.isClosed()) {
594+
// await this.streams[i].stream.close();
595+
// }
596+
// this.streams.splice(i, 1);
597+
// } catch (error) {
598+
// Logger.instanse.error("WebSocketclient::CloseStreams " + error + " " + this.id + "/" + this.clientagent);
599+
// }
600+
// }
601+
// }
602+
// }
603+
// async CloseStream(id: string): Promise<void> {
604+
// if (this.streams != null && this.streams.length > 0) {
605+
// for (let i = this.streams.length - 1; i >= 0; i--) {
606+
// try {
607+
// if (this.streams[i] != null && this.streams[i].id == id) {
608+
// if (!this.streams[i].stream.isClosed()) await this.streams[i].stream.close();
609+
// this.streams.splice(i, 1);
610+
// }
611+
// } catch (error) {
612+
// Logger.instanse.error("WebSocketclient::CloseStream " + error + " " + this.id + "/" + this.clientagent);
613+
// }
614+
// }
615+
// }
616+
// WebSocketServer.update_mongodb_watch_count(this);
617+
// }
616618
async UnWatch(id: string, jwt: string): Promise<void> {
617619
if (this.watches[id]) {
618-
this.CloseStream(this.watches[id].streamid);
620+
// this.CloseStream(this.watches[id].streamid);
619621
delete this.watches[id];
620622
}
621623
}
622624
public watches: IHashTable<ClientWatch> = {};
623625
async Watch(aggregates: object[], collectionname: string, jwt: string, id: string = null): Promise<string> {
624626
const stream: clsstream = new clsstream();
625627
stream.id = NoderedUtil.GetUniqueIdentifier();
626-
stream.stream = await Config.db.watch(aggregates, collectionname, jwt);
627-
this.streams.push(stream);
628+
stream.collectionname = collectionname;
629+
stream.aggregates = aggregates;
630+
// stream.stream = await Config.db.watch(aggregates, collectionname, jwt);
631+
// this.streams.push(stream);
628632
if (id == null) id = NoderedUtil.GetUniqueIdentifier();
629633

630-
const options = { fullDocument: "updateLookup" };
631-
const me = this;
632-
try {
633-
(stream.stream as any).on("error", err => {
634-
console.error(err);
635-
});
636-
(stream.stream as any).on("change", next => {
637-
try {
638-
Logger.instanse.info("Watch: " + JSON.stringify(next.documentKey));
639-
const msg: SocketMessage = SocketMessage.fromcommand("watchevent");
640-
const q = new WatchEventMessage();
641-
q.id = id;
642-
q.result = next;
643-
if (q.result && q.result.fullDocument) {
644-
q.result.fullDocument = Config.db.decryptentity(q.result.fullDocument);
645-
}
646-
msg.data = JSON.stringify(q);
647-
me._socketObject.send(msg.tojson());
648-
} catch (error) {
649-
Logger.instanse.error("WebSocketclient::Watch::changeListener " + error + " " + this.id + "/" + this.clientagent);
650-
}
651-
}, options);
634+
// const options = { fullDocument: "updateLookup" };
635+
// const me = this;
636+
// try {
637+
// (stream.stream as any).on("error", err => {
638+
// console.error(err);
639+
// });
640+
// (stream.stream as any).on("change", next => {
641+
// try {
642+
// // Logger.instanse.info("Watch: " + JSON.stringify(next.documentKey));
643+
// // const msg: SocketMessage = SocketMessage.fromcommand("watchevent");
644+
// // const q = new WatchEventMessage();
645+
// // q.id = id;
646+
// // q.result = next;
647+
// // if (q.result && q.result.fullDocument) {
648+
// // q.result.fullDocument = Config.db.decryptentity(q.result.fullDocument);
649+
// // }
650+
// // msg.data = JSON.stringify(q);
651+
// // me._socketObject.send(msg.tojson());
652+
// } catch (error) {
653+
// Logger.instanse.error("WebSocketclient::Watch::changeListener " + error + " " + this.id + "/" + this.clientagent);
654+
// }
655+
// }, options);
652656
WebSocketServer.update_mongodb_watch_count(this);
653657
this.watches[id] = {
654-
aggregates, collectionname, streamid: stream.id
658+
aggregates, collectionname //, streamid: stream.id
655659
} as ClientWatch;
656660
return id;
657-
} catch (error) {
658-
Logger.instanse.error("WebSocketclient::Watch " + error + " " + this.id + "/" + this.clientagent);
659-
throw error;
660-
}
661+
// } catch (error) {
662+
// Logger.instanse.error("WebSocketclient::Watch " + error + " " + this.id + "/" + this.clientagent);
663+
// throw error;
664+
// }
661665
}
662666
}
663667
export class ClientWatch {

OpenFlow/src/public/Controllers.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { WebSocketClientService } from "./WebSocketClientService";
66
import * as jsondiffpatch from "jsondiffpatch";
77
import * as ofurl from "./formsio_of_provider";
88

9+
910
declare let $: any;
1011

1112
function treatAsUTC(date): number {

OpenFlowNodeRED/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openiap/nodered",
3-
"version": "1.3.96",
3+
"version": "1.3.97",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {

OpenFlowNodeRED/src/nodered/nodes/api.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1643,7 +1643,7 @@
16431643
icon: "font-awesome/fa-bar-chart",
16441644
defaults: {
16451645
name: { value: "" },
1646-
aggregates: { value: '[{ "$match": { "fullDocument._type": "test" } }]' },
1646+
aggregates: { value: "[\"$.[?(@._type == 'test')]\"]" },
16471647
collection: { value: "entities" }
16481648
},
16491649
inputs: 0,

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.3.96
1+
1.3.97

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openiap/openflow",
3-
"version": "1.3.96",
3+
"version": "1.3.97",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {

0 commit comments

Comments
 (0)