Skip to content

Commit 5b38a49

Browse files
committed
Add Basket handler and subscription to events. Change in EventBus to use broker and one message queue per microservice http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html
1 parent 18a4020 commit 5b38a49

10 files changed

Lines changed: 136 additions & 65 deletions

File tree

src/Services/Basket/Basket.API/Basket.API.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
<PackageReference Include="Swashbuckle" Version="6.0.0-beta902" />
4040
</ItemGroup>
4141

42+
<ItemGroup>
43+
<ProjectReference Include="..\..\Common\Infrastructure\Infrastructure.csproj" />
44+
</ItemGroup>
45+
4246
<ItemGroup>
4347
<None Update="Dockerfile">
4448
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
using Microsoft.eShopOnContainers.Services.Basket.API.Model;
2+
using Microsoft.eShopOnContainers.Services.Common.Infrastructure;
3+
using Microsoft.eShopOnContainers.Services.Common.Infrastructure.Catalog;
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System.Threading.Tasks;
8+
9+
namespace Basket.API.Events
10+
{
11+
public class CatalogPriceChangedHandler : IIntegrationEventHandler<CatalogPriceChanged>
12+
{
13+
private readonly IBasketRepository _repository;
14+
public CatalogPriceChangedHandler()
15+
{
16+
//_repository = repository;
17+
}
18+
19+
public void Handle(CatalogPriceChanged @event)
20+
{
21+
22+
}
23+
}
24+
}
25+

src/Services/Basket/Basket.API/Startup.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
using System.Net;
1414
using Swashbuckle.Swagger.Model;
1515
using Microsoft.eShopOnContainers.Services.Basket.API.Auth.Server;
16+
using Microsoft.eShopOnContainers.Services.Common.Infrastructure;
17+
using Microsoft.eShopOnContainers.Services.Common.Infrastructure.Catalog;
18+
using Basket.API.Events;
1619

1720
namespace Microsoft.eShopOnContainers.Services.Basket.API
1821
{
@@ -76,6 +79,9 @@ public void ConfigureServices(IServiceCollection services)
7679
});
7780

7881
services.AddTransient<IBasketRepository, RedisBasketRepository>();
82+
var eventBus = new EventBus();
83+
services.AddSingleton<IEventBus>(eventBus);
84+
eventBus.Subscribe<CatalogPriceChanged>(new CatalogPriceChangedHandler());
7985
}
8086

8187
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.

src/Services/Catalog/Catalog.API/Controllers/CatalogController.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public async Task<IActionResult> Items(int? catalogTypeId, int? catalogBrandId,
107107
//hook to run integration tests until POST methods are created
108108
if (catalogTypeId.HasValue && catalogTypeId == 1)
109109
{
110-
_eventBus.Publish(new CatalogPriceChanged());
110+
_eventBus.Publish(new CatalogPriceChanged(2, 10.4M));
111111
}
112112

113113
return Ok(model);

src/Services/Common/Infrastructure/Catalog/CatalogPriceChanged.cs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@ namespace Microsoft.eShopOnContainers.Services.Common.Infrastructure.Catalog
66
{
77
public class CatalogPriceChanged : IIntegrationEvent
88
{
9-
private readonly string _eventName = "catalogpricechanged";
9+
public string Message { get { return "CatalogPriceChanged here!!"; } }
1010

11-
public string Name {
12-
get
13-
{
14-
return _eventName;
15-
}
16-
}
11+
public int ItemId { get; private set; }
12+
13+
public decimal NewPrice { get; private set; }
1714

18-
public string Message { get { return "CatalogPriceChanged!!"; } }
19-
}
15+
public CatalogPriceChanged(int itemId, decimal newPrice)
16+
{
17+
ItemId = itemId;
18+
NewPrice = newPrice;
19+
}
20+
}
2021
}

src/Services/Common/Infrastructure/Catalog/CatalogPriceChangedHandler.cs

Lines changed: 0 additions & 14 deletions
This file was deleted.

src/Services/Common/Infrastructure/EventBus.cs

Lines changed: 85 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,47 @@
11

22
using Microsoft.eShopOnContainers.Services.Common.Infrastructure.Catalog;
3+
using Newtonsoft.Json;
34
using RabbitMQ.Client;
45
using RabbitMQ.Client.Events;
56
using System;
7+
using System.Collections;
68
using System.Collections.Generic;
9+
using System.Linq;
10+
using System.Reflection;
711
using System.Text;
812

913
namespace Microsoft.eShopOnContainers.Services.Common.Infrastructure
1014
{
1115
public class EventBus : IEventBus
1216
{
13-
private readonly Dictionary<string, List<IIntegrationEventHandler>> _handlers;
14-
private readonly Dictionary<string, Tuple<IModel, IConnection>> _listeners;
17+
private readonly string _brokerName = "event_bus";
18+
private readonly Dictionary<string, List<IIntegrationEventHandler>> _handlers;
19+
private readonly List<Type> _eventTypes;
20+
21+
private Tuple<IModel, IConnection> _connection;
22+
private string _queueName;
23+
1524

1625
public EventBus()
1726
{
1827
_handlers = new Dictionary<string, List<IIntegrationEventHandler>>();
19-
_listeners = new Dictionary<string, Tuple<IModel, IConnection>>();
28+
_eventTypes = new List<Type>();
2029
}
2130
public void Publish(IIntegrationEvent @event)
2231
{
32+
var eventName = @event.GetType().Name;
2333
var factory = new ConnectionFactory() { HostName = "172.20.0.1" };
2434
using (var connection = factory.CreateConnection())
2535
using (var channel = connection.CreateModel())
26-
{
27-
channel.QueueDeclare(queue: @event.Name,
28-
durable: false,
29-
exclusive: false,
30-
autoDelete: false,
31-
arguments: null);
36+
{
37+
channel.ExchangeDeclare(exchange: _brokerName,
38+
type: "direct");
3239

33-
string message = ((CatalogPriceChanged)@event).Message;
40+
string message = JsonConvert.SerializeObject(@event);
3441
var body = Encoding.UTF8.GetBytes(message);
3542

36-
channel.BasicPublish(exchange: "",
37-
routingKey: @event.Name,
43+
channel.BasicPublish(exchange: _brokerName,
44+
routingKey: eventName,
3845
basicProperties: null,
3946
body: body);
4047
}
@@ -50,30 +57,14 @@ public void Subscribe<T>(IIntegrationEventHandler<T> handler) where T : IIntegra
5057
}
5158
else
5259
{
53-
var factory = new ConnectionFactory() { HostName = "172.18.0.1" };
54-
var connection = factory.CreateConnection();
55-
var channel = connection.CreateModel();
56-
57-
channel.QueueDeclare(queue: eventName,
58-
durable: false,
59-
exclusive: false,
60-
autoDelete: false,
61-
arguments: null);
62-
63-
var consumer = new EventingBasicConsumer(channel);
64-
consumer.Received += (model, ea) =>
65-
{
66-
var body = ea.Body;
67-
var message = Encoding.UTF8.GetString(body);
68-
};
69-
channel.BasicConsume(queue: "hello",
70-
noAck: true,
71-
consumer: consumer);
72-
;
73-
74-
_listeners.Add(eventName, new Tuple<IModel, IConnection>(channel, connection));
60+
var channel = GetChannel();
61+
channel.QueueBind(queue: _queueName,
62+
exchange: _brokerName,
63+
routingKey: eventName);
64+
7565
_handlers.Add(eventName, new List<IIntegrationEventHandler>());
7666
_handlers[eventName].Add(handler);
67+
_eventTypes.Add(typeof(T));
7768
}
7869

7970
}
@@ -88,13 +79,69 @@ public void Unsubscribe<T>(IIntegrationEventHandler<T> handler) where T : IInteg
8879
if (_handlers[eventName].Count == 0)
8980
{
9081
_handlers.Remove(eventName);
82+
var eventType = _eventTypes.Single(e => e.Name == eventName);
83+
_eventTypes.Remove(eventType);
84+
_connection.Item1.QueueUnbind(queue: _queueName,
85+
exchange: _brokerName,
86+
routingKey: eventName);
87+
88+
if (_handlers.Keys.Count == 0)
89+
{
90+
_queueName = string.Empty;
91+
_connection.Item1.Close();
92+
_connection.Item2.Close();
93+
}
94+
95+
}
96+
}
97+
}
9198

92-
var connectionItems =_listeners[eventName];
93-
_listeners.Remove(eventName);
99+
private IModel GetChannel()
100+
{
101+
if (_connection != null)
102+
{
103+
return _connection.Item1;
104+
}
105+
else
106+
{
107+
var factory = new ConnectionFactory() { HostName = "172.20.0.1" };
108+
var connection = factory.CreateConnection();
109+
var channel = connection.CreateModel();
94110

95-
connectionItems.Item1.Close();
96-
connectionItems.Item2.Close();
111+
channel.ExchangeDeclare(exchange: _brokerName,
112+
type: "direct");
113+
if (string.IsNullOrEmpty(_queueName))
114+
{
115+
_queueName = channel.QueueDeclare().QueueName;
97116
}
117+
118+
var consumer = new EventingBasicConsumer(channel);
119+
consumer.Received += (model, ea) =>
120+
{
121+
var eventName = ea.RoutingKey;
122+
if (_handlers.ContainsKey(eventName))
123+
{
124+
var message = Encoding.UTF8.GetString(ea.Body);
125+
Type eventType = _eventTypes.Single(t => t.Name == eventName);
126+
127+
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
128+
var handlers = _handlers[eventName];
129+
130+
131+
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
132+
133+
foreach (var handler in handlers)
134+
{
135+
concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
136+
}
137+
}
138+
};
139+
channel.BasicConsume(queue: _queueName,
140+
noAck: true,
141+
consumer: consumer);
142+
_connection = new Tuple<IModel, IConnection>(channel, connection);
143+
144+
return _connection.Item1;
98145
}
99146
}
100147
}

src/Services/Common/Infrastructure/IIntegrationEvent.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
namespace Microsoft.eShopOnContainers.Services.Common.Infrastructure
66
{
77
public interface IIntegrationEvent
8-
{
9-
string Name { get; }
8+
{
109
}
1110
}

src/Services/Common/Infrastructure/IIntegrationEventHandler.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,7 @@ public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEv
1010
void Handle(TIntegrationEvent @event);
1111
}
1212

13-
public interface IIntegrationEventHandler { }
13+
public interface IIntegrationEventHandler
14+
{
15+
}
1416
}

src/Services/Common/Infrastructure/Infrastructure.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
<RootNamespace>Microsoft.eShopOnContainers.Services.Common.Infrastructure</RootNamespace>
66
</PropertyGroup>
77
<ItemGroup>
8+
<PackageReference Include="Newtonsoft.Json" Version="9.0.1" />
89
<PackageReference Include="RabbitMQ.Client" Version="4.1.1" />
910
</ItemGroup>
1011
</Project>

0 commit comments

Comments
 (0)