Skip to content

Commit 1769d8f

Browse files
committed
Add insert many, add skip state on workflow out
1 parent 910c552 commit 1769d8f

14 files changed

Lines changed: 266 additions & 47 deletions

OpenFlow/src/Messages/Message.ts

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { Readable, Stream } from "stream";
1111
import { GridFSBucket, ObjectID, Db, Cursor, MongoNetworkError } from "mongodb";
1212
import * as path from "path";
1313
import { DatabaseConnection } from "../DatabaseConnection";
14-
import { StripeMessage, EnsureStripeCustomerMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, CreateWorkflowInstanceMessage, RegisterUserMessage, NoderedUser, WatchMessage, GetDocumentVersionMessage, DeleteManyMessage } from "openflow-api";
14+
import { StripeMessage, EnsureStripeCustomerMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, CreateWorkflowInstanceMessage, RegisterUserMessage, NoderedUser, WatchMessage, GetDocumentVersionMessage, DeleteManyMessage, InsertManyMessage } from "openflow-api";
1515
import { Billing, stripe_customer, stripe_base, stripe_list, StripeAddPlanMessage, StripeCancelPlanMessage, stripe_subscription, stripe_subscription_item, stripe_plan, stripe_coupon } from "openflow-api";
1616
import { V1ResourceRequirements, V1Deployment } from "@kubernetes/client-node";
1717
import { amqpwrapper } from "../amqpwrapper";
@@ -113,6 +113,9 @@ export class Message {
113113
case "insertone":
114114
this.InsertOne(cli);
115115
break;
116+
case "insertmany":
117+
this.InsertMany(cli);
118+
break;
116119
case "updateone":
117120
this.UpdateOne(cli);
118121
break;
@@ -373,15 +376,16 @@ export class Message {
373376
// filter out collections that are empty, or we don't have access too
374377
for (let i = 0; i < msg.result.length; i++) {
375378
const collectioname = msg.result[i].name;
376-
if (msg.result[i].name != "entities" && !cli.user.HasRoleName("admins")) {
377-
// cli._logger.debug("Check if user has objects in " + collectioname);
378-
const q = await Config.db.query({}, null, 1, 0, null, collectioname, msg.jwt);
379-
if (q.length > 0) {
380-
result.push(msg.result[i]);
381-
}
382-
} else {
383-
result.push(msg.result[i]);
384-
}
379+
// if (msg.result[i].name != "entities" && !cli.user.HasRoleName("admins")) {
380+
// // cli._logger.debug("Check if user has objects in " + collectioname);
381+
// const q = await Config.db.query({}, { _id: 1 }, 1, 0, null, collectioname, msg.jwt);
382+
// if (q.length > 0) {
383+
// result.push(msg.result[i]);
384+
// }
385+
// } else {
386+
// result.push(msg.result[i]);
387+
// }
388+
result.push(msg.result[i]);
385389
}
386390
if (result.filter(x => x.name == "entities").length == 0) {
387391
result.push({ name: "entities", type: "collection" });
@@ -550,9 +554,9 @@ export class Message {
550554
if (NoderedUtil.IsNullEmpty(msg.jwt)) { msg.jwt = cli.jwt; }
551555
if (NoderedUtil.IsNullEmpty(msg.w as any)) { msg.w = 0; }
552556
if (NoderedUtil.IsNullEmpty(msg.j as any)) { msg.j = false; }
553-
if (NoderedUtil.IsNullEmpty(msg.jwt) && msg.collectionname === "jslog") {
554-
msg.jwt = Crypt.rootToken();
555-
}
557+
// if (NoderedUtil.IsNullEmpty(msg.jwt) && msg.collectionname === "jslog") {
558+
// msg.jwt = Crypt.rootToken();
559+
// }
556560
if (NoderedUtil.IsNullEmpty(msg.jwt)) {
557561
throw new Error("jwt is null and client is not authenticated");
558562
}
@@ -570,6 +574,39 @@ export class Message {
570574
}
571575
this.Send(cli);
572576
}
577+
private async InsertMany(cli: WebSocketServerClient): Promise<void> {
578+
this.Reply();
579+
let msg: InsertManyMessage
580+
try {
581+
msg = InsertManyMessage.assign(this.data);
582+
if (NoderedUtil.IsNullEmpty(msg.jwt)) { msg.jwt = cli.jwt; }
583+
if (NoderedUtil.IsNullEmpty(msg.w as any)) { msg.w = 0; }
584+
if (NoderedUtil.IsNullEmpty(msg.j as any)) { msg.j = false; }
585+
// if (NoderedUtil.IsNullEmpty(msg.jwt) && msg.collectionname === "jslog") {
586+
// msg.jwt = Crypt.rootToken();
587+
// }
588+
if (NoderedUtil.IsNullEmpty(msg.jwt)) {
589+
throw new Error("jwt is null and client is not authenticated");
590+
}
591+
const Promises: Promise<any>[] = [];
592+
for (let i: number = 0; i < msg.items.length; i++) {
593+
Promises.push(Config.db.InsertOne(msg.items[i], msg.collectionname, msg.w, msg.j, msg.jwt));
594+
}
595+
msg.results = await Promise.all(Promises.map(p => p.catch(e => e)));
596+
if (msg.skipresults) msg.results = [];
597+
} catch (error) {
598+
if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; }
599+
if (msg !== null && msg !== undefined) msg.error = error.message ? error.message : error;
600+
cli._logger.error(error.message ? error.message : error);
601+
}
602+
try {
603+
this.data = JSON.stringify(msg);
604+
} catch (error) {
605+
this.data = "";
606+
cli._logger.error(error.message ? error.message : error);
607+
}
608+
this.Send(cli);
609+
}
573610
private async UpdateOne(cli: WebSocketServerClient): Promise<void> {
574611
this.Reply();
575612
let msg: UpdateOneMessage

OpenFlowNodeRED/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "openflow-nodered",
3-
"version": "1.1.68",
3+
"version": "1.1.69",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {
@@ -36,7 +36,7 @@
3636
"jsonwebtoken": "^8.5.1",
3737
"morgan": "^1.10.0",
3838
"node-red": "^1.2.2",
39-
"openflow-api": "^1.0.46",
39+
"openflow-api": "^1.0.49",
4040
"os-service": "^2.2.0",
4141
"passport-saml": "^1.3.4",
4242
"passport-saml-metadata": "^2.3.0",

OpenFlowNodeRED/src/nodered/nodes/api.html

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,95 @@
230230
</script>
231231

232232

233+
234+
<script type="text/x-red" data-template-name="api add many">
235+
<div class="form-row">
236+
<label><i class="fa fa-tag"></i> Type</label>
237+
<input type="text" id="node-input-entitytype" placeholder="Override new entity type">
238+
</div>
239+
<div class="form-row">
240+
<label><i class="fa fa-tag"></i> Collection</label>
241+
<input type="text" id="node-input-collection" placeholder="Collection">
242+
</div>
243+
<div class="form-row">
244+
<label><i class="fa fa-list"></i> Entity from</label>
245+
<input id="node-input-typed-inputfield" type="text" style="width: 70%">
246+
<input id="node-input-inputfield" type="hidden">
247+
</div>
248+
<div class="form-row">
249+
<label><i class="fa fa-list"></i> Result to</label>
250+
<input id="node-input-typed-resultfield" type="text" style="width: 70%">
251+
<input id="node-input-resultfield" type="hidden">
252+
</div>
253+
<div class="form-row">
254+
<label><i class="fa fa-list"></i> Write Concern</label>
255+
<select id="node-input-writeconcern">
256+
<option value="0">Requests no acknowledgment</option>
257+
<option value="majority">Requests majority</option>
258+
<option value="1">Requests acknowledgment from 1</option>
259+
<option value="2">Requests acknowledgment from 2</option>
260+
<option value="3">Requests acknowledgment from 3</option>
261+
</select>
262+
</div>
263+
<div class="form-row">
264+
<label><i class="fa fa-list"></i> Journal</label>
265+
<select id="node-input-journal">
266+
<option value="false">In memory</option>
267+
<option value="true">Written to the journal</option>
268+
</select>
269+
</div>
270+
<div class="form-row">
271+
<label><i class="fa fa-tag"></i> Skip results</label>
272+
<input type="checkbox" id="node-input-skipresults" style="width: auto;">
273+
</div>
274+
<div class="form-row">
275+
<label><i class="fa fa-tag"></i> Name</label>
276+
<input type="text" id="node-input-name" placeholder="Name">
277+
</div>
278+
</script>
279+
<script type="text/x-red" data-help-name="api add many">
280+
<p>Add payload as new entity in collection, can be an object or array of objects, returns an array of results</p>
281+
</script>
282+
<script type="text/javascript">
283+
RED.nodes.registerType('api add many', {
284+
category: 'api',
285+
color: "#DEBD5C",
286+
paletteLabel: 'add many',
287+
icon: "font-awesome/fa-plus",
288+
defaults: {
289+
name: { value: "" },
290+
entitytype: { value: "" },
291+
writeconcern: { value: "0", required: true },
292+
journal: { value: "false", required: true },
293+
collection: { value: "entities", required: true },
294+
inputfield: { value: "payload", required: true },
295+
resultfield: { value: "payload", required: true },
296+
skipresults: { value: false, required: true }
297+
},
298+
inputs: 1,
299+
outputs: 1,
300+
label: function () {
301+
return this.name || "api add many";
302+
},
303+
labelStyle: function () {
304+
return this.name ? "node_label_italic" : "";
305+
},
306+
307+
oneditprepare: function () {
308+
$("#node-input-typed-inputfield").typedInput({ types: ['msg'] });
309+
$("#node-input-typed-inputfield").typedInput('value', this.inputfield);
310+
$("#node-input-typed-resultfield").typedInput({ types: ['msg'] });
311+
$("#node-input-typed-resultfield").typedInput('value', this.resultfield);
312+
},
313+
oneditsave: function () {
314+
$("#node-input-inputfield").val($("#node-input-typed-inputfield").typedInput('value'));
315+
$("#node-input-resultfield").val($("#node-input-typed-resultfield").typedInput('value'));
316+
}
317+
318+
});
319+
320+
</script>
321+
233322
<script type="text/x-red" data-template-name="api update">
234323
<div class="form-row">
235324
<label><i class="fa fa-tag"></i> Type</label>

OpenFlowNodeRED/src/nodered/nodes/api.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export = function (RED: Red) {
1414
RED.nodes.registerType("api get jwt", api.api_get_jwt);
1515
RED.nodes.registerType("api get", api.api_get);
1616
RED.nodes.registerType("api add", api.api_add);
17+
RED.nodes.registerType("api add many", api.api_addmany);
1718
RED.nodes.registerType("api update", api.api_update);
1819
RED.nodes.registerType("api addorupdate", api.api_addorupdate);
1920
RED.nodes.registerType("api delete", api.api_delete);

OpenFlowNodeRED/src/nodered/nodes/api_nodes.ts

Lines changed: 81 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,6 @@ export class api_add {
240240
if (!NoderedUtil.IsNullEmpty(msg.resultfield)) { this.config.resultfield = msg.resultfield; }
241241
if (!NoderedUtil.IsNullEmpty(msg.writeconcern)) { this.config.writeconcern = msg.writeconcern; }
242242
if (!NoderedUtil.IsNullEmpty(msg.journal)) { this.config.journal = msg.journal; }
243-
// if (NoderedUtil.IsNullEmpty(msg.jwt) && !NoderedUtil.IsNullEmpty(Config.jwt)) {
244-
// msg.jwt = Config.jwt;
245-
// }
246-
247-
248243
if ((this.config.writeconcern as any) === undefined || (this.config.writeconcern as any) === null) this.config.writeconcern = 0;
249244
if ((this.config.journal as any) === undefined || (this.config.journal as any) === null) this.config.journal = false;
250245

@@ -255,16 +250,23 @@ export class api_add {
255250
if (data.length === 0) { this.node.warn("input array is empty"); }
256251
} else { this.node.warn("Input data is null"); }
257252

258-
this.node.status({ fill: "blue", shape: "dot", text: "Inserting items" });
259-
const Promises: Promise<any>[] = [];
260-
for (let i: number = 0; i < data.length; i++) {
261-
const element: any = data[i];
262-
if (!NoderedUtil.IsNullEmpty(this.config.entitytype)) {
263-
element._type = this.config.entitytype;
253+
this.node.status({ fill: "blue", shape: "dot", text: "processing " + data.length + " items" });
254+
let Promises: Promise<any>[] = [];
255+
let results: any[] = [];
256+
for (let y: number = 0; y < data.length; y += 50) {
257+
for (let i: number = y; i < (y + 50) && i < data.length; i++) {
258+
const element: any = data[i];
259+
if (!NoderedUtil.IsNullEmpty(this.config.entitytype)) {
260+
element._type = this.config.entitytype;
261+
}
262+
Promises.push(NoderedUtil.InsertOne(this.config.collection, element, this.config.writeconcern, this.config.journal, msg.jwt));
264263
}
265-
Promises.push(NoderedUtil.InsertOne(this.config.collection, element, this.config.writeconcern, this.config.journal, msg.jwt));
264+
this.node.status({ fill: "blue", shape: "dot", text: y + " to " + (y + 49) + " of " + data.length });
265+
const tempresults = await Promise.all(Promises.map(p => p.catch(e => e)));
266+
results = results.concat(tempresults);
267+
Promises = [];
266268
}
267-
data = await Promise.all(Promises.map(p => p.catch(e => e)));
269+
data = results;
268270

269271
const errors = data.filter(result => NoderedUtil.IsString(result) || (result instanceof Error));
270272
if (errors.length > 0) {
@@ -285,6 +287,72 @@ export class api_add {
285287
}
286288

287289

290+
export interface Iapi_addmany {
291+
entitytype: string;
292+
collection: string;
293+
inputfield: string;
294+
resultfield: string;
295+
writeconcern: number;
296+
skipresults: boolean;
297+
journal: boolean;
298+
}
299+
export class api_addmany {
300+
public node: Red = null;
301+
302+
constructor(public config: Iapi_addmany) {
303+
RED.nodes.createNode(this, config);
304+
this.node = this;
305+
this.node.status({});
306+
this.node.on("input", this.oninput);
307+
this.node.on("close", this.onclose);
308+
}
309+
async oninput(msg: any) {
310+
try {
311+
this.node.status({});
312+
// if (NoderedUtil.IsNullEmpty(msg.jwt)) { return NoderedUtil.HandleError(this, "Missing jwt token"); }
313+
314+
if (!NoderedUtil.IsNullEmpty(msg.entitytype)) { this.config.entitytype = msg.entitytype; }
315+
if (!NoderedUtil.IsNullEmpty(msg.collection)) { this.config.collection = msg.collection; }
316+
if (!NoderedUtil.IsNullEmpty(msg.inputfield)) { this.config.inputfield = msg.inputfield; }
317+
if (!NoderedUtil.IsNullEmpty(msg.resultfield)) { this.config.resultfield = msg.resultfield; }
318+
if (!NoderedUtil.IsNullEmpty(msg.writeconcern)) { this.config.writeconcern = msg.writeconcern; }
319+
if (!NoderedUtil.IsNullEmpty(msg.journal)) { this.config.journal = msg.journal; }
320+
if ((this.config.writeconcern as any) === undefined || (this.config.writeconcern as any) === null) this.config.writeconcern = 0;
321+
if ((this.config.journal as any) === undefined || (this.config.journal as any) === null) this.config.journal = false;
322+
323+
let data: any[] = [];
324+
const _data = NoderedUtil.FetchFromObject(msg, this.config.inputfield);
325+
if (!NoderedUtil.IsNullUndefinded(_data)) {
326+
if (!Array.isArray(_data)) { data.push(_data); } else { data = _data; }
327+
if (data.length === 0) { this.node.warn("input array is empty"); }
328+
} else { this.node.warn("Input data is null"); }
329+
330+
this.node.status({ fill: "blue", shape: "dot", text: "processing " + data.length + " items" });
331+
let results: any[] = [];
332+
for (let y: number = 0; y < data.length; y += 50) {
333+
let subitems: any[] = [];
334+
for (let i: number = y; i < (y + 50) && i < data.length; i++) {
335+
const element: any = data[i];
336+
if (!NoderedUtil.IsNullEmpty(this.config.entitytype)) {
337+
element._type = this.config.entitytype;
338+
}
339+
subitems.push(element);
340+
}
341+
this.node.status({ fill: "blue", shape: "dot", text: y + " to " + (y + 49) + " of " + data.length });
342+
results.push(await NoderedUtil.InsertMany(this.config.collection, subitems, this.config.writeconcern, this.config.journal, this.config.skipresults, msg.jwt));
343+
}
344+
data = results;
345+
NoderedUtil.saveToObject(msg, this.config.resultfield, data);
346+
this.node.send(msg);
347+
this.node.status({});
348+
} catch (error) {
349+
NoderedUtil.HandleError(this, error);
350+
}
351+
}
352+
onclose() {
353+
}
354+
}
355+
288356

289357

290358
export interface Iapi_update {

OpenFlowNodeRED/src/nodered/nodes/workflow.html

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@
6565
</select>
6666
<input id="node-input-form" type="hidden">
6767
</div>
68+
<div class="form-row">
69+
<label><i class="fa fa-tag"></i> Remove state data</label>
70+
<input type="checkbox" id="node-input-removestate" style="width: auto;">
71+
</div>
6872
<div class="form-row">
6973
<label ><i class="fa fa-tag"></i> Name</label>
7074
<input type="text" id="node-input-name" placeholder="Node name">
@@ -93,7 +97,8 @@
9397
defaults: {
9498
name: { value: "" },
9599
form: { value: "" },
96-
state: { value: "completed", required: true }
100+
state: { value: "completed", required: true },
101+
removestate: { value: false, required: true }
97102
},
98103
inputs: 1,
99104
outputs: 1,

0 commit comments

Comments
 (0)