Skip to content

Commit 1d33ba2

Browse files
committed
Send ack faster
1 parent 88b159f commit 1d33ba2

4 files changed

Lines changed: 13 additions & 18 deletions

File tree

OpenFlow/src/LoginProvider.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ export class LoginProvider {
485485
Logger.otel.endSpan(span);
486486
return;
487487
}
488-
Logger.instanse.error("validate_user_form " + Config.validate_user_form + " does not exists!");
488+
Logger.instanse.info("validate_user_form " + Config.validate_user_form + " does not exists!");
489489
Config.validate_user_form = "";
490490
res.end(JSON.stringify({}));
491491
res.end();

OpenFlow/src/QueueClient.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,16 @@ export class QueueClient {
4040
ack(false);
4141
return;
4242
}
43+
ack();
4344
try {
4445
msg.priority = options.priority;
4546
if (!NoderedUtil.IsNullEmpty(options.replyTo)) {
4647
span = Logger.otel.startSpan("QueueClient.QueueMessage");
4748
if (Config.log_openflow_amqp) Logger.instanse.debug("[queue] Process command: " + msg.command + " id: " + msg.id + " correlationId: " + options.correlationId);
4849
await msg.QueueProcess(options, span);
49-
// ack();
5050
await amqpwrapper.Instance().send(options.exchange, options.replyTo, msg, Config.openflow_amqp_expiration, options.correlationId, options.routingKey);
5151
} else {
5252
Logger.instanse.debug("[queue][ack] No replyto !!!!");
53-
// ack();
5453
}
5554
} catch (error) {
5655
// setTimeout(() => {
@@ -59,7 +58,6 @@ export class QueueClient {
5958
// Logger.instanse.warn("[queue][nack] Process message failed command: " + msg.command + " queuename: " + this.queuename + " replyto: " + options.replyTo + " correlationId: " + options.correlationId + " error: " + (error.message ? error.message : error))
6059
// }, Config.amqp_requeue_time);
6160
} finally {
62-
ack();
6361
Logger.otel.endSpan(span);
6462
}
6563
}, null);
@@ -68,6 +66,7 @@ export class QueueClient {
6866
const AssertQueueOptions: any = Object.assign({}, (amqpwrapper.Instance().AssertQueueOptions));
6967
AssertQueueOptions.exclusive = false;
7068
this.queue = await amqpwrapper.Instance().AddQueueConsumer(Crypt.rootUser(), "", AssertQueueOptions, null, async (data: any, options: QueueMessageOptions, ack: any, done: any) => {
69+
ack();
7170
const msg: Message = Message.fromjson(data);
7271
try {
7372
if (NoderedUtil.IsNullEmpty(options.replyTo)) {
@@ -82,15 +81,12 @@ export class QueueClient {
8281
} else {
8382
// throw new Error("Got message with no replyto");
8483
}
85-
// ack();
8684
} catch (error) {
8785
// setTimeout(() => {
8886
// ack(false);
8987
// // done(_data);
9088
// Logger.instanse.warn("[queue][nack] Received response failed for command: " + msg.command + " queuename: " + this.queuename + " replyto: " + options.replyTo + " correlationId: " + options.correlationId + " error: " + (error.message ? error.message : error))
9189
// }, Config.amqp_requeue_time);
92-
} finally {
93-
ack();
9490
}
9591
}, null);
9692
}

OpenFlow/src/WebSocketServerClient.ts

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -312,17 +312,17 @@ export class WebSocketServerClient {
312312
const AssertExchangeOptions: any = Object.assign({}, (amqpwrapper.Instance().AssertExchangeOptions));
313313
AssertExchangeOptions.exclusive = exclusive;
314314
exchangequeue = await amqpwrapper.Instance().AddExchangeConsumer(user, exchange, algorithm, routingkey, AssertExchangeOptions, this.jwt, addqueue, async (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
315+
ack();
315316
const _data = msg;
316317
try {
317318
const result = await this.Queue(msg, exchange, options);
318-
ack();
319319
done(result);
320320
} catch (error) {
321-
setTimeout(() => {
322-
ack(false);
323-
done(_data);
324-
console.error(exchange + " failed message queue message, nack and re queue message: ", (error.message ? error.message : error));
325-
}, Config.amqp_requeue_time);
321+
// setTimeout(() => {
322+
// ack(false);
323+
// done(_data);
324+
// console.error(exchange + " failed message queue message, nack and re queue message: ", (error.message ? error.message : error));
325+
// }, Config.amqp_requeue_time);
326326
}
327327
}, span);
328328
if (exchangequeue) {
@@ -384,13 +384,13 @@ export class WebSocketServerClient {
384384
}
385385
}
386386
queue = await amqpwrapper.Instance().AddQueueConsumer(this.user, qname, AssertQueueOptions, this.jwt, async (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
387+
ack();
387388
// const _data = msg;
388389
var _data = msg;
389390
try {
390391
Logger.instanse.verbose("[preack] queuename: " + queuename + " qname: " + qname + " replyto: " + options.replyTo + " correlationId: " + options.correlationId)
391392
_data = await this.Queue(msg, qname, options);;
392393
// const result = await this.Queue(msg, qname, options);
393-
// ack();
394394
// done(result);
395395
Logger.instanse.debug("[ack] queuename: " + queuename + " qname: " + qname + " replyto: " + options.replyTo + " correlationId: " + options.correlationId)
396396
} catch (error) {
@@ -400,7 +400,6 @@ export class WebSocketServerClient {
400400
// Logger.instanse.warn("[nack] queuename: " + queuename + " qname: " + qname + " replyto: " + options.replyTo + " correlationId: " + options.correlationId + " error: " + (error.message ? error.message : error))
401401
// }, Config.amqp_requeue_time);
402402
} finally {
403-
ack();
404403
try {
405404
done(_data);
406405
} catch (error) {

OpenFlow/src/amqpwrapper.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ export class amqpwrapper extends events.EventEmitter {
206206
this.channel = await this.conn.createConfirmChannel();
207207
this.channel.prefetch(Config.amqp_prefetch);
208208
this.replyqueue = await this.AddQueueConsumer(Crypt.rootUser(), "", null, null, (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
209+
ack();
209210
try {
210211
if (this.replyqueue) {
211212
if (!NoderedUtil.IsNullUndefinded(WebSocketServer.websocket_queue_message_count)) WebSocketServer.websocket_queue_message_count.
@@ -218,7 +219,6 @@ export class amqpwrapper extends events.EventEmitter {
218219
} catch (error) {
219220
console.error(error);
220221
}
221-
ack();
222222
done();
223223
}, undefined);
224224
// We don't want to recreate this
@@ -484,6 +484,7 @@ export class amqpwrapper extends events.EventEmitter {
484484
async Adddlx(parent: Span) {
485485
if (NoderedUtil.IsNullEmpty(Config.amqp_dlx)) return;
486486
await this.AddExchangeConsumer(Crypt.rootUser(), Config.amqp_dlx, "fanout", "", null, null, true, async (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
487+
ack();
487488
if (typeof msg === "string" || msg instanceof String) {
488489
try {
489490
msg = JSON.parse((msg as any));
@@ -509,7 +510,6 @@ export class amqpwrapper extends events.EventEmitter {
509510
console.error("Failed sending deadletter message to " + options.replyTo);
510511
console.error(error);
511512
}
512-
ack();
513513
done();
514514
}, parent);
515515
}
@@ -524,6 +524,7 @@ export class amqpwrapper extends events.EventEmitter {
524524
async AddOFExchange(parent: Span) {
525525
if (!Config.enable_openflow_amqp) return;
526526
await this.AddExchangeConsumer(Crypt.rootUser(), "openflow", "fanout", "", null, null, true, async (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
527+
ack();
527528
if (typeof msg === "string" || msg instanceof String) {
528529
try {
529530
msg = JSON.parse((msg as any));
@@ -566,7 +567,6 @@ export class amqpwrapper extends events.EventEmitter {
566567
} else {
567568
if (Config.log_amqp) Logger.instanse.verbose("[OF] Received string message: " + JSON.stringify(msg));
568569
}
569-
ack();
570570
done();
571571
}, parent);
572572
}

0 commit comments

Comments
 (0)