Skip to content

Commit 3aaa9fb

Browse files
committed
Merge branch 'master' into halfmoon
2 parents b731e94 + 5aba3d4 commit 3aaa9fb

6 files changed

Lines changed: 380 additions & 188 deletions

File tree

OpenFlow/src/DatabaseConnection.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,7 @@ export class DatabaseConnection {
625625
return value;
626626
});
627627
}
628+
if (collectionname == "files") collectionname = "fs.files";
628629
let myhint: Object = {};
629630
if (hint) {
630631
if (typeof hint === "string" || hint instanceof String) {
@@ -660,7 +661,12 @@ export class DatabaseConnection {
660661
const aggregatesjson = JSON.stringify(aggregates, null, 2)
661662

662663
span.addEvent("getbasequery");
663-
const base = this.getbasequery(jwt, "_acl", [Rights.read]);
664+
let base: object;
665+
if (collectionname == "fs.files") {
666+
base = this.getbasequery(jwt, "metadata._acl", [Rights.read]);
667+
} else {
668+
base = this.getbasequery(jwt, "_acl", [Rights.read]);
669+
}
664670
if (Array.isArray(aggregates)) {
665671
aggregates.unshift({ $match: base });
666672
} else {
@@ -1858,8 +1864,8 @@ export class DatabaseConnection {
18581864
}
18591865
const ot_end = Logger.otel.startTimer();
18601866
const mongodbspan: Span = Logger.otel.startSubSpan("mongodb.bulkexecute", span);
1861-
bulkInsert.execute()
1862-
bulkRemove.execute()
1867+
if (bulkInsert.length > 0) bulkInsert.execute()
1868+
if (bulkRemove.length > 0) bulkRemove.execute()
18631869
Logger.otel.endSpan(mongodbspan);
18641870
Logger.otel.endTimer(ot_end, DatabaseConnection.mongodb_deletemany, { collection: collectionname });
18651871
if (Config.log_deletes) Logger.instanse.verbose("[" + user.username + "][" + collectionname + "] deleted " + counter + " items in database");

OpenFlow/src/Messages/Message.ts

Lines changed: 123 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ export class Message {
136136
case "deletenoderedpod":
137137
await this.DeleteNoderedPod(span);
138138
break;
139+
case "getnoderedinstance":
140+
await this.GetNoderedInstance(span);
141+
break;
139142
default:
140143
span.recordException("Unknown command " + this.command);
141144
this.UnknownCommand();
@@ -431,7 +434,13 @@ export class Message {
431434
await this.GetKubeNodeLabels(cli);
432435
break;
433436
case "getnoderedinstance":
434-
await this.GetNoderedInstance(cli, span);
437+
this.EnsureJWT(cli);
438+
if (Config.enable_openflow_amqp) {
439+
cli.Send(await QueueClient.SendForProcessing(this, this.priority));
440+
} else {
441+
await this.RestartNoderedInstance(span);
442+
cli.Send(this);
443+
}
435444
break;
436445
case "getnoderedinstancelog":
437446
await this.GetNoderedInstanceLog(cli, span);
@@ -487,6 +496,8 @@ export class Message {
487496
break;
488497
case "ensurecustomer":
489498
await this.EnsureCustomer(cli, span);
499+
case "housekeeping":
500+
await this.Housekeeping(span);
490501
break;
491502
default:
492503
span.recordException("Unknown command " + command);
@@ -2379,22 +2390,24 @@ export class Message {
23792390
}
23802391
private static detectdocker: boolean = true;
23812392
private static usedocker: boolean = false;
2382-
private async GetNoderedInstance(cli: WebSocketServerClient, parent: Span): Promise<void> {
2393+
private async GetNoderedInstance(parent: Span): Promise<void> {
23832394
await this.DetectDocker();
23842395
if (Message.usedocker) {
2385-
this.dockerGetNoderedInstance(cli, parent);
2396+
this.dockerGetNoderedInstance(parent);
23862397
} else {
2387-
this.KubeGetNoderedInstance(cli, parent);
2398+
this.KubeGetNoderedInstance(parent);
23882399
}
23892400
}
2390-
private async dockerGetNoderedInstance(cli: WebSocketServerClient, parent: Span): Promise<void> {
2401+
private async dockerGetNoderedInstance(parent: Span): Promise<void> {
23912402
this.Reply();
23922403
let msg: GetNoderedInstanceMessage;
23932404
const span: Span = Logger.otel.startSubSpan("message.GetNoderedInstance", parent);
23942405
try {
2395-
Logger.instanse.debug("[" + cli.user.username + "] GetNoderedInstance");
2406+
const _tuser = Crypt.verityToken(this.jwt);
2407+
2408+
Logger.instanse.debug("[" + _tuser.username + "] GetNoderedInstance");
23962409
msg = GetNoderedInstanceMessage.assign(this.data);
2397-
const name = await this.GetInstanceName(msg._id, cli.user._id, cli.user.username, cli.jwt, span);
2410+
const name = await this.GetInstanceName(msg._id, _tuser._id, _tuser.username, this.jwt, span);
23982411

23992412
span.addEvent("init Docker()");
24002413
const docker = new Docker();
@@ -2430,27 +2443,27 @@ export class Message {
24302443
} catch (error) {
24312444
span.recordException(error);
24322445
this.data = "";
2433-
await handleError(cli, error);
2446+
await handleError(null, error);
24342447
if (msg !== null && msg !== undefined) msg.error = error.message ? error.message : error
24352448
}
24362449
try {
24372450
this.data = JSON.stringify(msg);
24382451
} catch (error) {
24392452
span.recordException(error);
24402453
this.data = "";
2441-
await handleError(cli, error);
2454+
await handleError(null, error);
24422455
}
24432456
Logger.otel.endSpan(span);
2444-
this.Send(cli);
24452457
}
2446-
private async KubeGetNoderedInstance(cli: WebSocketServerClient, parent: Span): Promise<void> {
2458+
private async KubeGetNoderedInstance(parent: Span): Promise<void> {
24472459
this.Reply();
24482460
let msg: GetNoderedInstanceMessage;
24492461
const span: Span = Logger.otel.startSubSpan("message.GetNoderedInstance", parent);
24502462
try {
2451-
Logger.instanse.debug("[" + cli.user.username + "] GetNoderedInstance");
2463+
const _tuser = Crypt.verityToken(this.jwt);
2464+
Logger.instanse.debug("[" + _tuser.username + "] GetNoderedInstance");
24522465
msg = GetNoderedInstanceMessage.assign(this.data);
2453-
const name = await this.GetInstanceName(msg._id, cli.user._id, cli.user.username, cli.jwt, span);
2466+
const name = await this.GetInstanceName(msg._id, _tuser._id, _tuser.username, this.jwt, span);
24542467
const namespace = Config.namespace;
24552468
const list = await KubeUtil.instance().CoreV1Api.listNamespacedPod(namespace);
24562469
msg.result = null;
@@ -2474,7 +2487,7 @@ export class Message {
24742487
if ((image.indexOf("openflownodered") > -1 || image.indexOf("openiap/nodered") > -1) && !NoderedUtil.IsNullEmpty(userid)) {
24752488
try {
24762489
if (billed != "true" && diffhours > 24) {
2477-
Logger.instanse.debug("[" + cli.user.username + "] Remove un billed nodered instance " + itemname + " that has been running for " + diffhours + " hours");
2490+
Logger.instanse.debug("[" + _tuser.username + "] Remove un billed nodered instance " + itemname + " that has been running for " + diffhours + " hours");
24782491
await this._DeleteNoderedInstance(userid, rootjwt, span);
24792492
}
24802493
} catch (error) {
@@ -2487,7 +2500,7 @@ export class Message {
24872500
}
24882501
}
24892502
}
2490-
if (!NoderedUtil.IsNullEmpty(msg.name) && item.metadata.name == msg.name && cli.user.HasRoleName("admins")) {
2503+
if (!NoderedUtil.IsNullEmpty(msg.name) && item.metadata.name == msg.name && _tuser.HasRoleName("admins")) {
24912504
found = item;
24922505
var metrics: any = null;
24932506
try {
@@ -2500,7 +2513,7 @@ export class Message {
25002513
found = item;
25012514
if (item.status.phase != "Failed") {
25022515
msg.result = item;
2503-
Logger.instanse.debug("[" + cli.user.username + "] GetNoderedInstance: " + name + " found one");
2516+
Logger.instanse.debug("[" + _tuser.username + "] GetNoderedInstance: " + name + " found one");
25042517
}
25052518
var metrics: any = null;
25062519
try {
@@ -2513,23 +2526,22 @@ export class Message {
25132526
}
25142527
if (msg.result == null) msg.result = found;
25152528
} else {
2516-
Logger.instanse.warn("[" + cli.user.username + "] GetNoderedInstance: found NO Namespaced Pods ???");
2529+
Logger.instanse.warn("[" + _tuser.username + "] GetNoderedInstance: found NO Namespaced Pods ???");
25172530
}
25182531
} catch (error) {
25192532
span.recordException(error);
25202533
this.data = "";
2521-
await handleError(cli, error);
2534+
await handleError(null, error);
25222535
if (msg !== null && msg !== undefined) msg.error = error.message ? error.message : error
25232536
}
25242537
try {
25252538
this.data = JSON.stringify(msg);
25262539
} catch (error) {
25272540
span.recordException(error);
25282541
this.data = "";
2529-
await handleError(cli, error);
2542+
await handleError(null, error);
25302543
}
25312544
Logger.otel.endSpan(span);
2532-
this.Send(cli);
25332545
}
25342546
private async GetNoderedInstanceLog(cli: WebSocketServerClient, parent: Span): Promise<void> {
25352547
await this.DetectDocker();
@@ -3801,6 +3813,97 @@ export class Message {
38013813
}
38023814
Logger.otel.endSpan(span);
38033815
this.Send(cli);
3816+
private async Housekeeping(parent: Span): Promise<void> {
3817+
const span: Span = Logger.otel.startSubSpan("message.QueueMessage", parent);
3818+
try {
3819+
await this.GetNoderedInstance(span)
3820+
} catch (error) {
3821+
}
3822+
try {
3823+
await Config.db.ensureindexes(span);
3824+
} catch (error) {
3825+
}
3826+
try {
3827+
const jwt: string = Crypt.rootToken();
3828+
const timestamp = new Date(new Date().toISOString());
3829+
timestamp.setUTCHours(0, 0, 0, 0);
3830+
const collections = await Config.db.ListCollections(jwt);
3831+
for (let col of collections) {
3832+
Config.db.db.collection("dbusage").deleteMany({ timestamp: timestamp, collection: col.name });
3833+
let aggregates: any = [
3834+
{
3835+
"$project": {
3836+
"_modifiedbyid": 1,
3837+
"_modifiedby": 1,
3838+
"object_size": { "$bsonSize": "$$ROOT" }
3839+
}
3840+
},
3841+
{
3842+
"$group": {
3843+
"_id": "$_modifiedbyid",
3844+
"size": { "$sum": "$object_size" },
3845+
"name": { "$max": "$_modifiedby" }
3846+
}
3847+
},
3848+
{ $addFields: { "userid": "$_id" } },
3849+
{ $unset: "_id" },
3850+
{ $addFields: { "collection": col.name } },
3851+
{ $addFields: { timestamp: timestamp.toISOString() } },
3852+
];
3853+
if (col.name == "fs.files") {
3854+
aggregates = [
3855+
{
3856+
"$project": {
3857+
"_modifiedbyid": "$metadata._modifiedbyid",
3858+
"_modifiedby": "$metadata._modifiedby",
3859+
"object_size": "$length"
3860+
}
3861+
},
3862+
{
3863+
"$group": {
3864+
"_id": "$_modifiedbyid",
3865+
"size": { "$sum": "$object_size" },
3866+
"name": { "$max": "$_modifiedby" }
3867+
}
3868+
},
3869+
{ $addFields: { "userid": "$_id" } },
3870+
{ $unset: "_id" },
3871+
{ $addFields: { "collection": col.name } },
3872+
{ $addFields: { timestamp: timestamp.toISOString() } },
3873+
]
3874+
}
3875+
if (col.name == "fs.files") {
3876+
aggregates = [
3877+
{
3878+
"$project": {
3879+
"userid": 1,
3880+
"name": 1,
3881+
"object_size": { "$bsonSize": "$$ROOT" }
3882+
}
3883+
},
3884+
{
3885+
"$group": {
3886+
"_id": "$userid",
3887+
"size": { "$sum": "$object_size" },
3888+
"name": { "$max": "$name" }
3889+
}
3890+
},
3891+
{ $addFields: { "userid": "$_id" } },
3892+
{ $unset: "_id" },
3893+
{ $addFields: { "collection": col.name } },
3894+
{ $addFields: { timestamp: timestamp.toISOString() } },
3895+
]
3896+
}
3897+
3898+
const items: any[] = await Config.db.db.collection(col.name).aggregate(aggregates).toArray();
3899+
let bulkInsert = Config.db.db.collection("dbusage").initializeUnorderedBulkOp();
3900+
items.forEach(item => bulkInsert.insert(item));
3901+
bulkInsert.execute();
3902+
}
3903+
} catch (error) {
3904+
3905+
}
3906+
Logger.otel.endSpan(span);
38043907
}
38053908
}
38063909

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import * as RED from "node-red";
2+
export class Util {
3+
public static EvaluateNodeProperty<T>(node: any, msg: any, name: string) {
4+
return new Promise<T>((resolve, reject) => {
5+
RED.util.evaluateNodeProperty(node.config[name], node.config[name + "type"], node, msg, (err, value) => {
6+
if (err) {
7+
reject(err);
8+
} else {
9+
resolve(value);
10+
}
11+
})
12+
});
13+
}
14+
public static SetMessageProperty(msg: any, name: string, value: any) {
15+
RED.util.setMessageProperty(msg, name, value);
16+
}
17+
18+
}

0 commit comments

Comments
 (0)