Skip to content

Commit 5e6546a

Browse files
committed
Add amqp_enabled_exchange option
1 parent d1b1bcd commit 5e6546a

6 files changed

Lines changed: 18 additions & 4 deletions

File tree

OpenFlow/src/Config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ export class Config {
8282
Config.amqp_force_queue_prefix = Config.parseBoolean(Config.getEnv("amqp_force_queue_prefix", "false"));
8383
Config.amqp_force_exchange_prefix = Config.parseBoolean(Config.getEnv("amqp_force_exchange_prefix", "false"));
8484
Config.amqp_force_sender_has_read = Config.parseBoolean(Config.getEnv("amqp_force_sender_has_read", "true"));
85+
Config.amqp_enabled_exchange = Config.parseBoolean(Config.getEnv("amqp_enabled_exchange", "false"));
8586
Config.amqp_url = Config.getEnv("amqp_url", "amqp://localhost"); // used to register queues and by personal nodered
8687
Config.amqp_check_for_consumer = Config.parseBoolean(Config.getEnv("amqp_check_for_consumer", "true"));
8788
Config.amqp_check_for_consumer_count = Config.parseBoolean(Config.getEnv("amqp_check_for_consumer_count", "false"));
@@ -211,6 +212,7 @@ export class Config {
211212
public static amqp_force_queue_prefix: boolean = Config.parseBoolean(Config.getEnv("amqp_force_queue_prefix", "false"));
212213
public static amqp_force_exchange_prefix: boolean = Config.parseBoolean(Config.getEnv("amqp_force_exchange_prefix", "false"));
213214
public static amqp_force_sender_has_read: boolean = Config.parseBoolean(Config.getEnv("amqp_force_sender_has_read", "true"));
215+
public static amqp_enabled_exchange: boolean = Config.parseBoolean(Config.getEnv("amqp_enabled_exchange", "false"));
214216
public static amqp_url: string = Config.getEnv("amqp_url", "amqp://localhost"); // used to register queues and by personal nodered
215217
public static amqp_check_for_consumer: boolean = Config.parseBoolean(Config.getEnv("amqp_check_for_consumer", "true"));
216218
public static amqp_check_for_consumer_count: boolean = Config.parseBoolean(Config.getEnv("amqp_check_for_consumer_count", "false"));

OpenFlow/src/LoginProvider.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,8 @@ export class LoginProvider {
421421
getting_started_url: Config.getting_started_url,
422422
validate_user_form: Config.validate_user_form,
423423
supports_watch: Config.supports_watch,
424-
nodered_images: Config.nodered_images
424+
nodered_images: Config.nodered_images,
425+
amqp_enabled_exchange: Config.amqp_enabled_exchange
425426
}
426427
res.end(JSON.stringify(res2));
427428
} catch (error) {

OpenFlow/src/Messages/Message.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,9 @@ export class Message {
360360
msg.data.jwt = msg.jwt;
361361
}
362362
}
363+
if (!NoderedUtil.IsNullEmpty(msg.exchange) && !Config.amqp_enabled_exchange) {
364+
throw new Error("AMQP exchange is not enabled on this OpenFlow");
365+
}
363366
const expiration: number = (typeof msg.expiration == 'number' ? msg.expiration : Config.amqp_default_expiration);
364367
if (typeof msg.data === 'string' || msg.data instanceof String) {
365368
try {
@@ -1450,6 +1453,7 @@ export class Message {
14501453
"otel_metric_url=" + Config.otel_metric_url,
14511454
"otel_trace_interval=" + Config.otel_trace_interval.toString(),
14521455
"otel_metric_interval=" + Config.otel_metric_interval.toString(),
1456+
"amqp_enabled_exchange=" + Config.amqp_enabled_exchange.toString()
14531457
]
14541458
// const image = await docker.pull(nodered_image, { serveraddress: "https://index.docker.io/v1" });
14551459
await this._pullImage(docker, nodered_image);
@@ -1702,7 +1706,7 @@ export class Message {
17021706
{ name: "otel_metric_url", value: Config.otel_metric_url },
17031707
{ name: "otel_trace_interval", value: Config.otel_trace_interval.toString() },
17041708
{ name: "otel_metric_interval", value: Config.otel_metric_interval.toString() },
1705-
1709+
{ name: "amqp_enabled_exchange", value: Config.amqp_enabled_exchange.toString() },
17061710
],
17071711
livenessProbe: livenessProbe,
17081712
}

OpenFlow/src/amqpwrapper.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ export class amqpwrapper extends events.EventEmitter {
265265
let name = tuser.username.split("@").join("").split(".").join("");
266266
name = name.toLowerCase();
267267
queue = name + queue;
268+
if (queue.length == 24) { queue += "1"; }
268269
}
269270
} else if (queue.length == 24) {
270271
if (NoderedUtil.IsNullEmpty(jwt)) {
@@ -327,6 +328,7 @@ export class amqpwrapper extends events.EventEmitter {
327328
let name = tuser.username.split("@").join("").split(".").join("");
328329
name = name.toLowerCase();
329330
exchange = name + exchange;
331+
if (exchange.length == 24) { exchange += "1"; }
330332
}
331333
const q: amqpexchange = new amqpexchange();
332334
if (!NoderedUtil.IsNullEmpty(q.queue)) {

OpenFlowNodeRED/src/Config.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ export class Config {
4949
Config.amqp_reply_expiration = parseInt(Config.getEnv("amqp_reply_expiration", (60 * 1000).toString())); // 1 min
5050
Config.amqp_workflow_out_expiration = parseInt(Config.getEnv("amqp_workflow_out_expiration", (60 * 1000).toString())); // 1 min
5151
Config.amqp_reply_expiration = parseInt(Config.getEnv("amqp_reply_expiration", "10000")); // 10 seconds
52-
Config.amqp_workflow_out_expiration = parseInt(Config.getEnv("amqp_workflow_out_expiration", "10000")); // 10 seconds
52+
Config.amqp_enabled_exchange = Config.parseBoolean(Config.getEnv("amqp_enabled_exchange", "false"));
53+
5354

5455
Config.api_credential_cache_seconds = parseInt(Config.getEnv("api_credential_cache_seconds", "300"));
5556
Config.api_allow_anonymous = Config.parseBoolean(Config.getEnv("api_allow_anonymous", "false"));
@@ -115,6 +116,7 @@ export class Config {
115116
// public static amqp_workflow_out_expiration: number = parseInt(Config.getEnv("amqp_workflow_out_expiration", (60 * 1000).toString())); // 1 min
116117
public static amqp_reply_expiration: number = parseInt(Config.getEnv("amqp_reply_expiration", "10000")); // 10 seconds
117118
public static amqp_workflow_out_expiration: number = parseInt(Config.getEnv("amqp_workflow_out_expiration", "10000")); // 10 seconds
119+
public static amqp_enabled_exchange: boolean = Config.parseBoolean(Config.getEnv("amqp_enabled_exchange", "false"));
118120

119121
public static api_credential_cache_seconds: number = parseInt(Config.getEnv("api_credential_cache_seconds", "300"));
120122
public static api_allow_anonymous: boolean = Config.parseBoolean(Config.getEnv("api_allow_anonymous", "false"));

OpenFlowNodeRED/src/nodered/nodes/amqp.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Red } from "node-red";
2+
import { Config } from "../../Config";
23
import * as amqp from "./amqp_nodes";
34

45

@@ -14,5 +15,7 @@ export = function (RED: Red) {
1415
RED.nodes.registerType("amqp consumer", amqp.amqp_consumer_node);
1516
RED.nodes.registerType("amqp publisher", amqp.amqp_publisher_node);
1617
RED.nodes.registerType("amqp acknowledgment", amqp.amqp_acknowledgment_node);
17-
RED.nodes.registerType("amqp exchange", amqp.amqp_exchange_node);
18+
if (Config.amqp_enabled_exchange) {
19+
RED.nodes.registerType("amqp exchange", amqp.amqp_exchange_node);
20+
}
1821
}

0 commit comments

Comments
 (0)