Skip to content

Commit dec884c

Browse files
committed
Add exchange, remove api req for rabbitmq
1 parent 5a763ef commit dec884c

15 files changed

Lines changed: 577 additions & 132 deletions

File tree

OpenFlow/src/Config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ export class Config {
216216
public static amqp_check_for_consumer_count: boolean = Config.parseBoolean(Config.getEnv("amqp_check_for_consumer_count", "false"));
217217
public static amqp_default_expiration: number = parseInt(Config.getEnv("amqp_default_expiration", (60 * 1000).toString())); // 1 min
218218
public static amqp_requeue_time: number = parseInt(Config.getEnv("amqp_requeue_time", "1000")); // 1 seconds
219+
// public static amqp_dlx: string = Config.getEnv("amqp_dlx", "openflow-dlx"); // Dead letter exchange, used to pickup dead or timeout messages
219220
public static amqp_dlx: string = Config.getEnv("amqp_dlx", "openflow-dlx"); // Dead letter exchange, used to pickup dead or timeout messages
220221

221222
public static mongodb_url: string = Config.getEnv("mongodb_url", "mongodb://localhost:27017");

OpenFlow/src/Messages/Message.ts

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { Readable, Stream } from "stream";
1616
import { GridFSBucket, ObjectID, Db, Cursor, MongoNetworkError } from "mongodb";
1717
import * as path from "path";
1818
import { DatabaseConnection } from "../DatabaseConnection";
19-
import { StripeMessage, EnsureStripeCustomerMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, CreateWorkflowInstanceMessage, RegisterUserMessage, NoderedUser, WatchMessage, GetDocumentVersionMessage, DeleteManyMessage, InsertManyMessage, GetKubeNodeLabels, PushMetricsMessage } from "@openiap/openflow-api";
19+
import { StripeMessage, EnsureStripeCustomerMessage, NoderedUtil, QueuedMessage, RegisterQueueMessage, QueueMessage, CloseQueueMessage, ListCollectionsMessage, DropCollectionMessage, QueryMessage, AggregateMessage, InsertOneMessage, UpdateOneMessage, Base, UpdateManyMessage, InsertOrUpdateOneMessage, DeleteOneMessage, MapReduceMessage, SigninMessage, TokenUser, User, Rights, EnsureNoderedInstanceMessage, DeleteNoderedInstanceMessage, DeleteNoderedPodMessage, RestartNoderedInstanceMessage, GetNoderedInstanceMessage, GetNoderedInstanceLogMessage, SaveFileMessage, WellknownIds, GetFileMessage, UpdateFileMessage, CreateWorkflowInstanceMessage, RegisterUserMessage, NoderedUser, WatchMessage, GetDocumentVersionMessage, DeleteManyMessage, InsertManyMessage, GetKubeNodeLabels, PushMetricsMessage, RegisterExchangeMessage } from "@openiap/openflow-api";
2020
import { Billing, stripe_customer, stripe_base, stripe_list, StripeAddPlanMessage, StripeCancelPlanMessage, stripe_subscription, stripe_subscription_item, stripe_plan, stripe_coupon } from "@openiap/openflow-api";
2121
import { V1ResourceRequirements, V1Deployment } from "@kubernetes/client-node";
2222
import { amqpwrapper } from "../amqpwrapper";
@@ -215,6 +215,9 @@ export class Message {
215215
case "registerqueue":
216216
await this.RegisterQueue(cli, span);
217217
break;
218+
case "registerexchange":
219+
await this.RegisterExchange(cli, span);
220+
break;
218221
case "queuemessage":
219222
await this.QueueMessage(cli, span);
220223
break;
@@ -300,6 +303,27 @@ export class Message {
300303
Logger.otel.endSpan(span);
301304
}
302305
}
306+
async RegisterExchange(cli: WebSocketServerClient, parent: Span) {
307+
this.Reply();
308+
let msg: RegisterExchangeMessage;
309+
try {
310+
msg = RegisterExchangeMessage.assign(this.data);
311+
var res = await cli.RegisterExchange(msg.exchangename, msg.algorithm, msg.routingkey, parent);
312+
msg.queuename = res.queuename;
313+
msg.exchangename = res.exchangename;
314+
} catch (error) {
315+
await handleError(cli, error);
316+
if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; }
317+
if (msg !== null && msg !== undefined) msg.error = error.message ? error.message : error;
318+
}
319+
try {
320+
this.data = JSON.stringify(msg);
321+
} catch (error) {
322+
this.data = "";
323+
await handleError(cli, error);
324+
}
325+
this.Send(cli);
326+
}
303327
async RegisterQueue(cli: WebSocketServerClient, parent: Span) {
304328
this.Reply();
305329
let msg: RegisterQueueMessage;
@@ -396,13 +420,13 @@ export class Message {
396420
}
397421
if (NoderedUtil.IsNullEmpty(msg.replyto)) {
398422
const sendthis = msg.data;
399-
await amqpwrapper.Instance().send("", msg.queuename, sendthis, expiration, msg.correlationId);
423+
await amqpwrapper.Instance().send(msg.exchange, msg.queuename, sendthis, expiration, msg.correlationId, msg.routingkey);
400424
} else {
401425
if (msg.queuename === msg.replyto) {
402426
throw new Error("Cannot send reply to self queuename:" + msg.queuename + " correlationId:" + msg.correlationId);
403427
}
404428
const sendthis = msg.data;
405-
const result = await amqpwrapper.Instance().sendWithReplyTo("", msg.queuename, msg.replyto, sendthis, expiration, msg.correlationId);
429+
const result = await amqpwrapper.Instance().sendWithReplyTo(msg.exchange, msg.queuename, msg.replyto, sendthis, expiration, msg.correlationId, msg.routingkey);
406430
}
407431
} catch (error) {
408432
await handleError(cli, error);
@@ -2748,7 +2772,7 @@ export class Message {
27482772

27492773
if (msg.initialrun) {
27502774
const message = { _id: res2._id, __jwt: msg.jwt, __user: tuser };
2751-
amqpwrapper.Instance().sendWithReplyTo("", msg.queue, msg.resultqueue, message, Config.amqp_default_expiration, msg.correlationId);
2775+
amqpwrapper.Instance().sendWithReplyTo("", msg.queue, msg.resultqueue, message, Config.amqp_default_expiration, msg.correlationId, "");
27522776
}
27532777
} catch (error) {
27542778
span.recordException(error);

OpenFlow/src/WebSocketServerClient.ts

Lines changed: 108 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@ import * as WebSocket from "ws";
22
import { SocketMessage } from "./SocketMessage";
33
import { Message, JSONfn } from "./Messages/Message";
44
import { Config } from "./Config";
5-
import { amqpwrapper, QueueMessageOptions, amqpqueue } from "./amqpwrapper";
6-
import { NoderedUtil, Base, InsertOneMessage, QueueMessage, MapReduceMessage, QueryMessage, UpdateOneMessage, UpdateManyMessage, DeleteOneMessage, User, mapFunc, reduceFunc, finalizeFunc, QueuedMessage, QueuedMessageCallback, WatchEventMessage } from "@openiap/openflow-api";
5+
import { amqpwrapper, QueueMessageOptions, amqpqueue, amqpexchange } from "./amqpwrapper";
6+
import { NoderedUtil, Base, InsertOneMessage, QueueMessage, MapReduceMessage, QueryMessage, UpdateOneMessage, UpdateManyMessage, DeleteOneMessage, User, mapFunc, reduceFunc, finalizeFunc, QueuedMessage, QueuedMessageCallback, WatchEventMessage, QueueClosedMessage, ExchangeClosedMessage } from "@openiap/openflow-api";
77
import { ChangeStream } from "mongodb";
88
import { WebSocketServer } from "./WebSocketServer";
99
import { Span } from "@opentelemetry/api";
1010
import { Logger } from "./Logger";
1111
import { WebServer } from "./WebServer";
1212
import { DatabaseConnection } from "./DatabaseConnection";
13+
import { cli } from "winston/lib/winston/config";
1314
interface IHashTable<T> {
1415
[key: string]: T;
1516
}
@@ -31,12 +32,15 @@ const Semaphore = (n) => ({
3132
});
3233
const semaphore = Semaphore(1);
3334

35+
export declare class RegisterExchangeResponse {
36+
exchangename: string;
37+
queuename: string;
38+
}
3439
export class clsstream {
3540
public stream: ChangeStream;
3641
public id: string;
3742
public callback: any;
3843
}
39-
4044
export class WebSocketServerClient {
4145
public jwt: string;
4246
private _socketObject: WebSocket;
@@ -51,6 +55,7 @@ export class WebSocketServerClient {
5155
public id: string = "";
5256
user: User;
5357
public _queues: amqpqueue[] = [];
58+
public _exchanges: amqpexchange[] = [];
5459
public devnull: boolean = false;
5560
public commandcounter: object = {};
5661
public inccommandcounter(command: string): number {
@@ -78,6 +83,28 @@ export class WebSocketServerClient {
7883
socketObject.on("message", (e: string): void => (this.message(e) as any)); // e: MessageEvent
7984
socketObject.on("error", (e: Event): void => this.error(e));
8085
socketObject.on("close", (e: CloseEvent): void => this.close(e));
86+
87+
amqpwrapper.Instance().on("disconnected", (e: Event): void => this.amqpdisconnected(e));
88+
}
89+
private amqpdisconnected(e: Event): void {
90+
for (var i = 0; i < this._queues.length; i++) {
91+
let msg: SocketMessage = SocketMessage.fromcommand("queueclosed");
92+
let q: QueueClosedMessage = new QueueClosedMessage();
93+
q.queuename = this._queues[i].queue;
94+
msg.data = JSON.stringify(q);
95+
this._socketObject.send(msg.tojson());
96+
Logger.instanse.info("Send queue closed message to " + this.id + " for queue " + q.queuename);
97+
}
98+
for (var i = 0; i < this._exchanges.length; i++) {
99+
let msg: SocketMessage = SocketMessage.fromcommand("exchangeclosed");
100+
let q: ExchangeClosedMessage = new ExchangeClosedMessage();
101+
q.queuename = this._exchanges[i].queue.queue; q.exchangename = this._exchanges[i].exchange;
102+
msg.data = JSON.stringify(q);
103+
this._socketObject.send(msg.tojson());
104+
Logger.instanse.info("Send queue closed message to " + this.id + " for exchange " + q.exchangename);
105+
}
106+
this._exchanges = [];
107+
this.CloseConsumers(null);
81108
}
82109
private open(e: Event): void {
83110
Logger.instanse.info("WebSocket connection opened " + e + " " + this.id);
@@ -221,32 +248,94 @@ export class WebSocketServerClient {
221248
Logger.otel.endSpan(span);
222249
}
223250
}
251+
public async RegisterExchange(exchangename: string, algorithm: "direct" | "fanout" | "topic" | "header", routingkey: string = "", parent: Span): Promise<RegisterExchangeResponse> {
252+
const span: Span = Logger.otel.startSubSpan("WebSocketServerClient.CreateConsumer", parent);
253+
try {
254+
let exclusive: boolean = false; // Should we keep the queue around ? for robots and roles
255+
let exchange = exchangename;
256+
if (NoderedUtil.IsNullEmpty(exchange)) {
257+
if (this.clientagent == "nodered") {
258+
exchange = "nodered." + NoderedUtil.GetUniqueIdentifier(); exclusive = true;
259+
} else if (this.clientagent == "webapp") {
260+
exchange = "webapp." + NoderedUtil.GetUniqueIdentifier(); exclusive = true;
261+
} else if (this.clientagent == "web") {
262+
exchange = "web." + NoderedUtil.GetUniqueIdentifier(); exclusive = true;
263+
} else if (this.clientagent == "openrpa") {
264+
exchange = "openrpa." + NoderedUtil.GetUniqueIdentifier(); exclusive = true;
265+
} else if (this.clientagent == "powershell") {
266+
exchange = "powershell." + NoderedUtil.GetUniqueIdentifier(); exclusive = true;
267+
} else {
268+
exchange = "unknown." + NoderedUtil.GetUniqueIdentifier(); exclusive = true;
269+
}
270+
if (exchange.length == 24) { exchange += "1"; }
271+
272+
}
273+
await semaphore.down();
274+
this.CloseConsumer(exchange, span);
275+
let exchangequeue: amqpexchange = null;
276+
try {
277+
const AssertExchangeOptions: any = Object.assign({}, (amqpwrapper.Instance().AssertExchangeOptions));
278+
AssertExchangeOptions.exclusive = exclusive;
279+
exchangequeue = await amqpwrapper.Instance().AddExchangeConsumer(exchange, algorithm, routingkey, AssertExchangeOptions, this.jwt, async (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
280+
const _data = msg;
281+
try {
282+
const result = await this.Queue(msg, exchange, options);
283+
ack();
284+
done(result);
285+
} catch (error) {
286+
setTimeout(() => {
287+
ack(false);
288+
// ack(); // just eat the error
289+
done(_data);
290+
console.error(exchange + " failed message queue message, nack and re queue message: ", (error.message ? error.message : error));
291+
}, Config.amqp_requeue_time);
292+
}
293+
}, span);
294+
if (exchangequeue) {
295+
exchange = exchangequeue.queue.queue;
296+
this._exchanges.push(exchangequeue);
297+
this._queues.push(exchangequeue.queue);
298+
}
299+
// if (!NoderedUtil.IsNullUndefinded(WebSocketServer.websocket_queue_count)) WebSocketServer.websocket_queue_count.bind({ ...Logger.otel.defaultlabels, clientid: this.id }).update(this._queues.length);
300+
} catch (error) {
301+
Logger.instanse.error("WebSocketclient::CreateConsumer " + error);
302+
}
303+
semaphore.up();
304+
if (exchangequeue != null) return { exchangename: exchangequeue.exchange, queuename: exchangequeue.queue.queue };
305+
return null;
306+
} catch (error) {
307+
span.recordException(error);
308+
throw error;
309+
} finally {
310+
Logger.otel.endSpan(span);
311+
}
312+
}
224313
public async CreateConsumer(queuename: string, parent: Span): Promise<string> {
225314
const span: Span = Logger.otel.startSubSpan("WebSocketServerClient.CreateConsumer", parent);
226315
try {
227-
let autoDelete: boolean = false; // Should we keep the queue around ? for robots and roles
316+
let exclusive: boolean = false; // Should we keep the queue around ? for robots and roles
228317
let qname = queuename;
229318
if (NoderedUtil.IsNullEmpty(qname)) {
230319
if (this.clientagent == "nodered") {
231-
qname = "nodered." + NoderedUtil.GetUniqueIdentifier(); autoDelete = true;
320+
qname = "nodered." + NoderedUtil.GetUniqueIdentifier(); exclusive = true;
232321
} else if (this.clientagent == "webapp") {
233-
qname = "webapp." + NoderedUtil.GetUniqueIdentifier(); autoDelete = true;
322+
qname = "webapp." + NoderedUtil.GetUniqueIdentifier(); exclusive = true;
234323
} else if (this.clientagent == "web") {
235-
qname = "web." + NoderedUtil.GetUniqueIdentifier(); autoDelete = true;
324+
qname = "web." + NoderedUtil.GetUniqueIdentifier(); exclusive = true;
236325
} else if (this.clientagent == "openrpa") {
237-
qname = "openrpa." + NoderedUtil.GetUniqueIdentifier(); autoDelete = true;
326+
qname = "openrpa." + NoderedUtil.GetUniqueIdentifier(); exclusive = true;
238327
} else if (this.clientagent == "powershell") {
239-
qname = "powershell." + NoderedUtil.GetUniqueIdentifier(); autoDelete = true;
328+
qname = "powershell." + NoderedUtil.GetUniqueIdentifier(); exclusive = true;
240329
} else {
241-
qname = "unknown." + NoderedUtil.GetUniqueIdentifier(); autoDelete = true;
330+
qname = "unknown." + NoderedUtil.GetUniqueIdentifier(); exclusive = true;
242331
}
243332
}
244333
await semaphore.down();
245334
this.CloseConsumer(qname, span);
246335
let queue: amqpqueue = null;
247336
try {
248337
const AssertQueueOptions: any = Object.assign({}, (amqpwrapper.Instance().AssertQueueOptions));
249-
AssertQueueOptions.autoDelete = autoDelete;
338+
AssertQueueOptions.exclusive = exclusive;
250339
queue = await amqpwrapper.Instance().AddQueueConsumer(qname, AssertQueueOptions, this.jwt, async (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
251340
const _data = msg;
252341
try {
@@ -268,10 +357,15 @@ export class WebSocketServerClient {
268357
}
269358
if (!NoderedUtil.IsNullUndefinded(WebSocketServer.websocket_queue_count)) WebSocketServer.websocket_queue_count.bind({ ...Logger.otel.defaultlabels, clientid: this.id }).update(this._queues.length);
270359
} catch (error) {
271-
Logger.instanse.error("WebSocketclient::CreateConsumer " + error);
360+
// Logger.instanse.error("WebSocketclient::CreateConsumer " + error);
361+
throw error
362+
}
363+
finally {
364+
semaphore.up();
365+
}
366+
if (queue != null) {
367+
return queue.queue;
272368
}
273-
semaphore.up();
274-
if (queue != null) return queue.queue;
275369
return null;
276370
} catch (error) {
277371
span.recordException(error);

0 commit comments

Comments
 (0)