Skip to content

Commit 430a677

Browse files
committed
wrap up minor updated
1 parent 5e6546a commit 430a677

5 files changed

Lines changed: 12 additions & 14 deletions

File tree

OpenFlow/src/Messages/Message.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ export class Message {
426426
await amqpwrapper.Instance().send(msg.exchange, msg.queuename, sendthis, expiration, msg.correlationId, msg.routingkey);
427427
} else {
428428
if (msg.queuename === msg.replyto) {
429-
throw new Error("Cannot send reply to self queuename:" + msg.queuename + " correlationId:" + msg.correlationId);
429+
throw new Error("Cannot send reply to self queuename: " + msg.queuename + " correlationId: " + msg.correlationId);
430430
}
431431
const sendthis = msg.data;
432432
const result = await amqpwrapper.Instance().sendWithReplyTo(msg.exchange, msg.queuename, msg.replyto, sendthis, expiration, msg.correlationId, msg.routingkey);
@@ -2310,7 +2310,7 @@ export class Message {
23102310
found = item;
23112311
if (item.status.phase != "Failed") {
23122312
msg.result = item;
2313-
Logger.instanse.debug("[" + cli.user.username + "] GetNoderedInstance:" + name + " found one");
2313+
Logger.instanse.debug("[" + cli.user.username + "] GetNoderedInstance: " + name + " found one");
23142314
}
23152315
var metrics: any = null;
23162316
try {
@@ -2437,7 +2437,7 @@ export class Message {
24372437
if (msg.instancename == item.metadata.name) {
24382438
if (cli.user.HasRoleName("admins") || item.metadata.labels.app === name) {
24392439

2440-
Logger.instanse.debug("[" + cli.user.username + "] GetNoderedInstanceLog:" + name + " found one as " + item.metadata.name);
2440+
Logger.instanse.debug("[" + cli.user.username + "] GetNoderedInstanceLog: " + name + " found one as " + item.metadata.name);
24412441
const obj = await await KubeUtil.instance().CoreV1Api.readNamespacedPodLog(item.metadata.name, namespace, "", false);
24422442
msg.result = obj.body;
24432443
Audit.NoderedAction(TokenUser.From(cli.user), true, name, "readpodlog", image, item.metadata.name, span);
@@ -2457,12 +2457,12 @@ export class Message {
24572457

24582458
}
24592459
if (!NoderedUtil.IsNullEmpty(msg.name) && item.metadata.name == msg.name && cli.user.HasRoleName("admins")) {
2460-
Logger.instanse.debug("[" + cli.user.username + "] GetNoderedInstanceLog:" + name + " found one as " + item.metadata.name);
2460+
Logger.instanse.debug("[" + cli.user.username + "] GetNoderedInstanceLog: " + name + " found one as " + item.metadata.name);
24612461
const obj = await await KubeUtil.instance().CoreV1Api.readNamespacedPodLog(item.metadata.name, namespace, "", false);
24622462
msg.result = obj.body;
24632463
Audit.NoderedAction(TokenUser.From(cli.user), true, name, "readpodlog", image, item.metadata.name, span);
24642464
} else if (item.metadata.labels.app === name) {
2465-
Logger.instanse.debug("[" + cli.user.username + "] GetNoderedInstanceLog:" + name + " found one as " + item.metadata.name);
2465+
Logger.instanse.debug("[" + cli.user.username + "] GetNoderedInstanceLog: " + name + " found one as " + item.metadata.name);
24662466
const obj = await await KubeUtil.instance().CoreV1Api.readNamespacedPodLog(item.metadata.name, namespace, "", false);
24672467
msg.result = obj.body;
24682468
Audit.NoderedAction(TokenUser.From(cli.user), true, name, "readpodlog", image, item.metadata.name, span);
@@ -2750,7 +2750,7 @@ export class Message {
27502750
if (NoderedUtil.IsNullEmpty(msg.name)) throw new Error("name is mandatory when workflowid not set")
27512751

27522752
if (msg.queue === msg.resultqueue) {
2753-
throw new Error("Cannot reply to self queuename:" + msg.queue + " correlationId:" + msg.resultqueue);
2753+
throw new Error("Cannot reply to self queuename: " + msg.queue + " correlationId: " + msg.resultqueue);
27542754
}
27552755

27562756
const res = await Config.db.query({ "_id": msg.targetid }, null, 1, 0, null, "users", msg.jwt, undefined, undefined, span);

OpenFlow/src/WebSocketServerClient.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -337,15 +337,16 @@ export class WebSocketServerClient {
337337
queue = await amqpwrapper.Instance().AddQueueConsumer(qname, AssertQueueOptions, this.jwt, async (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
338338
const _data = msg;
339339
try {
340+
Logger.instanse.verbose("[preack] queuename: " + queuename + " qname: " + qname + " replyto: " + options.replyTo + " correlationId: " + options.correlationId)
340341
const result = await this.Queue(msg, qname, options);
341342
ack();
342343
done(result);
344+
Logger.instanse.debug("[ack] queuename: " + queuename + " qname: " + qname + " replyto: " + options.replyTo + " correlationId: " + options.correlationId)
343345
} catch (error) {
344346
setTimeout(() => {
345347
ack(false);
346-
// ack(); // just eat the error
347348
done(_data);
348-
console.error(qname + " failed message queue message, nack and re queue message: ", (error.message ? error.message : error));
349+
Logger.instanse.warn("[nack] queuename: " + queuename + " qname: " + qname + " replyto: " + options.replyTo + " correlationId: " + options.correlationId + " error: " + (error.message ? error.message : error))
349350
}, Config.amqp_requeue_time);
350351
}
351352
}, span);
@@ -486,7 +487,6 @@ export class WebSocketServerClient {
486487
q.consumerTag = options.consumerTag;
487488
q.routingkey = options.routingkey;
488489
q.exchange = options.exchange;
489-
490490
let m: Message = Message.fromcommand("queuemessage");
491491
if (NoderedUtil.IsNullEmpty(q.correlationId)) { q.correlationId = m.id; }
492492
m.data = JSON.stringify(q);

OpenFlow/src/amqpwrapper.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,6 @@ export class amqpwrapper extends events.EventEmitter {
431431
if (typeof data !== 'string' && !(data instanceof String)) {
432432
data = JSON.stringify(data);
433433
}
434-
435434
Logger.instanse.info("send to queue: " + queue + " exchange: " + exchange + " with reply to " + replyTo + " correlationId: " + correlationId);
436435
const options: any = { mandatory: true };
437436
options.replyTo = replyTo;
@@ -471,7 +470,6 @@ export class amqpwrapper extends events.EventEmitter {
471470
data = JSON.stringify(data);
472471
}
473472
if (NoderedUtil.IsNullEmpty(correlationId)) correlationId = this.generateUuid();
474-
475473
Logger.instanse.info("send to queue: " + queue + " exchange: " + exchange);
476474
const options: any = { mandatory: true };
477475
if (!NoderedUtil.IsNullEmpty(correlationId)) options.correlationId = correlationId;

OpenFlow/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ async function adddlx() {
8484
msg.command = "timeout";
8585
// Resend message, this time to the reply queue for the correct node (replyTo)
8686
// this.SendMessage(JSON.stringify(data), msg.properties.replyTo, msg.properties.correlationId, false);
87-
Logger.instanse.info("[DLX][" + options.exchange + "] Send timeout to " + options.replyTo)
87+
Logger.instanse.info("[DLX][" + options.exchange + "] Send timeout to " + options.replyTo + " correlationId: " + options.correlationId);
8888
await amqpwrapper.Instance().sendWithReply("", options.replyTo, msg, 20000, options.correlationId, "");
8989
} catch (error) {
9090
console.error("Failed sending deadletter message to " + options.replyTo);

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
},
3131
"dependencies": {
3232
"@kubernetes/client-node": "0.14.1",
33-
"@openiap/openflow-api": "^1.0.71",
33+
"@openiap/openflow-api": "^1.0.75",
3434
"@opentelemetry/api": "^0.18.1",
3535
"@opentelemetry/core": "^0.18.2",
3636
"@opentelemetry/exporter-collector-grpc": "^0.18.2",
@@ -108,4 +108,4 @@
108108
"typescript": "^4.2.3",
109109
"watchify": "^4.0.0"
110110
}
111-
}
111+
}

0 commit comments

Comments
 (0)