Skip to content

Commit 0e36de1

Browse files
committed
Add mongodb watch to api and nodered
1 parent 95c70d3 commit 0e36de1

14 files changed

Lines changed: 376 additions & 16 deletions

File tree

OpenFlow/.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ package-lock.json
22
node_modules
33
logs
44
dist
5-
*grafana-proxy*
5+
*grafana-proxy*
6+
*license*

OpenFlow/src/Config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ export class Config {
8484
Config.nodered_initial_liveness_delay = parseInt(Config.getEnv("nodered_initial_liveness_delay", "60"));
8585
}
8686
public static db: DatabaseConnection = null;
87+
public static license_key: string = Config.getEnv("license_key", "");
8788
public static version: string = Config.getversion();
8889
public static logpath: string = Config.getEnv("logpath", __dirname);
8990
public static log_queries: boolean = Config.parseBoolean(Config.getEnv("log_queries", "false"));

OpenFlow/src/Crypt.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ export class Crypt {
8080
return token;
8181
}
8282
static verityToken(token: string): TokenUser {
83+
if (NoderedUtil.IsNullEmpty(token)) {
84+
throw new Error('jwt must be provided');
85+
}
8386
var o: any = jsonwebtoken.verify(token, Crypt.encryption_key);
8487
o.data = TokenUser.assign(o.data);
8588
return o.data;

OpenFlow/src/DatabaseConnection.ts

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import {
2-
ObjectID, Db, Binary, InsertOneWriteOpResult, DeleteWriteOpResultObject, ObjectId, MapReduceOptions, CollectionInsertOneOptions, UpdateWriteOpResult, WriteOpResult, GridFSBucket, ReadPreference
2+
ObjectID, Db, Binary, InsertOneWriteOpResult, DeleteWriteOpResultObject, ObjectId, MapReduceOptions, CollectionInsertOneOptions, UpdateWriteOpResult, WriteOpResult, GridFSBucket, ReadPreference, ChangeStream
33
} from "mongodb";
44
import { MongoClient } from "mongodb";
55
import winston = require("winston");
@@ -414,6 +414,50 @@ export class DatabaseConnection {
414414
DatabaseConnection.traversejsondecode(items);
415415
return items;
416416
}
417+
/**
418+
* Do MongoDB watch
419+
* @param {any} aggregates
420+
* @param {string} collectionname
421+
* @param {string} jwt
422+
* @returns Promise
423+
*/
424+
async watch<T extends Base>(aggregates: object[], collectionname: string, jwt: string): Promise<ChangeStream> {
425+
await this.connect();
426+
427+
var json: any = aggregates;
428+
if (typeof json !== 'string' && !(json instanceof String)) {
429+
json = JSON.stringify(json, (key, value) => {
430+
if (value instanceof RegExp)
431+
return ("__REGEXP " + value.toString());
432+
else
433+
return value;
434+
});
435+
}
436+
437+
aggregates = JSON.parse(json, (key, value) => {
438+
if (typeof value === 'string' && value.match(isoDatePattern)) {
439+
return new Date(value); // isostring, so cast to js date
440+
} else if (value != null && value != undefined && value.toString().indexOf("__REGEXP ") == 0) {
441+
var m = value.split("__REGEXP ")[1].match(/\/(.*)\/(.*)?/);
442+
return new RegExp(m[1], m[2] || "");
443+
} else
444+
return value; // leave any other value as-is
445+
});
446+
447+
// TODO: Should we filter on rights other than read ? should a person with reade be allowed to know when it was updated ?
448+
// a person with read, would beablt to know anyway, so guess read should be enough for now ...
449+
var base = this.getbasequery(jwt, "fullDocument._acl", [Rights.read]);
450+
if (Array.isArray(aggregates)) {
451+
aggregates.unshift({ $match: base });
452+
} else {
453+
if (NoderedUtil.IsNullUndefinded(aggregates)) {
454+
aggregates = [{ $match: base }];
455+
} else {
456+
aggregates = [{ $match: base }, aggregates];
457+
}
458+
}
459+
return await this.db.collection(collectionname).watch(aggregates);
460+
}
417461
/**
418462
* Do MongoDB map reduce
419463
* @param {any} aggregates

OpenFlow/src/Messages/Message.ts

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { Readable, Stream } from "stream";
1111
import { GridFSBucket, ObjectID, Db, Cursor, MongoNetworkError } from "mongodb";
1212
import * as path from "path";
1313
import { DatabaseConnection } from "../DatabaseConnection";
14-
import { StripeMessage, EnsureStripeCustomerMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, CreateWorkflowInstanceMessage, RegisterUserMessage, NoderedUser } from "openflow-api";
14+
import { StripeMessage, EnsureStripeCustomerMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, CreateWorkflowInstanceMessage, RegisterUserMessage, NoderedUser, WatchMessage } from "openflow-api";
1515
import { Billing, stripe_customer, stripe_base, stripe_list, StripeAddPlanMessage, StripeCancelPlanMessage, stripe_subscription, stripe_subscription_item, stripe_plan, stripe_coupon } from "openflow-api";
1616
import { V1ResourceRequirements, V1Deployment } from "@kubernetes/client-node";
1717
import { amqpwrapper } from "../amqpwrapper";
@@ -90,6 +90,12 @@ export class Message {
9090
case "aggregate":
9191
this.Aggregate(cli);
9292
break;
93+
case "watch":
94+
this.Watch(cli);
95+
break;
96+
case "unwatch":
97+
this.UnWatch(cli);
98+
break;
9399
case "insertone":
94100
this.InsertOne(cli);
95101
break;
@@ -404,7 +410,11 @@ export class Message {
404410
try {
405411
msg = QueryMessage.assign(this.data);
406412
if (NoderedUtil.IsNullEmpty(msg.jwt)) { msg.jwt = cli.jwt; }
407-
msg.result = await Config.db.query(msg.query, msg.projection, msg.top, msg.skip, msg.orderby, msg.collectionname, msg.jwt, msg.queryas);
413+
if (NoderedUtil.IsNullEmpty(msg.jwt)) {
414+
msg.error = "Access denied, not signed in";
415+
} else {
416+
msg.result = await Config.db.query(msg.query, msg.projection, msg.top, msg.skip, msg.orderby, msg.collectionname, msg.jwt, msg.queryas);
417+
}
408418
} catch (error) {
409419
cli._logger.error(error);
410420
if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; }
@@ -439,6 +449,48 @@ export class Message {
439449
}
440450
this.Send(cli);
441451
}
452+
private async UnWatch(cli: WebSocketServerClient): Promise<void> {
453+
this.Reply();
454+
var msg: WatchMessage
455+
try {
456+
msg = WatchMessage.assign(this.data);
457+
if (NoderedUtil.IsNullEmpty(msg.jwt)) { msg.jwt = cli.jwt; }
458+
await cli.UnWatch(msg.id, msg.jwt);
459+
msg.result = null;
460+
} catch (error) {
461+
if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; }
462+
if (msg !== null && msg !== undefined) msg.error = error.toString();
463+
cli._logger.error(error);
464+
}
465+
try {
466+
this.data = JSON.stringify(msg);
467+
} catch (error) {
468+
this.data = "";
469+
cli._logger.error(error);
470+
}
471+
this.Send(cli);
472+
}
473+
private async Watch(cli: WebSocketServerClient): Promise<void> {
474+
this.Reply();
475+
var msg: WatchMessage
476+
try {
477+
msg = WatchMessage.assign(this.data);
478+
if (NoderedUtil.IsNullEmpty(msg.jwt)) { msg.jwt = cli.jwt; }
479+
msg.id = await cli.Watch(msg.aggregates, msg.collectionname, msg.jwt);
480+
msg.result = null;
481+
} catch (error) {
482+
if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; }
483+
if (msg !== null && msg !== undefined) msg.error = error.toString();
484+
cli._logger.error(error);
485+
}
486+
try {
487+
this.data = JSON.stringify(msg);
488+
} catch (error) {
489+
this.data = "";
490+
cli._logger.error(error);
491+
}
492+
this.Send(cli);
493+
}
442494
private async InsertOne(cli: WebSocketServerClient): Promise<void> {
443495
this.Reply();
444496
var msg: InsertOneMessage
@@ -720,7 +772,7 @@ export class Message {
720772
user.device = msg.device;
721773
}
722774
if (msg.validate_only !== true) {
723-
cli._logger.debug(tuser.username + " signed in using " + type);
775+
cli._logger.debug(tuser.username + " signed in using " + type + " " + cli.id + "/" + cli.clientagent);
724776
cli.jwt = msg.jwt;
725777
cli.user = user;
726778
} else {

OpenFlow/src/WebSocketServer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ export class WebSocketServer {
7373
cli.Close();
7474
}
7575
cli.ping();
76-
if (!cli.connected() && cli.queuecount() == 0) {
76+
if (!cli.connected() && cli.queuecount() == 0 && cli.streamcount() == 0) {
7777
if (cli.user != null) {
7878
WebSocketServer._logger.info("removing disconnected client " + cli.id + "/" + cli.user.name + "/" + cli.clientagent);
7979
} else {

OpenFlow/src/WebSocketServerClient.ts

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import { SocketMessage } from "./SocketMessage";
44
import { Message, JSONfn } from "./Messages/Message";
55
import { Config } from "./Config";
66
import { amqpwrapper, QueueMessageOptions, amqpqueue } from "./amqpwrapper";
7-
import { NoderedUtil, Base, InsertOneMessage, QueueMessage, MapReduceMessage, QueryMessage, UpdateOneMessage, UpdateManyMessage, DeleteOneMessage, User, mapFunc, reduceFunc, finalizeFunc, QueuedMessage, QueuedMessageCallback } from "openflow-api";
7+
import { NoderedUtil, Base, InsertOneMessage, QueueMessage, MapReduceMessage, QueryMessage, UpdateOneMessage, UpdateManyMessage, DeleteOneMessage, User, mapFunc, reduceFunc, finalizeFunc, QueuedMessage, QueuedMessageCallback, WatchEventMessage } from "openflow-api";
8+
import { ChangeStream } from "mongodb";
89
// import { Mutex } from "./Mutex";
910

1011
interface IHashTable<T> {
@@ -39,6 +40,11 @@ const Semaphore = (n) => ({
3940
});
4041
const semaphore = Semaphore(1);
4142

43+
export class clsstream {
44+
public stream: ChangeStream;
45+
public id: string;
46+
public callback: any;
47+
}
4248

4349
export class WebSocketServerClient {
4450
public jwt: string;
@@ -91,13 +97,14 @@ export class WebSocketServerClient {
9197
socketObject.on("close", (e: CloseEvent): void => this.close(e));
9298
}
9399
private open(e: Event): void {
94-
this._logger.info("WebSocket connection opened " + e);
100+
this._logger.info("WebSocket connection opened " + e + " " + this.id);
95101
}
96102
private close(e: CloseEvent): void {
97-
this._logger.info("WebSocket connection closed " + e);
103+
this._logger.info("WebSocket connection closed " + e + " " + this.id + "/" + this.clientagent);
104+
this.Close();
98105
}
99106
private error(e: Event): void {
100-
this._logger.error("WebSocket error encountered " + e);
107+
this._logger.error("WebSocket error encountered " + e + " " + this.id + "/" + this.clientagent);
101108
}
102109
public queuecount(): number {
103110
if (this._queues == null) return 0;
@@ -121,6 +128,9 @@ export class WebSocketServerClient {
121128
if (this.queuecount() > 0) {
122129
this.CloseConsumers();
123130
}
131+
if (this.streamcount() > 0) {
132+
this.CloseConsumers();
133+
}
124134
return;
125135
}
126136
if (this._socketObject.readyState === this._socketObject.CLOSED || this._socketObject.readyState === this._socketObject.CLOSING) {
@@ -172,6 +182,7 @@ export class WebSocketServerClient {
172182
}
173183
public async Close(): Promise<void> {
174184
await this.CloseConsumers();
185+
await this.CloseStreams();
175186
if (this._socketObject != null) {
176187
try {
177188
this._socketObject.removeListener("open", (e: Event): void => this.open(e));
@@ -423,6 +434,70 @@ export class WebSocketServerClient {
423434
var msg: Message = new Message(); msg.command = "deleteone"; msg.data = JSON.stringify(q);
424435
q = await this.Send<DeleteOneMessage>(msg);
425436
}
437+
streams: clsstream[] = [];
438+
public streamcount(): number {
439+
if (this.streams == null) return 0;
440+
return this.streams.length;
441+
}
442+
async CloseStreams(): Promise<void> {
443+
if (this.streams != null && this.streams.length > 0) {
444+
for (var i = this.streams.length - 1; i >= 0; i--) {
445+
try {
446+
if (this.streams[i] != null && this.streams[i].stream != null && !this.streams[i].stream.isClosed()) {
447+
await this.streams[i].stream.close();
448+
}
449+
this.streams.splice(i, 1);
450+
} catch (error) {
451+
this._logger.error("WebSocketclient::CloseStreams " + error + " " + this.id + "/" + this.clientagent);
452+
}
453+
}
454+
}
455+
}
456+
async CloseStream(id: string): Promise<void> {
457+
if (this.streams != null && this.streams.length > 0) {
458+
for (var i = this.streams.length - 1; i >= 0; i--) {
459+
try {
460+
if (this.streams[i] != null && this.streams[i].id == id) {
461+
if (!this.streams[i].stream.isClosed()) await this.streams[i].stream.close();
462+
this.streams.splice(i, 1);
463+
}
464+
} catch (error) {
465+
this._logger.error("WebSocketclient::CloseStream " + error + " " + this.id + "/" + this.clientagent);
466+
}
467+
}
468+
}
469+
}
470+
async UnWatch(id: string, jwt: string): Promise<void> {
471+
this.CloseStream(id);
472+
}
473+
async Watch(aggregates: object[], collectionname: string, jwt: string): Promise<string> {
474+
var stream: clsstream = new clsstream();
475+
stream.id = Math.random().toString(36).substr(2, 9);
476+
stream.stream = await Config.db.watch(aggregates, collectionname, jwt);
477+
this.streams.push(stream);
426478

479+
const options = { fullDocument: "updateLookup" };
480+
const me = this;
481+
try {
482+
(stream.stream as any).on("change", next => {
483+
try {
484+
// me._logger.info(JSON.stringify(next, null, 4));
485+
me._logger.info("Watch: " + JSON.stringify(next.documentKey));
486+
const msg: SocketMessage = SocketMessage.fromcommand("watchevent");
487+
var q = new WatchEventMessage();
488+
q.id = stream.id;
489+
q.result = next;
490+
msg.data = JSON.stringify(q);
491+
me._socketObject.send(msg.tojson());
492+
} catch (error) {
493+
this._logger.error("WebSocketclient::Watch::changeListener " + error + " " + this.id + "/" + this.clientagent);
494+
}
495+
}, options);
496+
return stream.id;
497+
} catch (error) {
498+
this._logger.error("WebSocketclient::Watch " + error + " " + this.id + "/" + this.clientagent);
499+
throw error;
500+
}
501+
}
427502

428503
}

OpenFlow/src/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,6 @@ rejectionEmitter.on("rejectionHandled", (error, promise) => {
239239
});
240240
import * as fs from "fs";
241241
import { OAuthProvider } from "./OAuthProvider";
242-
// import { GrafanaProxy } from "./grafana-proxy";
243242
var GrafanaProxy: any = null;
244243
try {
245244
GrafanaProxy = require("./grafana-proxy");

OpenFlowNodeRED/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "openflow-nodered",
3-
"version": "1.0.97",
3+
"version": "1.0.98",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {
@@ -37,7 +37,7 @@
3737
"morgan": "^1.10.0",
3838
"node-red": "^1.1.3",
3939
"node-red-node-email": "^1.7.8",
40-
"openflow-api": "^1.0.20",
40+
"openflow-api": "^1.0.21",
4141
"os-service": "^2.2.0",
4242
"passport-saml": "^1.3.4",
4343
"passport-saml-metadata": "^2.3.0",

0 commit comments

Comments
 (0)