Skip to content

Commit ec209ae

Browse files
committed
Fix issue with Global Watch
1 parent d8c5243 commit ec209ae

1 file changed

Lines changed: 38 additions & 29 deletions

File tree

OpenFlow/src/DatabaseConnection.ts

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,13 @@ export class DatabaseConnection extends events.EventEmitter {
8787
public static mongodb_delete: ValueRecorder;
8888
public static mongodb_deletemany: ValueRecorder;
8989
public static semaphore = Auth.Semaphore(1);
90-
constructor(mongodburl: string, dbname: string) {
90+
91+
public registerGlobalWatches: boolean = true;
92+
constructor(mongodburl: string, dbname: string, registerGlobalWatches: boolean) {
9193
super();
9294
this._dbname = dbname;
9395
this.mongodburl = mongodburl;
96+
if (!NoderedUtil.IsNullEmpty(registerGlobalWatches)) this.registerGlobalWatches = registerGlobalWatches;
9497

9598
if (!NoderedUtil.IsNullUndefinded(Logger.otel)) {
9699
DatabaseConnection.mongodb_query = Logger.otel.meter.createValueRecorder('openflow_mongodb_query_seconds', {
@@ -221,6 +224,7 @@ export class DatabaseConnection extends events.EventEmitter {
221224
this.emit("connected");
222225
}
223226
registerGlobalWatch(collectionname: string, parent: Span) {
227+
if (!this.registerGlobalWatches) return;
224228
const span: Span = Logger.otel.startSubSpan("registerGlobalWatch", parent);
225229
try {
226230
span?.setAttribute("collectionname", collectionname);
@@ -239,33 +243,6 @@ export class DatabaseConnection extends events.EventEmitter {
239243
});
240244
(stream.stream as any).on("change", async (next) => {
241245
try {
242-
if (collectionname == "mq") {
243-
Auth.clearCache("watch detected change in " + collectionname + " collection for a " + _type + " " + item.name);
244-
}
245-
if (collectionname == "users" && (_type == "user" || _type == "role" || _type == "customer")) {
246-
Auth.clearCache("watch detected change in " + collectionname + " collection for a " + _type + " " + item.name);
247-
}
248-
if (collectionname == "config" && (_type == "provider" || _type == "restriction" || _type == "resource")) {
249-
Auth.clearCache("watch detected change in " + collectionname + " collection for a " + _type + " " + item.name);
250-
}
251-
if (collectionname == "config" && _type == "provider") {
252-
await LoginProvider.RegisterProviders(WebServer.app, Config.baseurl());
253-
}
254-
let doContinue: boolean = false;
255-
for (let i = 0; i < WebSocketServer._clients.length; i++) {
256-
let client = WebSocketServer._clients[i];
257-
if (NoderedUtil.IsNullUndefinded(client.user)) continue;
258-
let ids = Object.keys(client.watches);
259-
for (let y = 0; y < ids.length; y++) {
260-
var stream = client.watches[ids[y]];
261-
if (stream.collectionname != collectionname) continue;
262-
doContinue = true;
263-
break;
264-
}
265-
if (doContinue == true) break;
266-
}
267-
if (!doContinue) return;
268-
269246
var _id = next.documentKey._id;
270247
if (next.operationType == 'update' && collectionname == "users") {
271248
if (next.updateDescription.updatedFields.hasOwnProperty("_heartbeat")) return;
@@ -277,14 +254,46 @@ export class DatabaseConnection extends events.EventEmitter {
277254
// console.log(next.updateDescription.updatedFields);
278255
}
279256
var item = next.fullDocument;
257+
var _type = "";
258+
if (!NoderedUtil.IsNullUndefinded(item)) {
259+
_type = item._type;
260+
261+
if (collectionname == "mq") {
262+
Auth.clearCache("watch detected change in " + collectionname + " collection for a " + _type + " " + item.name);
263+
}
264+
if (collectionname == "users" && (_type == "user" || _type == "role" || _type == "customer")) {
265+
Auth.clearCache("watch detected change in " + collectionname + " collection for a " + _type + " " + item.name);
266+
}
267+
if (collectionname == "config" && (_type == "provider" || _type == "restriction" || _type == "resource")) {
268+
Auth.clearCache("watch detected change in " + collectionname + " collection for a " + _type + " " + item.name);
269+
}
270+
if (collectionname == "config" && _type == "provider") {
271+
await LoginProvider.RegisterProviders(WebServer.app, Config.baseurl());
272+
}
273+
}
274+
let doContinue: boolean = false;
275+
if (WebSocketServer._clients)
276+
for (let i = 0; i < WebSocketServer._clients.length; i++) {
277+
let client = WebSocketServer._clients[i];
278+
if (NoderedUtil.IsNullUndefinded(client.user)) continue;
279+
let ids = Object.keys(client.watches);
280+
for (let y = 0; y < ids.length; y++) {
281+
var stream = client.watches[ids[y]];
282+
if (stream.collectionname != collectionname) continue;
283+
doContinue = true;
284+
break;
285+
}
286+
if (doContinue == true) break;
287+
}
288+
if (!doContinue) return;
289+
280290
if (NoderedUtil.IsNullEmpty(item)) item = await this.GetLatestDocumentVersion({ collectionname, id: _id, jwt: Crypt.rootToken() }, null);
281291
if (NoderedUtil.IsNullEmpty(item)) {
282292
Logger.instanse.error("Missing fullDocument and could not find historic version for " + _id + " in " + collectionname);
283293
return;
284294
} else {
285295
if (Config.log_watches) Logger.instanse.verbose("[" + collectionname + "][" + next.operationType + "] " + _id + " " + item.name);
286296
}
287-
var _type = item._type;
288297
try {
289298
for (var i = 0; i < WebSocketServer._clients.length; i++) {
290299
var client = WebSocketServer._clients[i];

0 commit comments

Comments
 (0)