Skip to content

Commit 626220f

Browse files
committed
Add House Keeping
1 parent 3aaa9fb commit 626220f

9 files changed

Lines changed: 279 additions & 72 deletions

File tree

OpenFlow/src/Config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ export class Config {
3131
Config.amqp_prefetch = parseInt(Config.getEnv("amqp_prefetch", "50"));
3232
Config.trace_dashboardauth = Config.parseBoolean(Config.getEnv("trace_dashboardauth", "true"));
3333
Config.enable_entity_restriction = Config.parseBoolean(Config.getEnv("enable_entity_restriction", "false"));
34+
Config.auto_hourly_housekeeping = Config.parseBoolean(Config.getEnv("auto_hourly_housekeeping", "true"));
3435

3536

3637
Config.getting_started_url = Config.getEnv("getting_started_url", "");
@@ -163,6 +164,7 @@ export class Config {
163164
public static amqp_prefetch: number = parseInt(Config.getEnv("amqp_prefetch", "50"));
164165
public static trace_dashboardauth: boolean = Config.parseBoolean(Config.getEnv("trace_dashboardauth", "true"));
165166
public static enable_entity_restriction: boolean = Config.parseBoolean(Config.getEnv("enable_entity_restriction", "false"));
167+
public static auto_hourly_housekeeping: boolean = Config.parseBoolean(Config.getEnv("auto_hourly_housekeeping", "true"));
166168

167169
public static getting_started_url: string = Config.getEnv("getting_started_url", "");
168170

OpenFlow/src/DatabaseConnection.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1249,7 +1249,6 @@ export class DatabaseConnection {
12491249
}
12501250
}
12511251
}
1252-
12531252
if (q.collectionname != "fs.files") {
12541253
q.item._modifiedby = user.name;
12551254
q.item._modifiedbyid = user._id;
@@ -1266,6 +1265,12 @@ export class DatabaseConnection {
12661265
q.item[key] = original[key];
12671266
}
12681267
}
1268+
if (key == "dbusage" && q.collectionname == "users" && q.item._type == "user") {
1269+
if (!user.HasRoleName("admins")) {
1270+
q.item[key] = original[key];
1271+
}
1272+
}
1273+
12691274
if (key === "_created") {
12701275
q.item[key] = new Date(original[key]);
12711276
} else if (key === "_createdby" || key === "_createdbyid") {
@@ -1446,6 +1451,14 @@ export class DatabaseConnection {
14461451
(q.item["$set"])._modified = new Date(new Date().toISOString());
14471452
if ((q.item["$inc"]) === undefined) { (q.item["$inc"]) = {} };
14481453
(q.item["$inc"])._version = 1;
1454+
if (q.collectionname == "users") {
1455+
['$inc', '$mul', '$set', '$unset'].forEach(t => {
1456+
if (q.item[t] !== undefined) {
1457+
delete q.item[t].username;
1458+
delete q.item[t].dbusage;
1459+
}
1460+
})
1461+
}
14491462
const ot_end = Logger.otel.startTimer();
14501463
const mongodbspan: Span = Logger.otel.startSubSpan("mongodb.updateOne", span);
14511464
q.opresult = await this.db.collection(q.collectionname).updateOne(_query, q.item, options);
@@ -1545,6 +1558,14 @@ export class DatabaseConnection {
15451558
(q.item["$set"])._modifiedbyid = user._id;
15461559
(q.item["$set"])._modified = new Date(new Date().toISOString());
15471560

1561+
if (q.collectionname == "users") {
1562+
['$inc', '$mul', '$set', '$unset'].forEach(t => {
1563+
if (q.item[t] !== undefined) {
1564+
delete q.item[t].username;
1565+
delete q.item[t].dbusage;
1566+
}
1567+
})
1568+
}
15481569

15491570
Logger.instanse.silly("[" + user.username + "][" + q.collectionname + "] UpdateMany " + (q.item.name || q.item._name) + " in database");
15501571

OpenFlow/src/Messages/Message.ts

Lines changed: 157 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ export class Message {
139139
case "getnoderedinstance":
140140
await this.GetNoderedInstance(span);
141141
break;
142+
case "housekeeping":
143+
await this.Housekeeping(false, false, false, span);
144+
break;
142145
default:
143146
span.recordException("Unknown command " + this.command);
144147
this.UnknownCommand();
@@ -497,7 +500,13 @@ export class Message {
497500
case "ensurecustomer":
498501
await this.EnsureCustomer(cli, span);
499502
case "housekeeping":
500-
await this.Housekeeping(span);
503+
this.EnsureJWT(cli);
504+
if (Config.enable_openflow_amqp) {
505+
cli.Send(await QueueClient.SendForProcessing(this, this.priority));
506+
} else {
507+
await this.DeleteNoderedPod(span);
508+
cli.Send(this);
509+
}
501510
break;
502511
default:
503512
span.recordException("Unknown command " + command);
@@ -742,14 +751,17 @@ export class Message {
742751
span.setAttribute("cache size", keys.length + 1);
743752
msg.result = result;
744753
}
745-
if (Config.enable_entity_restriction) {
754+
const _tuser = Crypt.verityToken(this.jwt);
755+
if (Config.enable_entity_restriction && !_tuser.HasRoleId("admins")) {
746756
await Config.db.loadEntityRestrictions(span);
747-
const tuser = Crypt.verityToken(this.jwt);
748-
const authorized = Config.db.EntityRestrictions.filter(x => x.IsAuthorized(tuser));
749-
const allall = authorized.filter(x => x.collection == "");
750-
if (allall.length == 0) {
751-
const names = authorized.map(x => x.collection);
752-
msg.result = msg.result.filter(x => names.indexOf(x.name) > -1);
757+
if (Config.db.EntityRestrictions.length > 1) {
758+
const tuser = Crypt.verityToken(this.jwt);
759+
const authorized = Config.db.EntityRestrictions.filter(x => x.IsAuthorized(tuser));
760+
const allall = authorized.filter(x => x.collection == "");
761+
if (allall.length == 0) {
762+
const names = authorized.map(x => x.collection);
763+
msg.result = msg.result.filter(x => names.indexOf(x.name) > -1);
764+
}
753765
}
754766
} else {
755767
var b = true;
@@ -3813,95 +3825,171 @@ export class Message {
38133825
}
38143826
Logger.otel.endSpan(span);
38153827
this.Send(cli);
3816-
private async Housekeeping(parent: Span): Promise<void> {
3828+
}
3829+
formatBytes(bytes, decimals = 2) {
3830+
if (bytes === 0) return '0 Bytes';
3831+
3832+
const k = 1024;
3833+
const dm = decimals < 0 ? 0 : decimals;
3834+
const sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'];
3835+
3836+
const i = Math.floor(Math.log(bytes) / Math.log(k));
3837+
3838+
return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i];
3839+
}
3840+
public async Housekeeping(skipNodered: boolean, skipCalculateSize: boolean, skipUpdateUserSize: boolean, parent: Span): Promise<void> {
38173841
const span: Span = Logger.otel.startSubSpan("message.QueueMessage", parent);
38183842
try {
3819-
await this.GetNoderedInstance(span)
3843+
if (!skipNodered) await this.GetNoderedInstance(span)
38203844
} catch (error) {
38213845
}
38223846
try {
38233847
await Config.db.ensureindexes(span);
38243848
} catch (error) {
38253849
}
3850+
const timestamp = new Date(new Date().toISOString());
3851+
timestamp.setUTCHours(0, 0, 0, 0);
38263852
try {
3827-
const jwt: string = Crypt.rootToken();
3828-
const timestamp = new Date(new Date().toISOString());
3829-
timestamp.setUTCHours(0, 0, 0, 0);
3830-
const collections = await Config.db.ListCollections(jwt);
3831-
for (let col of collections) {
3832-
Config.db.db.collection("dbusage").deleteMany({ timestamp: timestamp, collection: col.name });
3833-
let aggregates: any = [
3834-
{
3835-
"$project": {
3836-
"_modifiedbyid": 1,
3837-
"_modifiedby": 1,
3838-
"object_size": { "$bsonSize": "$$ROOT" }
3839-
}
3840-
},
3841-
{
3842-
"$group": {
3843-
"_id": "$_modifiedbyid",
3844-
"size": { "$sum": "$object_size" },
3845-
"name": { "$max": "$_modifiedby" }
3846-
}
3847-
},
3848-
{ $addFields: { "userid": "$_id" } },
3849-
{ $unset: "_id" },
3850-
{ $addFields: { "collection": col.name } },
3851-
{ $addFields: { timestamp: timestamp.toISOString() } },
3852-
];
3853-
if (col.name == "fs.files") {
3854-
aggregates = [
3853+
if (!skipCalculateSize) {
3854+
3855+
const user = Crypt.rootUser();
3856+
const tuser = TokenUser.From(user);
3857+
const jwt: string = Crypt.rootToken();
3858+
const collections = await Config.db.ListCollections(jwt);
3859+
let totalusage = 0;
3860+
let index = 0;
3861+
for (let col of collections) {
3862+
if (col.name == "fs.chunks") continue;
3863+
index++;
3864+
Config.db.db.collection("dbusage").deleteMany({ timestamp: timestamp, collection: col.name });
3865+
let aggregates: any = [
38553866
{
38563867
"$project": {
3857-
"_modifiedbyid": "$metadata._modifiedbyid",
3858-
"_modifiedby": "$metadata._modifiedby",
3859-
"object_size": "$length"
3868+
"_modifiedbyid": 1,
3869+
"_modifiedby": 1,
3870+
"object_size": { "$bsonSize": "$$ROOT" }
38603871
}
38613872
},
38623873
{
38633874
"$group": {
38643875
"_id": "$_modifiedbyid",
38653876
"size": { "$sum": "$object_size" },
3866-
"name": { "$max": "$_modifiedby" }
3877+
"name": { "$first": "$_modifiedby" }
38673878
}
38683879
},
38693880
{ $addFields: { "userid": "$_id" } },
38703881
{ $unset: "_id" },
38713882
{ $addFields: { "collection": col.name } },
38723883
{ $addFields: { timestamp: timestamp.toISOString() } },
3873-
]
3884+
];
3885+
if (col.name == "fs.files") {
3886+
aggregates = [
3887+
{
3888+
"$project": {
3889+
"_modifiedbyid": "$metadata._modifiedbyid",
3890+
"_modifiedby": "$metadata._modifiedby",
3891+
"object_size": "$length"
3892+
}
3893+
},
3894+
{
3895+
"$group": {
3896+
"_id": "$_modifiedbyid",
3897+
"size": { "$sum": "$object_size" },
3898+
"name": { "$first": "$_modifiedby" }
3899+
}
3900+
},
3901+
{ $addFields: { "userid": "$_id" } },
3902+
{ $unset: "_id" },
3903+
{ $addFields: { "collection": col.name } },
3904+
{ $addFields: { timestamp: timestamp.toISOString() } },
3905+
]
3906+
}
3907+
if (col.name == "audit") {
3908+
aggregates = [
3909+
{
3910+
"$project": {
3911+
"userid": 1,
3912+
"name": 1,
3913+
"object_size": { "$bsonSize": "$$ROOT" }
3914+
}
3915+
},
3916+
{
3917+
"$group": {
3918+
"_id": "$userid",
3919+
"size": { "$sum": "$object_size" },
3920+
"name": { "$first": "$name" }
3921+
}
3922+
},
3923+
{ $addFields: { "userid": "$_id" } },
3924+
{ $unset: "_id" },
3925+
{ $addFields: { "collection": col.name } },
3926+
{ $addFields: { timestamp: timestamp.toISOString() } },
3927+
]
3928+
}
3929+
3930+
const items: any[] = await Config.db.db.collection(col.name).aggregate(aggregates).toArray();
3931+
let usage = 0;
3932+
if (items.length > 0) {
3933+
let bulkInsert = Config.db.db.collection("dbusage").initializeUnorderedBulkOp();
3934+
for (var _item of items) {
3935+
let item = Config.db.ensureResource(_item);
3936+
item = await Config.db.CleanACL(item, tuser, span);
3937+
delete item._id;
3938+
item._type = "usage";
3939+
item._createdby = "root";
3940+
item._createdbyid = WellknownIds.root;
3941+
item._created = new Date(new Date().toISOString());
3942+
item._modifiedby = "root";
3943+
item._modifiedbyid = WellknownIds.root;
3944+
item._modified = item._created;
3945+
usage += item.size;
3946+
DatabaseConnection.traversejsonencode(item);
3947+
3948+
bulkInsert.insert(item);
3949+
}
3950+
3951+
totalusage += usage;
3952+
try {
3953+
await bulkInsert.execute();
3954+
if (items.length > 0) Logger.instanse.debug("[housekeeping][" + col.name + "][" + index + "/" + collections.length + "] add " + items.length + " items with a usage of " + this.formatBytes(usage));
3955+
3956+
} catch (error) {
3957+
Logger.instanse.error(error);
3958+
span.recordException(error);
3959+
}
3960+
}
38743961
}
3875-
if (col.name == "fs.files") {
3876-
aggregates = [
3877-
{
3878-
"$project": {
3879-
"userid": 1,
3880-
"name": 1,
3881-
"object_size": { "$bsonSize": "$$ROOT" }
3882-
}
3883-
},
3884-
{
3885-
"$group": {
3886-
"_id": "$userid",
3887-
"size": { "$sum": "$object_size" },
3888-
"name": { "$max": "$name" }
3889-
}
3890-
},
3891-
{ $addFields: { "userid": "$_id" } },
3892-
{ $unset: "_id" },
3893-
{ $addFields: { "collection": col.name } },
3894-
{ $addFields: { timestamp: timestamp.toISOString() } },
3962+
Logger.instanse.debug("[housekeeping] Add stats from " + collections.length + " collections with a total usage of " + this.formatBytes(totalusage));
3963+
}
3964+
3965+
} catch (error) {
3966+
Logger.instanse.error(error);
3967+
span.recordException(error);
3968+
}
3969+
try {
3970+
if (!skipUpdateUserSize) {
3971+
let index = 0;
3972+
const usercount = await Config.db.db.collection("users").aggregate([{ "$match": { "_type": "user" } }, { $count: "userCount" }]).toArray();
3973+
Logger.instanse.debug("[housekeeping] Begin updating all users (" + usercount[0].userCount + ") dbusage field");
3974+
3975+
const cursor = Config.db.db.collection("users").find({ "_type": "user" })
3976+
for await (const u of cursor) {
3977+
index++;
3978+
const pipe = [
3979+
{ "$match": { "userid": u._id, timestamp: timestamp } },
3980+
{ "$group": { "_id": "$userid", "size": { "$sum": "$size" } } }
38953981
]
3982+
const items: any[] = await Config.db.db.collection("dbusage").aggregate(pipe).toArray();
3983+
if (items.length > 0) {
3984+
await Config.db.db.collection("users").updateOne({ _id: u._id }, { $set: { "dbusage": items[0].size } });
3985+
}
3986+
if (index % 100 == 0) Logger.instanse.debug("[housekeeping][" + index + "/" + usercount[0].userCount + "] Processing");
38963987
}
3897-
3898-
const items: any[] = await Config.db.db.collection(col.name).aggregate(aggregates).toArray();
3899-
let bulkInsert = Config.db.db.collection("dbusage").initializeUnorderedBulkOp();
3900-
items.forEach(item => bulkInsert.insert(item));
3901-
bulkInsert.execute();
3988+
Logger.instanse.debug("[housekeeping] Completed updating all users dbusage field");
39023989
}
39033990
} catch (error) {
3904-
3991+
Logger.instanse.error(error);
3992+
span.recordException(error);
39053993
}
39063994
Logger.otel.endSpan(span);
39073995
}

OpenFlow/src/index.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { DBHelper } from "./DBHelper";
1111
import { OAuthProvider } from "./OAuthProvider";
1212
import { Span } from "@opentelemetry/api";
1313
import { QueueClient } from "./QueueClient";
14+
import { Message } from "./Messages/Message";
1415

1516
Logger.configure();
1617

@@ -298,6 +299,24 @@ async function initDatabase(): Promise<boolean> {
298299
await Config.db.ensureindexes(span);
299300
Logger.otel.endSpan(span);
300301

302+
if (Config.auto_hourly_housekeeping) {
303+
housekeeping = setInterval(async () => {
304+
try {
305+
var dt = new Date(new Date().toISOString());
306+
var msg = new Message(); msg.jwt = Crypt.rootToken();
307+
var updateUsage: boolean = !(dt.getHours() == 1 || dt.getHours() == 13);
308+
await msg.Housekeeping(false, updateUsage, updateUsage, null);
309+
} catch (error) {
310+
}
311+
}, 3600000);
312+
setTimeout(async () => {
313+
var dt = new Date(new Date().toISOString());
314+
var msg = new Message(); msg.jwt = Crypt.rootToken();
315+
var updateUsage: boolean = !(dt.getHours() == 1 || dt.getHours() == 13);
316+
updateUsage = false;
317+
await msg.Housekeeping(false, updateUsage, updateUsage, null);
318+
}, 5000);
319+
}
301320
return true;
302321
} catch (error) {
303322
span.recordException(error);
@@ -350,9 +369,16 @@ var signals = {
350369
'SIGINT': 2,
351370
'SIGTERM': 15
352371
};
372+
var housekeeping = null;
353373
function handle(signal, value) {
354374
console.trace(`process received a ${signal} signal with value ${value}`);
355375
try {
376+
if (housekeeping != null) {
377+
try {
378+
clearInterval(housekeeping);
379+
} catch (error) {
380+
}
381+
}
356382
setTimeout(() => {
357383
process.exit(128 + value);
358384
}, 1000);

0 commit comments

Comments
 (0)