Skip to content

Commit 9fbbb68

Browse files
committed
Add writeconcern support in nodered
1 parent 1a2c947 commit 9fbbb68

6 files changed

Lines changed: 209 additions & 137 deletions

File tree

OpenFlow/src/DatabaseConnection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ export class DatabaseConnection {
201201
(item as any).passwordhash = await Crypt.hash((item as any).newpassword);
202202
delete (item as any).newpassword;
203203
}
204-
var options = { writeConcern: { w: w, j: j } };
204+
var options = { writeConcern: { w: parseInt((w as any)), j: j } };
205205
var result: InsertOneWriteOpResult = await this.db.collection(collectionname).insertOne(item, options);
206206
item = result.ops[0];
207207

@@ -267,7 +267,7 @@ export class DatabaseConnection {
267267
delete (item as any).newpassword;
268268
}
269269
this._logger.debug("updating " + (item.name || item._name) + " in database");
270-
var options = { writeConcern: { w: w, j: j } };
270+
var options = { writeConcern: { w: parseInt((w as any)), j: j } };
271271
await this.db.collection(collectionname).replaceOne({ _id: item._id }, item, options);
272272
this.traversejsondecode(item);
273273
return item;

OpenFlowNodeRED/src/Message.ts

Lines changed: 106 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,33 @@ import { WebSocketClient, QueuedMessage } from "./WebSocketClient";
22

33
function isNumber(value: string | number): boolean {
44
return ((value != null) && !isNaN(Number(value.toString())));
5-
}
5+
}
66
export class SocketMessage {
77
public id: string;
88
public replyto: string;
99
public command: string;
1010
public data: string;
1111
public count: number;
1212
public index: number;
13-
public static fromjson(json: string):SocketMessage {
14-
let result:SocketMessage = new SocketMessage();
15-
let obj:any = JSON.parse(json);
13+
public static fromjson(json: string): SocketMessage {
14+
let result: SocketMessage = new SocketMessage();
15+
let obj: any = JSON.parse(json);
1616
result.command = obj.command;
1717
result.id = obj.id;
1818
result.replyto = obj.replyto;
1919
result.count = 1;
2020
result.index = 0;
2121
result.data = obj.data;
22-
if(isNumber(obj.count) ) { result.count = obj.count; }
23-
if(isNumber(obj.index) ) { result.index = obj.index; }
24-
if(result.id === null || result.id === undefined || result.id === "") {
22+
if (isNumber(obj.count)) { result.count = obj.count; }
23+
if (isNumber(obj.index)) { result.index = obj.index; }
24+
if (result.id === null || result.id === undefined || result.id === "") {
2525
// result.id = crypto.randomBytes(16).toString("hex");
2626
result.id = Math.random().toString(36).substr(2, 9);
2727
}
2828
return result;
2929
}
30-
public static frommessage(msg: Message, data: string, count: number, index: number):SocketMessage {
31-
var result:SocketMessage = new SocketMessage();
30+
public static frommessage(msg: Message, data: string, count: number, index: number): SocketMessage {
31+
var result: SocketMessage = new SocketMessage();
3232
result.id = msg.id;
3333
result.replyto = msg.replyto;
3434
result.command = msg.command;
@@ -37,8 +37,8 @@ export class SocketMessage {
3737
result.data = data;
3838
return result;
3939
}
40-
public static fromcommand(command: string):SocketMessage {
41-
var result:SocketMessage = new SocketMessage();
40+
public static fromcommand(command: string): SocketMessage {
41+
var result: SocketMessage = new SocketMessage();
4242
result.command = command;
4343
result.count = 1;
4444
result.index = 0;
@@ -50,31 +50,31 @@ export class SocketMessage {
5050
export class SigninMessage {
5151
public error: string;
5252

53-
public validate_only:boolean = false;
54-
public username:string;
55-
public password:string;
56-
public user:TokenUser;
57-
public jwt:string;
58-
static assign(o:any):SigninMessage {
53+
public validate_only: boolean = false;
54+
public username: string;
55+
public password: string;
56+
public user: TokenUser;
57+
public jwt: string;
58+
static assign(o: any): SigninMessage {
5959
if (typeof o === "string" || o instanceof String) {
6060
return Object.assign(new SigninMessage(), JSON.parse(o.toString()));
6161
}
6262
return Object.assign(new SigninMessage(), o);
6363
}
6464
}
6565
export class TokenUser {
66-
_type:string;
67-
_id:string;
68-
name:string;
69-
username:string;
70-
roles:Rolemember[] = [];
71-
static assign<T>(o:T): T {
72-
var newo:TokenUser = new TokenUser();
66+
_type: string;
67+
_id: string;
68+
name: string;
69+
username: string;
70+
roles: Rolemember[] = [];
71+
static assign<T>(o: T): T {
72+
var newo: TokenUser = new TokenUser();
7373
return Object.assign(newo, o);
7474
}
7575
}
7676
export class Rolemember {
77-
constructor(name:string, _id:string) {
77+
constructor(name: string, _id: string) {
7878
this.name = name;
7979
this._id = _id;
8080
}
@@ -85,14 +85,14 @@ export class QueryMessage {
8585
public error: string;
8686
public jwt: string;
8787

88-
public query:any;
89-
public projection:Object;
90-
public top:number;
91-
public skip:number;
92-
public orderby:Object | string;
93-
public collectionname:string;
94-
public result:any[];
95-
static assign(o:any):QueryMessage {
88+
public query: any;
89+
public projection: Object;
90+
public top: number;
91+
public skip: number;
92+
public orderby: Object | string;
93+
public collectionname: string;
94+
public result: any[];
95+
static assign(o: any): QueryMessage {
9696
if (typeof o === "string" || o instanceof String) {
9797
return Object.assign(new QueryMessage(), JSON.parse(o.toString()));
9898
}
@@ -102,11 +102,11 @@ export class QueryMessage {
102102
export class AggregateMessage {
103103
public error: string;
104104
public jwt: string;
105-
106-
public aggregates:object[];
107-
public collectionname:string;
108-
public result:any[];
109-
static assign(o:any):AggregateMessage {
105+
106+
public aggregates: object[];
107+
public collectionname: string;
108+
public result: any[];
109+
static assign(o: any): AggregateMessage {
110110
if (typeof o === "string" || o instanceof String) {
111111
return Object.assign(new AggregateMessage(), JSON.parse(o.toString()));
112112
}
@@ -117,10 +117,17 @@ export class InsertOneMessage {
117117
public error: string;
118118
public jwt: string;
119119

120-
public item:any;
121-
public collectionname:string;
122-
public result:any;
123-
static assign(o:any):InsertOneMessage {
120+
// w: 1 - Requests acknowledgment that the write operation has propagated
121+
// w: 0 - Requests no acknowledgment of the write operation
122+
// w: 2 would require acknowledgment from the primary and one of the secondaries
123+
// w: 3 would require acknowledgment from the primary and both secondaries
124+
public w: number;
125+
// true, requests acknowledgment that the mongod instances have written to the on-disk journal
126+
public j: boolean;
127+
public item: any;
128+
public collectionname: string;
129+
public result: any;
130+
static assign(o: any): InsertOneMessage {
124131
if (typeof o === "string" || o instanceof String) {
125132
return Object.assign(new InsertOneMessage(), JSON.parse(o.toString()));
126133
}
@@ -131,10 +138,17 @@ export class UpdateOneMessage {
131138
public error: string;
132139
public jwt: string;
133140

134-
public item:object;
135-
public collectionname:string;
136-
public result:any;
137-
static assign(o:any):UpdateOneMessage {
141+
// w: 1 - Requests acknowledgment that the write operation has propagated
142+
// w: 0 - Requests no acknowledgment of the write operation
143+
// w: 2 would require acknowledgment from the primary and one of the secondaries
144+
// w: 3 would require acknowledgment from the primary and both secondaries
145+
public w: number;
146+
// true, requests acknowledgment that the mongod instances have written to the on-disk journal
147+
public j: boolean;
148+
public item: object;
149+
public collectionname: string;
150+
public result: any;
151+
static assign(o: any): UpdateOneMessage {
138152
if (typeof o === "string" || o instanceof String) {
139153
return Object.assign(new UpdateOneMessage(), JSON.parse(o.toString()));
140154
}
@@ -145,11 +159,18 @@ export class InsertOrUpdateOneMessage {
145159
public error: string;
146160
public jwt: string;
147161

148-
public item:object;
149-
public collectionname:string;
150-
public uniqeness:string
151-
public result:any;
152-
static assign(o:any):InsertOrUpdateOneMessage {
162+
// w: 1 - Requests acknowledgment that the write operation has propagated
163+
// w: 0 - Requests no acknowledgment of the write operation
164+
// w: 2 would require acknowledgment from the primary and one of the secondaries
165+
// w: 3 would require acknowledgment from the primary and both secondaries
166+
public w: number;
167+
// true, requests acknowledgment that the mongod instances have written to the on-disk journal
168+
public j: boolean;
169+
public item: object;
170+
public collectionname: string;
171+
public uniqeness: string
172+
public result: any;
173+
static assign(o: any): InsertOrUpdateOneMessage {
153174
if (typeof o === "string" || o instanceof String) {
154175
return Object.assign(new InsertOrUpdateOneMessage(), JSON.parse(o.toString()));
155176
}
@@ -161,9 +182,9 @@ export class DeleteOneMessage {
161182
public error: string;
162183
public jwt: string;
163184

164-
public _id:string;
165-
public collectionname:string;
166-
static assign(o:any):DeleteOneMessage {
185+
public _id: string;
186+
public collectionname: string;
187+
static assign(o: any): DeleteOneMessage {
167188
if (typeof o === "string" || o instanceof String) {
168189
return Object.assign(new DeleteOneMessage(), JSON.parse(o.toString()));
169190
}
@@ -180,13 +201,13 @@ export class MapReduceMessage<T> {
180201
public error: string;
181202
public jwt: string;
182203

183-
public collectionname:string;
184-
public result:T[];
185-
public scope:any;
204+
public collectionname: string;
205+
public result: T[];
206+
public scope: any;
186207

187-
constructor(public map: mapFunc, public reduce: reduceFunc, public finalize: finalizeFunc, public query: any, public out:string | any) {
208+
constructor(public map: mapFunc, public reduce: reduceFunc, public finalize: finalizeFunc, public query: any, public out: string | any) {
188209
}
189-
static assign<T>(o:any):MapReduceMessage<T> {
210+
static assign<T>(o: any): MapReduceMessage<T> {
190211
if (typeof o === "string" || o instanceof String) {
191212
return Object.assign(new MapReduceMessage(null, null, null, null, null), JSON.parse(o.toString()));
192213
}
@@ -195,25 +216,25 @@ export class MapReduceMessage<T> {
195216
}
196217
export class JSONfn {
197218
public static stringify(obj) {
198-
return JSON.stringify(obj,function(key, value){
199-
return (typeof value === 'function' ) ? value.toString() : value;
200-
});
219+
return JSON.stringify(obj, function (key, value) {
220+
return (typeof value === 'function') ? value.toString() : value;
221+
});
201222
}
202223
public static parse(str) {
203-
return JSON.parse(str,function(key, value){
204-
if(typeof value != 'string') return value;
205-
return ( value.substring(0,8) == 'function') ? eval('('+value+')') : value;
224+
return JSON.parse(str, function (key, value) {
225+
if (typeof value != 'string') return value;
226+
return (value.substring(0, 8) == 'function') ? eval('(' + value + ')') : value;
206227
});
207-
}
228+
}
208229
}
209230

210231
export class Message {
211232
public id: string;
212233
public replyto: string;
213234
public command: string;
214235
public data: string;
215-
public static frommessage(msg: SocketMessage, data: string):Message {
216-
var result:Message = new Message();
236+
public static frommessage(msg: SocketMessage, data: string): Message {
237+
var result: Message = new Message();
217238
result.id = msg.id;
218239
result.replyto = msg.replyto;
219240
result.command = msg.command;
@@ -224,13 +245,13 @@ export class Message {
224245
public Process(cli: WebSocketClient): void {
225246
try {
226247
var command: string = "";
227-
if(this.command!==null && this.command!==undefined) { command = this.command.toLowerCase(); }
228-
if(this.command !== "ping" && this.command !== "pong") {
229-
if(this.replyto!==null && this.replyto!==undefined) {
230-
var qmsg:QueuedMessage = cli.messageQueue[this.replyto];
231-
if(qmsg!==undefined && qmsg !== null) {
248+
if (this.command !== null && this.command !== undefined) { command = this.command.toLowerCase(); }
249+
if (this.command !== "ping" && this.command !== "pong") {
250+
if (this.replyto !== null && this.replyto !== undefined) {
251+
var qmsg: QueuedMessage = cli.messageQueue[this.replyto];
252+
if (qmsg !== undefined && qmsg !== null) {
232253
qmsg.message = Object.assign(qmsg.message, JSON.parse(this.data));
233-
if(qmsg.cb!==undefined && qmsg.cb !== null) { qmsg.cb(qmsg.message); }
254+
if (qmsg.cb !== undefined && qmsg.cb !== null) { qmsg.cb(qmsg.message); }
234255
delete cli.messageQueue[this.id];
235256
}
236257
return;
@@ -242,7 +263,7 @@ export class Message {
242263
break;
243264
case "pong":
244265
break;
245-
case "signin":
266+
case "signin":
246267
this.Signin(cli);
247268
break;
248269
case "refreshtoken":
@@ -269,31 +290,31 @@ export class Message {
269290
this.Reply("pong");
270291
await this.Send(cli);
271292
}
272-
public Reply(command:string): void {
293+
public Reply(command: string): void {
273294
this.command = command;
274295
this.replyto = this.id;
275296
this.id = Math.random().toString(36).substr(2, 9);
276297
}
277298
private Signin(cli: WebSocketClient): void {
278-
var msg:SigninMessage = SigninMessage.assign(this.data);
299+
var msg: SigninMessage = SigninMessage.assign(this.data);
279300
cli.jwt = msg.jwt;
280301
cli.user = msg.user;
281-
var qmsg:QueuedMessage = cli.messageQueue[this.replyto];
282-
if(qmsg!==undefined && qmsg !== null) {
283-
if(qmsg.cb!==undefined && qmsg.cb !== null) { qmsg.cb(msg); }
302+
var qmsg: QueuedMessage = cli.messageQueue[this.replyto];
303+
if (qmsg !== undefined && qmsg !== null) {
304+
if (qmsg.cb !== undefined && qmsg.cb !== null) { qmsg.cb(msg); }
284305
delete cli.messageQueue[this.id];
285306
}
286307
}
287308
private RefreshToken(cli: WebSocketClient): void {
288-
var msg:SigninMessage = SigninMessage.assign(this.data);
309+
var msg: SigninMessage = SigninMessage.assign(this.data);
289310
cli.jwt = msg.jwt;
290311
cli.user = msg.user;
291312
}
292313
private Query(cli: WebSocketClient): void {
293-
var msg:QueryMessage = QueryMessage.assign(this.data);
294-
var qmsg:QueuedMessage = cli.messageQueue[this.replyto];
295-
if(qmsg!==undefined && qmsg !== null) {
296-
if(qmsg.cb!==undefined && qmsg.cb !== null) { qmsg.cb(msg); }
314+
var msg: QueryMessage = QueryMessage.assign(this.data);
315+
var qmsg: QueuedMessage = cli.messageQueue[this.replyto];
316+
if (qmsg !== undefined && qmsg !== null) {
317+
if (qmsg.cb !== undefined && qmsg.cb !== null) { qmsg.cb(msg); }
297318
delete cli.messageQueue[this.id];
298319
}
299320

0 commit comments

Comments
 (0)