Skip to content

Commit 8b84696

Browse files
authored
Update EventBusServiceBus.cs
Do not complete messages until actually processed.
1 parent 1b8806e commit 8b84696

1 file changed

Lines changed: 9 additions & 3 deletions

File tree

src/BuildingBlocks/EventBus/EventBusServiceBus/EventBusServiceBus.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,13 @@ private void RegisterSubscriptionClientMessageHandler()
129129
{
130130
var eventName = $"{message.Label}{INTEGRATION_EVENT_SUFIX}";
131131
var messageData = Encoding.UTF8.GetString(message.Body);
132-
await ProcessEvent(eventName, messageData);
132+
var processed = await ProcessEvent(eventName, messageData);
133133

134134
// Complete the message so that it is not received again.
135-
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
135+
if (processed)
136+
{
137+
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
138+
}
136139
},
137140
new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 10, AutoComplete = false });
138141
}
@@ -148,8 +151,9 @@ private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceiv
148151
return Task.CompletedTask;
149152
}
150153

151-
private async Task ProcessEvent(string eventName, string message)
154+
private async Task<bool> ProcessEvent(string eventName, string message)
152155
{
156+
var processed = false;
153157
if (_subsManager.HasSubscriptionsForEvent(eventName))
154158
{
155159
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
@@ -173,7 +177,9 @@ private async Task ProcessEvent(string eventName, string message)
173177
}
174178
}
175179
}
180+
processed = true;
176181
}
182+
return processed
177183
}
178184

179185
private void RemoveDefaultRule()

0 commit comments

Comments
 (0)