Skip to content

Commit f071ee7

Browse files
Unai Zorrilla CastroUnai Zorrilla Castro
authored andcommitted
Fix bug with queue names
1 parent 44e39a6 commit f071ee7

1 file changed

Lines changed: 18 additions & 10 deletions

File tree

src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ private readonly List<Type> _eventTypes
3131
= new List<Type>();
3232

3333
private IModel _consumerChannel;
34+
private string _queueName;
3435

3536
public EventBusRabbitMQ(IRabbitMQPersisterConnection persisterConnection, ILogger<EventBusRabbitMQ> logger)
3637
{
@@ -93,7 +94,7 @@ public void Subscribe<T>(IIntegrationEventHandler<T> handler) where T : Integrat
9394

9495
using (var channel = _persisterConnection.CreateModel())
9596
{
96-
channel.QueueBind(queue: channel.QueueDeclare().QueueName,
97+
channel.QueueBind(queue: _queueName,
9798
exchange: BROKER_NAME,
9899
routingKey: eventName);
99100

@@ -131,9 +132,16 @@ public void Unsubscribe<T>(IIntegrationEventHandler<T> handler) where T : Integr
131132

132133
using (var channel = _persisterConnection.CreateModel())
133134
{
134-
channel.QueueUnbind(queue: channel.QueueDeclare().QueueName,
135+
channel.QueueUnbind(queue: _queueName,
135136
exchange: BROKER_NAME,
136137
routingKey: eventName);
138+
139+
if (_handlers.Keys.Count == 0)
140+
{
141+
_queueName = string.Empty;
142+
143+
_consumerChannel.Close();
144+
}
137145
}
138146
}
139147
}
@@ -142,7 +150,11 @@ public void Unsubscribe<T>(IIntegrationEventHandler<T> handler) where T : Integr
142150

143151
public void Dispose()
144152
{
145-
_consumerChannel.Dispose();
153+
if (_consumerChannel != null)
154+
{
155+
_consumerChannel.Dispose();
156+
}
157+
146158
_handlers.Clear();
147159
}
148160

@@ -158,6 +170,8 @@ private IModel CreateConsumerChannel()
158170
channel.ExchangeDeclare(exchange: BROKER_NAME,
159171
type: "direct");
160172

173+
_queueName = channel.QueueDeclare().QueueName;
174+
161175
var consumer = new EventingBasicConsumer(channel);
162176
consumer.Received += async (model, ea) =>
163177
{
@@ -167,16 +181,10 @@ private IModel CreateConsumerChannel()
167181
await ProcessEvent(eventName, message);
168182
};
169183

170-
channel.BasicConsume(queue: channel.QueueDeclare().QueueName,
184+
channel.BasicConsume(queue: _queueName,
171185
noAck: true,
172186
consumer: consumer);
173187

174-
channel.ModelShutdown += (sender, ea) =>
175-
{
176-
_consumerChannel.Dispose();
177-
_consumerChannel = CreateConsumerChannel();
178-
};
179-
180188
channel.CallbackException += (sender, ea) =>
181189
{
182190
_consumerChannel.Dispose();

0 commit comments

Comments
 (0)