Skip to content

Commit c8e7b9a

Browse files
committed
add prober addmany
1 parent 0d432dd commit c8e7b9a

5 files changed

Lines changed: 274 additions & 180 deletions

File tree

OpenFlow/src/DatabaseConnection.ts

Lines changed: 174 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -815,19 +815,20 @@ export class DatabaseConnection {
815815
const span: Span = Logger.otel.startSubSpan("db.InsertOne", parent);
816816
try {
817817
if (item === null || item === undefined) { throw Error("Cannot create null item"); }
818+
if (NoderedUtil.IsNullEmpty(jwt)) {
819+
throw new Error("jwt is null");
820+
}
818821
await this.connect(span);
819822
span.addEvent("ensureResource");
823+
span.addEvent("verityToken");
824+
const user: TokenUser = Crypt.verityToken(jwt);
825+
820826
item = this.ensureResource(item);
821827
span.addEvent("traversejsonencode");
822828
DatabaseConnection.traversejsonencode(item);
823-
if (NoderedUtil.IsNullEmpty(jwt)) {
824-
throw new Error("jwt is null");
825-
}
826829
let name = item.name;
827830
if (NoderedUtil.IsNullEmpty(name)) name = item._name;
828831
if (NoderedUtil.IsNullEmpty(name)) name = "Unknown";
829-
span.addEvent("verityToken");
830-
const user: TokenUser = Crypt.verityToken(jwt);
831832
item._createdby = user.name;
832833
item._createdbyid = user._id;
833834
item._created = new Date(new Date().toISOString());
@@ -968,6 +969,174 @@ export class DatabaseConnection {
968969
}
969970
return item;
970971
}
972+
async InsertMany<T extends Base>(items: T[], collectionname: string, w: number, j: boolean, jwt: string, parent: Span): Promise<T[]> {
973+
const span: Span = Logger.otel.startSubSpan("db.InsertOne", parent);
974+
let result: T[] = [];
975+
try {
976+
if (NoderedUtil.IsNullUndefinded(items) || items.length == 0) { throw Error("Cannot create null item"); }
977+
if (NoderedUtil.IsNullEmpty(jwt)) {
978+
throw new Error("jwt is null");
979+
}
980+
await this.connect(span);
981+
const user = Crypt.verityToken(jwt);
982+
span.setAttribute("collection", collectionname);
983+
span.setAttribute("username", user.username);
984+
var bulkInsert = this.db.collection(collectionname).initializeUnorderedBulkOp();
985+
var x = 1000
986+
var counter = 0
987+
var date = new Date()
988+
date.setMonth(date.getMonth() - 1);
989+
let tempresult: any[] = [];
990+
for (var i = 0; i < items.length; i++) {
991+
var item = items[i];
992+
993+
item = this.ensureResource(item);
994+
// span.addEvent("traversejsonencode");
995+
DatabaseConnection.traversejsonencode(item);
996+
let name = item.name;
997+
if (NoderedUtil.IsNullEmpty(name)) name = item._name;
998+
if (NoderedUtil.IsNullEmpty(name)) name = "Unknown";
999+
item._createdby = user.name;
1000+
item._createdbyid = user._id;
1001+
item._created = new Date(new Date().toISOString());
1002+
item._modifiedby = user.name;
1003+
item._modifiedbyid = user._id;
1004+
item._modified = item._created;
1005+
const hasUser: Ace = item._acl.find(e => e._id === user._id);
1006+
if ((hasUser === null || hasUser === undefined)) {
1007+
Base.addRight(item, user._id, user.name, [Rights.full_control]);
1008+
}
1009+
if (collectionname != "audit") { Logger.instanse.silly("[" + user.username + "][" + collectionname + "] Adding " + item._type + " " + name + " to database"); }
1010+
if (!this.hasAuthorization(user, item, Rights.create)) { throw new Error("Access denied, no authorization to InsertOne " + item._type + " " + name + " to database"); }
1011+
1012+
// span.addEvent("encryptentity");
1013+
item = this.encryptentity(item) as T;
1014+
1015+
if (collectionname === "users" && item._type === "user" && item.hasOwnProperty("newpassword")) {
1016+
(item as any).passwordhash = await Crypt.hash((item as any).newpassword);
1017+
delete (item as any).newpassword;
1018+
}
1019+
if (collectionname === "users" && !NoderedUtil.IsNullEmpty(item._type) && !NoderedUtil.IsNullEmpty(item.name)) {
1020+
if ((item._type === "user" || item._type === "role") &&
1021+
(this.WellknownNamesArray.indexOf(item.name) > -1 || this.WellknownNamesArray.indexOf((item as any).username) > -1)) {
1022+
throw new Error("Access denied");
1023+
}
1024+
}
1025+
item._version = 0;
1026+
if (item._id != null) {
1027+
const basehist = await this.query<any>({ id: item._id }, { _version: 1 }, 1, 0, { _version: -1 }, collectionname + "_hist", Crypt.rootToken(), undefined, undefined, span);
1028+
if (basehist.length > 0) {
1029+
item._version = basehist[0]._version;
1030+
}
1031+
if (basehist.length > 0) {
1032+
const org = await this.GetDocumentVersion(collectionname, item._id, item._version, Crypt.rootToken(), span)
1033+
if (org != null) {
1034+
item._createdby = org._createdby;
1035+
item._createdbyid = org._createdbyid;
1036+
item._created = org._created;
1037+
item._modifiedby = org._modifiedby;
1038+
item._modifiedbyid = org._modifiedbyid;
1039+
item._modified = org._modified;
1040+
if (!item._created) item._created = new Date(new Date().toISOString());
1041+
if (!item._createdby) item._createdby = user.name;
1042+
if (!item._createdbyid) item._createdbyid = user._id;
1043+
if (!item._modified) item._modified = new Date(new Date().toISOString());
1044+
if (!item._modifiedby) item._modifiedby = user.name;
1045+
if (!item._modifiedbyid) item._modifiedbyid = user._id;
1046+
if (!item._version) item._version = 0;
1047+
} else {
1048+
item._version++;
1049+
}
1050+
}
1051+
} else {
1052+
item._id = new ObjectID().toHexString();
1053+
}
1054+
span.addEvent("CleanACL");
1055+
item = await this.CleanACL(item, user, span);
1056+
if (item._type === "role" && collectionname === "users") {
1057+
item = await this.Cleanmembers(item as any, null);
1058+
}
1059+
1060+
if (collectionname === "users" && item._type === "user") {
1061+
const u: TokenUser = (item as any);
1062+
if (NoderedUtil.IsNullEmpty(u.username)) { throw new Error("Username is mandatory"); }
1063+
if (NoderedUtil.IsNullEmpty(u.name)) { throw new Error("Name is mandatory"); }
1064+
span.addEvent("FindByUsername");
1065+
const exists = await DBHelper.FindByUsername(u.username, null, span);
1066+
if (exists != null) { throw new Error("Access denied, user '" + u.username + "' already exists"); }
1067+
}
1068+
if (collectionname === "users" && item._type === "role") {
1069+
const r: Role = (item as any);
1070+
if (NoderedUtil.IsNullEmpty(r.name)) { throw new Error("Name is mandatory"); }
1071+
span.addEvent("FindByUsername");
1072+
const exists2 = await DBHelper.FindRoleByName(r.name, span);
1073+
if (exists2 != null) { throw new Error("Access denied, role '" + r.name + "' already exists"); }
1074+
}
1075+
1076+
const options: CollectionInsertOneOptions = {};
1077+
1078+
1079+
1080+
bulkInsert.insert(item);
1081+
counter++
1082+
if (counter % x === 0) {
1083+
const ot_end = Logger.otel.startTimer();
1084+
const mongodbspan: Span = Logger.otel.startSubSpan("mongodb.bulkexecute", span);
1085+
tempresult = tempresult.concat(bulkInsert.execute())
1086+
Logger.otel.endSpan(mongodbspan);
1087+
Logger.otel.endTimer(ot_end, DatabaseConnection.mongodb_insert, { collection: collectionname });
1088+
bulkInsert = this.db.collection(collectionname).initializeUnorderedBulkOp()
1089+
}
1090+
}
1091+
const ot_end = Logger.otel.startTimer();
1092+
const mongodbspan: Span = Logger.otel.startSubSpan("mongodb.bulkexecute", span);
1093+
tempresult = tempresult.concat(bulkInsert.execute())
1094+
Logger.otel.endSpan(mongodbspan);
1095+
Logger.otel.endTimer(ot_end, DatabaseConnection.mongodb_insert, { collection: collectionname });
1096+
1097+
for (var i = 0; i < items.length; i++) {
1098+
var item = items[i];
1099+
if (collectionname === "users" && item._type === "user") {
1100+
Base.addRight(item, item._id, item.name, [Rights.read, Rights.update, Rights.invoke]);
1101+
span.addEvent("FindRoleByNameOrId");
1102+
const users: Role = await DBHelper.FindRoleByNameOrId("users", jwt, span);
1103+
users.AddMember(item);
1104+
span.addEvent("CleanACL");
1105+
item = await this.CleanACL(item, user, span);
1106+
span.addEvent("Save");
1107+
await DBHelper.Save(users, Crypt.rootToken(), span);
1108+
const user2: TokenUser = item as any;
1109+
DBHelper.EnsureNoderedRoles(user2, Crypt.rootToken(), false, span);
1110+
}
1111+
if (collectionname === "users" && item._type === "role") {
1112+
Base.addRight(item, item._id, item.name, [Rights.read]);
1113+
item = await this.CleanACL(item, user, span);
1114+
const ot_end = Logger.otel.startTimer();
1115+
const mongodbspan: Span = Logger.otel.startSubSpan("mongodb.replaceOne", span);
1116+
await this.db.collection(collectionname).replaceOne({ _id: item._id }, item);
1117+
Logger.otel.endSpan(mongodbspan);
1118+
Logger.otel.endTimer(ot_end, DatabaseConnection.mongodb_replace, { collection: collectionname });
1119+
DBHelper.cached_roles = [];
1120+
}
1121+
if (collectionname === "config" && item._type === "oauthclient") {
1122+
if (user.HasRoleName("admins")) {
1123+
setTimeout(() => OAuthProvider.LoadClients(), 1000);
1124+
}
1125+
}
1126+
span.addEvent("traversejsondecode");
1127+
DatabaseConnection.traversejsondecode(item);
1128+
}
1129+
result = items;
1130+
if (Config.log_inserts) Logger.instanse.verbose("[" + user.username + "][" + collectionname + "] inserted " + counter + " items in database");
1131+
} catch (error) {
1132+
span.recordException(error);
1133+
throw error;
1134+
}
1135+
finally {
1136+
Logger.otel.endSpan(span);
1137+
}
1138+
return result;
1139+
}
9711140
synRawUpdateOne(collection: string, query: any, updatedoc: any, measure: boolean, cb: any) {
9721141
let ot_end: any = null;
9731142
if (measure) {

0 commit comments

Comments
 (0)