Skip to content

Commit c910c8b

Browse files
committed
improve queue info/control
1 parent 7599756 commit c910c8b

9 files changed

Lines changed: 228 additions & 28 deletions

File tree

OpenFlow/src/Messages/Message.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,12 @@ export class Message {
182182
case "dumprabbitmq":
183183
this.DumpRabbitmq(cli);
184184
break;
185+
case "getrabbitmqqueue":
186+
this.GetRabbitmqQueue(cli);
187+
break;
188+
case "deleterabbitmqqueue":
189+
this.DeleterabbitmqQueue(cli);
190+
break;
185191
default:
186192
this.UnknownCommand(cli);
187193
break;
@@ -2229,6 +2235,13 @@ export class Message {
22292235
consumers = queue.consumer_details.length;
22302236
}
22312237
}
2238+
// if (consumers > 0 && queue.consumer_details == null) {
2239+
// var tempconfig = await amqpwrapper.getqueue(Config.amqp_url, '/', queue.name);
2240+
// // var tempconfig = await amqpwrapper.getqueue(queue.name);
2241+
// if (tempconfig.consumer_details != null) {
2242+
// item.consumer_details = tempconfig.consumer_details
2243+
// }
2244+
// }
22322245
item.queuename = queue.name;
22332246
item.consumers = consumers;
22342247
item.name = queue.name + "(" + consumers + ")";
@@ -2265,6 +2278,45 @@ export class Message {
22652278
}
22662279
this.Send(cli);
22672280
}
2281+
async GetRabbitmqQueue(cli: WebSocketServerClient) {
2282+
this.Reply();
2283+
try {
2284+
let msg: any = JSON.parse(this.data);
2285+
const kickstartapi = amqpwrapper.getvhosts(Config.amqp_url);
2286+
try {
2287+
msg.data = await amqpwrapper.getqueue(Config.amqp_url, '/', msg.name);
2288+
this.data = JSON.stringify(msg);
2289+
} catch (error) {
2290+
cli._logger.error(error);
2291+
}
2292+
} catch (error) {
2293+
this.command = "error";
2294+
this.data = JSON.stringify(error);
2295+
cli._logger.error(error);
2296+
2297+
}
2298+
this.Send(cli);
2299+
}
2300+
async DeleterabbitmqQueue(cli: WebSocketServerClient) {
2301+
this.Reply();
2302+
try {
2303+
let msg: any = JSON.parse(this.data);
2304+
const kickstartapi = amqpwrapper.getvhosts(Config.amqp_url);
2305+
try {
2306+
msg.data = await amqpwrapper.deletequeue(Config.amqp_url, '/', msg.name);
2307+
this.data = JSON.stringify(msg);
2308+
} catch (error) {
2309+
cli._logger.error(error);
2310+
}
2311+
} catch (error) {
2312+
this.command = "error";
2313+
this.data = JSON.stringify(error);
2314+
cli._logger.error(error);
2315+
2316+
}
2317+
this.Send(cli);
2318+
}
2319+
22682320
}
22692321

22702322
export class JSONfn {

OpenFlow/src/amqpwrapper.ts

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -402,8 +402,8 @@ export class amqpwrapper {
402402
var result: boolean = false;
403403
try {
404404
result = await retry(async bail => {
405-
// var queue = await amqpwrapper.getqueue(Config.amqp_url, '/', queuename);
406-
var queue = await amqpwrapper.getqueue(queuename);
405+
var queue = await amqpwrapper.getqueue(Config.amqp_url, '/', queuename);
406+
// var queue = await amqpwrapper.getqueue(queuename);
407407
let hasConsumers: boolean = false;
408408
if (queue.consumers > 0) {
409409
hasConsumers = true;
@@ -466,29 +466,45 @@ export class amqpwrapper {
466466
var payload = JSON.parse(response.body);
467467
return payload;
468468
}
469-
static async getqueue(queuename) {
470-
const queues = await amqpwrapper.getqueues(Config.amqp_url);
471-
for (let i = 0; i < queues.length; i++) {
472-
let queue = queues[i];
473-
if (queue.name == queuename) {
474-
return queue;
475-
}
476-
}
477-
// var q = this.parseurl(amqp_url);
478-
// var options = {
479-
// headers: {
480-
// 'Content-type': 'application/x-www-form-urlencoded'
481-
// },
482-
// username: (q as any).username,
483-
// password: (q as any).password,
484-
// timeout: 500, retry: 1
485-
// };
486-
// var _url = 'http://' + q.host + ':' + q.port + '/api/queues/' + encodeURIComponent(vhost) + '/' + queuename;
487-
// var response = await got.get(_url, options);
488-
// var payload = JSON.parse(response.body);
489-
// return payload;
469+
static async getqueue(amqp_url: string, vhost: string, queuename) {
470+
// const queues = await amqpwrapper.getqueues(Config.amqp_url);
471+
// for (let i = 0; i < queues.length; i++) {
472+
// let queue = queues[i];
473+
// if (queue.name == queuename) {
474+
// return queue;
475+
// }
476+
// }
477+
var q = this.parseurl(amqp_url);
478+
var options = {
479+
headers: {
480+
'Content-type': 'application/x-www-form-urlencoded'
481+
},
482+
username: (q as any).username,
483+
password: (q as any).password,
484+
timeout: 500, retry: 1
485+
};
486+
var _url = 'http://' + q.host + ':' + q.port + '/api/queues/' + encodeURIComponent(vhost) + '/' + encodeURIComponent(queuename);
487+
var response = await got.get(_url, options);
488+
var payload = JSON.parse(response.body);
489+
return payload;
490+
}
491+
static async deletequeue(amqp_url: string, vhost: string, queuename) {
492+
var q = this.parseurl(amqp_url);
493+
var options = {
494+
headers: {
495+
'Content-type': 'application/x-www-form-urlencoded'
496+
},
497+
username: (q as any).username,
498+
password: (q as any).password,
499+
timeout: 500, retry: 1
500+
};
501+
var _url = 'http://' + q.host + ':' + q.port + '/api/queues/' + encodeURIComponent(vhost) + '/' + encodeURIComponent(queuename);
502+
var response = await got.delete(_url, options);
503+
var payload = JSON.parse(response.body);
504+
return payload;
490505
}
491506

507+
492508
// async checkQueue(queue: string): Promise<boolean> {
493509
// if (Config.amqp_check_for_consumer) {
494510
// var q: amqpqueue = this.queues[queue];

OpenFlow/src/public/Controllers.ts

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3453,6 +3453,107 @@ export class QueuesCtrl extends entitiesCtrl<Base> {
34533453
this.loadData();
34543454
}
34553455
}
3456+
export class QueueCtrl extends entityCtrl<Base> {
3457+
public newid: string;
3458+
public memberof: Role[];
3459+
public data: any;
3460+
constructor(
3461+
public $scope: ng.IScope,
3462+
public $location: ng.ILocationService,
3463+
public $routeParams: ng.route.IRouteParamsService,
3464+
public $interval: ng.IIntervalService,
3465+
public WebSocketClientService: WebSocketClientService,
3466+
public api: api
3467+
) {
3468+
super($scope, $location, $routeParams, $interval, WebSocketClientService, api);
3469+
console.debug("QueueCtrl");
3470+
this.collection = "configclients";
3471+
this.postloadData = this.processdata;
3472+
this.memberof = [];
3473+
WebSocketClientService.onSignedin((user: TokenUser) => {
3474+
if (this.id !== null && this.id !== undefined) {
3475+
this.loadData();
3476+
} else {
3477+
this.model = new Base();
3478+
this.model._type = "queue";
3479+
this.model.name = "";
3480+
this.processdata();
3481+
}
3482+
3483+
});
3484+
}
3485+
async processdata() {
3486+
try {
3487+
if (this.model == null) {
3488+
this.errormessage = "Not found!";
3489+
this.loading = false;
3490+
if (!this.$scope.$$phase) { this.$scope.$apply(); }
3491+
return;
3492+
}
3493+
this.loading = true;
3494+
let m: Message = new Message();
3495+
m.command = "getrabbitmqqueue"; m.data = "{\"name\": \"" + (this.model as any).queuename + "\"}";
3496+
var q = await WebSocketClient.instance.Send<any>(m);
3497+
if ((q as any).command == "error") throw new Error(q.data);
3498+
this.data = q.data;
3499+
if (this.data == null) {
3500+
this.errormessage = "Queue not found!";
3501+
this.loading = false;
3502+
if (!this.$scope.$$phase) { this.$scope.$apply(); }
3503+
return;
3504+
}
3505+
if (this.data.consumer_details == null || this.data.consumer_details.length == 0) {
3506+
this.errormessage = "Queue has no consumers!";
3507+
this.loading = false;
3508+
if (!this.$scope.$$phase) { this.$scope.$apply(); }
3509+
return;
3510+
}
3511+
this.collection = "configclients";
3512+
this.basequery = { _type: "socketclient" };
3513+
var clients = await NoderedUtil.Query("configclients", { _type: "socketclient" }, null, null, 2000, 0, null, null);
3514+
for (var i = 0; i < this.data.consumer_details.length; i++) {
3515+
console.log("find " + this.data.consumer_details[i].consumer_tag);
3516+
3517+
for (var y = 0; y < clients.length; y++) {
3518+
const _client = clients[y];
3519+
if (_client.queues != null) {
3520+
const keys = Object.keys(_client.queues);
3521+
for (var z = 0; z < keys.length; z++) {
3522+
var q = _client.queues[keys[z]];
3523+
console.log(_client.name + " " + q.consumerTag);
3524+
if (q.consumerTag == this.data.consumer_details[i].consumer_tag) {
3525+
this.data.consumer_details[i].clientname = _client.name;
3526+
}
3527+
}
3528+
}
3529+
3530+
}
3531+
3532+
}
3533+
} catch (error) {
3534+
console.error(error);
3535+
}
3536+
this.loading = false;
3537+
if (!this.$scope.$$phase) { this.$scope.$apply(); }
3538+
}
3539+
async DeleteQueue(model) {
3540+
try {
3541+
this.loading = true;
3542+
let m: Message = new Message();
3543+
m.command = "deleterabbitmqqueue"; m.data = "{\"name\": \"" + (this.model as any).queuename + "\"}";
3544+
var q = await WebSocketClient.instance.Send<any>(m);
3545+
if ((q as any).command == "error") throw new Error(q.data);
3546+
this.data = q.data;
3547+
this.$location.path("/Queues");
3548+
} catch (error) {
3549+
this.errormessage = error;
3550+
console.error(error);
3551+
}
3552+
this.loading = false;
3553+
if (!this.$scope.$$phase) { this.$scope.$apply(); }
3554+
}
3555+
3556+
}
34563557
export class SocketsCtrl extends entitiesCtrl<Base> {
34573558
public message: string = "";
34583559
public charts: chartset[] = [];

OpenFlow/src/public/Queue.html

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<a ng-disabled="ctrl.loading==true" type="button" class="btn btn-success" ng-href="#/Queues">Queues</a>
2+
<a ng-disabled="ctrl.loading==true" type="button" class="btn btn-success" ng-href="#/Sockets">Sockets</a>
3+
<div class="row">
4+
<h1 translate lib="web">{{ctrl.model.name}}</h1>
5+
</div>
6+
<div ng-show="ctrl.errormessage != ''"" class=" alert alert-danger" role="alert">{{ctrl.errormessage}}</div>
7+
8+
<table id=" table1" class="table table-striped table-hover table-sm" when-scrolled="ctrl.more()" style="width: 100%;">
9+
<thead class="thead-dark">
10+
<tr>
11+
<th scope="col"><b translate lib="web">ack</b></th>
12+
<th scope="col"><b translate lib="web">active</b></th>
13+
<th scope="col"><b translate lib="web">consumer_tag</b></th>
14+
<th scope="col"><b translate lib="web">clientname</b></th>
15+
<th scope="col"><b translate lib="web"></b></th>
16+
</tr>
17+
</thead>
18+
<tbody>
19+
<tr ng-repeat="model in ctrl.data.consumer_details">
20+
<td>{{model.ack_required}}</td>
21+
<td>{{model.active}}</td>
22+
<td>{{model.consumer_tag}}</td>
23+
<td>{{model.clientname}}</td>
24+
<td class="btn-cell">
25+
<a href ng-click="ctrl.DeleteQueue(model)" ng-disabled="ctrl.loading==true"><i class="az-trash"></i></a>
26+
</td>
27+
</tr>
28+
</tbody>
29+
</table>

OpenFlow/src/public/Queues.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
</thead>
1717
<tbody>
1818
<tr ng-repeat="model in ctrl.models">
19-
<td>{{model.name}}</td>
19+
<td><a ng-href="#/Queue/{{model._id}}">{{model.name}}</a></td>
2020
<td><span translate lib="web">{{model.type || model._type}}</span></td>
2121
<td>{{model.consumers}}</td>
2222
<td>{{model.queuename}}</td>

OpenFlow/src/public/app.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { WebSocketClientService } from "./WebSocketClientService";
22
import angular = require("angular");
33
import { timesince, translate, textarea, fileread, userdata, api } from "./CommonControllers";
4-
import { MenuCtrl, ProvidersCtrl, MainCtrl, LoginCtrl, ProviderCtrl, UsersCtrl, UserCtrl, RolesCtrl, RoleCtrl, RPAWorkflowsCtrl, RPAWorkflowCtrl, WorkflowsCtrl, ReportsCtrl, jslogCtrl, EditFormCtrl, FormsCtrl, FormCtrl, FilesCtrl, EntitiesCtrl, EntityCtrl, HistoryCtrl, SocketCtrl, NoderedCtrl, hdrobotsCtrl, RobotsCtrl, AuditlogsCtrl, SignupCtrl, PaymentCtrl, QueuesCtrl, SocketsCtrl } from "./Controllers";
4+
import { MenuCtrl, ProvidersCtrl, MainCtrl, LoginCtrl, ProviderCtrl, UsersCtrl, UserCtrl, RolesCtrl, RoleCtrl, RPAWorkflowsCtrl, RPAWorkflowCtrl, WorkflowsCtrl, ReportsCtrl, jslogCtrl, EditFormCtrl, FormsCtrl, FormCtrl, FilesCtrl, EntitiesCtrl, EntityCtrl, HistoryCtrl, SocketCtrl, NoderedCtrl, hdrobotsCtrl, RobotsCtrl, AuditlogsCtrl, SignupCtrl, PaymentCtrl, QueuesCtrl, SocketsCtrl, QueueCtrl } from "./Controllers";
55

66
/**
77
* @type {angular.Module}
@@ -86,6 +86,8 @@ module openflow {
8686
.when('/Payment', { templateUrl: 'Payment.html', controller: PaymentCtrl, controllerAs: 'ctrl' })
8787
.when('/Payment/:userid', { templateUrl: 'Payment.html', controller: PaymentCtrl, controllerAs: 'ctrl' })
8888

89+
.when('/Queue', { templateUrl: 'Queue.html', controller: QueueCtrl, controllerAs: 'ctrl' })
90+
.when('/Queue/:id', { templateUrl: 'Queue.html', controller: QueueCtrl, controllerAs: 'ctrl' })
8991
.when('/Queues', { templateUrl: 'Queues.html', controller: QueuesCtrl, controllerAs: 'ctrl' })
9092
.when('/Sockets', { templateUrl: 'Sockets.html', controller: SocketsCtrl, controllerAs: 'ctrl' })
9193

OpenFlowNodeRED/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "openflow-nodered",
3-
"version": "1.0.71",
3+
"version": "1.0.72",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.0.71
1+
1.0.72

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "openiap",
3-
"version": "1.0.71",
3+
"version": "1.0.72",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {

0 commit comments

Comments
 (0)