@@ -209,7 +209,6 @@ export class amqpwrapper {
209209 async AddQueueConsumer ( cli : WebSocketClient , queue : string , QueueOptions : any , jwt : string , callback : QueueOnMessage ) : Promise < string > {
210210 if ( this . channel == null || this . conn == null ) throw new Error ( "Cannot Add new Queue Consumer, not connected to rabbitmq" ) ;
211211 var q : amqpqueue = null ;
212- q . cli = cli ;
213212 if ( Config . amqp_force_queue_prefix && ! Util . IsNullEmpty ( jwt ) ) {
214213 var tuser = Crypt . verityToken ( jwt ) ;
215214 var name = tuser . username . split ( "@" ) . join ( "" ) . split ( "." ) . join ( "" ) ;
@@ -220,9 +219,15 @@ export class amqpwrapper {
220219
221220 if ( this . exchanges [ queue ] != null ) {
222221 q = this . queues [ queue ] ;
222+ try {
223+ if ( this . channel != null && ! Util . IsNullEmpty ( q . consumerTag ) ) await this . channel . cancel ( q . consumerTag ) ;
224+ } catch ( error ) {
225+ console . error ( error ) ;
226+ }
223227 } else {
224228 q = new amqpqueue ( ) ;
225229 }
230+ q . cli = cli ;
226231 if ( ! Util . IsNullEmpty ( q . queue ) ) {
227232 if ( q . queue . startsWith ( "amq." ) ) {
228233 delete this . queues [ q . queue ] ;
@@ -247,7 +252,6 @@ export class amqpwrapper {
247252 async AddExchangeConsumer ( cli : WebSocketClient , exchange : string , algorithm : string , routingkey : string , ExchangeOptions : any , jwt : string , callback : QueueOnMessage ) : Promise < void > {
248253 if ( this . channel == null || this . conn == null ) throw new Error ( "Cannot Add new Exchange Consumer, not connected to rabbitmq" ) ;
249254 var q : amqpexchange = null ;
250- q . cli = cli ;
251255 if ( Config . amqp_force_exchange_prefix && ! Util . IsNullEmpty ( jwt ) ) {
252256 var tuser = Crypt . verityToken ( jwt ) ;
253257 var name = tuser . username . split ( "@" ) . join ( "" ) . split ( "." ) . join ( "" ) ;
@@ -260,6 +264,7 @@ export class amqpwrapper {
260264 } else {
261265 q = new amqpexchange ( ) ;
262266 }
267+ q . cli = cli ;
263268 if ( ! Util . IsNullEmpty ( q . queue ) ) {
264269 delete this . queues [ q . queue ] ;
265270 }
0 commit comments