Skip to content

Commit 2efc870

Browse files
committed
remove/move rabbitmq stuff
1 parent 2d128e7 commit 2efc870

3 files changed

Lines changed: 162 additions & 269 deletions

File tree

OpenFlow/src/Messages/Message.ts

Lines changed: 9 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -457,16 +457,12 @@ export class Message {
457457
await this.DumpClients(cli, span);
458458
break;
459459
case "dumprabbitmq":
460-
await this.DumpRabbitmq(cli, span);
461460
break;
462461
case "getrabbitmqqueue":
463-
await this.GetRabbitmqQueue(cli);
464462
break;
465463
case "deleterabbitmqqueue":
466-
await this.DeleterabbitmqQueue(cli);
467464
break;
468465
case "pushmetrics":
469-
await this.PushMetrics(cli);
470466
break;
471467
default:
472468
span.recordException("Unknown command " + command);
@@ -507,6 +503,9 @@ export class Message {
507503
this.Reply();
508504
let msg: RegisterQueueMessage;
509505
try {
506+
if (!NoderedUtil.IsNullEmpty(msg.queuename) && msg.queuename.toLowerCase() == "openflow") {
507+
throw new Error("Access denied");
508+
}
510509
msg = RegisterQueueMessage.assign(this.data);
511510
msg.queuename = await cli.CreateConsumer(msg.queuename, parent);
512511
} catch (error) {
@@ -549,6 +548,12 @@ export class Message {
549548
} catch (error) {
550549
}
551550
}
551+
if (!NoderedUtil.IsNullEmpty(msg.queuename) && msg.queuename.toLowerCase() == "openflow") {
552+
throw new Error("Access denied");
553+
} else if (NoderedUtil.IsNullEmpty(msg.queuename) && NoderedUtil.IsNullEmpty(msg.exchange)) {
554+
throw new Error("queuename or exchange must be given");
555+
}
556+
552557

553558
if (msg.queuename.length == 24 && Config.amqp_force_sender_has_read) {
554559
const tuser = Crypt.verityToken(msg.jwt);
@@ -3583,124 +3588,6 @@ export class Message {
35833588
Logger.otel.endSpan(span);
35843589
this.Send(cli);
35853590
}
3586-
async DumpRabbitmq(cli: WebSocketServerClient, parent: Span) {
3587-
this.Reply();
3588-
const span: Span = Logger.otel.startSubSpan("message.DumpRabbitmq", parent);
3589-
try {
3590-
const kickstartapi = amqpwrapper.getvhosts(Config.amqp_url);
3591-
const jwt = Crypt.rootToken();
3592-
const known = await Config.db.query({ _type: "queue" }, null, 5000, 0, null, "configclients", jwt, undefined, undefined, span);
3593-
const queues = await amqpwrapper.getqueues(Config.amqp_url);
3594-
for (let i = 0; i < queues.length; i++) {
3595-
let queue = queues[i];
3596-
let exists = known.filter((x: any) => (x && x.queuename == queue.name));
3597-
let item: any = {
3598-
name: queue.id, consumers: queue.consumers, consumer_details: queue.consumer_details, _type: "queue"
3599-
};
3600-
let consumers: number = 0;
3601-
if (queue.consumers > 0) { consumers = queue.consumers; }
3602-
if (consumers == 0) {
3603-
if (queue.consumer_details != null && queue.consumer_details.length > 0) {
3604-
consumers = queue.consumer_details.length;
3605-
}
3606-
}
3607-
item.queuename = queue.name;
3608-
item.consumers = consumers;
3609-
item.name = queue.name + "(" + consumers + ")";
3610-
if (exists.length == 0) {
3611-
try {
3612-
await Config.db.InsertOne(item, "configclients", 1, false, jwt, span);
3613-
} catch (error) {
3614-
await handleError(cli, error);
3615-
}
3616-
} else {
3617-
item._id = exists[0]._id;
3618-
try {
3619-
await Config.db._UpdateOne(null, item, "configclients", 1, false, jwt, span);
3620-
} catch (error) {
3621-
await handleError(cli, error);
3622-
}
3623-
}
3624-
}
3625-
for (let i = 0; i < known.length; i++) {
3626-
let queue: any = known[i];
3627-
let id = queue.id;
3628-
let exists = queues.filter((x: any) => x.name == queue.queuename);
3629-
if (exists.length == 0) {
3630-
try {
3631-
await Config.db.DeleteOne(queue._id, "configclients", jwt, span);
3632-
} catch (error) {
3633-
await handleError(cli, error);
3634-
}
3635-
}
3636-
}
3637-
} catch (error) {
3638-
span.recordException(error);
3639-
this.data = "";
3640-
await handleError(cli, error);
3641-
}
3642-
Logger.otel.endSpan(span);
3643-
this.Send(cli);
3644-
}
3645-
async GetRabbitmqQueue(cli: WebSocketServerClient) {
3646-
this.Reply();
3647-
try {
3648-
let msg: any = JSON.parse(this.data);
3649-
const kickstartapi = amqpwrapper.getvhosts(Config.amqp_url);
3650-
try {
3651-
msg.data = await amqpwrapper.getqueue(Config.amqp_url, '/', msg.name);
3652-
this.data = JSON.stringify(msg);
3653-
} catch (error) {
3654-
await handleError(cli, error);
3655-
}
3656-
} catch (error) {
3657-
this.data = JSON.stringify(error);
3658-
await handleError(cli, error);
3659-
3660-
}
3661-
this.Send(cli);
3662-
}
3663-
async DeleterabbitmqQueue(cli: WebSocketServerClient) {
3664-
this.Reply();
3665-
try {
3666-
let msg: any = JSON.parse(this.data);
3667-
const kickstartapi = amqpwrapper.getvhosts(Config.amqp_url);
3668-
try {
3669-
msg.data = await amqpwrapper.deletequeue(Config.amqp_url, '/', msg.name);
3670-
this.data = JSON.stringify(msg);
3671-
} catch (error) {
3672-
await handleError(cli, error);
3673-
}
3674-
} catch (error) {
3675-
this.data = JSON.stringify(error);
3676-
await handleError(cli, error);
3677-
3678-
}
3679-
this.Send(cli);
3680-
}
3681-
async PushMetrics(cli: WebSocketServerClient) {
3682-
this.Reply();
3683-
// let msg: PushMetricsMessage;
3684-
// try {
3685-
// msg = PushMetricsMessage.assign(this.data);
3686-
// cli.metrics = msg.metrics;
3687-
// if (NoderedUtil.IsNullUndefinded(msg.jwt)) msg.jwt = cli.jwt;
3688-
// } catch (error) {
3689-
// if (error == null) new Error("Unknown error");
3690-
// await handleError(cli, error);
3691-
// if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; }
3692-
// if (msg !== null && msg !== undefined) {
3693-
// msg.error = (error.message ? error.message : error);
3694-
// }
3695-
// }
3696-
// try {
3697-
// this.data = JSON.stringify(msg);
3698-
// } catch (error) {
3699-
// this.data = "";
3700-
// await handleError(cli, error);
3701-
// }
3702-
this.Send(cli);
3703-
}
37043591
}
37053592

37063593
export class JSONfn {

OpenFlow/src/amqpwrapper.ts

Lines changed: 1 addition & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
1-
import * as retry from "async-retry";
21
import * as amqplib from "amqplib";
32
import { Config } from "./Config";
43
import { Crypt } from "./Crypt";
5-
import * as url from "url";
64
import { NoderedUtil } from "@openiap/openflow-api";
75
import { WebSocketServer } from "./WebSocketServer";
86
import { Span } from "@opentelemetry/api";
97
import { Logger } from "./Logger";
108
import events = require("events");
11-
const got = require("got");
129
type QueueOnMessage = (msg: string, options: QueueMessageOptions, ack: any, done: any) => void;
1310
interface IHashTable<T> {
1411
[key: string]: T;
@@ -21,7 +18,7 @@ export type QueueMessageOptions = {
2118
exchange: string,
2219
priority: number
2320
}
24-
type AssertQueue = {
21+
export type AssertQueue = {
2522
consumerCount: number;
2623
messageCount: number;
2724
queue: string;
@@ -99,7 +96,6 @@ export class amqpwrapper extends events.EventEmitter {
9996
this.queuemessagecounter[queuename] = result;
10097
return result;
10198
}
102-
10399
async connect(): Promise<void> {
104100
try {
105101
if (this.timeout != null) {
@@ -519,146 +515,4 @@ export class amqpwrapper extends events.EventEmitter {
519515
this.channel.publish(exchange, routingkey, Buffer.from(data), options);
520516
}
521517
}
522-
static parseurl(amqp_url): url.UrlWithParsedQuery {
523-
const q = url.parse(amqp_url, true);
524-
if (q.port == null || q.port == "") { q.port = "15672"; }
525-
if (q.auth != null && q.auth != "") {
526-
const arr = q.auth.split(':');
527-
(q as any).username = arr[0];
528-
(q as any).password = arr[1];
529-
} else {
530-
(q as any).username = Config.amqp_username;
531-
(q as any).password = Config.amqp_password;
532-
}
533-
q.protocol = 'http://';
534-
return q;
535-
}
536-
537-
// This will crash the channel, that does not seem scalable
538-
async checkQueue(queuename: string): Promise<boolean> {
539-
if (Config.amqp_check_for_consumer) {
540-
let test: AssertQueue = null;
541-
try {
542-
if (Config.amqp_check_for_consumer_count) {
543-
return this.checkQueueConsumerCount(queuename);
544-
}
545-
test = await amqpwrapper.getqueue(Config.amqp_url, '/', queuename);
546-
if (test == null) {
547-
return false;
548-
}
549-
} catch (error) {
550-
test = null;
551-
}
552-
if (test == null || test.consumerCount == 0) {
553-
return false;
554-
}
555-
}
556-
return true;
557-
}
558-
async checkQueueConsumerCount(queuename: string): Promise<boolean> {
559-
let result: boolean = false;
560-
try {
561-
result = await retry(async bail => {
562-
const queue = await amqpwrapper.getqueue(Config.amqp_url, '/', queuename);
563-
// const queue = await amqpwrapper.getqueue(queuename);
564-
let hasConsumers: boolean = false;
565-
if (queue.consumers > 0) {
566-
hasConsumers = true;
567-
}
568-
if (!hasConsumers) {
569-
if (queue.consumer_details != null && queue.consumer_details.length > 0) {
570-
hasConsumers = true;
571-
} else {
572-
hasConsumers = false;
573-
}
574-
}
575-
if (hasConsumers == false) {
576-
hasConsumers = false;
577-
throw new Error("No consumer listening at " + queuename);
578-
// return bail();
579-
}
580-
return hasConsumers;
581-
}, {
582-
retries: 10,
583-
minTimeout: 500,
584-
maxTimeout: 500,
585-
onRetry: function (error: Error, count: number): void {
586-
result = false;
587-
console.warn("retry " + count + " error " + error.message + " getting " + url);
588-
}
589-
});
590-
} catch (error) {
591-
Logger.instanse.debug(error.message ? error.message : error);
592-
}
593-
if (result == true) {
594-
return result;
595-
}
596-
return false;
597-
}
598-
static async getvhosts(amqp_url) {
599-
const q = this.parseurl(amqp_url);
600-
const options = {
601-
headers: {
602-
'Content-type': 'application/x-www-form-urlencoded'
603-
},
604-
username: (q as any).username,
605-
password: (q as any).password
606-
};
607-
const _url = 'http://' + q.host + ':' + q.port + '/api/vhosts';
608-
const response = await got.get(_url, options);
609-
const payload = JSON.parse(response.body);
610-
return payload;
611-
}
612-
static async getqueues(amqp_url: string, vhost: string = null) {
613-
const q = this.parseurl(amqp_url);
614-
const options = {
615-
headers: {
616-
'Content-type': 'application/x-www-form-urlencoded'
617-
},
618-
username: (q as any).username,
619-
password: (q as any).password
620-
};
621-
let _url = 'http://' + q.host + ':' + q.port + '/api/queues';
622-
if (!NoderedUtil.IsNullEmpty(vhost)) _url += '/' + encodeURIComponent(vhost);
623-
const response = await got.get(_url, options);
624-
const payload = JSON.parse(response.body);
625-
return payload;
626-
}
627-
static async getqueue(amqp_url: string, vhost: string, queuename) {
628-
// const queues = await amqpwrapper.getqueues(Config.amqp_url);
629-
// for (let i = 0; i < queues.length; i++) {
630-
// let queue = queues[i];
631-
// if (queue.name == queuename) {
632-
// return queue;
633-
// }
634-
// }
635-
const q = this.parseurl(amqp_url);
636-
const options = {
637-
headers: {
638-
'Content-type': 'application/x-www-form-urlencoded'
639-
},
640-
username: (q as any).username,
641-
password: (q as any).password,
642-
timeout: 500, retry: 1
643-
};
644-
const _url = 'http://' + q.host + ':' + q.port + '/api/queues/' + encodeURIComponent(vhost) + '/' + encodeURIComponent(queuename);
645-
const response = await got.get(_url, options);
646-
const payload = JSON.parse(response.body);
647-
return payload;
648-
}
649-
static async deletequeue(amqp_url: string, vhost: string, queuename) {
650-
const q = this.parseurl(amqp_url);
651-
const options = {
652-
headers: {
653-
'Content-type': 'application/x-www-form-urlencoded'
654-
},
655-
username: (q as any).username,
656-
password: (q as any).password,
657-
timeout: 500, retry: 1
658-
};
659-
const _url = 'http://' + q.host + ':' + q.port + '/api/queues/' + encodeURIComponent(vhost) + '/' + encodeURIComponent(queuename);
660-
const response = await got.delete(_url, options);
661-
const payload = JSON.parse(response.body);
662-
return payload;
663-
}
664518
}

0 commit comments

Comments
 (0)