Skip to content

Commit 8d4e895

Browse files
committed
add validate_emails_disposable option
1 parent 349f8bd commit 8d4e895

7 files changed

Lines changed: 161 additions & 28 deletions

File tree

OpenFlow/src/Config.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ export class Config {
8989
Config.smtp_pass = Config.getEnv("smtp_service", "");
9090
Config.smtp_url = Config.getEnv("smtp_url", "");
9191
Config.debounce_lookup = Config.parseBoolean(Config.getEnv("debounce_lookup", "false"));
92+
Config.validate_emails_disposable = Config.parseBoolean(Config.getEnv("validate_emails_disposable", "false"));
93+
9294

9395

9496
Config.tls_crt = Config.getEnv("tls_crt", "");
@@ -279,6 +281,7 @@ export class Config {
279281
public static smtp_pass: string = Config.getEnv("smtp_pass", "");
280282
public static smtp_url: string = Config.getEnv("smtp_url", "");
281283
public static debounce_lookup: boolean = Config.parseBoolean(Config.getEnv("debounce_lookup", "false"));
284+
public static validate_emails_disposable: boolean = Config.parseBoolean(Config.getEnv("validate_emails_disposable", "false"));
282285

283286
public static tls_crt: string = Config.getEnv("tls_crt", "");
284287
public static tls_key: string = Config.getEnv("tls_key", "");

OpenFlow/src/DBHelper.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ export class DBHelper {
257257
let item = await this.memoryCache.wrap("queuename_" + name, () => {
258258
if (jwt === null || jwt == undefined || jwt == "") { jwt = Crypt.rootToken(); }
259259
Logger.instanse.debug("DBHelper", "FindQueueByName", "Add queue to cache : " + name);
260-
return Config.db.getbyname<User>(name, "mq", jwt, true, span);
260+
return Config.db.GetOne<User>({ query: { name }, collectionname: "mq", jwt }, span);
261261
});
262262
if (NoderedUtil.IsNullUndefinded(item)) return null;
263263
return this.DecorateWithRoles(User.assign(item), span);
@@ -295,7 +295,7 @@ export class DBHelper {
295295
let item = await this.memoryCache.wrap("exchangename_" + name, () => {
296296
if (jwt === null || jwt == undefined || jwt == "") { jwt = Crypt.rootToken(); }
297297
Logger.instanse.debug("DBHelper", "FindExchangeByName", "Add exchange to cache : " + name);
298-
return Config.db.getbyname<User>(name, "mq", jwt, true, span);
298+
return Config.db.GetOne<User>({ query: { name }, collectionname: "mq", jwt }, span);
299299
});
300300
if (NoderedUtil.IsNullUndefinded(item)) return null;
301301
return this.DecorateWithRoles(User.assign(item), span);
@@ -344,6 +344,29 @@ export class DBHelper {
344344
Logger.otel.endSpan(span);
345345
}
346346
}
347+
public async GetDisposableDomain(domain: string, parent: Span): Promise<Base> {
348+
await this.init();
349+
if (domain.indexOf("@") > -1) {
350+
domain = domain.substr(domain.indexOf("@") + 1);
351+
}
352+
const span: Span = Logger.otel.startSubSpan("dbhelper.FindByUsername", parent);
353+
try {
354+
if (NoderedUtil.IsNullEmpty(domain)) return null;
355+
let item = await this.memoryCache.wrap("disposable_" + domain, () => {
356+
const jwt = Crypt.rootToken();
357+
Logger.instanse.debug("DBHelper", "IsDisposableDomain", "Add to cache : " + domain);
358+
const query = { name: domain, "_type": "disposable" };
359+
return Config.db.GetOne<Base>({ query, collectionname: "domains", jwt }, span);
360+
});
361+
if (NoderedUtil.IsNullUndefinded(item)) return null;
362+
return item;
363+
} catch (error) {
364+
span?.recordException(error);
365+
throw error;
366+
} finally {
367+
Logger.otel.endSpan(span);
368+
}
369+
}
347370
public async FindByUsernameOrFederationid(username: string, issuer: string, parent: Span): Promise<User> {
348371
await this.init();
349372
const span: Span = Logger.otel.startSubSpan("dbhelper.FindByUsername", parent);

OpenFlow/src/DatabaseConnection.ts

Lines changed: 72 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -958,18 +958,21 @@ export class DatabaseConnection extends events.EventEmitter {
958958
Logger.otel.endSpan(span);
959959
}
960960
}
961-
/**
961+
/**
962962
* Get a single item based on id
963963
* @param {string} id Id to search for
964964
* @param {string} collectionname Collection to search
965965
* @param {string} jwt JWT of user who is making the query, to limit results based on permissions
966966
* @returns Promise<T>
967967
*/
968-
async getbyid<T extends Base>(id: string, collectionname: string, jwt: string, decrypt: boolean, parent: Span): Promise<T> {
969-
const span: Span = Logger.otel.startSubSpan("db.getbyid", parent);
968+
async GetOne<T extends Base>(options: { query?: object, collectionname: string, orderby?: object, jwt?: string, decrypt?: boolean }, parent: Span): Promise<T> {
969+
const span: Span = Logger.otel.startSubSpan("db.GetOne", parent);
970+
if (NoderedUtil.IsNullUndefinded(options.jwt)) options.jwt = Crypt.rootToken();
971+
if (NoderedUtil.IsNullUndefinded(options.decrypt)) options.decrypt = true;
972+
if (NoderedUtil.IsNullUndefinded(options.query)) options.query = {};
973+
const { query, collectionname, orderby, jwt, decrypt } = options;
970974
try {
971-
if (id === null || id === undefined || id === "") { throw Error("Id cannot be null"); }
972-
const arr: T[] = await this.query<T>({ query: { _id: id }, top: 1, collectionname, jwt, decrypt }, span);
975+
const arr: T[] = await this.query<T>({ query, collectionname, orderby, jwt, decrypt }, span);
973976
if (arr === null || arr.length === 0) { return null; }
974977
return arr[0];
975978
} catch (error) {
@@ -980,19 +983,18 @@ export class DatabaseConnection extends events.EventEmitter {
980983
}
981984
}
982985
/**
983-
* Get a single item based on name
984-
* @param {string} name Name to search for
986+
* Get a single item based on id
987+
* @param {string} id Id to search for
985988
* @param {string} collectionname Collection to search
986989
* @param {string} jwt JWT of user who is making the query, to limit results based on permissions
987990
* @returns Promise<T>
988991
*/
989-
async getbyname<T extends Base>(name: string, collectionname: string, jwt: string, decrypt: boolean, parent: Span): Promise<T> {
992+
async getbyid<T extends Base>(id: string, collectionname: string, jwt: string, decrypt: boolean, parent: Span): Promise<T> {
990993
const span: Span = Logger.otel.startSubSpan("db.getbyid", parent);
991994
try {
992-
if (name === null || name === undefined || name === "") { throw Error("Name cannot be null"); }
993-
const arr: T[] = await this.query<T>({ query: { name }, top: 1, collectionname, jwt, decrypt }, span);
994-
if (arr === null || arr.length === 0) { return null; }
995-
return arr[0];
995+
if (id === null || id === undefined || id === "") { throw Error("Id cannot be null"); }
996+
const query = { _id: id };
997+
return this.GetOne({ query, collectionname, jwt, decrypt }, span)
996998
} catch (error) {
997999
span?.recordException(error);
9981000
throw error;
@@ -1259,7 +1261,7 @@ export class DatabaseConnection extends events.EventEmitter {
12591261
if (user.dblocked && !user.HasRoleName("admins")) throw new Error("Access denied (db locked) could be due to hitting quota limit for " + user.username);
12601262
span?.addEvent("ensureResource");
12611263
item = this.ensureResource(item, collectionname);
1262-
if (!await this.CheckEntityRestriction(user, collectionname, item, span)) {
1264+
if (user._id != WellknownIds.root && !await this.CheckEntityRestriction(user, collectionname, item, span)) {
12631265
throw Error("Create " + item._type + " access denied");
12641266
}
12651267
span?.addEvent("traversejsonencode");
@@ -1882,7 +1884,7 @@ export class DatabaseConnection extends events.EventEmitter {
18821884
q.item._version = original._version;
18831885
}
18841886
q.item = this.ensureResource(q.item, q.collectionname);
1885-
if (original._type != q.item._type && !await this.CheckEntityRestriction(user, q.collectionname, q.item, span)) {
1887+
if (user._id != WellknownIds.root && original._type != q.item._type && !await this.CheckEntityRestriction(user, q.collectionname, q.item, span)) {
18861888
throw Error("Create " + q.item._type + " access denied");
18871889
}
18881890
const hasUser: Ace = q.item._acl.find(e => e._id === user._id);
@@ -2286,6 +2288,9 @@ export class DatabaseConnection extends events.EventEmitter {
22862288
});
22872289
},
22882290
});
2291+
async InsertOrUpdateOne<T extends Base>(q: InsertOrUpdateOneMessage, parent: Span): Promise<InsertOrUpdateOneMessage> {
2292+
return this._InsertOrUpdateOne(q, parent);
2293+
}
22892294
private static InsertOrUpdateOneSemaphore = DatabaseConnection.Semaphore(1);
22902295
/**
22912296
* Insert or Update depending on document allready exists.
@@ -2297,7 +2302,7 @@ export class DatabaseConnection extends events.EventEmitter {
22972302
* @param {string} jwt JWT of user who is doing the update, ensuring rights
22982303
* @returns Promise<T>
22992304
*/
2300-
async InsertOrUpdateOne<T extends Base>(q: InsertOrUpdateOneMessage, parent: Span): Promise<InsertOrUpdateOneMessage> {
2305+
async _InsertOrUpdateOne<T extends Base>(q: InsertOrUpdateOneMessage, parent: Span): Promise<InsertOrUpdateOneMessage> {
23012306
const span: Span = Logger.otel.startSubSpan("db.InsertOrUpdateOne", parent);
23022307
try {
23032308
await DatabaseConnection.InsertOrUpdateOneSemaphore.down();
@@ -2316,7 +2321,13 @@ export class DatabaseConnection extends events.EventEmitter {
23162321
query = { _id: q.item._id };
23172322
}
23182323
}
2319-
const user: TokenUser = await Crypt.verityToken(q.jwt);
2324+
let user: TokenUser = (q as any).user;
2325+
if (NoderedUtil.IsNullUndefinded(user)) {
2326+
user = await Crypt.verityToken(q.jwt);
2327+
} else {
2328+
delete (q as any).user;
2329+
}
2330+
23202331
if (user.dblocked && !user.HasRoleName("admins")) throw new Error("Access denied (db locked) could be due to hitting quota limit for " + user.username);
23212332
let exists: Base[] = [];
23222333
if (query != null) {
@@ -2375,6 +2386,51 @@ export class DatabaseConnection extends events.EventEmitter {
23752386
Logger.otel.endSpan(span);
23762387
}
23772388
}
2389+
async InsertOrUpdateMany<T extends Base>(items: T[], collectionname: string, uniqeness: string, skipresults: boolean, w: number, j: boolean, jwt: string, parent: Span): Promise<T[]> {
2390+
const span: Span = Logger.otel.startSubSpan("db.InsertOrUpdateMany", parent);
2391+
let result: T[] = [];
2392+
try {
2393+
if (NoderedUtil.IsNullUndefinded(items) || items.length == 0) { throw Error("Cannot create null item"); }
2394+
if (NoderedUtil.IsNullEmpty(jwt)) {
2395+
throw new Error("jwt is null");
2396+
}
2397+
await this.connect(span);
2398+
const user = await Crypt.verityToken(jwt);
2399+
if (user.dblocked && !user.HasRoleName("admins")) throw new Error("Access denied (db locked) could be due to hitting quota limit for " + user.username);
2400+
span?.setAttribute("collection", collectionname);
2401+
span?.setAttribute("username", user.username);
2402+
let inserted = 0;
2403+
let updated = 0;
2404+
for (var i = 0; i < items.length; i++) {
2405+
const item = items[i];
2406+
var q = new InsertOrUpdateOneMessage();
2407+
(q as any).user = user;
2408+
q.collectionname = collectionname; q.item = item; q.j = j;
2409+
q.w = w; q.jwt = jwt; q.uniqeness = uniqeness;
2410+
var res = await this._InsertOrUpdateOne(q, span);
2411+
if (res && res.opresult) {
2412+
if (res.opresult.modifiedCount == 1) {
2413+
updated++;
2414+
} else {
2415+
inserted++;
2416+
}
2417+
} else {
2418+
if (item._id != res.result._id) inserted++;
2419+
if (item._id == res.result._id) updated++;
2420+
}
2421+
if (!skipresults) result.push(res.result);
2422+
}
2423+
Logger.instanse.info("DatabaseConnection", "InsertOrUpdateMany", "[" + user.username + "][" + collectionname + "] inserted " + inserted + " items and updated " + updated + " items in database");
2424+
} catch (error) {
2425+
Logger.instanse.error("DatabaseConnection", "InsertOrUpdateMany", error);
2426+
span?.recordException(error);
2427+
throw error;
2428+
}
2429+
finally {
2430+
Logger.otel.endSpan(span);
2431+
}
2432+
return result;
2433+
}
23782434
private async _DeleteFile(id: string): Promise<void> {
23792435
return new Promise<void>(async (resolve, reject) => {
23802436
try {

OpenFlow/src/LoginProvider.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,12 @@ export class LoginProvider {
690690
throw new Error("Please use a valid and non temporary email address");
691691
}
692692
}
693+
if (Config.validate_emails_disposable) {
694+
var domain = Logger.DBHelper.GetDisposableDomain(email, span);
695+
if (domain != null) {
696+
throw new Error("Please use a valid and non temporary email address");
697+
}
698+
}
693699
}
694700

695701
if (email.indexOf("@") > -1) {

OpenFlow/src/Messages/Message.ts

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { Readable, Stream } from "stream";
1010
import { GridFSBucket, ObjectID, Cursor } from "mongodb";
1111
import * as path from "path";
1212
import { DatabaseConnection } from "../DatabaseConnection";
13-
import { StripeMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, NoderedUser, WatchMessage, GetDocumentVersionMessage, DeleteManyMessage, InsertManyMessage, RegisterExchangeMessage, EnsureCustomerMessage, Customer, stripe_tax_id, Role, SelectCustomerMessage, Rolemember, ResourceUsage, Resource, ResourceVariant, stripe_subscription, GetNextInvoiceMessage, stripe_invoice, stripe_price, stripe_plan, stripe_invoice_line, GetKubeNodeLabelsMessage, CreateWorkflowInstanceMessage, WorkitemFile } from "@openiap/openflow-api";
13+
import { StripeMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, NoderedUser, WatchMessage, GetDocumentVersionMessage, DeleteManyMessage, InsertManyMessage, RegisterExchangeMessage, EnsureCustomerMessage, Customer, stripe_tax_id, Role, SelectCustomerMessage, Rolemember, ResourceUsage, Resource, ResourceVariant, stripe_subscription, GetNextInvoiceMessage, stripe_invoice, stripe_price, stripe_plan, stripe_invoice_line, GetKubeNodeLabelsMessage, CreateWorkflowInstanceMessage, WorkitemFile, InsertOrUpdateManyMessage } from "@openiap/openflow-api";
1414
import { stripe_customer, stripe_list, StripeAddPlanMessage, StripeCancelPlanMessage, stripe_subscription_item, stripe_coupon } from "@openiap/openflow-api";
1515
import { amqpwrapper, QueueMessageOptions } from "../amqpwrapper";
1616
import { WebSocketServerClient } from "../WebSocketServerClient";
@@ -98,6 +98,9 @@ export class Message {
9898
case "insertorupdateone":
9999
await this.InsertOrUpdateOne(span);
100100
break;
101+
case "Insertorupdatemany":
102+
await this.InsertOrUpdateMany(span);
103+
break;
101104
case "deleteone":
102105
await this.DeleteOne(span);
103106
break;
@@ -393,6 +396,18 @@ export class Message {
393396
cli.Send(this);
394397
}
395398
break;
399+
case "insertorupdatemany":
400+
if (!this.EnsureJWT(cli)) {
401+
Logger.instanse.debug("Message", "Process", "Discard " + command + " due to missing jwt, and respond with error, for client at " + cli.remoteip + " " + cli.clientagent + " " + cli.clientversion);
402+
break;
403+
}
404+
if (Config.enable_openflow_amqp) {
405+
cli.Send(await QueueClient.SendForProcessing(this, this.priority));
406+
} else {
407+
await this.InsertOrUpdateMany(span);
408+
cli.Send(this);
409+
}
410+
break;
396411
case "deleteone":
397412
if (!this.EnsureJWT(cli)) {
398413
Logger.instanse.debug("Message", "Process", "Discard " + command + " due to missing jwt, and respond with error, for client at " + cli.remoteip + " " + cli.clientagent + " " + cli.clientversion);
@@ -1504,6 +1519,36 @@ export class Message {
15041519
await handleError(null, error);
15051520
}
15061521
}
1522+
private async InsertOrUpdateMany(parent: Span): Promise<void> {
1523+
this.Reply();
1524+
const span: Span = Logger.otel.startSubSpan("message.InsertOrUpdateMany", parent);
1525+
let msg: InsertOrUpdateManyMessage
1526+
try {
1527+
msg = InsertOrUpdateManyMessage.assign(this.data);
1528+
if (NoderedUtil.IsNullEmpty(msg.jwt)) { msg.jwt = this.jwt; }
1529+
if (NoderedUtil.IsNullEmpty(msg.w as any)) { msg.w = 0; }
1530+
if (NoderedUtil.IsNullEmpty(msg.j as any)) { msg.j = false; }
1531+
if (NoderedUtil.IsNullEmpty(msg.jwt)) {
1532+
throw new Error("jwt is null and client is not authenticated");
1533+
}
1534+
msg.results = await Config.db.InsertOrUpdateMany(msg.items, msg.collectionname, msg.uniqeness, msg.skipresults, msg.w, msg.j, msg.jwt, span);
1535+
if (msg.skipresults) msg.results = [];
1536+
delete msg.items;
1537+
} catch (error) {
1538+
span?.recordException(error);
1539+
if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; }
1540+
if (msg !== null && msg !== undefined) msg.error = error.message ? error.message : error;
1541+
await handleError(null, error);
1542+
}
1543+
try {
1544+
this.data = JSON.stringify(msg);
1545+
} catch (error) {
1546+
span?.recordException(error);
1547+
this.data = "";
1548+
await handleError(null, error);
1549+
}
1550+
Logger.otel.endSpan(span);
1551+
}
15071552
private async DeleteOne(parent: Span): Promise<void> {
15081553
this.Reply();
15091554
let msg: DeleteOneMessage

0 commit comments

Comments
 (0)