Skip to content

Commit af0ede2

Browse files
committed
readd connect to queues
1 parent ce7fae8 commit af0ede2

2 files changed

Lines changed: 14 additions & 60 deletions

File tree

OpenFlow/src/amqpwrapper.ts

Lines changed: 13 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { Crypt } from "./Crypt";
66
import * as url from "url";
77
import { NoderedUtil } from "openflow-api";
88
var got = require("got");
9-
109
type QueueOnMessage = (msg: string, options: QueueMessageOptions, ack: any, done: any) => void;
1110
interface IHashTable<T> {
1211
[key: string]: T;
@@ -41,7 +40,6 @@ export class amqpqueue {
4140
public ok: AssertQueue;
4241
public QueueOptions: any;
4342
public consumerTag: string;
44-
// public cli: WebSocketClient;
4543
}
4644
export class amqpexchange {
4745
public exchange: string;
@@ -51,9 +49,7 @@ export class amqpexchange {
5149
public callback: QueueOnMessage;
5250
public ok: amqplib.Replies.AssertExchange;
5351
public ExchangeOptions: any;
54-
// public cli: WebSocketClient;
5552
}
56-
5753
// tslint:disable-next-line: class-name
5854
export class amqpwrapper {
5955
private conn: amqplib.Connection;
@@ -68,52 +64,19 @@ export class amqpwrapper {
6864
public queues: amqpqueue[] = [];
6965
private exchanges: amqpexchange[] = [];
7066
private replyqueue: amqpqueue;
71-
7267
private static _instance: amqpwrapper = null;
7368
public static Instance(): amqpwrapper {
7469
return this._instance;
7570
}
7671
public static SetInstance(instance: amqpwrapper): void {
7772
this._instance = instance;
7873
}
79-
80-
// private callback: QueueOnMessage;
81-
// private algorithm: string;
82-
// private routingkey: string;
83-
// public exchange: string;
84-
// public queue: string;
85-
// private _ok: amqplib.Replies.AssertExchange;
86-
private _ok: AssertQueue;
87-
8874
constructor(logger: winston.Logger, connectionstring: string) {
8975
this._logger = logger;
9076
this.connectionstring = connectionstring;
91-
92-
//if (!NoderedUtil.IsNullEmpty(Config.deadLetterExchange) && exchange != Config.deadLetterExchange) {
93-
// this.AssertExchangeOptions.arguments = {};
94-
// this.AssertExchangeOptions.arguments['x-message-ttl'] = Config.dlxmessagettl;
95-
// this.AssertQueueOptions.arguments = {};
96-
// this.AssertQueueOptions.arguments['x-dead-letter-exchange'] = Config.deadLetterExchange;
97-
// this.AssertQueueOptions.arguments['x-message-ttl'] = Config.dlxmessagettl;
98-
99-
// this.AssertExchangeOptions.arguments['x-dead-letter-exchange'] = Config.deadLetterExchange;
100-
// if (!NoderedUtil.IsNullEmpty(routingkey)) this.AssertExchangeOptions.arguments['x-dead-letter-routing-key'] = routingkey;
101-
// this.AssertExchangeOptions.arguments['x-expires'] = Config.dlxmessageexpires;
102-
// if (!NoderedUtil.IsNullEmpty(routingkey)) this.AssertQueueOptions.arguments['x-dead-letter-routing-key'] = routingkey;
103-
// this.AssertQueueOptions.arguments['x-expires'] = Config.dlxmessageexpires;
104-
105-
// Bad idear ...
106-
// this.AssertExchangeOptions.arguments['alternate-exchange'] = Config.deadLetterExchange;
107-
//}
108-
10977
if (!NoderedUtil.IsNullEmpty(Config.amqp_dlx)) {
11078
this.AssertQueueOptions.arguments = {};
11179
this.AssertQueueOptions.arguments['x-dead-letter-exchange'] = Config.amqp_dlx;
112-
// // arguments: {
113-
// // 'x-dead-letter-exchange': Config.amqp_dlx_prefix + queue,
114-
// // 'x-dead-letter-routing-key': Config.amqp_dlrk_prefix + queue,
115-
// // 'x-message-ttl': Config.amqp_message_ttl
116-
11780
}
11881
}
11982
private timeout: NodeJS.Timeout = null;
@@ -152,11 +115,6 @@ export class amqpwrapper {
152115
ack();
153116
done();
154117
});
155-
156-
// this.channel.on('ack', (e) => {
157-
// });
158-
// this.channel.on('cancel', (e) => {
159-
// });
160118
this.channel.on('close', (e) => {
161119
try {
162120
if (this.conn != null) this.conn.close();
@@ -168,23 +126,19 @@ export class amqpwrapper {
168126
this.timeout = setTimeout(this.connect.bind(this), 1000);
169127
}
170128
});
171-
//this.channel.on('delivery', (e) => {
172-
//});
173-
// this.channel.on('nack', (e) => {
174-
// });
175129
// ROLLBACK
176-
// var keys = Object.keys(this.exchanges);
177-
// for (var i = 0; i < keys.length; i++) {
178-
// var q1: amqpexchange = this.exchanges[keys[i]];
179-
// this.AddExchangeConsumer(q1.exchange, q1.algorithm, q1.routingkey, q1.ExchangeOptions, null, q1.callback);
180-
// }
181-
// var keys = Object.keys(this.queues);
182-
// for (var i = 0; i < keys.length; i++) {
183-
// if (keys[i] != this.replyqueue) {
184-
// var q2: amqpqueue = this.queues[keys[i]];
185-
// this.AddQueueConsumer(q2.queue, q2.QueueOptions, null, q2.callback);
186-
// }
187-
// }
130+
var keys = Object.keys(this.exchanges);
131+
for (var i = 0; i < keys.length; i++) {
132+
var q1: amqpexchange = this.exchanges[keys[i]];
133+
this.AddExchangeConsumer(q1.exchange, q1.algorithm, q1.routingkey, q1.ExchangeOptions, null, q1.callback);
134+
}
135+
var keys = Object.keys(this.queues);
136+
for (var i = 0; i < keys.length; i++) {
137+
if (keys[i] != this.replyqueue.queue) {
138+
var q2: amqpqueue = this.queues[keys[i]];
139+
this.AddQueueConsumer(q2.queue, q2.QueueOptions, null, q2.callback);
140+
}
141+
}
188142
} catch (error) {
189143
console.error(error);
190144
this.timeout = setTimeout(this.connect.bind(this), 1000);
@@ -295,7 +249,7 @@ export class amqpwrapper {
295249
// q.ExchangeOptions = new Object((ExchangeOptions != null ? ExchangeOptions : this.AssertExchangeOptions));
296250
q.ExchangeOptions = Object.assign({}, (ExchangeOptions != null ? ExchangeOptions : this.AssertExchangeOptions));
297251
q.exchange = exchange; q.algorithm = algorithm; q.routingkey = routingkey; q.callback = callback;
298-
this._ok = await this.channel.assertExchange(q.exchange, q.algorithm, q.ExchangeOptions);
252+
var _ok = await this.channel.assertExchange(q.exchange, q.algorithm, q.ExchangeOptions);
299253
var AssertQueueOptions = null;
300254
if (!NoderedUtil.IsNullEmpty(Config.amqp_dlx) && exchange == Config.amqp_dlx) {
301255
AssertQueueOptions = Object.create(this.AssertQueueOptions);

OpenFlowNodeRED/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,4 @@
4747
"unhandled-rejection": "^1.0.0",
4848
"winston": "^3.3.3"
4949
}
50-
}
50+
}

0 commit comments

Comments
 (0)