@@ -5,7 +5,7 @@ import { Message, JSONfn } from "./Messages/Message";
55import { Config } from "./Config" ;
66import { amqpwrapper , QueueMessageOptions , amqpqueue } from "./amqpwrapper" ;
77import { NoderedUtil , Base , InsertOneMessage , QueueMessage , MapReduceMessage , QueryMessage , UpdateOneMessage , UpdateManyMessage , DeleteOneMessage , User , mapFunc , reduceFunc , finalizeFunc , QueuedMessage , QueuedMessageCallback } from "openflow-api" ;
8- import { Mutex } from "./Mutex" ;
8+ // import { Mutex } from "./Mutex";
99
1010interface IHashTable < T > {
1111 [ key : string ] : T ;
@@ -21,6 +21,25 @@ interface IHashTable<T> {
2121// public id: string;
2222// public message: any;
2323// }
24+ const Semaphore = ( n ) => ( {
25+ n,
26+ async down ( ) {
27+ while ( this . n <= 0 ) await this . wait ( ) ;
28+ this . n -- ;
29+ } ,
30+ up ( ) {
31+ this . n ++ ;
32+ } ,
33+ async wait ( ) {
34+ if ( this . n <= 0 ) return await new Promise ( ( res , req ) => {
35+ setImmediate ( async ( ) => res ( await this . wait ( ) ) )
36+ } ) ;
37+ return ;
38+ } ,
39+ } ) ;
40+ const semaphore = Semaphore ( 1 ) ;
41+
42+
2443export class WebSocketServerClient {
2544 public jwt : string ;
2645 public _logger : winston . Logger ;
@@ -38,7 +57,12 @@ export class WebSocketServerClient {
3857 // public consumers: amqp_consumer[] = [];
3958 // public queues: IHashTable<amqpqueue> = {};
4059 public _queues : amqpqueue [ ] = [ ] ;
41- public queuesMutex = new Mutex ( ) ;
60+ // public queuesMutex = new Mutex();
61+
62+
63+
64+
65+
4266
4367 constructor ( logger : winston . Logger , socketObject : WebSocket ) {
4468 this . _logger = logger ;
@@ -121,17 +145,19 @@ export class WebSocketServerClient {
121145 }
122146 }
123147 public async CloseConsumers ( ) : Promise < void > {
124- return await this . queuesMutex . dispatch ( async ( ) => {
125- for ( let i = this . _queues . length - 1 ; i >= 0 ; i -- ) {
126- try {
127- // await this.CloseConsumer(this._queues[i]);
128- await amqpwrapper . Instance ( ) . RemoveQueueConsumer ( this . _queues [ i ] ) ;
129- this . _queues . splice ( i , 1 ) ;
130- } catch ( error ) {
131- this . _logger . error ( "WebSocketclient::closeconsumers " + error ) ;
132- }
148+ await semaphore . down ( ) ;
149+ for ( let i = this . _queues . length - 1 ; i >= 0 ; i -- ) {
150+ try {
151+ // await this.CloseConsumer(this._queues[i]);
152+ await amqpwrapper . Instance ( ) . RemoveQueueConsumer ( this . _queues [ i ] ) ;
153+ this . _queues . splice ( i , 1 ) ;
154+ } catch ( error ) {
155+ this . _logger . error ( "WebSocketclient::closeconsumers " + error ) ;
133156 }
134- } ) ;
157+ }
158+ semaphore . up ( ) ;
159+ // return await this.queuesMutex.dispatch(async () => {
160+ // });
135161 }
136162 public async Close ( ) : Promise < void > {
137163 await this . CloseConsumers ( ) ;
@@ -152,19 +178,17 @@ export class WebSocketServerClient {
152178 }
153179 }
154180 public async CloseConsumer ( queuename : string ) : Promise < void > {
155- return await this . queuesMutex . dispatch ( async ( ) => {
156- for ( var i = this . _queues . length - 1 ; i >= 0 ; i -- ) {
157- var q = this . _queues [ i ] ;
158- if ( q . queue == queuename || q . queuename == queuename ) {
159- try {
160- await amqpwrapper . Instance ( ) . RemoveQueueConsumer ( this . _queues [ i ] ) ;
161- this . _queues . splice ( i , 1 ) ;
162- } catch ( error ) {
163- this . _logger . error ( "WebSocketclient::CloseConsumer " + error ) ;
164- }
181+ for ( var i = this . _queues . length - 1 ; i >= 0 ; i -- ) {
182+ var q = this . _queues [ i ] ;
183+ if ( q . queue == queuename || q . queuename == queuename ) {
184+ try {
185+ await amqpwrapper . Instance ( ) . RemoveQueueConsumer ( this . _queues [ i ] ) ;
186+ this . _queues . splice ( i , 1 ) ;
187+ } catch ( error ) {
188+ this . _logger . error ( "WebSocketclient::CloseConsumer " + error ) ;
165189 }
166190 }
167- } ) ;
191+ }
168192 // if (this.queues[queuename] != null) {
169193 // try {
170194 // await amqpwrapper.Instance().RemoveQueueConsumer(this.queues[queuename]);
@@ -190,42 +214,46 @@ export class WebSocketServerClient {
190214 qname = "unknown." + Math . random ( ) . toString ( 36 ) . substr ( 2 , 9 ) ; autoDelete = true ;
191215 }
192216 }
217+ await semaphore . down ( ) ;
193218 this . CloseConsumer ( qname ) ;
194- // if (this.queues[qname] != null) {
195- // await amqpwrapper.Instance().RemoveQueueConsumer(this.queues[qname]);
196- // delete this.queues[qname];
197- // }
198- // var AssertQueueOptions: any = new Object(amqpwrapper.Instance().AssertQueueOptions);
199- var AssertQueueOptions : any = Object . assign ( { } , ( amqpwrapper . Instance ( ) . AssertQueueOptions ) ) ;
200- AssertQueueOptions . autoDelete = autoDelete ;
201- var queue = await amqpwrapper . Instance ( ) . AddQueueConsumer ( qname , AssertQueueOptions , this . jwt , async ( msg : any , options : QueueMessageOptions , ack : any , done : any ) => {
202- var _data = msg ;
203- try {
204- _data = await this . Queue ( msg , qname , options ) ;
205- ack ( ) ;
206- done ( _data ) ;
207- } catch ( error ) {
208- setTimeout ( ( ) => {
209- ack ( false ) ;
210- // ack(); // just eat the error
219+ try {
220+ // if (this.queues[qname] != null) {
221+ // await amqpwrapper.Instance().RemoveQueueConsumer(this.queues[qname]);
222+ // delete this.queues[qname];
223+ // }
224+ // var AssertQueueOptions: any = new Object(amqpwrapper.Instance().AssertQueueOptions);
225+ var AssertQueueOptions : any = Object . assign ( { } , ( amqpwrapper . Instance ( ) . AssertQueueOptions ) ) ;
226+ AssertQueueOptions . autoDelete = autoDelete ;
227+ var queue = await amqpwrapper . Instance ( ) . AddQueueConsumer ( qname , AssertQueueOptions , this . jwt , async ( msg : any , options : QueueMessageOptions , ack : any , done : any ) => {
228+ var _data = msg ;
229+ try {
230+ _data = await this . Queue ( msg , qname , options ) ;
231+ ack ( ) ;
211232 done ( _data ) ;
212- if ( error . message != null && error . message != "" ) {
213- console . log ( qname + " failed message queue message, nack and re queue message: " , error . message ) ;
214- } else {
215- console . log ( qname + " failed message queue message, nack and re queue message: " , error ) ;
216- }
217- } , Config . amqp_requeue_time ) ;
218- }
219- } ) ;
220- qname = queue . queue ;
221- // if (this.queues[qname] != null) {
222- // await amqpwrapper.Instance().RemoveQueueConsumer(this.queues[qname]);
223- // }
224- //this.queues[qname] = queue;
225- await this . queuesMutex . dispatch ( ( ) => {
233+ } catch ( error ) {
234+ setTimeout ( ( ) => {
235+ ack ( false ) ;
236+ // ack(); // just eat the error
237+ done ( _data ) ;
238+ if ( error . message != null && error . message != "" ) {
239+ console . log ( qname + " failed message queue message, nack and re queue message: " , error . message ) ;
240+ } else {
241+ console . log ( qname + " failed message queue message, nack and re queue message: " , error ) ;
242+ }
243+ } , Config . amqp_requeue_time ) ;
244+ }
245+ } ) ;
246+ qname = queue . queue ;
247+ // if (this.queues[qname] != null) {
248+ // await amqpwrapper.Instance().RemoveQueueConsumer(this.queues[qname]);
249+ // }
250+ //this.queues[qname] = queue;
226251 this . _queues . push ( queue ) ;
227- } ) ;
228- console . log ( '_queues.length: ' + this . _queues . length ) ;
252+ console . log ( '_queues.length: ' + this . _queues . length ) ;
253+ } catch ( error ) {
254+ this . _logger . error ( "WebSocketclient::CreateConsumer " + error ) ;
255+ }
256+ semaphore . up ( ) ;
229257 return queue . queue ;
230258 }
231259 sleep ( ms ) {
0 commit comments