Skip to content

Commit 24bed0a

Browse files
author
Ramón Tomás
committed
Send IntegrationEvents after committing transactions
1 parent adafb9a commit 24bed0a

20 files changed

Lines changed: 144 additions & 49 deletions

src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,14 +217,16 @@ private async Task ProcessEvent(string eventName, string message)
217217
if (subscription.IsDynamic)
218218
{
219219
var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
220+
if (handler == null) continue;
220221
dynamic eventData = JObject.Parse(message);
221222
await handler.Handle(eventData);
222223
}
223224
else
224225
{
226+
var handler = scope.ResolveOptional(subscription.HandlerType);
227+
if (handler == null) continue;
225228
var eventType = _subsManager.GetEventTypeByName(eventName);
226229
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
227-
var handler = scope.ResolveOptional(subscription.HandlerType);
228230
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
229231
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
230232
}

src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,14 +163,16 @@ private async Task<bool> ProcessEvent(string eventName, string message)
163163
if (subscription.IsDynamic)
164164
{
165165
var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
166+
if (handler == null) continue;
166167
dynamic eventData = JObject.Parse(message);
167168
await handler.Handle(eventData);
168169
}
169170
else
170171
{
171-
var eventType = _subsManager.GetEventTypeByName(eventName);
172-
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
173172
var handler = scope.ResolveOptional(subscription.HandlerType);
173+
if (handler == null) continue;
174+
var eventType = _subsManager.GetEventTypeByName(eventName);
175+
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
174176
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
175177
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
176178
}

src/BuildingBlocks/EventBus/IntegrationEventLogEF/EventStateEnum.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF
77
public enum EventStateEnum
88
{
99
NotPublished = 0,
10-
Published = 1,
11-
PublishedFailed = 2
10+
InProgress = 1,
11+
Published = 2,
12+
PublishedFailed = 3
1213
}
1314
}

src/BuildingBlocks/EventBus/IntegrationEventLogEF/IntegrationEventLogEntry.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
using System.Text;
44
using Newtonsoft.Json;
55
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
6+
using System.Linq;
7+
using System.ComponentModel.DataAnnotations.Schema;
68

79
namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF
810
{
@@ -11,7 +13,7 @@ public class IntegrationEventLogEntry
1113
private IntegrationEventLogEntry() { }
1214
public IntegrationEventLogEntry(IntegrationEvent @event)
1315
{
14-
EventId = @event.Id;
16+
EventId = @event.Id;
1517
CreationTime = @event.CreationDate;
1618
EventTypeName = @event.GetType().FullName;
1719
Content = JsonConvert.SerializeObject(@event);
@@ -20,9 +22,18 @@ public IntegrationEventLogEntry(IntegrationEvent @event)
2022
}
2123
public Guid EventId { get; private set; }
2224
public string EventTypeName { get; private set; }
25+
[NotMapped]
26+
public string EventTypeShortName => EventTypeName.Split('.')?.Last();
27+
[NotMapped]
28+
public IntegrationEvent IntegrationEvent { get; private set; }
2329
public EventStateEnum State { get; set; }
2430
public int TimesSent { get; set; }
2531
public DateTime CreationTime { get; private set; }
2632
public string Content { get; private set; }
33+
34+
public void DeserializeJsonContent(Type type)
35+
{
36+
IntegrationEvent = JsonConvert.DeserializeObject(Content, type) as IntegrationEvent;
37+
}
2738
}
2839
}

src/BuildingBlocks/EventBus/IntegrationEventLogEF/Services/IIntegrationEventLogService.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Servi
99
{
1010
public interface IIntegrationEventLogService
1111
{
12+
Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync();
1213
Task SaveEventAsync(IntegrationEvent @event, DbTransaction transaction);
13-
Task MarkEventAsPublishedAsync(IntegrationEvent @event);
14+
Task MarkEventAsPublishedAsync(Guid eventId);
15+
Task MarkEventAsInProgressAsync(Guid eventId);
16+
Task MarkEventAsFailedAsync(Guid eventId);
1417
}
1518
}

src/BuildingBlocks/EventBus/IntegrationEventLogEF/Services/IntegrationEventLogService.cs

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
using Microsoft.EntityFrameworkCore;
22
using Microsoft.EntityFrameworkCore.Diagnostics;
3+
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus;
34
using Microsoft.eShopOnContainers.BuildingBlocks.EventBus.Events;
5+
using Newtonsoft.Json;
46
using System;
7+
using System.Collections;
8+
using System.Collections.Generic;
59
using System.Data.Common;
610
using System.Linq;
711
using System.Threading.Tasks;
@@ -10,19 +14,35 @@ namespace Microsoft.eShopOnContainers.BuildingBlocks.IntegrationEventLogEF.Servi
1014
{
1115
public class IntegrationEventLogService : IIntegrationEventLogService
1216
{
17+
private readonly IEventBusSubscriptionsManager _subsManager;
1318
private readonly IntegrationEventLogContext _integrationEventLogContext;
1419
private readonly DbConnection _dbConnection;
1520

16-
public IntegrationEventLogService(DbConnection dbConnection)
21+
public IntegrationEventLogService(IEventBusSubscriptionsManager subsManager,
22+
DbConnection dbConnection)
1723
{
1824
_dbConnection = dbConnection ?? throw new ArgumentNullException(nameof(dbConnection));
25+
_subsManager = subsManager ?? throw new ArgumentNullException(nameof(subsManager));
1926
_integrationEventLogContext = new IntegrationEventLogContext(
2027
new DbContextOptionsBuilder<IntegrationEventLogContext>()
2128
.UseSqlServer(_dbConnection)
2229
.ConfigureWarnings(warnings => warnings.Throw(RelationalEventId.QueryClientEvaluationWarning))
2330
.Options);
2431
}
2532

33+
public async Task<IEnumerable<IntegrationEventLogEntry>> RetrieveEventLogsPendingToPublishAsync()
34+
{
35+
var eventLogsPendingToPublish = await _integrationEventLogContext.IntegrationEventLogs
36+
.Where(e => e.State == EventStateEnum.NotPublished)
37+
.OrderBy(o => o.CreationTime)
38+
.ToListAsync();
39+
40+
eventLogsPendingToPublish.ForEach(evtLog =>
41+
evtLog.DeserializeJsonContent(_subsManager.GetEventTypeByName(evtLog.EventTypeShortName)));
42+
43+
return eventLogsPendingToPublish;
44+
}
45+
2646
public Task SaveEventAsync(IntegrationEvent @event, DbTransaction transaction)
2747
{
2848
if (transaction == null)
@@ -38,11 +58,28 @@ public Task SaveEventAsync(IntegrationEvent @event, DbTransaction transaction)
3858
return _integrationEventLogContext.SaveChangesAsync();
3959
}
4060

41-
public Task MarkEventAsPublishedAsync(IntegrationEvent @event)
61+
public Task MarkEventAsPublishedAsync(Guid eventId)
62+
{
63+
return UpdateEventStatus(eventId, EventStateEnum.Published);
64+
}
65+
66+
public Task MarkEventAsInProgressAsync(Guid eventId)
4267
{
43-
var eventLogEntry = _integrationEventLogContext.IntegrationEventLogs.Single(ie => ie.EventId == @event.Id);
44-
eventLogEntry.TimesSent++;
45-
eventLogEntry.State = EventStateEnum.Published;
68+
return UpdateEventStatus(eventId, EventStateEnum.InProgress);
69+
}
70+
71+
public Task MarkEventAsFailedAsync(Guid eventId)
72+
{
73+
return UpdateEventStatus(eventId, EventStateEnum.PublishedFailed);
74+
}
75+
76+
private Task UpdateEventStatus(Guid eventId, EventStateEnum status)
77+
{
78+
var eventLogEntry = _integrationEventLogContext.IntegrationEventLogs.Single(ie => ie.EventId == eventId);
79+
eventLogEntry.State = status;
80+
81+
if(status == EventStateEnum.InProgress)
82+
eventLogEntry.TimesSent++;
4683

4784
_integrationEventLogContext.IntegrationEventLogs.Update(eventLogEntry);
4885

src/Services/Catalog/Catalog.API/IntegrationEvents/CatalogIntegrationEventService.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,16 @@ public CatalogIntegrationEventService(IEventBus eventBus, CatalogContext catalog
2929

3030
public async Task PublishThroughEventBusAsync(IntegrationEvent evt)
3131
{
32-
_eventBus.Publish(evt);
33-
34-
await _eventLogService.MarkEventAsPublishedAsync(evt);
32+
try
33+
{
34+
await _eventLogService.MarkEventAsInProgressAsync(evt.Id);
35+
_eventBus.Publish(evt);
36+
await _eventLogService.MarkEventAsPublishedAsync(evt.Id);
37+
}
38+
catch (Exception)
39+
{
40+
await _eventLogService.MarkEventAsFailedAsync(evt.Id);
41+
}
3542
}
3643

3744
public async Task SaveEventAndCatalogContextChangesAsync(IntegrationEvent evt)

src/Services/Catalog/Catalog.API/Startup.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,11 @@ public static IServiceCollection AddSwagger(this IServiceCollection services)
232232
public static IServiceCollection AddIntegrationServices(this IServiceCollection services, IConfiguration configuration)
233233
{
234234
services.AddTransient<Func<DbConnection, IIntegrationEventLogService>>(
235-
sp => (DbConnection c) => new IntegrationEventLogService(c));
235+
sp =>
236+
{
237+
var busMgr = sp.GetRequiredService<IEventBusSubscriptionsManager>();
238+
return (DbConnection c) => new IntegrationEventLogService(busMgr, c);
239+
});
236240

237241
services.AddTransient<ICatalogIntegrationEventService, CatalogIntegrationEventService>();
238242

src/Services/Ordering/Ordering.API/Application/Behaviors/TransactionBehaviour.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using Microsoft.EntityFrameworkCore;
33
using Microsoft.eShopOnContainers.Services.Ordering.Infrastructure;
44
using Microsoft.Extensions.Logging;
5+
using Ordering.API.Application.IntegrationEvents;
56
using System;
67
using System.Collections.Generic;
78
using System.Linq;
@@ -14,10 +15,14 @@ public class TransactionBehaviour<TRequest, TResponse> : IPipelineBehavior<TRequ
1415
{
1516
private readonly ILogger<TransactionBehaviour<TRequest, TResponse>> _logger;
1617
private readonly OrderingContext _dbContext;
18+
private readonly IOrderingIntegrationEventService _orderingIntegrationEventService;
1719

18-
public TransactionBehaviour(OrderingContext dbContext, ILogger<TransactionBehaviour<TRequest, TResponse>> logger)
20+
public TransactionBehaviour(OrderingContext dbContext,
21+
IOrderingIntegrationEventService orderingIntegrationEventService,
22+
ILogger<TransactionBehaviour<TRequest, TResponse>> logger)
1923
{
2024
_dbContext = dbContext ?? throw new ArgumentException(nameof(OrderingContext));
25+
_orderingIntegrationEventService = orderingIntegrationEventService ?? throw new ArgumentException(nameof(orderingIntegrationEventService));
2126
_logger = logger ?? throw new ArgumentException(nameof(ILogger));
2227
}
2328

@@ -39,6 +44,8 @@ await strategy.ExecuteAsync(async () =>
3944
await _dbContext.CommitTransactionAsync();
4045

4146
_logger.LogInformation($"Committed transaction {typeof(TRequest).Name}");
47+
48+
await _orderingIntegrationEventService.PublishEventsThroughEventBusAsync();
4249
});
4350

4451
return response;

src/Services/Ordering/Ordering.API/Application/Commands/CreateOrderCommandHandler.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
namespace Microsoft.eShopOnContainers.Services.Ordering.API.Application.Commands
22
{
33
using Domain.AggregatesModel.OrderAggregate;
4+
using global::Ordering.API.Application.IntegrationEvents;
5+
using global::Ordering.API.Application.IntegrationEvents.Events;
46
using MediatR;
57
using Microsoft.eShopOnContainers.Services.Ordering.API.Infrastructure.Services;
68
using Microsoft.eShopOnContainers.Services.Ordering.Infrastructure.Idempotency;
@@ -15,17 +17,26 @@ public class CreateOrderCommandHandler
1517
private readonly IOrderRepository _orderRepository;
1618
private readonly IIdentityService _identityService;
1719
private readonly IMediator _mediator;
20+
private readonly IOrderingIntegrationEventService _orderingIntegrationEventService;
1821

1922
// Using DI to inject infrastructure persistence Repositories
20-
public CreateOrderCommandHandler(IMediator mediator, IOrderRepository orderRepository, IIdentityService identityService)
23+
public CreateOrderCommandHandler(IMediator mediator,
24+
IOrderingIntegrationEventService orderingIntegrationEventService,
25+
IOrderRepository orderRepository,
26+
IIdentityService identityService)
2127
{
2228
_orderRepository = orderRepository ?? throw new ArgumentNullException(nameof(orderRepository));
2329
_identityService = identityService ?? throw new ArgumentNullException(nameof(identityService));
2430
_mediator = mediator ?? throw new ArgumentNullException(nameof(mediator));
31+
_orderingIntegrationEventService = orderingIntegrationEventService ?? throw new ArgumentNullException(nameof(orderingIntegrationEventService));
2532
}
2633

2734
public async Task<bool> Handle(CreateOrderCommand message, CancellationToken cancellationToken)
2835
{
36+
// Add Integration event to clean the basket
37+
var orderStartedIntegrationEvent = new OrderStartedIntegrationEvent(message.UserId);
38+
await _orderingIntegrationEventService.AddAndSaveEventAsync(orderStartedIntegrationEvent);
39+
2940
// Add/Update the Buyer AggregateRoot
3041
// DDD patterns comment: Add child entities and value-objects through the Order Aggregate-Root
3142
// methods and constructor so validations, invariants and business logic

0 commit comments

Comments
 (0)