@@ -4,6 +4,7 @@ import { Util } from "./Util";
44import { Config } from "./Config" ;
55import { cli } from "winston/lib/winston/config" ;
66import { Crypt } from "./Crypt" ;
7+ import { WebSocketClient } from "./WebSocketClient" ;
78
89type QueueOnMessage = ( msg : string , options : QueueMessageOptions , ack : any , done : any ) => void ;
910interface IHashTable < T > {
@@ -39,6 +40,7 @@ export class amqpqueue {
3940 public ok : AssertQueue ;
4041 public QueueOptions : any ;
4142 public consumerTag : string ;
43+ public cli : WebSocketClient ;
4244}
4345export class amqpexchange {
4446 public exchange : string ;
@@ -48,6 +50,7 @@ export class amqpexchange {
4850 public callback : QueueOnMessage ;
4951 public ok : amqplib . Replies . AssertExchange ;
5052 public ExchangeOptions : any ;
53+ public cli : WebSocketClient ;
5154}
5255
5356// tslint:disable-next-line: class-name
@@ -144,7 +147,7 @@ export class amqpwrapper {
144147 if ( ! Util . IsNullEmpty ( this . replyqueue ) ) {
145148 delete this . queues [ this . replyqueue ] ;
146149 }
147- this . replyqueue = await this . AddQueueConsumer ( "" , null , null , ( msg : any , options : QueueMessageOptions , ack : any , done : any ) => {
150+ this . replyqueue = await this . AddQueueConsumer ( null , "" , null , null , ( msg : any , options : QueueMessageOptions , ack : any , done : any ) => {
148151 if ( ! Util . IsNullUndefinded ( this . activecalls [ options . correlationId ] ) ) {
149152 this . activecalls [ options . correlationId ] . resolve ( msg ) ;
150153 this . activecalls [ options . correlationId ] = null ;
@@ -203,9 +206,10 @@ export class amqpwrapper {
203206 if ( this . channel != null ) await this . channel . cancel ( q . consumerTag ) ;
204207 delete this . queues [ q . queue ] ;
205208 }
206- async AddQueueConsumer ( queue : string , QueueOptions : any , jwt : string , callback : QueueOnMessage ) : Promise < string > {
209+ async AddQueueConsumer ( cli : WebSocketClient , queue : string , QueueOptions : any , jwt : string , callback : QueueOnMessage ) : Promise < string > {
207210 if ( this . channel == null || this . conn == null ) throw new Error ( "Cannot Add new Queue Consumer, not connected to rabbitmq" ) ;
208211 var q : amqpqueue = null ;
212+ q . cli = cli ;
209213 if ( Config . amqp_force_queue_prefix && ! Util . IsNullEmpty ( jwt ) ) {
210214 var tuser = Crypt . verityToken ( jwt ) ;
211215 var name = tuser . username . split ( "@" ) . join ( "" ) . split ( "." ) . join ( "" ) ;
@@ -234,15 +238,16 @@ export class amqpwrapper {
234238 q . queue = q . ok . queue ;
235239 this . _logger . info ( "[AMQP] Added queue consumer " + q . queue ) ;
236240 var consumeresult = await this . channel . consume ( q . ok . queue , ( msg ) => {
237- this . OnMessage ( this , msg , q . callback ) ;
241+ this . OnMessage ( q , msg , q . callback ) ;
238242 } , { noAck : false } ) ;
239243 q . consumerTag = consumeresult . consumerTag ;
240244 this . queues [ q . queue ] = q ;
241245 return q . queue ;
242246 }
243- async AddExchangeConsumer ( exchange : string , algorithm : string , routingkey : string , ExchangeOptions : any , jwt : string , callback : QueueOnMessage ) : Promise < void > {
247+ async AddExchangeConsumer ( cli : WebSocketClient , exchange : string , algorithm : string , routingkey : string , ExchangeOptions : any , jwt : string , callback : QueueOnMessage ) : Promise < void > {
244248 if ( this . channel == null || this . conn == null ) throw new Error ( "Cannot Add new Exchange Consumer, not connected to rabbitmq" ) ;
245249 var q : amqpexchange = null ;
250+ q . cli = cli ;
246251 if ( Config . amqp_force_exchange_prefix && ! Util . IsNullEmpty ( jwt ) ) {
247252 var tuser = Crypt . verityToken ( jwt ) ;
248253 var name = tuser . username . split ( "@" ) . join ( "" ) . split ( "." ) . join ( "" ) ;
@@ -266,14 +271,27 @@ export class amqpwrapper {
266271 AssertQueueOptions = Object . create ( this . AssertQueueOptions ) ;
267272 delete AssertQueueOptions . arguments ;
268273 }
269- q . queue = await this . AddQueueConsumer ( "" , AssertQueueOptions , jwt , q . callback ) ;
274+ q . queue = await this . AddQueueConsumer ( cli , "" , AssertQueueOptions , jwt , q . callback ) ;
270275 this . channel . bindQueue ( q . queue , q . exchange , q . routingkey ) ;
271276 this . _logger . info ( "[AMQP] Added exchange consumer " + q . exchange ) ;
272277 this . exchanges [ exchange ] = q ;
273278 }
274- OnMessage ( sender : amqpwrapper , msg : amqplib . ConsumeMessage , callback : QueueOnMessage ) : void {
279+ OnMessage ( sender : amqpqueue , msg : amqplib . ConsumeMessage , callback : QueueOnMessage ) : void {
275280 // sender._logger.info("OnMessage " + msg.content.toString());
276281 try {
282+ var now = new Date ( ) ;
283+ var seconds = ( now . getTime ( ) - sender . cli . lastheartbeat . getTime ( ) ) / 1000 ;
284+ if ( seconds >= 20 ) {
285+ try {
286+ sender . cli . _logger . info ( "amqpwrapper.OnMessage: receive message for inactive client, nack message and try and close" ) ;
287+ this . channel . nack ( msg ) ;
288+ sender . cli . Close ( ) ;
289+ } catch ( error ) {
290+ console . error ( error ) ;
291+ }
292+ return ;
293+ }
294+
277295 var correlationId : string = msg . properties . correlationId ;
278296 var replyTo : string = msg . properties . replyTo ;
279297 var consumerTag : string = msg . fields . consumerTag ;
0 commit comments