Skip to content

Commit e393fba

Browse files
authored
Merge pull request openiap#55 from skadefro/master
improve amqp part 1
2 parents 74e28b5 + 1178c70 commit e393fba

44 files changed

Lines changed: 1339 additions & 1283 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.vscode/launch.json

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
"program": "${workspaceRoot}/OpenFlow/src/index.ts",
3333
"request": "launch",
3434
"runtimeArgs": [
35-
"--nolazy"
35+
"--nolazy",
36+
"--trace-warnings"
3637
],
38+
"showAsyncStacks": true,
3739
"trace": true,
3840
"runtimeExecutable": null,
3941
"sourceMaps": true,
@@ -53,8 +55,10 @@
5355
"program": "${workspaceRoot}/OpenFlowNodeRED/src/index.ts",
5456
"request": "launch",
5557
"runtimeArgs": [
56-
"--nolazy"
58+
"--nolazy",
59+
"--trace-warnings"
5760
],
61+
"showAsyncStacks": true,
5862
"trace": true,
5963
"runtimeExecutable": null,
6064
"sourceMaps": true,

OpenFlow/src/Config.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,18 @@ export class Config {
3939
public static domain: string = Config.getEnv("domain", "localhost"); // sent to website and used in baseurl()
4040

4141

42+
public static amqp_force_queue_prefix: boolean = Config.parseBoolean(Config.getEnv("amqp_force_queue_prefix", "true"));
43+
public static amqp_force_exchange_prefix: boolean = Config.parseBoolean(Config.getEnv("amqp_force_exchange_prefix", "true"));
4244
public static amqp_url: string = Config.getEnv("amqp_url", "amqp://localhost"); // used to register queues and by personal nodered
45+
public static amqp_check_for_consumer: boolean = Config.parseBoolean(Config.getEnv("amqp_check_for_consumer", "true"));
46+
public static amqp_default_expiration: number = parseInt(Config.getEnv("amqp_default_expiration", "10000")); // 10 seconds
47+
public static amqp_requeue_time: number = parseInt(Config.getEnv("amqp_requeue_time", "1000")); // 1 seconds
48+
public static amqp_dlx: string = Config.getEnv("amqp_dlx", "openflow-dlx"); // Dead letter exchange, used to pickup dead or timeout messages
49+
50+
// public static amqp_default_expiration: number = parseInt(Config.getEnv("amqp_default_expiration", (60 * 1000).toString())); // 1 min
51+
// public static deadLetterExchange: string = Config.getEnv("deadletterexchange", "openflow-dlx"); // queue used to handle messages, that was not picked up.
52+
// public static dlxmessagettl: number = parseInt(Config.getEnv("dlxmessagettl", "2000")); // time to live for messages in miliseconds
53+
// public static dlxmessageexpires: number = parseInt(Config.getEnv("dlxmessageexpires", "1500")); // expire messages after this amount of miliseconds
4354
public static mongodb_url: string = Config.getEnv("mongodb_url", "mongodb://localhost:27017");
4455
public static mongodb_db: string = Config.getEnv("mongodb_db", "openflow");
4556

@@ -56,7 +67,7 @@ export class Config {
5667
public static personalnoderedtoken_expires_in: string = Config.getEnv("personalnoderedtoken_expires_in", "365d");
5768

5869
// Used to configure personal nodered's
59-
public static force_queue_prefix: boolean = Config.parseBoolean(Config.getEnv("force_queue_prefix", "true"));
70+
// public static force_queue_prefix: boolean = Config.parseBoolean(Config.getEnv("force_queue_prefix", "true"));
6071
public static nodered_image: string = Config.getEnv("nodered_image", "cloudhack/openflownodered:edge");
6172
public static saml_federation_metadata: string = Config.getEnv("saml_federation_metadata", "");
6273
public static api_ws_url: string = Config.getEnv("api_ws_url", "ws://localhost:3000");
@@ -67,10 +78,10 @@ export class Config {
6778
// Environment variables to set a prefix for RabbitMQs Dead Letter Exchange, Dead Letter Routing Key,
6879
// Dead Letter Queue, and Message Time to Live - to enable timeouts for RabbitMQ messages
6980
// These values must be the same for OpenFlowNodeRED and OpenFlow, or will cause errors when asserting queues
70-
public static amqp_dlx_prefix: string = Config.getEnv("amqp_dlx_prefix", "DLX.");
71-
public static amqp_dlrk_prefix: string = Config.getEnv("amqp_dlrk_prefix", "dlx.");
72-
public static amqp_dlq_prefix: string = Config.getEnv("amqp_dlq_prefix", "dlq.");
73-
public static amqp_message_ttl: number = parseInt(Config.getEnv("amqp_message_ttl", "20000"));
81+
// public static amqp_dlx_prefix: string = Config.getEnv("amqp_dlx_prefix", "DLX.");
82+
// public static amqp_dlrk_prefix: string = Config.getEnv("amqp_dlrk_prefix", "dlx.");
83+
// public static amqp_dlq_prefix: string = Config.getEnv("amqp_dlq_prefix", "dlq.");
84+
// public static amqp_message_ttl: number = parseInt(Config.getEnv("amqp_message_ttl", "20000"));
7485

7586
public static baseurl(): string {
7687
var result: string = "";

OpenFlow/src/DatabaseConnection.ts

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -46,20 +46,6 @@ Object.defineProperty(Promise, 'retry', {
4646
}
4747
})
4848

49-
// (Promise as any).retry(100, (resolve, reject) => {
50-
// // your sendFile core logic with proper
51-
// // calls to resolve and reject goes here
52-
// const rand = Math.random()
53-
54-
// console.log(rand)
55-
56-
// if (rand < 0.1) resolve(rand)
57-
// else reject(rand)
58-
// }).then(
59-
// value => console.log(`resolved: ${value}`),
60-
// error => console.log(`rejected: ${error}`)
61-
// )
62-
6349
export class DatabaseConnection {
6450
private mongodburl: string;
6551
private cli: MongoClient;
@@ -94,14 +80,13 @@ export class DatabaseConnection {
9480
}
9581
this.cli = await (Promise as any).retry(100, (resolve, reject) => {
9682
MongoClient.connect(this.mongodburl, { autoReconnect: false, useNewUrlParser: true }).then((cli) => {
97-
console.log(`Connected to mongodb`);
9883
resolve(cli);
9984
}).catch((reason) => {
100-
console.log(reason);
85+
console.error(reason);
10186
reject(reason);
10287
});
10388
});
104-
console.log(`Really connected to mongodb`);
89+
this._logger.info(`Really connected to mongodb`);
10590
// this.cli = await MongoClient.connect(this.mongodburl, { autoReconnect: false, useNewUrlParser: true });
10691
this.cli.on("error", (error) => {
10792
this.isConnected = false;
@@ -291,9 +276,12 @@ export class DatabaseConnection {
291276
if (typeof orderby === "string" || orderby instanceof String) {
292277
var neworderby = null;
293278
try {
294-
neworderby = JSON.parse((orderby as string));
295-
mysort = neworderby;
279+
if (orderby.indexOf("{") > -1) {
280+
neworderby = JSON.parse((orderby as string));
281+
mysort = neworderby;
282+
}
296283
} catch (error) {
284+
console.log(error);
297285
}
298286
if (neworderby == null) mysort[(orderby as string)] = 1;
299287
} else {
@@ -916,10 +904,16 @@ export class DatabaseConnection {
916904
}
917905
});
918906
} else {
919-
query = { _id: q.item._id };
907+
// has no id, and no uniqeness defined, so we assume its a new item we should insert
908+
if (q.item._id != null) {
909+
query = { _id: q.item._id };
910+
}
920911
}
921912
var user: TokenUser = Crypt.verityToken(q.jwt);
922-
var exists = await this.query(query, { name: 1 }, 2, 0, null, q.collectionname, q.jwt);
913+
var exists: Base[] = [];
914+
if (query != null) {
915+
exists = await this.query(query, { name: 1 }, 2, 0, null, q.collectionname, q.jwt);
916+
}
923917
if (exists.length == 1) {
924918
q.item._id = exists[0]._id;
925919
}
@@ -1178,6 +1172,10 @@ export class DatabaseConnection {
11781172
}
11791173

11801174
if (item._acl != null && item._acl != undefined) {
1175+
if (typeof item._acl === 'string' || item._acl instanceof String) {
1176+
item._acl = JSON.parse((item._acl as any));
1177+
}
1178+
11811179
var a = item._acl.filter(x => x._id == user._id);
11821180
if (a.length > 0) {
11831181
let _ace = Ace.assign(a[0]);

OpenFlow/src/KubeUtil.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ export class KubeUtil {
2121
kc.loadFromDefault();
2222
success = true;
2323
} catch (error) {
24-
console.log(error);
24+
console.error(error);
2525
}
2626
if (success == false) {
2727
try {
2828
kc.loadFromCluster();
2929
success = true;
3030
} catch (error) {
31-
console.log(error);
31+
console.error(error);
3232
}
3333
}
3434
this.CoreV1Api = kc.makeApiClient(k8s.CoreV1Api);

OpenFlow/src/Messages/Message.ts

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import { DatabaseConnection } from "../DatabaseConnection";
4343
import { CreateWorkflowInstanceMessage } from "./CreateWorkflowInstanceMessage";
4444
import { StripeMessage, EnsureStripeCustomerMessage, Billing, stripe_customer, stripe_base, stripe_list, StripeAddPlanMessage, StripeCancelPlanMessage, stripe_subscription, stripe_subscription_item, stripe_plan, stripe_coupon } from "./StripeMessage";
4545
import { V1ResourceRequirements } from "@kubernetes/client-node";
46+
import { amqpwrapper } from "../amqpwrapper";
4647
var request = require("request");
4748
var got = require("got");
4849

@@ -213,7 +214,7 @@ export class Message {
213214
var msg: RegisterQueueMessage<Base>
214215
try {
215216
msg = RegisterQueueMessage.assign(this.data);
216-
await cli.CreateConsumer(msg.queuename);
217+
msg.queuename = await cli.CreateConsumer(msg.queuename);
217218
} catch (error) {
218219
cli._logger.error(error);
219220
if (Util.IsNullUndefinded(msg)) { (msg as any) = {}; }
@@ -238,25 +239,35 @@ export class Message {
238239
if (typeof msg.data == 'string') {
239240
try {
240241
var obj = JSON.parse(msg.data);
241-
if (Util.IsNullUndefinded(obj.jwt)) {
242-
obj.jwt = msg.jwt;
243-
msg.data = JSON.stringify(obj);
244-
}
242+
// if (Util.IsNullUndefinded(obj.jwt)) {
243+
// obj.jwt = msg.jwt;
244+
// msg.data = JSON.stringify(obj);
245+
// }
245246
} catch (error) {
246247
}
247248
} else {
248249
msg.data.jwt = msg.jwt;
249250
}
250251
}
252+
var expiration: number = Config.amqp_default_expiration;
253+
if (typeof msg.expiration == 'number') expiration = msg.expiration;
251254
if (Util.IsNullEmpty(msg.replyto)) {
252-
await cli.sendToQueue(msg);
255+
// var sendthis = { data: msg.data, jwt: cli.jwt, user: cli.user };
256+
var sendthis = msg.data;
257+
await amqpwrapper.Instance().send("", msg.queuename, sendthis, expiration, msg.correlationId);
253258
} else {
254259
if (msg.queuename === msg.replyto) {
255-
cli._logger.warn("Ignore reply to self queuename:" + msg.queuename + " correlationId:" + msg.correlationId);
256-
return
260+
throw new Error("Cannot send reply to self queuename:" + msg.queuename + " correlationId:" + msg.correlationId);
261+
// cli._logger.warn("Ignore reply to self queuename:" + msg.queuename + " correlationId:" + msg.correlationId);
262+
// return
257263
}
258-
this.replyto = msg.correlationId;
259-
await cli.sendQueueReply(msg);
264+
//var sendthis = { data: msg.data, jwt: cli.jwt, user: cli.user };
265+
var sendthis = msg.data;
266+
var result = await amqpwrapper.Instance().sendWithReplyTo("", msg.queuename, msg.replyto, sendthis, expiration, msg.correlationId);
267+
// var result = await amqpwrapper.Instance().sendWithReply("", msg.queuename, sendthis, expiration, msg.correlationId);
268+
269+
// this.replyto = msg.correlationId;
270+
// await cli.sendQueueReply(msg, expiration);
260271
}
261272
} catch (error) {
262273
cli._logger.error(error);
@@ -802,9 +813,9 @@ export class Message {
802813
var tuser: TokenUser = new TokenUser(nodereduser);
803814
var nodered_jwt: string = Crypt.createToken(tuser, Config.personalnoderedtoken_expires_in);
804815

805-
if (Config.force_queue_prefix) {
806-
user.nodered.queue_prefix = nodereduser.username;
807-
}
816+
// if (Config.force_queue_prefix) {
817+
// user.nodered.queue_prefix = nodereduser.username;
818+
// }
808819

809820
cli._logger.debug("[" + cli.user.username + "] ensure nodered role " + name + "noderedadmins");
810821
var noderedadmins = await User.ensureRole(cli.jwt, name + "noderedadmins", null);
@@ -1144,14 +1155,13 @@ export class Message {
11441155
cli._logger.debug("[" + cli.user.username + "] Remove un billed nodered instance " + itemname + " that has been running for " + diffhours + " hours");
11451156
await this._DeleteNoderedInstance(userid, cli.user._id, cli.user.username, rootjwt);
11461157
}
1147-
// console.log(itemname + " " + diffminutes + " min / " + diffhours + " hours");
11481158
} catch (error) {
11491159
}
11501160
} else if (image.indexOf("openflownodered") > 0) {
11511161
if (billed != "true" && diffhours > 24) {
1152-
console.log("unbilled " + itemname + " with no userid, should be removed, it has been running for " + diffhours + " hours");
1162+
console.debug("unbilled " + itemname + " with no userid, should be removed, it has been running for " + diffhours + " hours");
11531163
} else {
1154-
console.log("unbilled " + itemname + " with no userid, has been running for " + diffhours + " hours");
1164+
console.debug("unbilled " + itemname + " with no userid, has been running for " + diffhours + " hours");
11551165
}
11561166
}
11571167
}
@@ -1475,10 +1485,10 @@ export class Message {
14751485
var tuser = Crypt.verityToken(msg.jwt);
14761486
msg.jwt = Crypt.createToken(tuser, Config.longtoken_expires_in);
14771487

1478-
if (cli.consumers.length == 0) {
1479-
await cli.CreateConsumer("nodered." + Math.random().toString(36).substr(2, 9));
1480-
// throw new Error("Client not connected to any message queues");
1481-
}
1488+
// if (cli.consumers.length == 0) {
1489+
// await cli.CreateConsumer("nodered." + Math.random().toString(36).substr(2, 9));
1490+
// // throw new Error("Client not connected to any message queues");
1491+
// }
14821492
if (Util.IsNullEmpty(msg.queue)) {
14831493
var workflow: any = null;
14841494
var user: any = null;
@@ -1515,7 +1525,8 @@ export class Message {
15151525

15161526
if (msg.initialrun) {
15171527
var message = { _id: res2._id };
1518-
cli.consumers[0].sendToQueueWithReply(msg.queue, msg.resultqueue, msg.correlationId, message);
1528+
amqpwrapper.Instance().sendWithReplyTo("", msg.queue, msg.resultqueue, message, Config.amqp_default_expiration, msg.correlationId);
1529+
// cli.consumers[0].sendToQueueWithReply(msg.queue, msg.resultqueue, msg.correlationId, message, (60 * (60 * 1000))); // 1 hour
15191530
}
15201531
} catch (error) {
15211532
cli._logger.error(error);

OpenFlow/src/Messages/QueueMessage.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ export class QueueMessage implements IReplyMessage {
88
public replyto: string;
99
public queuename: string;
1010
public data: any;
11+
public expiration: number;
12+
13+
public consumerTag: string;
14+
public routingkey: string;
15+
public exchange: string;
1116
static assign(o: any): QueueMessage {
1217
if (typeof o === "string" || o instanceof String) {
1318
return Object.assign(new QueueMessage(), JSON.parse(o.toString()));

0 commit comments

Comments
 (0)