Skip to content

Commit da015bd

Browse files
committed
Preserve function payloads with ampq pub
1 parent 62d5f71 commit da015bd

5 files changed

Lines changed: 12 additions & 19 deletions

File tree

OpenFlowNodeRED/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openiap/nodered",
3-
"version": "1.2.56",
3+
"version": "1.2.57",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {

OpenFlowNodeRED/src/index.ts

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -108,19 +108,7 @@ function handle(signal, value) {
108108
}
109109
}
110110
Object.keys(signals).forEach((signal) => process.on(signal, handle));
111-
// process.on('SIGTERM', handle);
112-
// process.on('SIGINT', handle);
113-
// process.on('SIGUSR1', handle);
114-
// process.on('SIGPIPE', handle);
115-
// process.on('SIGHUP', handle);
116-
// process.on('SIGBREAK', handle);
117-
// process.on('SIGKILL', handle);
118-
// process.on('SIGWINCH', handle);
119-
// process.on('SIGSTOP', handle);
120-
// process.on('SIGBUS', handle);
121-
// process.on('SIGFPE', handle);
122-
// process.on('SIGSEGV', handle);
123-
// process.on('SIGILL', handle);
111+
124112

125113
let server: http.Server = null;
126114
(async function (): Promise<void> {
@@ -146,8 +134,6 @@ let server: http.Server = null;
146134
});
147135
socket.events.on("onopen", async () => {
148136
try {
149-
// q.clientagent = "nodered";
150-
// q.clientversion = Config.version;
151137
let jwt: string = "";
152138
if (Config.jwt !== "") {
153139
jwt = Config.jwt;

OpenFlowNodeRED/src/nodered/nodes/amqp_nodes.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ export class amqp_publisher_node {
178178
private connection: amqp_connection;
179179
private _onsignedin: any = null;
180180
private _onsocketclose: any = null;
181+
private payloads: any = {};
181182
constructor(public config: Iamqp_publisher_node) {
182183
RED.nodes.createNode(this, config);
183184
try {
@@ -232,7 +233,11 @@ export class amqp_publisher_node {
232233
try {
233234
const result: any = {};
234235
result.amqpacknowledgment = ack;
235-
const data = msg.data;
236+
let data = msg.data;
237+
if (!NoderedUtil.IsNullEmpty(data._msgid)) {
238+
data = Object.assign(this.payloads[data._msgid], data);
239+
delete this.payloads[data._msgid];
240+
}
236241
result.payload = data.payload;
237242
result.jwt = data.jwt;
238243
if (data.command == "timeout") {
@@ -253,11 +258,13 @@ export class amqp_publisher_node {
253258
data.payload = msg.payload;
254259
data.jwt = msg.jwt;
255260
data._id = msg._id;
261+
data._msgid = msg._msgid;
256262
const expiration: number = (typeof msg.expiration == 'number' ? msg.expiration : Config.amqp_message_ttl);
257263
const queue = this.config.queue;
258264
this.node.status({ fill: "blue", shape: "dot", text: "Sending message ..." });
259265
try {
260266
await NoderedUtil.QueueMessage(this.websocket(), queue, this.localqueue, data, null, expiration);
267+
this.payloads[msg._msgid] = msg;
261268
} catch (error) {
262269
data.error = error;
263270
this.node.send([null, data]);

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.2.56
1+
1.2.57

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@openiap/openflow",
3-
"version": "1.2.56",
3+
"version": "1.2.57",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {

0 commit comments

Comments
 (0)