Skip to content

Commit 88cc4ba

Browse files
committed
Cleanup openrpa instance workaround
1 parent 909af2d commit 88cc4ba

5 files changed

Lines changed: 88 additions & 153 deletions

File tree

OpenFlow/src/DatabaseConnection.ts

Lines changed: 72 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,22 @@ const jsondiffpatch = require('jsondiffpatch').create({
2222
});
2323

2424

25+
const Semaphore = (n) => ({
26+
n,
27+
async down() {
28+
while (this.n <= 0) await this.wait();
29+
this.n--;
30+
},
31+
up() {
32+
this.n++;
33+
},
34+
async wait() {
35+
if (this.n <= 0) return await new Promise((res, req) => {
36+
setImmediate(async () => res(await this.wait()))
37+
});
38+
return;
39+
},
40+
});
2541
Object.defineProperty(Promise, 'retry', {
2642
configurable: true,
2743
writable: true,
@@ -1354,6 +1370,7 @@ export class DatabaseConnection {
13541370
// this.traversejsondecode(item);
13551371
// return item;
13561372
}
1373+
private static InsertOrUpdateOneSemaphore = Semaphore(1);
13571374
/**
13581375
* Insert or Update depending on document allready exists.
13591376
* @param {T} item Item to insert or update
@@ -1365,152 +1382,68 @@ export class DatabaseConnection {
13651382
* @returns Promise<T>
13661383
*/
13671384
async InsertOrUpdateOne<T extends Base>(q: InsertOrUpdateOneMessage, parent: Span): Promise<InsertOrUpdateOneMessage> {
1368-
let query: any = null;
1369-
if (!NoderedUtil.IsNullEmpty(q.uniqeness)) {
1370-
query = {};
1371-
const arr = q.uniqeness.split(",");
1372-
arr.forEach(field => {
1373-
if (field.trim() !== "") {
1374-
query[field.trim()] = q.item[field.trim()];
1385+
const span: Span = Logger.otel.startSubSpan("db.InsertOrUpdateOne", parent);
1386+
try {
1387+
await DatabaseConnection.InsertOrUpdateOneSemaphore.down();
1388+
let query: any = null;
1389+
if (q.uniqeness !== null && q.uniqeness !== undefined && q.uniqeness !== "") {
1390+
query = {};
1391+
const arr = q.uniqeness.split(",");
1392+
arr.forEach(field => {
1393+
if (field.trim() !== "") {
1394+
query[field.trim()] = q.item[field.trim()];
1395+
}
1396+
});
1397+
} else {
1398+
// has no id, and no uniqeness defined, so we assume its a new item we should insert
1399+
if (q.item._id != null) {
1400+
query = { _id: q.item._id };
13751401
}
1376-
});
1377-
} else {
1378-
// has no id, and no uniqeness defined, so we assume its a new item we should insert
1379-
if (q.item._id != null) {
1380-
query = { _id: q.item._id };
13811402
}
1382-
}
1383-
const user: TokenUser = Crypt.verityToken(q.jwt);
1384-
var _query = query;
1385-
if (q.collectionname === "files") { q.collectionname = "fs.files"; }
1386-
if (q.collectionname === "fs.files") {
1387-
_query = { $and: [query, this.getbasequery(q.jwt, "metadata._acl", [Rights.read])] };
1388-
} else {
1389-
_query = { $and: [query, this.getbasequery(q.jwt, "_acl", [Rights.read])] };
1390-
}
1391-
if (q.collectionname == "openrpa_instances") {
1392-
q.item = this.ensureResource(q.item);
1393-
q.item = await this.Cleanmembers(q.item as any, null);
1394-
q.item = this.encryptentity(q.item) as T;
1395-
1396-
if (!q.item._createdby) {
1397-
q.item._created = new Date(new Date().toISOString());
1398-
q.item._createdby = user.name;
1399-
q.item._createdbyid = user._id;
1400-
}
1401-
q.item._modified = new Date(new Date().toISOString());
1402-
q.item._modifiedby = user.name;
1403-
q.item._modifiedbyid = user._id;
1404-
let roup: any = null;
1403+
const user: TokenUser = Crypt.verityToken(q.jwt);
1404+
let exists: Base[] = [];
14051405
if (query != null) {
1406-
const mongodbspan: Span = Logger.otel.startSubSpan("mongodb.replaceOne", parent);
1407-
mongodbspan.setAttribute("collection", q.collectionname);
1408-
mongodbspan.setAttribute("query", JSON.stringify(query));
1409-
const ot_end = Logger.otel.startTimer();
1410-
try {
1411-
roup = await this.db.collection(q.collectionname).replaceOne(_query, q.item, { upsert: true });
1412-
q.opresult = roup;
1413-
q.result = roup.ops[0];
1414-
} catch (error) {
1415-
console.error(error.message ? error.message : error)
1416-
mongodbspan.recordException(error);
1417-
} finally {
1418-
Logger.otel.endSpan(mongodbspan);
1419-
Logger.otel.endTimer(ot_end, DatabaseConnection.mongodb_replace, { collection: q.collectionname });
1406+
// exists = await this.query(query, { name: 1 }, 2, 0, null, q.collectionname, q.jwt);
1407+
exists = await this.query(query, null, 2, 0, null, q.collectionname, q.jwt, undefined, undefined, span);
1408+
}
1409+
if (exists.length == 1) {
1410+
q.item._id = exists[0]._id;
1411+
}
1412+
else if (exists.length > 1) {
1413+
throw JSON.stringify(query) + " is not uniqe, more than 1 item in collection matches this";
1414+
}
1415+
if (!this.hasAuthorization(user, q.item, Rights.update)) { throw new Error("Access denied, no authorization to InsertOrUpdateOne"); }
1416+
if (exists.length == 1) {
1417+
if (Config.log_updates) Logger.instanse.debug("[" + user.username + "][" + q.collectionname + "] InsertOrUpdateOne, Updating found one in database");
1418+
const uq = new UpdateOneMessage();
1419+
// uq.query = query;
1420+
uq.item = q.item; uq.collectionname = q.collectionname; uq.w = q.w; uq.j; uq.jwt = q.jwt;
1421+
const keys = Object.keys(exists[0]);
1422+
for (let i = 0; i < keys.length; i++) {
1423+
let key = keys[i];
1424+
if (key.startsWith("_")) {
1425+
if (NoderedUtil.IsNullUndefinded(uq.item[key])) uq.item[key] = exists[0][key];
1426+
}
14201427
}
1428+
const uqres = await this.UpdateOne(uq, span);
1429+
q.opresult = uqres.opresult;
1430+
q.result = uqres.result;
14211431
} else {
1422-
const mongodbspan: Span = Logger.otel.startSubSpan("mongodb.replaceOne", parent);
1423-
mongodbspan.setAttribute("collection", q.collectionname);
1424-
mongodbspan.setAttribute("query", JSON.stringify(query));
1425-
const ot_end = Logger.otel.startTimer();
1426-
try {
1427-
roup = await this.db.collection(q.collectionname).insertOne(q.item, { upsert: true });
1428-
q.opresult = roup;
1429-
q.result = roup.ops[0];
1430-
} catch (error) {
1431-
console.error(error.message ? error.message : error)
1432-
mongodbspan.recordException(error);
1433-
} finally {
1434-
Logger.otel.endSpan(mongodbspan);
1435-
Logger.otel.endTimer(ot_end, DatabaseConnection.mongodb_replace, { collection: q.collectionname });
1436-
}
1432+
if (Config.log_updates) Logger.instanse.debug("[" + user.username + "][" + q.collectionname + "] InsertOrUpdateOne, Inserting as new in database");
1433+
delete q.item._id;
1434+
q.result = await this.InsertOne(q.item, q.collectionname, q.w, q.j, q.jwt, span);
14371435
}
1438-
q.result = this.decryptentity(q.result);
1439-
DatabaseConnection.traversejsondecode(q.result);
1440-
if (query == null) {
1441-
this.SaveDiff(q.collectionname, null, q.result, parent);
1436+
if (q.collectionname === "users" && q.item._type === "role") {
1437+
DBHelper.cached_roles = [];
14421438
}
14431439
return q;
1444-
} else {
1445-
return this.InsertOrUpdateOneOld(q, parent);
1446-
}
1447-
}
1448-
/**
1449-
* Insert or Update depending on document allready exists.
1450-
* @param {T} item Item to insert or update
1451-
* @param {string} collectionname Collection containing item
1452-
* @param {string} uniqeness List of fields to combine for uniqeness
1453-
* @param {number} w Write Concern ( 0:no acknowledgment, 1:Requests acknowledgment, 2: Requests acknowledgment from 2, 3:Requests acknowledgment from 3)
1454-
* @param {boolean} j Ensure is written to the on-disk journal.
1455-
* @param {string} jwt JWT of user who is doing the update, ensuring rights
1456-
* @returns Promise<T>
1457-
*/
1458-
async InsertOrUpdateOneOld<T extends Base>(q: InsertOrUpdateOneMessage, parent: Span): Promise<InsertOrUpdateOneMessage> {
1459-
const span: Span = Logger.otel.startSubSpan("db.InsertOrUpdateOne", parent);
1460-
let query: any = null;
1461-
if (q.uniqeness !== null && q.uniqeness !== undefined && q.uniqeness !== "") {
1462-
query = {};
1463-
const arr = q.uniqeness.split(",");
1464-
arr.forEach(field => {
1465-
if (field.trim() !== "") {
1466-
query[field.trim()] = q.item[field.trim()];
1467-
}
1468-
});
1469-
} else {
1470-
// has no id, and no uniqeness defined, so we assume its a new item we should insert
1471-
if (q.item._id != null) {
1472-
query = { _id: q.item._id };
1473-
}
1474-
}
1475-
const user: TokenUser = Crypt.verityToken(q.jwt);
1476-
let exists: Base[] = [];
1477-
if (query != null) {
1478-
// exists = await this.query(query, { name: 1 }, 2, 0, null, q.collectionname, q.jwt);
1479-
exists = await this.query(query, null, 2, 0, null, q.collectionname, q.jwt, undefined, undefined, span);
1480-
}
1481-
if (exists.length == 1) {
1482-
q.item._id = exists[0]._id;
1483-
}
1484-
else if (exists.length > 1) {
1485-
throw JSON.stringify(query) + " is not uniqe, more than 1 item in collection matches this";
1486-
}
1487-
if (!this.hasAuthorization(user, q.item, Rights.update)) { throw new Error("Access denied, no authorization to InsertOrUpdateOne"); }
1488-
// if (q.item._id !== null && q.item._id !== undefined && q.item._id !== "") {
1489-
if (exists.length == 1) {
1490-
if (Config.log_updates) Logger.instanse.debug("[" + user.username + "][" + q.collectionname + "] InsertOrUpdateOne, Updating found one in database");
1491-
const uq = new UpdateOneMessage();
1492-
// uq.query = query;
1493-
uq.item = q.item; uq.collectionname = q.collectionname; uq.w = q.w; uq.j; uq.jwt = q.jwt;
1494-
const keys = Object.keys(exists[0]);
1495-
for (let i = 0; i < keys.length; i++) {
1496-
let key = keys[i];
1497-
if (key.startsWith("_")) {
1498-
if (NoderedUtil.IsNullUndefinded(uq.item[key])) uq.item[key] = exists[0][key];
1499-
}
1500-
}
1501-
const uqres = await this.UpdateOne(uq, span);
1502-
q.opresult = uqres.opresult;
1503-
q.result = uqres.result;
1504-
} else {
1505-
if (Config.log_updates) Logger.instanse.debug("[" + user.username + "][" + q.collectionname + "] InsertOrUpdateOne, Inserting as new in database");
1506-
delete q.item._id;
1507-
q.result = await this.InsertOne(q.item, q.collectionname, q.w, q.j, q.jwt, span);
1508-
}
1509-
if (q.collectionname === "users" && q.item._type === "role") {
1510-
DBHelper.cached_roles = [];
1440+
} catch (error) {
1441+
span.recordException(error);
1442+
throw error;
1443+
} finally {
1444+
DatabaseConnection.InsertOrUpdateOneSemaphore.up();
1445+
Logger.otel.endSpan(span);
15111446
}
1512-
Logger.otel.endSpan(span);
1513-
return q;
15141447
}
15151448
private async _DeleteFile(id: string): Promise<void> {
15161449
return new Promise<void>(async (resolve, reject) => {
@@ -2332,4 +2265,5 @@ export class DatabaseConnection {
23322265
Logger.otel.endSpan(span);
23332266
}
23342267
}
2335-
}
2268+
}
2269+

OpenFlow/src/Messages/Message.ts

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,6 @@ export class Message {
748748
}
749749
this.Send(cli);
750750
}
751-
752751
private async InsertOrUpdateOne(cli: WebSocketServerClient, parent: Span): Promise<void> {
753752
this.Reply();
754753
let msg: InsertOrUpdateOneMessage
@@ -757,30 +756,32 @@ export class Message {
757756
if (NoderedUtil.IsNullEmpty(msg.jwt)) { msg.jwt = cli.jwt; }
758757
if (NoderedUtil.IsNullEmpty(msg.w as any)) { msg.w = 0; }
759758
if (NoderedUtil.IsNullEmpty(msg.j as any)) { msg.j = false; }
760-
msg = await Config.db.InsertOrUpdateOne(msg, parent);
761-
if (NoderedUtil.IsNullUndefinded(this.data)) {
762-
var b = true;
759+
if (msg.collectionname == "openrpa_instances" && msg.item._type == "workflowinstance") {
760+
// Force uniqeness for workflow instances in old versions of openrpa
761+
const versionPadded = version => version.split('.').map((n, i) => n.padStart(3, '0')).join('');
762+
var version: string = versionPadded(cli.clientversion);
763+
if (cli.clientagent == "openrpa" && parseInt(version) <= parseInt("001002040000")) { // 001002040000
764+
msg.uniqeness = "InstanceId,WorkflowId";
765+
}
766+
let state: string = (msg.item as any).state;
767+
// Force removing completed states, for old versions of openrpa
768+
if (msg.item && ["aborted", "failed", "completed"].indexOf(state) > -1) {
769+
delete (msg.item as any).xml;
770+
}
763771
}
772+
msg = await Config.db.InsertOrUpdateOne(msg, parent);
764773
} catch (error) {
765774
if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; }
766775
if (error) if (msg !== null && msg !== undefined) msg.error = error.message ? error.message : error;
767776
if (!error) msg.error = "Unknown error";
768777
await handleError(cli, error);
769778
}
770-
if (NoderedUtil.IsNullUndefinded(this.data)) {
771-
var b = true;
772-
}
773-
774779
try {
775780
this.data = JSON.stringify(msg);
776781
} catch (error) {
777782
this.data = "";
778783
await handleError(cli, error);
779784
}
780-
if (NoderedUtil.IsNullUndefinded(this.data)) {
781-
var b = true;
782-
}
783-
784785
this.Send(cli);
785786
}
786787
private async DeleteOne(cli: WebSocketServerClient): Promise<void> {

OpenFlowNodeRED/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openiap/nodered",
3-
"version": "1.2.87",
3+
"version": "1.2.88",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.2.87
1+
1.2.88

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openiap/openflow",
3-
"version": "1.2.87",
3+
"version": "1.2.88",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {

0 commit comments

Comments
 (0)