Skip to content

Commit 74f5200

Browse files
committed
add more test, minor fixes
1 parent 8425d31 commit 74f5200

19 files changed

Lines changed: 417 additions & 135 deletions

OpenFlow/src/Auth.ts

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -72,22 +72,38 @@ export class Auth {
7272
}
7373
}
7474
public static async RemoveUser(key: string, type: string): Promise<void> {
75-
await semaphore.down();
75+
await Auth.semaphore.down();
7676
if (!NoderedUtil.IsNullUndefinded(this.authorizationCache[key + type])) {
7777
Logger.instanse.silly("Delete user with key " + key + " from cache");
7878
delete this.authorizationCache[key + type];
7979
}
80-
semaphore.up();
80+
Auth.semaphore.up();
8181
}
8282
public static async AddUser(user: User, key: string, type: string): Promise<void> {
83-
await semaphore.down();
83+
await Auth.semaphore.down();
8484
if (NoderedUtil.IsNullUndefinded(this.authorizationCache[key + type])) {
8585
Logger.instanse.silly("Adding user " + user.name + " to cache with key " + key);
8686
var cuser: CachedUser = new CachedUser(user, user._id, type);
8787
this.authorizationCache[key + type] = cuser;
8888
}
89-
semaphore.up();
89+
Auth.semaphore.up();
9090
}
91+
public static Semaphore = (n) => ({
92+
n,
93+
async down() {
94+
while (this.n <= 0) await this.wait();
95+
this.n--;
96+
},
97+
up() {
98+
this.n++;
99+
},
100+
async wait() {
101+
if (this.n <= 0) return new Promise((res, req) => {
102+
setImmediate(async () => res(await this.wait()))
103+
});
104+
},
105+
});
106+
public static semaphore = Auth.Semaphore(1);
91107
}
92108
export class CachedUser {
93109
public firstsignin: Date;
@@ -102,19 +118,3 @@ export class CachedUser {
102118
interface HashTable<T> {
103119
[key: string]: T;
104120
}
105-
const Semaphore = (n) => ({
106-
n,
107-
async down() {
108-
while (this.n <= 0) await this.wait();
109-
this.n--;
110-
},
111-
up() {
112-
this.n++;
113-
},
114-
async wait() {
115-
if (this.n <= 0) return new Promise((res, req) => {
116-
setImmediate(async () => res(await this.wait()))
117-
});
118-
},
119-
});
120-
const semaphore = Semaphore(1);

OpenFlow/src/Config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export class Config {
2727
Config.log_deletes = Config.parseBoolean(Config.getEnv("log_deletes", "false"));
2828
Config.log_otel_times = Config.parseBoolean(Config.getEnv("log_otel_times", "false"));
2929
Config.log_openflow_amqp = Config.parseBoolean(Config.getEnv("log_openflow_amqp", "false"));
30+
Config.log_amqp = Config.parseBoolean(Config.getEnv("log_amqp", "true"));
3031
Config.openflow_uniqueid = Config.getEnv("openflow_uniqueid", "");
3132
Config.enable_openflow_amqp = Config.parseBoolean(Config.getEnv("enable_openflow_amqp", "false"));
3233
Config.openflow_amqp_expiration = parseInt(Config.getEnv("openflow_amqp_expiration", (60 * 1000 * 25).toString())); // 25 min
@@ -168,6 +169,7 @@ export class Config {
168169
public static log_deletes: boolean = Config.parseBoolean(Config.getEnv("log_deletes", "false"));
169170
public static log_otel_times: boolean = Config.parseBoolean(Config.getEnv("log_otel_times", "false"));
170171
public static log_openflow_amqp: boolean = Config.parseBoolean(Config.getEnv("log_openflow_amqp", "false"));
172+
public static log_amqp: boolean = Config.parseBoolean(Config.getEnv("log_amqp", "true"));
171173
public static openflow_uniqueid: string = Config.getEnv("openflow_uniqueid", "");
172174
public static enable_openflow_amqp: boolean = Config.parseBoolean(Config.getEnv("enable_openflow_amqp", "false"));
173175
public static openflow_amqp_expiration: number = parseInt(Config.getEnv("openflow_amqp_expiration", (60 * 1000 * 25).toString())); // 25 min

OpenFlow/src/Crypt.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ export class Crypt {
2121
public static async SetPassword(user: User, password: string, parent: Span): Promise<void> {
2222
const span: Span = Logger.otel.startSubSpan("Crypt.SetPassword", parent);
2323
try {
24+
if (NoderedUtil.IsNullUndefinded(user)) throw new Error("user is mandatody")
25+
if (NoderedUtil.IsNullEmpty(password)) throw new Error("password is mandatody")
2426
user.passwordhash = await Crypt.hash(password);
2527
if (!(this.ValidatePassword(user, password, span))) { throw new Error("Failed validating password after hasing"); }
2628
} catch (error) {
@@ -33,6 +35,8 @@ export class Crypt {
3335
public static async ValidatePassword(user: User, password: string, parent: Span): Promise<boolean> {
3436
const span: Span = Logger.otel.startSubSpan("Crypt.ValidatePassword", parent);
3537
try {
38+
if (NoderedUtil.IsNullUndefinded(user)) throw new Error("user is mandatody")
39+
if (NoderedUtil.IsNullEmpty(password)) throw new Error("password is mandatody")
3640
return await Crypt.compare(password, user.passwordhash, span);
3741
} catch (error) {
3842
span.recordException(error);

OpenFlow/src/DatabaseConnection.ts

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,6 @@ const jsondiffpatch = require('jsondiffpatch').create({
2424
});
2525

2626

27-
const Semaphore = (n) => ({
28-
n,
29-
async down() {
30-
while (this.n <= 0) await this.wait();
31-
this.n--;
32-
},
33-
up() {
34-
this.n++;
35-
},
36-
async wait() {
37-
if (this.n <= 0) return new Promise((res, req) => {
38-
setImmediate(async () => res(await this.wait()))
39-
});
40-
},
41-
});
4227
Object.defineProperty(Promise, 'retry', {
4328
configurable: true,
4429
writable: true,
@@ -67,6 +52,7 @@ export class DatabaseConnection {
6752
public static mongodb_replace: ValueRecorder;
6853
public static mongodb_delete: ValueRecorder;
6954
public static mongodb_deletemany: ValueRecorder;
55+
public static semaphore = Auth.Semaphore(1);
7056
constructor(mongodburl: string, dbname: string) {
7157
this._dbname = dbname;
7258
this.mongodburl = mongodburl;
@@ -586,8 +572,8 @@ export class DatabaseConnection {
586572
const baseversion = basehist[0]._version;
587573
const history = await this.query<T>({ id: id, "_version": { $gt: baseversion, $lte: version } }, null, Config.history_delta_count, 0, { _version: 1 }, collectionname + "_hist", rootjwt, undefined, undefined, span);
588574
for (let delta of history) {
589-
if (delta != null) {
590-
result = jsondiffpatch.patch(result, delta);
575+
if (delta != null && (delta as any).delta != null) {
576+
result = jsondiffpatch.patch(result, (delta as any).delta);
591577
}
592578
}
593579
}
@@ -1734,7 +1720,22 @@ export class DatabaseConnection {
17341720
Logger.otel.endSpan(span);
17351721
}
17361722
}
1737-
private static InsertOrUpdateOneSemaphore = Semaphore(1);
1723+
public static Semaphore = (n) => ({
1724+
n,
1725+
async down() {
1726+
while (this.n <= 0) await this.wait();
1727+
this.n--;
1728+
},
1729+
up() {
1730+
this.n++;
1731+
},
1732+
async wait() {
1733+
if (this.n <= 0) return new Promise((res, req) => {
1734+
setImmediate(async () => res(await this.wait()))
1735+
});
1736+
},
1737+
});
1738+
private static InsertOrUpdateOneSemaphore = DatabaseConnection.Semaphore(1);
17381739
/**
17391740
* Insert or Update depending on document allready exists.
17401741
* @param {T} item Item to insert or update
@@ -2550,7 +2551,7 @@ export class DatabaseConnection {
25502551
Logger.otel.endTimer(ot_end, DatabaseConnection.mongodb_insert, { collection: collectionname + '_hist' });
25512552
});
25522553
}
2553-
if (original != null && original._version > 0) {
2554+
if (original != null && original._version >= 0) {
25542555
delta = jsondiffpatch.diff(original, item);
25552556
if (NoderedUtil.IsNullUndefinded(delta)) return 0;
25562557
const keys = Object.keys(delta);

OpenFlow/src/Logger.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,7 @@ export class Logger {
6565
Logger.License = new _lic_require.LicenseFile();
6666
} else {
6767
Logger.License = {} as any;
68-
Logger.License.ofid = function () {
69-
if (!NoderedUtil.IsNullEmpty(this._ofid)) return this._ofid;
70-
var crypto = require('crypto');
71-
const openflow_uniqueid = Config.openflow_uniqueid || crypto.createHash('md5').update(Config.domain).digest("hex");
72-
Config.openflow_uniqueid = openflow_uniqueid;
73-
this._ofid = openflow_uniqueid;
74-
return openflow_uniqueid;
75-
};
68+
Logger.License.ofid = Logger.ofid;
7669
}
7770

7871

@@ -119,4 +112,14 @@ export class Logger {
119112
return logger;
120113
}
121114
static instanse: winston.Logger = null;
115+
private static _ofid = null;
116+
static ofid() {
117+
if (!NoderedUtil.IsNullEmpty(Logger._ofid)) return Logger._ofid;
118+
var crypto = require('crypto');
119+
const openflow_uniqueid = Config.openflow_uniqueid || crypto.createHash('md5').update(Config.domain).digest("hex");
120+
Config.openflow_uniqueid = openflow_uniqueid;
121+
Logger._ofid = openflow_uniqueid;
122+
return openflow_uniqueid;
123+
}
124+
122125
}

OpenFlow/src/Messages/Message.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { Span } from "@opentelemetry/api";
2525
import { Logger } from "../Logger";
2626
import Dockerode = require("dockerode");
2727
import { QueueClient } from "../QueueClient";
28+
import { use } from "passport";
2829
const got = require("got");
2930
const { RateLimiterMemory } = require('rate-limiter-flexible')
3031
const BaseRateLimiter = new RateLimiterMemory({
@@ -508,7 +509,8 @@ export class Message {
508509
break;
509510
case "selectcustomer":
510511
this.EnsureJWT(cli);
511-
await this.SelectCustomer(span);
512+
var user = await this.SelectCustomer(span);
513+
if (user != null) cli.user.selectedcustomerid = user.selectedcustomerid;
512514
this.ReloadUserToken(cli, span);
513515
cli.Send(this);
514516
break;
@@ -4450,7 +4452,8 @@ export class Message {
44504452
}
44514453
Logger.otel.endSpan(span);
44524454
}
4453-
async SelectCustomer(parent: Span) {
4455+
async SelectCustomer(parent: Span): Promise<TokenUser> {
4456+
let user: TokenUser = null;
44544457
this.Reply();
44554458
let msg: SelectCustomerMessage;
44564459
try {
@@ -4459,7 +4462,7 @@ export class Message {
44594462
var customer = await Config.db.getbyid<Customer>(msg.customerid, "users", this.jwt, parent)
44604463
if (customer == null) msg.customerid = null;
44614464
}
4462-
const user: TokenUser = User.assign(Crypt.verityToken(this.jwt));
4465+
user = User.assign(Crypt.verityToken(this.jwt));
44634466
if (Config.db.WellknownIdsArray.indexOf(user._id) != -1) throw new Error("Builtin entities cannot select a company")
44644467

44654468
if (NoderedUtil.IsNullEmpty(msg.customerid)) {
@@ -4487,6 +4490,7 @@ export class Message {
44874490
this.data = "";
44884491
await handleError(null, error);
44894492
}
4493+
return user;
44904494
}
44914495

44924496
}

OpenFlow/src/WebSocketServerClient.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import * as WebSocket from "ws";
22
import { SocketMessage } from "./SocketMessage";
33
import { Message, JSONfn } from "./Messages/Message";
44
import { Config } from "./Config";
5-
import { amqpwrapper, QueueMessageOptions, amqpqueue, amqpexchange } from "./amqpwrapper";
5+
import { amqpwrapper, QueueMessageOptions, amqpqueue, amqpexchange, exchangealgorithm } from "./amqpwrapper";
66
import { NoderedUtil, Base, InsertOneMessage, QueueMessage, MapReduceMessage, QueryMessage, UpdateOneMessage, UpdateManyMessage, DeleteOneMessage, User, mapFunc, reduceFunc, finalizeFunc, QueuedMessage, QueuedMessageCallback, WatchEventMessage, QueueClosedMessage, ExchangeClosedMessage } from "@openiap/openflow-api";
77
import { ChangeStream } from "mongodb";
88
import { WebSocketServer } from "./WebSocketServer";
@@ -245,7 +245,7 @@ export class WebSocketServerClient {
245245
Logger.otel.endSpan(span);
246246
}
247247
}
248-
public async RegisterExchange(exchangename: string, algorithm: "direct" | "fanout" | "topic" | "header", routingkey: string = "", parent: Span): Promise<RegisterExchangeResponse> {
248+
public async RegisterExchange(exchangename: string, algorithm: exchangealgorithm, routingkey: string = "", parent: Span): Promise<RegisterExchangeResponse> {
249249
const span: Span = Logger.otel.startSubSpan("WebSocketServerClient.CreateConsumer", parent);
250250
try {
251251
let exclusive: boolean = false; // Should we keep the queue around ? for robots and roles

0 commit comments

Comments
 (0)