Skip to content

Commit 8fbf163

Browse files
committed
Create timeouts for RabbitMQ messages to OpenRPA robots
1 parent 1182413 commit 8fbf163

5 files changed

Lines changed: 70 additions & 6 deletions

File tree

OpenFlow/src/Config.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ export class Config {
6464
public static nodered_domain_schema: string = Config.getEnv("nodered_domain_schema", ""); // also sent to website
6565
public static nodered_initial_liveness_delay: number = parseInt(Config.getEnv("nodered_initial_liveness_delay", "60"));
6666

67+
// Environment variables to set a prefix for RabbitMQs Dead Letter Exchange, Dead Letter Routing Key,
68+
// Dead Letter Queue, and Message Time to Live - to enable timeouts for RabbitMQ messages
69+
// These values must be the same for OpenFlowNodeRED and OpenFlow, or will cause errors when asserting queues
70+
public static amqp_dlx_prefix: string = Config.getEnv("amqp_dlx_prefix", "DLX.");
71+
public static amqp_dlrk_prefix: string = Config.getEnv("amqp_dlrk_prefix", "dlx.");
72+
public static amqp_dlq_prefix: string = Config.getEnv("amqp_dlq_prefix", "dlq.");
73+
public static amqp_message_ttl: number = parseInt(Config.getEnv("amqp_message_ttl", "20000"));
74+
6775
public static baseurl(): string {
6876
var result: string = "";
6977
if (Config.tls_crt != '' && Config.tls_key != '') {

OpenFlow/src/amqp_consumer.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import * as winston from "winston";
22
import * as amqplib from "amqplib";
33
import * as url from "url";
4+
import { Config } from "./Config";
45

56

67
// tslint:disable-next-line: class-name
@@ -22,7 +23,17 @@ export class amqp_consumer {
2223
this.conn = await amqplib.connect(this.connectionstring);
2324
this.conn.on("error", () => null);
2425
this.channel = await this.conn.createChannel();
25-
this._ok = await this.channel.assertQueue(this.queue, { durable: false, autoDelete: autoDelete });
26+
// Assert queue with additional arguments for dead letter exchange (for timed out messages)
27+
// These arguments have to match the arguments set in amqp_publisher.ts in the OpenFlowNodeRED src folder
28+
this._ok = await this.channel.assertQueue(this.queue, {
29+
durable: false,
30+
autoDelete: autoDelete,
31+
arguments: {
32+
'x-dead-letter-exchange': Config.amqp_dlx_prefix + this.queue,
33+
'x-dead-letter-routing-key': Config.amqp_dlrk_prefix + this.queue,
34+
'x-message-ttl': Config.amqp_message_ttl
35+
}
36+
});
2637
await this.channel.consume(this.queue, (msg) => { this.OnMessage(me, msg); }, { noAck: autoack });
2738
this._logger.info("Connected to " + new URL(this.connectionstring).hostname);
2839
}

OpenFlowNodeRED/src/Config.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@ export class Config {
3939
public static tls_ca: string = Config.getEnv("tls_ca", "");
4040
public static tls_passphrase: string = Config.getEnv("tls_passphrase", "");
4141

42+
// Environment variables to set a prefix for RabbitMQs Dead Letter Exchange, Dead Letter Routing Key,
43+
// Dead Letter Queue, and Message Time to Live - to enable timeouts for RabbitMQ messages
44+
// These values must be the same for OpenFlowNodeRED and OpenFlow, or will cause errors when asserting queues
45+
public static amqp_dlx_prefix: string = Config.getEnv("amqp_dlx_prefix", "DLX.");
46+
public static amqp_dlrk_prefix: string = Config.getEnv("amqp_dlrk_prefix", "dlx.");
47+
public static amqp_dlq_prefix: string = Config.getEnv("amqp_dlq_prefix", "dlq.");
48+
public static amqp_message_ttl: number = parseInt(Config.getEnv("amqp_message_ttl", "20000"));
49+
4250

4351
public static baseurl(): string {
4452
if (NoderedUtil.IsNullEmpty(Config.domain)) {

OpenFlowNodeRED/src/amqp_publisher.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import * as winston from "winston";
22
import * as amqplib from "amqplib";
33
import { NoderedUtil } from "./nodered/nodes/NoderedUtil";
4+
import { Config } from './Config';
45

56

67
interface IHashTable<T> {
@@ -65,10 +66,42 @@ export class amqp_publisher {
6566
if (this.channel != null && this.channel != undefined) { await this.channel.close(); this.channel = null; }
6667
if (this.conn != null && this.conn != undefined) { await this.conn.close(); this.conn = null; }
6768
}
68-
SendMessage(msg: string, queue: string, correlationId: string, sendreply: boolean): void {
69+
async SendMessage(msg: string, queue: string, correlationId: string, sendreply: boolean): Promise<void> {
6970
if (correlationId == null || correlationId == "") { correlationId = this.generateUuid(); }
7071
this._logger.info("SendMessage " + msg);
72+
7173
if (sendreply) {
74+
// Before sending the message, need to assert the exchange and queue to handle timed out messages
75+
// This is done via a dead letter exchange, and dead letter queue
76+
const dlx = await this.channel.assertExchange(Config.amqp_dlx_prefix + queue, 'topic', { durable: false });
77+
const dlq = await this.channel.assertQueue(Config.amqp_dlq_prefix + queue, { durable: false });
78+
// Bind the dead letter queue to the dead letter exchange, routing with the dead letter routing key
79+
await this.channel.bindQueue(dlq.queue, dlx.exchange, Config.amqp_dlrk_prefix + queue);
80+
81+
// Must also consume messages in the dead letter queue, to catch messages that have timed out
82+
await this.channel.consume(dlq.queue, msg => {
83+
// This is the function to run when the dead letter (timed out) message is picked up
84+
var data = JSON.parse(msg.content.toString());
85+
// Change the command and return back to the correct queue (replyTo) to be handled
86+
// Clear x-first-death-reason header
87+
msg.properties.headers["x-first-death-reason"] = null;
88+
// Set command to timeout to be handled when collected from the node's queue
89+
data.command = "timeout";
90+
// Resend message, this time to the reply queue for the correct node (replyTo)
91+
this.SendMessage(JSON.stringify(data), msg.properties.replyTo, msg.properties.correlationId, false);
92+
},
93+
{ noAck: true });
94+
95+
// Need to assert new queue first to ensure it has the timeout arguments added to it
96+
await this.channel.assertQueue(queue, {
97+
durable: false,
98+
arguments: {
99+
'x-dead-letter-exchange': Config.amqp_dlx_prefix + queue,
100+
'x-dead-letter-routing-key': Config.amqp_dlrk_prefix + queue,
101+
'x-message-ttl': Config.amqp_message_ttl
102+
}
103+
});
104+
72105
this.channel.sendToQueue(queue, Buffer.from(msg), { correlationId: correlationId, replyTo: this._ok.queue });
73106
} else {
74107
this.channel.sendToQueue(queue, Buffer.from(msg), { correlationId: correlationId });

OpenFlowNodeRED/src/nodered/nodes/rpa_nodes.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,10 @@ export class rpa_workflow_node {
117117
var command = data.command;
118118
result.jwt = data.jwt;
119119
var correlationId = msg.properties.correlationId;
120+
120121
if (correlationId != null && this.messages[correlationId] != null) {
121122
result = this.messages[correlationId];
122-
if (command == "invokecompleted" || command == "invokefailed" || command == "invokeaborted" || command == "error") {
123+
if (command == "invokecompleted" || command == "invokefailed" || command == "invokeaborted" || command == "error" || command == "timeout") {
123124
delete this.messages[correlationId];
124125
}
125126
}
@@ -134,7 +135,7 @@ export class rpa_workflow_node {
134135
console.log("********************");
135136
this.node.send(result);
136137
}
137-
else if (command == "invokefailed" || command == "invokeaborted" || command == "error") {
138+
else if (command == "invokefailed" || command == "invokeaborted" || command == "error" || command == "timeout") {
138139
result.payload = data;
139140
if (data.user != null) result.user = data.user;
140141
if (result.payload == null || result.payload == undefined) { result.payload = {}; }
@@ -164,18 +165,21 @@ export class rpa_workflow_node {
164165
if (msg.payload == null || typeof msg.payload == "string" || typeof msg.payload == "number") {
165166
msg.payload = { "data": msg.payload };
166167
}
167-
if(NoderedUtil.IsNullEmpty(targetid)) {
168+
if (NoderedUtil.IsNullEmpty(targetid)) {
168169
this.node.status({ fill: "red", shape: "dot", text: "robot is mandatory" });
169170
return;
170171
}
171-
if(NoderedUtil.IsNullEmpty(workflowid)) {
172+
if (NoderedUtil.IsNullEmpty(workflowid)) {
172173
this.node.status({ fill: "red", shape: "dot", text: "workflow is mandatory" });
173174
return;
174175
}
175176
var rpacommand = {
176177
command: "invoke",
177178
workflowid: workflowid,
178179
jwt: msg.jwt,
180+
// Adding expiry to the rpacommand as a timestamp for when the RPA message is expected to timeout from the message queue
181+
// Currently set to 20 seconds into the future
182+
expiry: Math.floor((new Date().getTime()) / 1000) + Config.amqp_message_ttl,
179183
data: { payload: msg.payload }
180184
}
181185
this.node.status({ fill: "blue", shape: "dot", text: "Robot running..." });

0 commit comments

Comments
 (0)