Skip to content

Commit 77dc842

Browse files
committed
1
1 parent 6377f58 commit 77dc842

2 files changed

Lines changed: 64 additions & 57 deletions

File tree

OpenFlow/src/amqpwrapper.ts

Lines changed: 63 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -119,70 +119,75 @@ export class amqpwrapper {
119119
}
120120
private timeout: NodeJS.Timeout = null;
121121
async connect(): Promise<void> {
122-
if (this.timeout != null) {
123-
this.timeout = null;
124-
}
125-
var me: amqpwrapper = this;
126-
if (this.conn == null) {
127-
this.conn = await amqplib.connect(this.connectionstring);
128-
this.conn.on('error', (error) => {
129-
if (error.code != 404) {
130-
this._logger.error(JSON.stringify(error));
131-
console.log(error);
122+
try {
123+
if (this.timeout != null) {
124+
this.timeout = null;
125+
}
126+
var me: amqpwrapper = this;
127+
if (this.conn == null) {
128+
this.conn = await amqplib.connect(this.connectionstring);
129+
this.conn.on('error', (error) => {
130+
if (error.code != 404) {
131+
this._logger.error(JSON.stringify(error));
132+
console.log(error);
133+
}
134+
});
135+
this.conn.on("close", () => {
136+
this._logger.info("[AMQP] reconnecting");
137+
this.conn = null;
138+
if (this.timeout == null) {
139+
this.timeout = setTimeout(this.connect.bind(this), 1000);
140+
}
141+
});
142+
}
143+
this.channel = await this.conn.createChannel();
144+
if (!Util.IsNullEmpty(this.replyqueue)) {
145+
delete this.queues[this.replyqueue];
146+
}
147+
this.replyqueue = await this.AddQueueConsumer("", null, null, (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
148+
if (!Util.IsNullUndefinded(this.activecalls[options.correlationId])) {
149+
this.activecalls[options.correlationId].resolve(msg);
150+
this.activecalls[options.correlationId] = null;
151+
delete this.activecalls[options.correlationId];
132152
}
153+
ack();
154+
done();
133155
});
134-
this.conn.on("close", () => {
135-
this._logger.info("[AMQP] reconnecting");
156+
157+
// this.channel.on('ack', (e) => {
158+
// });
159+
// this.channel.on('cancel', (e) => {
160+
// });
161+
this.channel.on('close', (e) => {
162+
try {
163+
if (this.conn != null) this.conn.close();
164+
} catch (error) {
165+
}
136166
this.conn = null;
167+
this.channel = null;
137168
if (this.timeout == null) {
138169
this.timeout = setTimeout(this.connect.bind(this), 1000);
139170
}
140171
});
141-
}
142-
this.channel = await this.conn.createChannel();
143-
if (!Util.IsNullEmpty(this.replyqueue)) {
144-
delete this.queues[this.replyqueue];
145-
}
146-
this.replyqueue = await this.AddQueueConsumer("", null, null, (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
147-
if (!Util.IsNullUndefinded(this.activecalls[options.correlationId])) {
148-
this.activecalls[options.correlationId].resolve(msg);
149-
this.activecalls[options.correlationId] = null;
150-
delete this.activecalls[options.correlationId];
151-
}
152-
ack();
153-
done();
154-
});
155-
156-
// this.channel.on('ack', (e) => {
157-
// });
158-
// this.channel.on('cancel', (e) => {
159-
// });
160-
this.channel.on('close', (e) => {
161-
try {
162-
if (this.conn != null) this.conn.close();
163-
} catch (error) {
172+
//this.channel.on('delivery', (e) => {
173+
//});
174+
// this.channel.on('nack', (e) => {
175+
// });
176+
var keys = Object.keys(this.exchanges);
177+
for (var i = 0; i < keys.length; i++) {
178+
var q1: amqpexchange = this.exchanges[keys[i]];
179+
this.AddExchangeConsumer(q1.exchange, q1.algorithm, q1.routingkey, q1.ExchangeOptions, null, q1.callback);
164180
}
165-
this.conn = null;
166-
this.channel = null;
167-
if (this.timeout == null) {
168-
this.timeout = setTimeout(this.connect.bind(this), 1000);
169-
}
170-
});
171-
//this.channel.on('delivery', (e) => {
172-
//});
173-
// this.channel.on('nack', (e) => {
174-
// });
175-
var keys = Object.keys(this.exchanges);
176-
for (var i = 0; i < keys.length; i++) {
177-
var q1: amqpexchange = this.exchanges[keys[i]];
178-
this.AddExchangeConsumer(q1.exchange, q1.algorithm, q1.routingkey, q1.ExchangeOptions, null, q1.callback);
179-
}
180-
var keys = Object.keys(this.queues);
181-
for (var i = 0; i < keys.length; i++) {
182-
if (keys[i] != this.replyqueue) {
183-
var q2: amqpqueue = this.queues[keys[i]];
184-
this.AddQueueConsumer(q2.queue, q2.QueueOptions, null, q2.callback);
181+
var keys = Object.keys(this.queues);
182+
for (var i = 0; i < keys.length; i++) {
183+
if (keys[i] != this.replyqueue) {
184+
var q2: amqpqueue = this.queues[keys[i]];
185+
this.AddQueueConsumer(q2.queue, q2.QueueOptions, null, q2.callback);
186+
}
185187
}
188+
} catch (error) {
189+
console.error(error);
190+
this.timeout = setTimeout(this.connect.bind(this), 1000);
186191
}
187192
}
188193
async RemoveQueueConsumer(queue: string): Promise<string> {
@@ -194,10 +199,11 @@ export class amqpwrapper {
194199
return;
195200
}
196201
this._logger.info("[AMQP] Remove queue consumer " + queue);
197-
await this.channel.cancel(q.consumerTag);
202+
if (this.channel != null) await this.channel.cancel(q.consumerTag);
198203
delete this.queues[q.queue];
199204
}
200205
async AddQueueConsumer(queue: string, QueueOptions: any, jwt: string, callback: QueueOnMessage): Promise<string> {
206+
if (this.channel == null || this.conn == null) throw new Error("Cannot Add new Queue Consumer, not connected to rabbitmq");
201207
var q: amqpqueue = null;
202208
if (Config.amqp_force_queue_prefix && !Util.IsNullEmpty(jwt)) {
203209
var tuser = Crypt.verityToken(jwt);
@@ -234,6 +240,7 @@ export class amqpwrapper {
234240
return q.queue;
235241
}
236242
async AddExchangeConsumer(exchange: string, algorithm: string, routingkey: string, ExchangeOptions: any, jwt: string, callback: QueueOnMessage): Promise<void> {
243+
if (this.channel == null || this.conn == null) throw new Error("Cannot Add new Exchange Consumer, not connected to rabbitmq");
237244
var q: amqpexchange = null;
238245
if (Config.amqp_force_exchange_prefix && !Util.IsNullEmpty(jwt)) {
239246
var tuser = Crypt.verityToken(jwt);

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.0.7
1+
1.0.8

0 commit comments

Comments
 (0)