Skip to content

Commit e2075b2

Browse files
committed
return result in callback, not orignal
1 parent 956881e commit e2075b2

2 files changed

Lines changed: 13 additions & 8 deletions

File tree

OpenFlow/src/Messages/Message.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ export class Message {
5959
} catch (error) {
6060
// TODO: should we set message to data ?
6161
}
62-
if (qmsg.cb !== undefined && qmsg.cb !== null) { qmsg.cb(qmsg.message); }
62+
//if (qmsg.cb !== undefined && qmsg.cb !== null) { qmsg.cb(qmsg.message); }
63+
if (qmsg.cb !== undefined && qmsg.cb !== null) { qmsg.cb(this); }
6364
delete cli.messageQueue[this.id];
6465
}
6566
return;
@@ -145,20 +146,17 @@ export class Message {
145146
}
146147
async QueueMessage(cli: WebSocketClient) {
147148
this.Reply();
148-
console.log("#*********************************************#");
149149
var msg: QueueMessage = QueueMessage.assign(this.data);
150150
try {
151151
//
152152
if (msg.replyto === null || msg.replyto === undefined || msg.replyto === "") {
153-
console.log("# sendToQueue");
154153
await cli.sendToQueue(msg);
155154
} else {
156155
if (msg.queuename === msg.replyto) {
157156
cli._logger.warn("Ignore reply to self queuename:" + msg.queuename + " correlationId:" + msg.correlationId);
158157
return
159158
}
160159
this.replyto = msg.correlationId;
161-
console.log("# sendQueueReply");
162160
await cli.sendQueueReply(msg);
163161
}
164162
} catch (error) {
@@ -171,9 +169,7 @@ export class Message {
171169
this.data = "";
172170
msg.error = error.toString();
173171
}
174-
console.log("# send reply");
175172
this.Send(cli);
176-
console.log("#*********************************************#");
177173
// if(this.replyto !== null && this.replyto !== undefined && this.replyto !== "") {
178174
// }
179175
}

OpenFlow/src/WebSocketClient.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,17 +121,25 @@ export class WebSocketClient {
121121
if (this.consumers.length === 0) { throw new Error("No consumers for client available to send message through") }
122122
var result = this.consumers[0].sendToQueue(msg.queuename, msg.correlationId, { payload: msg.data, jwt: this.jwt });
123123
}
124+
sleep(ms) {
125+
return new Promise(resolve => {
126+
setTimeout(resolve, ms)
127+
})
128+
}
124129
async OnMessage(sender: amqp_consumer, msg: amqplib.ConsumeMessage) {
125130
try {
126131
this._logger.debug("WebSocketclient::WebSocket Send message to socketclient, from " + msg.properties.replyTo + " correlationId: " + msg.properties.correlationId);
127132
var data = await this.Queue(msg.content.toString(), msg.properties.replyTo, msg.properties.correlationId, sender.queue);
133+
console.log("*******************************************");
134+
console.log(data);
128135
this._logger.debug("WebSocketclient::WebSocket ack message in queue " + sender.queue);
129136
sender.channel.ack(msg);
130137
} catch (error) {
131138
this._logger.error("WebSocketclient::WebSocket error in queue " + sender.queue + " / " + error);
132-
sender.channel.nack(msg);
139+
setTimeout(() => {
140+
sender.channel.nack(msg);
141+
}, 2000);
133142
}
134-
135143
}
136144
public ping(): boolean {
137145
try {
@@ -236,6 +244,7 @@ export class WebSocketClient {
236244
if (q.correlationId === undefined || q.correlationId === null || q.correlationId === "") { q.correlationId = m.id; }
237245
m.data = JSON.stringify(q);
238246
q = await this.Send<QueueMessage>(m);
247+
if ((q as any).command == "error") throw new Error(q.data);
239248
return q.data;
240249
}
241250

0 commit comments

Comments
 (0)