Skip to content

Commit aa7556a

Browse files
committed
Create EventBusServiceBus project and add EventBusServiceBus/ServiceBusPersisterConnection to project
1 parent 721a4fd commit aa7556a

5 files changed

Lines changed: 266 additions & 1 deletion

File tree

eShopOnContainers-ServicesAndWebApps.sln

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
Microsoft Visual Studio Solution File, Format Version 12.00
33
# Visual Studio 15
4-
VisualStudioVersion = 15.0.26403.3
4+
VisualStudioVersion = 15.0.26430.6
55
MinimumVisualStudioVersion = 10.0.40219.1
66
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{932D8224-11F6-4D07-B109-DA28AD288A63}"
77
EndProject
@@ -76,6 +76,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Extensions.Health
7676
EndProject
7777
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventBus.Tests", "src\BuildingBlocks\EventBus\EventBus.Tests\EventBus.Tests.csproj", "{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}"
7878
EndProject
79+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventBusServiceBus", "src\BuildingBlocks\EventBus\EventBusServiceBus\EventBusServiceBus.csproj", "{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}"
80+
EndProject
7981
Global
8082
GlobalSection(SolutionConfigurationPlatforms) = preSolution
8183
Ad-Hoc|Any CPU = Ad-Hoc|Any CPU
@@ -1002,6 +1004,54 @@ Global
10021004
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x64.Build.0 = Release|Any CPU
10031005
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x86.ActiveCfg = Release|Any CPU
10041006
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D}.Release|x86.Build.0 = Release|Any CPU
1007+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|Any CPU.ActiveCfg = Debug|Any CPU
1008+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|Any CPU.Build.0 = Debug|Any CPU
1009+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|ARM.ActiveCfg = Debug|Any CPU
1010+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|ARM.Build.0 = Debug|Any CPU
1011+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|iPhone.ActiveCfg = Debug|Any CPU
1012+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|iPhone.Build.0 = Debug|Any CPU
1013+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|iPhoneSimulator.ActiveCfg = Debug|Any CPU
1014+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|iPhoneSimulator.Build.0 = Debug|Any CPU
1015+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|x64.ActiveCfg = Debug|Any CPU
1016+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|x64.Build.0 = Debug|Any CPU
1017+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|x86.ActiveCfg = Debug|Any CPU
1018+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Ad-Hoc|x86.Build.0 = Debug|Any CPU
1019+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|Any CPU.ActiveCfg = Debug|Any CPU
1020+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|Any CPU.Build.0 = Debug|Any CPU
1021+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|ARM.ActiveCfg = Debug|Any CPU
1022+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|ARM.Build.0 = Debug|Any CPU
1023+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|iPhone.ActiveCfg = Debug|Any CPU
1024+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|iPhone.Build.0 = Debug|Any CPU
1025+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|iPhoneSimulator.ActiveCfg = Debug|Any CPU
1026+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|iPhoneSimulator.Build.0 = Debug|Any CPU
1027+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|x64.ActiveCfg = Debug|Any CPU
1028+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|x64.Build.0 = Debug|Any CPU
1029+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|x86.ActiveCfg = Debug|Any CPU
1030+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.AppStore|x86.Build.0 = Debug|Any CPU
1031+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
1032+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|Any CPU.Build.0 = Debug|Any CPU
1033+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|ARM.ActiveCfg = Debug|Any CPU
1034+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|ARM.Build.0 = Debug|Any CPU
1035+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|iPhone.ActiveCfg = Debug|Any CPU
1036+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|iPhone.Build.0 = Debug|Any CPU
1037+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|iPhoneSimulator.ActiveCfg = Debug|Any CPU
1038+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|iPhoneSimulator.Build.0 = Debug|Any CPU
1039+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|x64.ActiveCfg = Debug|Any CPU
1040+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|x64.Build.0 = Debug|Any CPU
1041+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|x86.ActiveCfg = Debug|Any CPU
1042+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Debug|x86.Build.0 = Debug|Any CPU
1043+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|Any CPU.ActiveCfg = Release|Any CPU
1044+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|Any CPU.Build.0 = Release|Any CPU
1045+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|ARM.ActiveCfg = Release|Any CPU
1046+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|ARM.Build.0 = Release|Any CPU
1047+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|iPhone.ActiveCfg = Release|Any CPU
1048+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|iPhone.Build.0 = Release|Any CPU
1049+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|iPhoneSimulator.ActiveCfg = Release|Any CPU
1050+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|iPhoneSimulator.Build.0 = Release|Any CPU
1051+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|x64.ActiveCfg = Release|Any CPU
1052+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|x64.Build.0 = Release|Any CPU
1053+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|x86.ActiveCfg = Release|Any CPU
1054+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8}.Release|x86.Build.0 = Release|Any CPU
10051055
EndGlobalSection
10061056
GlobalSection(SolutionProperties) = preSolution
10071057
HideSolutionNode = FALSE
@@ -1038,5 +1088,6 @@ Global
10381088
{22A0F9C1-2D4A-4107-95B7-8459E6688BC5} = {A81ECBC2-6B00-4DCD-8388-469174033379}
10391089
{4BD76717-3102-4969-8C2C-BAAA3F0263B6} = {A81ECBC2-6B00-4DCD-8388-469174033379}
10401090
{89D80DF1-32E1-4AAF-970F-DA0AA6881F9D} = {807BB76E-B2BB-47A2-A57B-3D1B20FF5E7F}
1091+
{69AF10D3-AA76-4FF7-B187-EC7E8CC5F5B8} = {807BB76E-B2BB-47A2-A57B-3D1B20FF5E7F}
10411092
EndGlobalSection
10421093
EndGlobal
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
using Microsoft.Azure.ServiceBus;
2+
using Microsoft.Extensions.Logging;
3+
using System;
4+
using System.IO;
5+
6+
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
7+
{
8+
public class DefaultServiceBusPersisterConnection : ServiceBusConnection, IServiceBusPersisterConnection
9+
{
10+
private readonly ILogger<ServiceBusConnection> _logger;
11+
private readonly ServiceBusConnectionStringBuilder _serviceBusConnectionStringBuilder;
12+
private ITopicClient _topicClient;
13+
14+
bool _disposed;
15+
object sync_root = new object();
16+
17+
public DefaultServiceBusPersisterConnection(ServiceBusConnectionStringBuilder serviceBusConnectionStringBuilder,
18+
TimeSpan operationTimeout, RetryPolicy retryPolicy, ILogger<ServiceBusConnection> logger)
19+
: base(operationTimeout, retryPolicy)
20+
{
21+
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
22+
23+
InitializeConnection(serviceBusConnectionStringBuilder);
24+
_serviceBusConnectionStringBuilder = serviceBusConnectionStringBuilder ??
25+
throw new ArgumentNullException(nameof(serviceBusConnectionStringBuilder));
26+
}
27+
28+
public bool IsConnected => _topicClient.IsClosedOrClosing;
29+
30+
public ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder => _serviceBusConnectionStringBuilder;
31+
32+
public ITopicClient CreateModel()
33+
{
34+
if(_topicClient.IsClosedOrClosing)
35+
{
36+
_topicClient = new TopicClient(_serviceBusConnectionStringBuilder, RetryPolicy);
37+
}
38+
39+
return _topicClient;
40+
}
41+
42+
public void Dispose()
43+
{
44+
if (_disposed) return;
45+
46+
_disposed = true;
47+
}
48+
}
49+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
2+
{
3+
using System;
4+
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Abstractions;
5+
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
6+
using Microsoft.Extensions.Logging;
7+
using Microsoft.Azure.ServiceBus;
8+
using Newtonsoft.Json;
9+
using System.Text;
10+
using System.Threading.Tasks;
11+
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
12+
using System.Reflection;
13+
using Microsoft.Azure.ServiceBus.Filters;
14+
15+
public class EventBusServiceBus : IEventBus
16+
{
17+
private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection;
18+
private ServiceBusConnectionStringBuilder _serviceBusConnectionStringBuilder;
19+
private readonly ILogger<EventBusServiceBus> _logger;
20+
private readonly IEventBusSubscriptionsManager _subsManager;
21+
private readonly SubscriptionClient _subscriptionClient;
22+
23+
public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection,
24+
ILogger<EventBusServiceBus> logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName)
25+
{
26+
_serviceBusPersisterConnection = serviceBusPersisterConnection;
27+
_logger = logger;
28+
_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
29+
30+
_subscriptionClient = new SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder,
31+
subscriptionClientName);
32+
}
33+
34+
public void Publish(IntegrationEvent @event)
35+
{
36+
var eventName = @event.GetType().Name;
37+
var jsonMessage = JsonConvert.SerializeObject(@event);
38+
var body = Encoding.UTF8.GetBytes(jsonMessage);
39+
40+
var message = new Message
41+
{
42+
MessageId = new Guid().ToString(),
43+
Body = Encoding.UTF8.GetBytes(jsonMessage),
44+
Label = eventName,
45+
};
46+
47+
var topicClient = _serviceBusPersisterConnection.CreateModel();
48+
49+
topicClient.SendAsync(message)
50+
.GetAwaiter()
51+
.GetResult();
52+
}
53+
54+
public void Subscribe<T, TH>(Func<TH> handler)
55+
where T : IntegrationEvent
56+
where TH : IIntegrationEventHandler<T>
57+
{
58+
var eventName = typeof(T).Name;
59+
var containsKey = _subsManager.HasSubscriptionsForEvent<T>();
60+
if (!containsKey)
61+
{
62+
try
63+
{
64+
_subscriptionClient.AddRuleAsync(new RuleDescription
65+
{
66+
Filter = new CorrelationFilter { Label = eventName },
67+
Name = eventName
68+
}).GetAwaiter().GetResult();
69+
}
70+
catch(ServiceBusException)
71+
{
72+
_logger.LogWarning($"The messaging entity {eventName} already exists.");
73+
}
74+
}
75+
76+
_subsManager.AddSubscription<T, TH>(handler);
77+
}
78+
79+
public void Unsubscribe<T, TH>()
80+
where T : IntegrationEvent
81+
where TH : IIntegrationEventHandler<T>
82+
{
83+
var eventName = typeof(T).Name;
84+
85+
try
86+
{
87+
_subscriptionClient
88+
.RemoveRuleAsync(eventName)
89+
.GetAwaiter()
90+
.GetResult();
91+
}
92+
catch (MessagingEntityNotFoundException)
93+
{
94+
_logger.LogWarning($"The messaging entity {eventName} Could not be found.");
95+
}
96+
97+
_subsManager.RemoveSubscription<T, TH>();
98+
}
99+
100+
public void Dispose()
101+
{
102+
_subsManager.Clear();
103+
}
104+
105+
//private async Task CreateConsumerChannel()
106+
//{
107+
// _subscriptionClient.RegisterMessageHandler(
108+
// async (message, token) =>
109+
// {
110+
// var eventName = message.Label;
111+
// var messageData = Encoding.UTF8.GetString(message.Body);
112+
// await ProcessEvent(eventName, messageData);
113+
// },
114+
// new MessageHandlerOptions() { MaxConcurrentCalls = 10, AutoComplete = true });
115+
//}
116+
117+
private async Task ProcessEvent(string eventName, string message)
118+
{
119+
if (_subsManager.HasSubscriptionsForEvent(eventName))
120+
{
121+
var eventType = _subsManager.GetEventTypeByName(eventName);
122+
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
123+
var handlers = _subsManager.GetHandlersForEvent(eventName);
124+
125+
foreach (var handlerfactory in handlers)
126+
{
127+
var handler = handlerfactory.DynamicInvoke();
128+
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
129+
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
130+
}
131+
}
132+
}
133+
}
134+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>netcoreapp1.1</TargetFramework>
5+
<RootNamespace>Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus</RootNamespace>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="0.0.5-preview" />
10+
<PackageReference Include="Microsoft.Extensions.Logging" Version="1.1.1" />
11+
</ItemGroup>
12+
13+
<ItemGroup>
14+
<ProjectReference Include="..\EventBus\EventBus.csproj" />
15+
</ItemGroup>
16+
17+
</Project>
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusServiceBus
2+
{
3+
using System;
4+
using Microsoft.Azure.ServiceBus;
5+
6+
public interface IServiceBusPersisterConnection : IDisposable
7+
{
8+
ServiceBusConnectionStringBuilder ServiceBusConnectionStringBuilder { get; }
9+
10+
bool IsConnected { get; }
11+
12+
ITopicClient CreateModel();
13+
}
14+
}

0 commit comments

Comments
 (0)