@@ -40,7 +40,7 @@ export class amqpqueue {
4040 public ok : AssertQueue ;
4141 public QueueOptions : any ;
4242 public consumerTag : string ;
43- public cli : WebSocketClient ;
43+ // public cli: WebSocketClient;
4444}
4545export class amqpexchange {
4646 public exchange : string ;
@@ -50,7 +50,7 @@ export class amqpexchange {
5050 public callback : QueueOnMessage ;
5151 public ok : amqplib . Replies . AssertExchange ;
5252 public ExchangeOptions : any ;
53- public cli : WebSocketClient ;
53+ // public cli: WebSocketClient;
5454}
5555
5656// tslint:disable-next-line: class-name
@@ -62,7 +62,7 @@ export class amqpwrapper {
6262 public AssertExchangeOptions : any = { durable : false , confirm : true } ;
6363 public AssertQueueOptions : any = { } ;
6464 private activecalls : IHashTable < Deferred < string > > = { } ;
65- private queues : IHashTable < amqpqueue > = { } ;
65+ public queues : IHashTable < amqpqueue > = { } ;
6666 private exchanges : IHashTable < amqpexchange > = { } ;
6767 private replyqueue : string ;
6868
@@ -147,7 +147,7 @@ export class amqpwrapper {
147147 if ( ! Util . IsNullEmpty ( this . replyqueue ) ) {
148148 delete this . queues [ this . replyqueue ] ;
149149 }
150- this . replyqueue = await this . AddQueueConsumer ( null , "" , null , null , ( msg : any , options : QueueMessageOptions , ack : any , done : any ) => {
150+ this . replyqueue = await this . AddQueueConsumer ( "" , null , null , ( msg : any , options : QueueMessageOptions , ack : any , done : any ) => {
151151 if ( ! Util . IsNullUndefinded ( this . activecalls [ options . correlationId ] ) ) {
152152 this . activecalls [ options . correlationId ] . resolve ( msg ) ;
153153 this . activecalls [ options . correlationId ] = null ;
@@ -206,7 +206,7 @@ export class amqpwrapper {
206206 if ( this . channel != null ) await this . channel . cancel ( q . consumerTag ) ;
207207 delete this . queues [ q . queue ] ;
208208 }
209- async AddQueueConsumer ( cli : WebSocketClient , queue : string , QueueOptions : any , jwt : string , callback : QueueOnMessage ) : Promise < string > {
209+ async AddQueueConsumer ( queue : string , QueueOptions : any , jwt : string , callback : QueueOnMessage ) : Promise < string > {
210210 if ( this . channel == null || this . conn == null ) throw new Error ( "Cannot Add new Queue Consumer, not connected to rabbitmq" ) ;
211211 var q : amqpqueue = null ;
212212 if ( Config . amqp_force_queue_prefix && ! Util . IsNullEmpty ( jwt ) ) {
@@ -217,7 +217,7 @@ export class amqpwrapper {
217217 if ( isrole . length == 0 && tuser . _id != queue ) queue = name + queue ;
218218 }
219219
220- if ( this . exchanges [ queue ] != null ) {
220+ if ( this . queues [ queue ] != null ) {
221221 q = this . queues [ queue ] ;
222222 try {
223223 if ( this . channel != null && ! Util . IsNullEmpty ( q . consumerTag ) ) await this . channel . cancel ( q . consumerTag ) ;
@@ -227,7 +227,6 @@ export class amqpwrapper {
227227 } else {
228228 q = new amqpqueue ( ) ;
229229 }
230- q . cli = cli ;
231230 if ( ! Util . IsNullEmpty ( q . queue ) ) {
232231 if ( q . queue . startsWith ( "amq." ) ) {
233232 delete this . queues [ q . queue ] ;
@@ -249,7 +248,7 @@ export class amqpwrapper {
249248 this . queues [ q . queue ] = q ;
250249 return q . queue ;
251250 }
252- async AddExchangeConsumer ( cli : WebSocketClient , exchange : string , algorithm : string , routingkey : string , ExchangeOptions : any , jwt : string , callback : QueueOnMessage ) : Promise < void > {
251+ async AddExchangeConsumer ( exchange : string , algorithm : string , routingkey : string , ExchangeOptions : any , jwt : string , callback : QueueOnMessage ) : Promise < void > {
253252 if ( this . channel == null || this . conn == null ) throw new Error ( "Cannot Add new Exchange Consumer, not connected to rabbitmq" ) ;
254253 var q : amqpexchange = null ;
255254 if ( Config . amqp_force_exchange_prefix && ! Util . IsNullEmpty ( jwt ) ) {
@@ -264,7 +263,6 @@ export class amqpwrapper {
264263 } else {
265264 q = new amqpexchange ( ) ;
266265 }
267- q . cli = cli ;
268266 if ( ! Util . IsNullEmpty ( q . queue ) ) {
269267 delete this . queues [ q . queue ] ;
270268 }
@@ -276,7 +274,7 @@ export class amqpwrapper {
276274 AssertQueueOptions = Object . create ( this . AssertQueueOptions ) ;
277275 delete AssertQueueOptions . arguments ;
278276 }
279- q . queue = await this . AddQueueConsumer ( cli , "" , AssertQueueOptions , jwt , q . callback ) ;
277+ q . queue = await this . AddQueueConsumer ( "" , AssertQueueOptions , jwt , q . callback ) ;
280278 this . channel . bindQueue ( q . queue , q . exchange , q . routingkey ) ;
281279 this . _logger . info ( "[AMQP] Added exchange consumer " + q . exchange ) ;
282280 this . exchanges [ exchange ] = q ;
@@ -285,17 +283,17 @@ export class amqpwrapper {
285283 // sender._logger.info("OnMessage " + msg.content.toString());
286284 try {
287285 var now = new Date ( ) ;
288- var seconds = ( now . getTime ( ) - sender . cli . lastheartbeat . getTime ( ) ) / 1000 ;
289- if ( seconds >= Config . client_heartbeat_timeout ) {
290- try {
291- sender . cli . _logger . info ( "amqpwrapper.OnMessage: receive message for inactive client, nack message and try and close" ) ;
292- this . channel . nack ( msg ) ;
293- sender . cli . Close ( ) ;
294- } catch ( error ) {
295- console . error ( error ) ;
296- }
297- return ;
298- }
286+ // var seconds = (now.getTime() - sender.cli.lastheartbeat.getTime()) / 1000;
287+ // if (seconds >= Config.client_heartbeat_timeout) {
288+ // try {
289+ // sender.cli._logger.info("amqpwrapper.OnMessage: receive message for inactive client, nack message and try and close");
290+ // this.channel.nack(msg);
291+ // sender.cli.Close();
292+ // } catch (error) {
293+ // console.error(error);
294+ // }
295+ // return;
296+ // }
299297
300298 var correlationId : string = msg . properties . correlationId ;
301299 var replyTo : string = msg . properties . replyTo ;
0 commit comments