@@ -2,9 +2,7 @@ import * as winston from "winston";
22import * as amqplib from "amqplib" ;
33import { Util } from "./Util" ;
44import { Config } from "./Config" ;
5- import { cli } from "winston/lib/winston/config" ;
65import { Crypt } from "./Crypt" ;
7- import { WebSocketClient } from "./WebSocketClient" ;
86
97type QueueOnMessage = ( msg : string , options : QueueMessageOptions , ack : any , done : any ) => void ;
108interface IHashTable < T > {
@@ -46,7 +44,7 @@ export class amqpexchange {
4644 public exchange : string ;
4745 public algorithm : string ;
4846 public routingkey : string ;
49- public queue : string ;
47+ public queue : amqpqueue ;
5048 public callback : QueueOnMessage ;
5149 public ok : amqplib . Replies . AssertExchange ;
5250 public ExchangeOptions : any ;
@@ -62,9 +60,11 @@ export class amqpwrapper {
6260 public AssertExchangeOptions : any = { durable : false , confirm : true } ;
6361 public AssertQueueOptions : any = { } ;
6462 private activecalls : IHashTable < Deferred < string > > = { } ;
65- public queues : IHashTable < amqpqueue > = { } ;
66- private exchanges : IHashTable < amqpexchange > = { } ;
67- private replyqueue : string ;
63+ // public queues: IHashTable<amqpqueue> = {};
64+ // private exchanges: IHashTable<amqpexchange> = {};
65+ public queues : amqpqueue [ ] = [ ] ;
66+ private exchanges : amqpexchange [ ] = [ ] ;
67+ private replyqueue : amqpqueue ;
6868
6969 private static _instance : amqpwrapper = null ;
7070 public static Instance ( ) : amqpwrapper {
@@ -145,7 +145,7 @@ export class amqpwrapper {
145145 }
146146 this . channel = await this . conn . createChannel ( ) ;
147147 if ( ! Util . IsNullEmpty ( this . replyqueue ) ) {
148- delete this . queues [ this . replyqueue ] ;
148+ this . queues = this . queues . filter ( x => x . consumerTag != this . replyqueue . consumerTag ) ;
149149 }
150150 this . replyqueue = await this . AddQueueConsumer ( "" , null , null , ( msg : any , options : QueueMessageOptions , ack : any , done : any ) => {
151151 if ( ! Util . IsNullUndefinded ( this . activecalls [ options . correlationId ] ) ) {
@@ -194,19 +194,13 @@ export class amqpwrapper {
194194 this . timeout = setTimeout ( this . connect . bind ( this ) , 1000 ) ;
195195 }
196196 }
197- async RemoveQueueConsumer ( queue : string ) : Promise < string > {
198- var q : amqpqueue = null ;
199- if ( this . queues [ queue ] != null ) {
200- q = this . queues [ queue ] ;
201- } else {
202- this . _logger . error ( "[AMQP] Request for removing unknown consumer " + queue ) ;
203- return ;
197+ async RemoveQueueConsumer ( queue : amqpqueue ) : Promise < void > {
198+ if ( queue != null ) {
199+ this . _logger . info ( "[AMQP] Remove queue consumer " + queue . queue ) ;
200+ if ( this . channel != null ) await this . channel . cancel ( queue . consumerTag ) ;
204201 }
205- this . _logger . info ( "[AMQP] Remove queue consumer " + queue ) ;
206- if ( this . channel != null ) await this . channel . cancel ( q . consumerTag ) ;
207- delete this . queues [ q . queue ] ;
208202 }
209- async AddQueueConsumer ( queue : string , QueueOptions : any , jwt : string , callback : QueueOnMessage ) : Promise < string > {
203+ async AddQueueConsumer ( queue : string , QueueOptions : any , jwt : string , callback : QueueOnMessage ) : Promise < amqpqueue > {
210204 if ( this . channel == null || this . conn == null ) throw new Error ( "Cannot Add new Queue Consumer, not connected to rabbitmq" ) ;
211205 var q : amqpqueue = null ;
212206 if ( Config . amqp_force_queue_prefix && ! Util . IsNullEmpty ( jwt ) ) {
@@ -217,16 +211,17 @@ export class amqpwrapper {
217211 if ( isrole . length == 0 && tuser . _id != queue ) queue = name + queue ;
218212 }
219213
220- if ( this . queues [ queue ] != null ) {
221- q = this . queues [ queue ] ;
222- try {
223- if ( this . channel != null && ! Util . IsNullEmpty ( q . consumerTag ) ) await this . channel . cancel ( q . consumerTag ) ;
224- } catch ( error ) {
225- console . error ( error ) ;
226- }
227- } else {
228- q = new amqpqueue ( ) ;
229- }
214+ // if (this.queues[queue] != null) {
215+ // q = this.queues[queue];
216+ // try {
217+ // if (this.channel != null && !Util.IsNullEmpty(q.consumerTag)) await this.channel.cancel(q.consumerTag);
218+ // } catch (error) {
219+ // console.error(error);
220+ // }
221+ // } else {
222+ // q = new amqpqueue();
223+ // }
224+ q = new amqpqueue ( ) ;
230225 q . callback = callback ;
231226 // q.QueueOptions = new Object((QueueOptions != null ? QueueOptions : this.AssertQueueOptions));
232227 q . QueueOptions = Object . assign ( { } , ( QueueOptions != null ? QueueOptions : this . AssertQueueOptions ) ) ;
@@ -240,11 +235,12 @@ export class amqpwrapper {
240235 this . OnMessage ( q , msg , q . callback ) ;
241236 } , { noAck : false } ) ;
242237 q . consumerTag = consumeresult . consumerTag ;
243- this . queues [ q . queue ] = q ;
238+ // this.queues[q.queue] = q;
239+ this . queues . push ( q ) ;
244240 this . checkQueue ( q . queue ) ;
245- return q . queue ;
241+ return q ;
246242 }
247- async AddExchangeConsumer ( exchange : string , algorithm : string , routingkey : string , ExchangeOptions : any , jwt : string , callback : QueueOnMessage ) : Promise < void > {
243+ async AddExchangeConsumer ( exchange : string , algorithm : string , routingkey : string , ExchangeOptions : any , jwt : string , callback : QueueOnMessage ) : Promise < amqpexchange > {
248244 if ( this . channel == null || this . conn == null ) throw new Error ( "Cannot Add new Exchange Consumer, not connected to rabbitmq" ) ;
249245 var q : amqpexchange = null ;
250246 if ( Config . amqp_force_exchange_prefix && ! Util . IsNullEmpty ( jwt ) ) {
@@ -253,14 +249,14 @@ export class amqpwrapper {
253249 name = name . toLowerCase ( ) ;
254250 exchange = name + exchange ;
255251 }
256-
257- if ( this . exchanges [ exchange ] != null ) {
258- q = this . exchanges [ exchange ] ;
259- } else {
260- q = new amqpexchange ( ) ;
261- }
252+ q = new amqpexchange ( ) ;
253+ // if (this.exchanges[exchange] != null) {
254+ // q = this.exchanges[exchange];
255+ // } else {
256+ // q = new amqpexchange();
257+ // }
262258 if ( ! Util . IsNullEmpty ( q . queue ) ) {
263- delete this . queues [ q . queue ] ;
259+ this . RemoveQueueConsumer ( q . queue ) ;
264260 }
265261 // q.ExchangeOptions = new Object((ExchangeOptions != null ? ExchangeOptions : this.AssertExchangeOptions));
266262 q . ExchangeOptions = Object . assign ( { } , ( ExchangeOptions != null ? ExchangeOptions : this . AssertExchangeOptions ) ) ;
@@ -274,7 +270,9 @@ export class amqpwrapper {
274270 q . queue = await this . AddQueueConsumer ( "" , AssertQueueOptions , jwt , q . callback ) ;
275271 this . channel . bindQueue ( q . queue , q . exchange , q . routingkey ) ;
276272 this . _logger . info ( "[AMQP] Added exchange consumer " + q . exchange ) ;
277- this . exchanges [ exchange ] = q ;
273+ // this.exchanges[exchange] = q;
274+ this . exchanges . push ( q ) ;
275+ return q ;
278276 }
279277 OnMessage ( sender : amqpqueue , msg : amqplib . ConsumeMessage , callback : QueueOnMessage ) : void {
280278 // sender._logger.info("OnMessage " + msg.content.toString());
@@ -335,7 +333,7 @@ export class amqpwrapper {
335333 async sendWithReply ( exchange : string , queue : string , data : any , expiration : number , correlationId : string ) : Promise < string > {
336334 if ( Util . IsNullEmpty ( correlationId ) ) correlationId = this . generateUuid ( ) ;
337335 this . activecalls [ correlationId ] = new Deferred ( ) ;
338- await this . sendWithReplyTo ( exchange , queue , this . replyqueue , data , expiration , correlationId ) ;
336+ await this . sendWithReplyTo ( exchange , queue , this . replyqueue . queue , data , expiration , correlationId ) ;
339337 return this . activecalls [ correlationId ] . promise ;
340338 }
341339 async sendWithReplyTo ( exchange : string , queue : string , replyTo : string , data : any , expiration : number , correlationId : string ) : Promise < void > {
0 commit comments