Skip to content

Commit 32a09b5

Browse files
committed
Add history_delta_count to save database space
1 parent a65d483 commit 32a09b5

7 files changed

Lines changed: 94 additions & 19 deletions

File tree

OpenFlow/src/Config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ export class Config {
6868
Config.mongodb_db = Config.getEnv("mongodb_db", "openflow");
6969

7070
Config.skip_history_collections = Config.getEnv("skip_history_collections", "");
71+
Config.history_delta_count = parseInt(Config.getEnv("history_delta_count", "1000"));
7172
Config.allow_skiphistory = Config.parseBoolean(Config.getEnv("allow_skiphistory", "true"));
7273

7374
Config.saml_issuer = Config.getEnv("saml_issuer", "the-issuer"); // define uri of STS, also sent to personal nodereds
@@ -140,6 +141,7 @@ export class Config {
140141
public static mongodb_db: string = Config.getEnv("mongodb_db", "openflow");
141142

142143
public static skip_history_collections: string = Config.getEnv("skip_history_collections", "");
144+
public static history_delta_count = parseInt(Config.getEnv("history_delta_count", "1000"));
143145
public static allow_skiphistory: boolean = Config.parseBoolean(Config.getEnv("allow_skiphistory", "true"));
144146

145147
public static saml_issuer: string = Config.getEnv("saml_issuer", "the-issuer"); // define uri of STS, also sent to personal nodereds

OpenFlow/src/DatabaseConnection.ts

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@ import { DBHelper } from "./DBHelper";
1010
// tslint:disable-next-line: typedef
1111
const safeObjectID = (s: string | number | ObjectID) => ObjectID.isValid(s) ? new ObjectID(s) : null;
1212
const isoDatePattern = new RegExp(/\d{4}-[01]\d-[0-3]\dT[0-2]\d:[0-5]\d:[0-5]\d\.\d+([+-][0-2]\d:[0-5]\d|Z)/);
13+
var jsondiffpatch = require('jsondiffpatch').create({
14+
objectHash: function (obj, index) {
15+
// try to find an id property, otherwise just use the index in the array
16+
return obj.name || obj.id || obj._id || '$$index:' + index;
17+
}
18+
});
19+
1320

1421
Object.defineProperty(Promise, 'retry', {
1522
configurable: true,
@@ -355,6 +362,34 @@ export class DatabaseConnection {
355362
if (Config.log_queries) this._logger.debug("[" + user.username + "][" + collectionname + "] query gave " + arr.length + " results ");
356363
return arr;
357364
}
365+
async GetDocumentVersion<T extends Base>(collectionname: string, id: string, version: number, jwt: string): Promise<T> {
366+
var roundDown = function (num, precision): number {
367+
num = parseFloat(num);
368+
if (!precision) return num;
369+
return (Math.floor(num / precision) * precision);
370+
};
371+
372+
var result: T = await this.getbyid<T>(id, collectionname, jwt);
373+
if (result == null) return result;
374+
if (result._version > version) {
375+
var rootjwt = Crypt.rootToken()
376+
// var baseversion = roundDown(version, Config.history_delta_count);
377+
var basehist = await this.query<any>({ id: id, item: { $exists: true, $ne: null }, "_version": { $lte: version } }, null, 1, 0, { _version: -1 }, collectionname + "_hist", rootjwt);
378+
result = basehist[0].item;
379+
var baseversion = basehist[0]._version;
380+
381+
var history = await this.query<T>({ id: id, "_version": { $gt: baseversion, $lte: version } }, null, Config.history_delta_count, 0, { _version: 1 }, collectionname + "_hist", rootjwt);
382+
383+
for (var i = 0; i < history.length; i++) {
384+
var delta = (history[i] as any).delta;
385+
if (delta != null) {
386+
result = jsondiffpatch.patch(result, delta);
387+
}
388+
}
389+
}
390+
return result;
391+
}
392+
358393
/**
359394
* Get a single item based on id
360395
* @param {string} id Id to search for
@@ -1435,6 +1470,11 @@ export class DatabaseConnection {
14351470
}
14361471
}
14371472
async SaveDiff(collectionname: string, original: any, item: any) {
1473+
var roundDown = function (num, precision): number {
1474+
num = parseFloat(num);
1475+
if (!precision) return num;
1476+
return (Math.floor(num / precision) * precision);
1477+
};
14381478
if (item._type == 'instance' && collectionname == 'workflows') return 0;
14391479
if (item._type == 'instance' && collectionname == 'workflows') return 0;
14401480
delete item._skiphistory;
@@ -1461,12 +1501,6 @@ export class DatabaseConnection {
14611501
_version = original._version + 1;
14621502
}
14631503
}
1464-
var jsondiffpatch = require('jsondiffpatch').create({
1465-
objectHash: function (obj, index) {
1466-
// try to find an id property, otherwise just use the index in the array
1467-
return obj.name || obj.id || obj._id || '$$index:' + index;
1468-
}
1469-
});
14701504
var delta: any = null;
14711505
// for backward comp, we cannot assume all objects have an history
14721506
// we create diff from version 0
@@ -1503,11 +1537,15 @@ export class DatabaseConnection {
15031537
_createdbyid: _modifiedbyid,
15041538
name: item.name,
15051539
id: item._id,
1506-
item: original,
1540+
item: undefined,
15071541
delta: delta,
15081542
_version: _version,
15091543
reason: reason
15101544
}
1545+
var baseversion = roundDown(_version, Config.history_delta_count);
1546+
if (baseversion == _version) {
1547+
deltahist.item = original;
1548+
}
15111549
await this.db.collection(collectionname + '_hist').insertOne(deltahist);
15121550
}
15131551
} else {

OpenFlow/src/Messages/Message.ts

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { Readable, Stream } from "stream";
1111
import { GridFSBucket, ObjectID, Db, Cursor, MongoNetworkError } from "mongodb";
1212
import * as path from "path";
1313
import { DatabaseConnection } from "../DatabaseConnection";
14-
import { StripeMessage, EnsureStripeCustomerMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, CreateWorkflowInstanceMessage, RegisterUserMessage, NoderedUser, WatchMessage } from "openflow-api";
14+
import { StripeMessage, EnsureStripeCustomerMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, CreateWorkflowInstanceMessage, RegisterUserMessage, NoderedUser, WatchMessage, GetDocumentVersionMessage } from "openflow-api";
1515
import { Billing, stripe_customer, stripe_base, stripe_list, StripeAddPlanMessage, StripeCancelPlanMessage, stripe_subscription, stripe_subscription_item, stripe_plan, stripe_coupon } from "openflow-api";
1616
import { V1ResourceRequirements, V1Deployment } from "@kubernetes/client-node";
1717
import { amqpwrapper } from "../amqpwrapper";
@@ -87,6 +87,9 @@ export class Message {
8787
case "query":
8888
this.Query(cli);
8989
break;
90+
case "getdocumentversion":
91+
this.GetDocumentVersion(cli);
92+
break;
9093
case "aggregate":
9194
this.Aggregate(cli);
9295
break;
@@ -403,7 +406,6 @@ export class Message {
403406
}
404407
this.Send(cli);
405408
}
406-
407409
private async Query(cli: WebSocketServerClient): Promise<void> {
408410
this.Reply();
409411
var msg: QueryMessage
@@ -429,6 +431,32 @@ export class Message {
429431
}
430432
this.Send(cli);
431433
}
434+
private async GetDocumentVersion(cli: WebSocketServerClient): Promise<void> {
435+
this.Reply();
436+
var msg: GetDocumentVersionMessage
437+
try {
438+
msg = GetDocumentVersionMessage.assign(this.data);
439+
if (NoderedUtil.IsNullEmpty(msg.jwt)) { msg.jwt = cli.jwt; }
440+
if (NoderedUtil.IsNullEmpty(msg.jwt)) {
441+
msg.error = "Access denied, not signed in";
442+
} else {
443+
msg.result = await Config.db.GetDocumentVersion(msg.collectionname, msg._id, msg.version, msg.jwt);
444+
}
445+
} catch (error) {
446+
cli._logger.error(error);
447+
if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; }
448+
if (msg !== null && msg !== undefined) msg.error = error.toString();
449+
cli._logger.error(error);
450+
}
451+
try {
452+
this.data = JSON.stringify(msg);
453+
} catch (error) {
454+
this.data = "";
455+
cli._logger.error(error);
456+
}
457+
this.Send(cli);
458+
}
459+
432460
private async Aggregate(cli: WebSocketServerClient): Promise<void> {
433461
this.Reply();
434462
var msg: AggregateMessage

OpenFlow/src/public/Controllers.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2676,11 +2676,16 @@ export class HistoryCtrl extends entitiesCtrl<Base> {
26762676
modal.modal()
26772677
// var delta = jsondiffpatch.diff(this.model, model.item);
26782678
if (model.item == null) {
2679-
var items = await NoderedUtil.Query(this.collection + "_hist", { _id: model._id }, null, this.orderby, 100, 0, null);
2680-
if (items.length > 0) {
2681-
model.item = items[0].item;
2682-
model.delta = items[0].delta;
2683-
}
2679+
// var items = await NoderedUtil.Query(this.collection + "_hist", { _id: model._id }, null, this.orderby, 100, 0, null);
2680+
// if (items.length > 0) {
2681+
// model.item = items[0].item;
2682+
// model.delta = items[0].delta;
2683+
// }
2684+
var item = await NoderedUtil.GetDocumentVersion(this.collection, this.id, model._version, null);
2685+
if (item != null) model.item = item;
2686+
}
2687+
if (model.item == null) {
2688+
document.getElementById('visual').innerHTML = "Failed loading item version " + model._version;
26842689
}
26852690
var keys = Object.keys(model.item);
26862691
keys.forEach(key => {
@@ -2702,7 +2707,8 @@ export class HistoryCtrl extends entitiesCtrl<Base> {
27022707
}
27032708
var modal: any = $("#exampleModal");
27042709
modal.modal();
2705-
document.getElementById('visual').innerHTML = jsondiffpatch.formatters.html.format(model.delta, model.item);
2710+
// document.getElementById('visual').innerHTML = jsondiffpatch.formatters.html.format(model.delta, model.item);
2711+
document.getElementById('visual').innerHTML = jsondiffpatch.formatters.html.format(model.delta, {});
27062712
}
27072713
async RevertTo(model) {
27082714
if (model.item == null) {
@@ -3979,6 +3985,7 @@ export class CredentialCtrl extends entityCtrl<Base> {
39793985
]
39803986
}
39813987
, null, { _type: -1, name: 1 }, 5, 0, null));
3988+
39823989
// this.searchFilteredList = await NoderedUtil.Query("users",
39833990
// {
39843991
// $and: [

OpenFlowNodeRED/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "openflow-nodered",
3-
"version": "1.1.08",
3+
"version": "1.1.09",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.1.08
1+
1.1.09

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "openiap",
3-
"version": "1.1.08",
3+
"version": "1.1.09",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {
@@ -62,7 +62,7 @@
6262
"multer": "^1.4.2",
6363
"multer-gridfs-storage": "^4.2.0",
6464
"oauth2-server": "^3.1.1",
65-
"openflow-api": "^1.0.22",
65+
"openflow-api": "file:../openflow-api",
6666
"os-service": "^2.2.0",
6767
"passport": "^0.4.1",
6868
"passport-google-oauth20": "^2.0.0",

0 commit comments

Comments
 (0)