Skip to content

Commit d5cf461

Browse files
committed
Add better error queue message
1 parent 62995f4 commit d5cf461

7 files changed

Lines changed: 40 additions & 37 deletions

File tree

OpenFlow/src/Messages/Message.ts

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -604,11 +604,11 @@ export class Message {
604604
if (mq != null) {
605605
if (Config.amqp_force_consumer_has_update) {
606606
if (!DatabaseConnection.hasAuthorization(tuser, mq, Rights.update)) {
607-
throw new Error("Unknown queue or access denied, missing update permission on exchange object " + tuser.name);
607+
throw new Error("[" + tuser.name + "] Unknown queue or access denied, missing update permission on exchange object " + msg.exchangename);
608608
}
609609
} else if (Config.amqp_force_sender_has_invoke) {
610610
if (!DatabaseConnection.hasAuthorization(tuser, mq, Rights.invoke)) {
611-
throw new Error("Unknown queue or access denied, missing invoke permission on exchange object " + tuser.name);
611+
throw new Error("[" + tuser.name + "] Unknown queue or access denied, missing invoke permission on exchange object " + msg.exchangename);
612612
}
613613
}
614614
} else {
@@ -627,7 +627,7 @@ export class Message {
627627
var addqueue: boolean = (msg.addqueue as any);
628628
if (NoderedUtil.IsNullEmpty(addqueue)) addqueue = true;
629629
addqueue = Config.parseBoolean(addqueue);
630-
var res = await cli.RegisterExchange(msg.exchangename, msg.algorithm, msg.routingkey, addqueue, parent);
630+
var res = await cli.RegisterExchange(tuser, msg.exchangename, msg.algorithm, msg.routingkey, addqueue, parent);
631631
msg.queuename = res.queuename;
632632
msg.exchangename = res.exchangename;
633633
} catch (error) {
@@ -719,11 +719,11 @@ export class Message {
719719
if (mq != null) {
720720
if (Config.amqp_force_consumer_has_update) {
721721
if (!DatabaseConnection.hasAuthorization(tuser, mq, Rights.update)) {
722-
throw new Error("Unknown queue or access denied, missing update permission on users object " + tuser.name);
722+
throw new Error("[" + tuser.name + "] Unknown queue or access denied, missing update permission on users object " + mq.name + " " + mq._id);
723723
}
724724
} else if (Config.amqp_force_sender_has_invoke) {
725725
if (!DatabaseConnection.hasAuthorization(tuser, mq, Rights.invoke)) {
726-
throw new Error("Unknown queue or access denied, missing invoke permission on users object " + tuser.name);
726+
throw new Error("[" + tuser.name + "] Unknown queue or access denied, missing invoke permission on users object " + mq.name + " " + mq._id);
727727
}
728728
}
729729
allowed = true;
@@ -741,11 +741,11 @@ export class Message {
741741
if (mq != null) {
742742
if (Config.amqp_force_consumer_has_update) {
743743
if (!DatabaseConnection.hasAuthorization(tuser, mq, Rights.update)) {
744-
throw new Error("Unknown queue or access denied, missing update permission on queue object " + tuser.name);
744+
throw new Error("[" + tuser.name + "] Unknown queue or access denied, missing update permission on queue object " + msg.queuename);
745745
}
746746
} else if (Config.amqp_force_sender_has_invoke) {
747747
if (!DatabaseConnection.hasAuthorization(tuser, mq, Rights.invoke)) {
748-
throw new Error("Unknown queue or access denied, missing invoke permission on queue object " + tuser.name);
748+
throw new Error("[" + tuser.name + "] Unknown queue or access denied, missing invoke permission on queue object " + msg.queuename);
749749
}
750750
}
751751
allowed = true;
@@ -834,11 +834,11 @@ export class Message {
834834
if (mq != null) {
835835
if (Config.amqp_force_sender_has_invoke) {
836836
if (!DatabaseConnection.hasAuthorization(tuser, mq, Rights.invoke)) {
837-
throw new Error("Unknown queue or access denied, missing invoke permission on users object " + tuser.name);
837+
throw new Error("[" + tuser.name + "] Unknown queue or access denied, missing invoke permission on users object " + mq.name + " " + mq._id);
838838
}
839839
} else {
840840
if (!DatabaseConnection.hasAuthorization(tuser, mq, Rights.read)) {
841-
throw new Error("Unknown queue or access denied, missing read permission on users object " + tuser.name);
841+
throw new Error("[" + tuser.name + "] Unknown queue or access denied, missing read permission on users object " + mq.name + " " + mq._id);
842842
}
843843
}
844844
allowed = true;
@@ -856,11 +856,11 @@ export class Message {
856856
if (mq != null) {
857857
if (Config.amqp_force_sender_has_invoke) {
858858
if (!DatabaseConnection.hasAuthorization(tuser, mq, Rights.invoke)) {
859-
throw new Error("Unknown queue or access denied, missing invoke permission on queue object " + tuser.name);
859+
throw new Error("[" + tuser.name + "] Unknown queue or access denied, missing invoke permission on queue object " + msg.queuename);
860860
}
861861
} else {
862862
if (!DatabaseConnection.hasAuthorization(tuser, mq, Rights.read)) {
863-
throw new Error("Unknown queue or access denied, missing read permission on queue object " + tuser.name);
863+
throw new Error("[" + tuser.name + "] Unknown queue or access denied, missing read permission on queue object " + msg.queuename);
864864
}
865865

866866
}
@@ -951,7 +951,9 @@ export class Message {
951951
let msg: CloseQueueMessage
952952
try {
953953
msg = CloseQueueMessage.assign(this.data);
954-
await cli.CloseConsumer(msg.queuename, parent);
954+
const jwt: string = msg.jwt || this.jwt;
955+
const tuser = Crypt.verityToken(jwt);
956+
await cli.CloseConsumer(tuser, msg.queuename, parent);
955957
} catch (error) {
956958
await handleError(cli, error);
957959
if (NoderedUtil.IsNullUndefinded(msg)) { (msg as any) = {}; }

OpenFlow/src/QueueClient.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { NoderedUtil } from "@openiap/openflow-api";
22
import { Span } from "@opentelemetry/api";
33
import { amqpqueue, amqpwrapper, QueueMessageOptions } from "./amqpwrapper";
44
import { Config } from "./Config";
5+
import { Crypt } from "./Crypt";
56
import { Logger } from "./Logger";
67
import { Message } from "./Messages/Message";
78
export class QueueClient {
@@ -32,7 +33,7 @@ export class QueueClient {
3233
AssertQueueOptions.exclusive = false;
3334
AssertQueueOptions["x-max-priority"] = 5;
3435
AssertQueueOptions.maxPriority = 5;
35-
await amqpwrapper.Instance().AddQueueConsumer(this.queuename, AssertQueueOptions, null, async (data: any, options: QueueMessageOptions, ack: any, done: any) => {
36+
await amqpwrapper.Instance().AddQueueConsumer(Crypt.rootUser(), this.queuename, AssertQueueOptions, null, async (data: any, options: QueueMessageOptions, ack: any, done: any) => {
3637
const msg: Message = Message.fromjson(data);
3738
let span: Span = null;
3839
try {
@@ -65,7 +66,7 @@ export class QueueClient {
6566
public static async RegisterMyQueue() {
6667
const AssertQueueOptions: any = Object.assign({}, (amqpwrapper.Instance().AssertQueueOptions));
6768
AssertQueueOptions.exclusive = false;
68-
this.queue = await amqpwrapper.Instance().AddQueueConsumer("", AssertQueueOptions, null, async (data: any, options: QueueMessageOptions, ack: any, done: any) => {
69+
this.queue = await amqpwrapper.Instance().AddQueueConsumer(Crypt.rootUser(), "", AssertQueueOptions, null, async (data: any, options: QueueMessageOptions, ack: any, done: any) => {
6970
const msg: Message = Message.fromjson(data);
7071
try {
7172
if (NoderedUtil.IsNullEmpty(options.replyTo)) {

OpenFlow/src/WebSocketServerClient.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { SocketMessage } from "./SocketMessage";
33
import { Message, JSONfn } from "./Messages/Message";
44
import { Config } from "./Config";
55
import { amqpwrapper, QueueMessageOptions, amqpqueue, amqpexchange, exchangealgorithm } from "./amqpwrapper";
6-
import { NoderedUtil, Base, InsertOneMessage, QueueMessage, MapReduceMessage, QueryMessage, UpdateOneMessage, UpdateManyMessage, DeleteOneMessage, User, mapFunc, reduceFunc, finalizeFunc, QueuedMessage, QueuedMessageCallback, WatchEventMessage, QueueClosedMessage, ExchangeClosedMessage } from "@openiap/openflow-api";
6+
import { NoderedUtil, Base, InsertOneMessage, QueueMessage, MapReduceMessage, QueryMessage, UpdateOneMessage, UpdateManyMessage, DeleteOneMessage, User, mapFunc, reduceFunc, finalizeFunc, QueuedMessage, QueuedMessageCallback, WatchEventMessage, QueueClosedMessage, ExchangeClosedMessage, TokenUser } from "@openiap/openflow-api";
77
import { ChangeStream } from "mongodb";
88
import { WebSocketServer } from "./WebSocketServer";
99
import { Span } from "@opentelemetry/api";
@@ -215,7 +215,7 @@ export class WebSocketServerClient {
215215
for (let i = this._queues.length - 1; i >= 0; i--) {
216216
try {
217217
// await this.CloseConsumer(this._queues[i]);
218-
await amqpwrapper.Instance().RemoveQueueConsumer(this._queues[i], undefined);
218+
await amqpwrapper.Instance().RemoveQueueConsumer(this.user, this._queues[i], undefined);
219219
this._queues.splice(i, 1);
220220
this._queuescurrent--;
221221
this._queuescurrentstr = this._queuescurrent.toString();
@@ -260,7 +260,7 @@ export class WebSocketServerClient {
260260
Logger.otel.endSpan(span);
261261
}
262262
}
263-
public async CloseConsumer(queuename: string, parent: Span): Promise<void> {
263+
public async CloseConsumer(user: TokenUser | User, queuename: string, parent: Span): Promise<void> {
264264
const span: Span = Logger.otel.startSubSpan("WebSocketServerClient.CloseConsumer", parent);
265265
await semaphore.down();
266266
try {
@@ -269,7 +269,7 @@ export class WebSocketServerClient {
269269
const q = this._queues[i];
270270
if (q && (q.queue == queuename || q.queuename == queuename)) {
271271
try {
272-
amqpwrapper.Instance().RemoveQueueConsumer(this._queues[i], span).catch((err) => {
272+
amqpwrapper.Instance().RemoveQueueConsumer(user, this._queues[i], span).catch((err) => {
273273
Logger.instanse.error("WebSocketclient::CloseConsumer::RemoveQueueConsumer " + err);
274274
});
275275
this._queues.splice(i, 1);
@@ -289,7 +289,7 @@ export class WebSocketServerClient {
289289
Logger.otel.endSpan(span);
290290
}
291291
}
292-
public async RegisterExchange(exchangename: string, algorithm: exchangealgorithm, routingkey: string = "", addqueue: boolean, parent: Span): Promise<RegisterExchangeResponse> {
292+
public async RegisterExchange(user: TokenUser | User, exchangename: string, algorithm: exchangealgorithm, routingkey: string = "", addqueue: boolean, parent: Span): Promise<RegisterExchangeResponse> {
293293
const span: Span = Logger.otel.startSubSpan("WebSocketServerClient.CreateConsumer", parent);
294294
try {
295295
let exclusive: boolean = false; // Should we keep the queue around ? for robots and roles
@@ -311,7 +311,7 @@ export class WebSocketServerClient {
311311
try {
312312
const AssertExchangeOptions: any = Object.assign({}, (amqpwrapper.Instance().AssertExchangeOptions));
313313
AssertExchangeOptions.exclusive = exclusive;
314-
exchangequeue = await amqpwrapper.Instance().AddExchangeConsumer(exchange, algorithm, routingkey, AssertExchangeOptions, this.jwt, addqueue, async (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
314+
exchangequeue = await amqpwrapper.Instance().AddExchangeConsumer(user, exchange, algorithm, routingkey, AssertExchangeOptions, this.jwt, addqueue, async (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
315315
const _data = msg;
316316
try {
317317
const result = await this.Queue(msg, exchange, options);
@@ -368,7 +368,7 @@ export class WebSocketServerClient {
368368
qname = "unknown." + NoderedUtil.GetUniqueIdentifier(); exclusive = true;
369369
}
370370
}
371-
await this.CloseConsumer(qname, span);
371+
await this.CloseConsumer(this.user, qname, span);
372372
let queue: amqpqueue = null;
373373
try {
374374
const AssertQueueOptions: any = Object.assign({}, (amqpwrapper.Instance().AssertQueueOptions));
@@ -380,10 +380,10 @@ export class WebSocketServerClient {
380380
if (exists.length > 0) {
381381
Logger.instanse.warn("CreateConsumer: " + qname + " already exists, removing before re-creating");
382382
for (let i = 0; i < exists.length; i++) {
383-
await amqpwrapper.Instance().RemoveQueueConsumer(exists[i], span);
383+
await amqpwrapper.Instance().RemoveQueueConsumer(this.user, exists[i], span);
384384
}
385385
}
386-
queue = await amqpwrapper.Instance().AddQueueConsumer(qname, AssertQueueOptions, this.jwt, async (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
386+
queue = await amqpwrapper.Instance().AddQueueConsumer(this.user, qname, AssertQueueOptions, this.jwt, async (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
387387
const _data = msg;
388388
try {
389389
Logger.instanse.verbose("[preack] queuename: " + queuename + " qname: " + qname + " replyto: " + options.replyTo + " correlationId: " + options.correlationId)

0 commit comments

Comments
 (0)