@@ -21,7 +21,8 @@ public class EventBusRabbitMQ : IEventBus, IDisposable
2121 private readonly Dictionary < string , List < IIntegrationEventHandler > > _handlers ;
2222 private readonly List < Type > _eventTypes ;
2323
24- private Tuple < IModel , IConnection > _connection ;
24+ private IModel _model ;
25+ private IConnection _connection ;
2526 private string _queueName ;
2627
2728
@@ -86,15 +87,15 @@ public void Unsubscribe<T>(IIntegrationEventHandler<T> handler) where T : Integr
8687 _handlers . Remove ( eventName ) ;
8788 var eventType = _eventTypes . Single ( e => e . Name == eventName ) ;
8889 _eventTypes . Remove ( eventType ) ;
89- _connection . Item1 . QueueUnbind ( queue : _queueName ,
90+ _model . QueueUnbind ( queue : _queueName ,
9091 exchange : _brokerName ,
9192 routingKey : eventName ) ;
9293
9394 if ( _handlers . Keys . Count == 0 )
9495 {
9596 _queueName = string . Empty ;
96- _connection . Item1 . Dispose ( ) ;
97- _connection . Item2 . Dispose ( ) ;
97+ _model . Dispose ( ) ;
98+ _connection . Dispose ( ) ;
9899 }
99100
100101 }
@@ -103,48 +104,51 @@ public void Unsubscribe<T>(IIntegrationEventHandler<T> handler) where T : Integr
103104
104105 public void Dispose ( )
105106 {
106- if ( _connection != null )
107- {
108- _handlers . Clear ( ) ;
109- _connection . Item1 . Dispose ( ) ;
110- _connection . Item2 . Dispose ( ) ;
111- }
107+ _handlers . Clear ( ) ;
108+ _model ? . Dispose ( ) ;
109+ _connection ? . Dispose ( ) ;
112110 }
113111
114112 private IModel GetChannel ( )
115113 {
116- if ( _connection != null )
114+ if ( _model != null )
117115 {
118- return _connection . Item1 ;
116+ return _model ;
119117 }
120118 else
121119 {
122- var factory = new ConnectionFactory ( ) { HostName = _connectionString } ;
123- var connection = factory . CreateConnection ( ) ;
124- var channel = connection . CreateModel ( ) ;
120+ ( ( _model , _connection ) = CreateConnection ( ) ;
121+ return _model ;
122+ }
123+ }
125124
126- channel . ExchangeDeclare ( exchange : _brokerName ,
127- type : "direct" ) ;
128- if ( string . IsNullOrEmpty ( _queueName ) )
129- {
130- _queueName = channel . QueueDeclare ( ) . QueueName ;
131- }
132125
133- var consumer = new EventingBasicConsumer ( channel ) ;
134- consumer . Received += async ( model , ea ) =>
135- {
136- var eventName = ea . RoutingKey ;
137- var message = Encoding . UTF8 . GetString ( ea . Body ) ;
138-
139- await ProcessEvent ( eventName , message ) ;
140- } ;
141- channel . BasicConsume ( queue : _queueName ,
142- noAck : true ,
143- consumer : consumer ) ;
144- _connection = new Tuple < IModel , IConnection > ( channel , connection ) ;
126+ private ( IModel model , IConnection connection ) CreateConnection ( )
127+ {
128+ var factory = new ConnectionFactory ( ) { HostName = _connectionString } ;
129+ var con = factory . CreateConnection ( ) ;
130+ var channel = con . CreateModel ( ) ;
145131
146- return _connection . Item1 ;
132+ channel . ExchangeDeclare ( exchange : _brokerName ,
133+ type : "direct" ) ;
134+ if ( string . IsNullOrEmpty ( _queueName ) )
135+ {
136+ _queueName = channel . QueueDeclare ( ) . QueueName ;
147137 }
138+
139+ var consumer = new EventingBasicConsumer ( channel ) ;
140+ consumer . Received += async ( model , ea ) =>
141+ {
142+ var eventName = ea . RoutingKey ;
143+ var message = Encoding . UTF8 . GetString ( ea . Body ) ;
144+
145+ await ProcessEvent ( eventName , message ) ;
146+ } ;
147+ channel . BasicConsume ( queue : _queueName ,
148+ noAck : true ,
149+ consumer : consumer ) ;
150+
151+ return ( channel , con ) ;
148152 }
149153
150154 private async Task ProcessEvent ( string eventName , string message )
0 commit comments