diff --git a/docs/GatewayConfiguration.md b/docs/GatewayConfiguration.md
index b4f45a5..90db1b5 100644
--- a/docs/GatewayConfiguration.md
+++ b/docs/GatewayConfiguration.md
@@ -41,7 +41,9 @@ paths, timeouts, queue sizes, enum values, or protocol values are invalid.
},
"Events": {
"QueueCapacity": 10000,
- "BackpressurePolicy": "FailFast"
+ "BackpressurePolicy": "FailFast",
+ "ReplayBufferCapacity": 1024,
+ "ReplayRetentionSeconds": 300
},
"Dashboard": {
"Enabled": true,
@@ -135,12 +137,19 @@ ordering and avoids competing consumers.
|--------|---------|-------------|
| `MxGateway:Events:QueueCapacity` | `10000` | Capacity for bounded per-session event queues used by the gateway worker event channel and the public gRPC event stream queue. |
| `MxGateway:Events:BackpressurePolicy` | `FailFast` | Event backpressure behavior. `FailFast` faults the session on public stream queue overflow. `DisconnectSubscriber` disconnects only the slow stream. |
+| `MxGateway:Events:ReplayBufferCapacity` | `1024` | Maximum number of events retained per session in the replay ring buffer, used to re-deliver events a returning subscriber missed (reconnect/reattach). The oldest retained event is evicted once this count is exceeded. `0` disables replay retention. |
+| `MxGateway:Events:ReplayRetentionSeconds` | `300` | Maximum age, in seconds, of an event retained in the replay ring buffer. Entries older than this are evicted regardless of capacity. `0` disables age-based eviction. |
`QueueCapacity` must be greater than zero. With `FailFast`, queue overflow
faults the affected worker or session instead of silently dropping MXAccess
events. With `DisconnectSubscriber`, public gRPC stream overflow terminates only
the affected stream while the MXAccess session remains active.
+`ReplayBufferCapacity` and `ReplayRetentionSeconds` must each be greater than or
+equal to zero (either dimension can be disabled with `0`). A returning subscriber
+that asks for events older than the oldest still-retained event is told it missed
+events (a "gap") and must re-snapshot; whatever is still retained is replayed.
+
## Dashboard Options
| Option | Default | Description |
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Configuration/EventOptions.cs b/src/ZB.MOM.WW.MxGateway.Server/Configuration/EventOptions.cs
index 3d3fb2e..408fb79 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Configuration/EventOptions.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Configuration/EventOptions.cs
@@ -11,4 +11,20 @@ public sealed class EventOptions
/// Gets the backpressure policy for event queue overflow.
///
public EventBackpressurePolicy BackpressurePolicy { get; init; } = EventBackpressurePolicy.FailFast;
+
+ ///
+ /// Gets the maximum number of events retained in the per-session replay ring buffer
+ /// used to re-deliver events a returning subscriber missed (reconnect/reattach).
+ /// When the buffer exceeds this count the oldest retained events are evicted first.
+ /// A value of 0 disables replay retention entirely.
+ ///
+ public int ReplayBufferCapacity { get; init; } = 1024;
+
+ ///
+ /// Gets the maximum age, in seconds, of an event retained in the per-session replay
+ /// ring buffer. Entries older than this are evicted regardless of capacity. A value
+ /// of 0 disables age-based eviction (only
+ /// bounds the buffer).
+ ///
+ public double ReplayRetentionSeconds { get; init; } = 300;
}
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs b/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs
index fb7ed3e..f93ec03 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs
@@ -193,6 +193,16 @@ public sealed class GatewayOptionsValidator : OptionsValidatorBase= 0,
+ "MxGateway:Events:ReplayRetentionSeconds must be greater than or equal to zero.");
}
private static void ValidateDashboard(DashboardOptions options, ValidationBuilder builder)
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
index 038a510..8cb55ce 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs
@@ -55,10 +55,24 @@ public sealed class SessionEventDistributor : IAsyncDisposable
private readonly int _subscriberQueueCapacity;
private readonly TimeSpan _shutdownTimeout;
private readonly ILogger _logger;
+ private readonly TimeProvider _timeProvider;
private readonly ConcurrentDictionary _subscribers = new();
private readonly CancellationTokenSource _shutdownCts = new();
private readonly object _lifecycleLock = new();
+ // Replay ring buffer. Appended on the pump thread and queried from arbitrary
+ // threads via TryGetReplayFrom, so every access is under _replayLock. The deque
+ // keeps events in ascending WorkerSequence order (the pump fans in source order),
+ // so the oldest retained event is always at the front. Capacity == 0 disables
+ // retention; RetentionSeconds <= 0 disables age-based eviction.
+ private readonly int _replayBufferCapacity;
+ private readonly TimeSpan _replayRetention;
+ private readonly bool _ageEvictionEnabled;
+ private readonly LinkedList _replayBuffer = new();
+ private readonly object _replayLock = new();
+ private bool _anyEventSeen;
+ private ulong _highestSequenceSeen;
+
private long _nextSubscriberId;
private Task? _pumpTask;
private bool _started;
@@ -78,22 +92,80 @@ public sealed class SessionEventDistributor : IAsyncDisposable
/// queue capacity shape used today.
///
/// Logger for pump lifecycle diagnostics.
+ ///
+ /// This overload disables the replay ring buffer (capacity 0). Use the overload
+ /// taking replay parameters to retain events for reconnect/reattach replay.
+ ///
public SessionEventDistributor(
string sessionId,
Func> eventSourceFactory,
int subscriberQueueCapacity,
ILogger logger)
+ : this(
+ sessionId,
+ eventSourceFactory,
+ subscriberQueueCapacity,
+ replayBufferCapacity: 0,
+ replayRetentionSeconds: 0,
+ logger,
+ TimeProvider.System)
+ {
+ }
+
+ ///
+ /// Initializes a per-session event distributor with a bounded replay ring buffer.
+ ///
+ /// Owning session id, used only for logging context.
+ ///
+ /// Factory producing the session's event stream given a cancellation token.
+ /// The pump consumes this exactly once. See the type remarks for the seam Task 4
+ /// plugs into.
+ ///
+ ///
+ /// Bounded capacity of each per-subscriber channel. Mirrors the gRPC event-stream
+ /// queue capacity shape used today.
+ ///
+ ///
+ /// Maximum number of events retained for replay. The oldest retained event is
+ /// evicted once this count is exceeded. 0 disables retention entirely.
+ ///
+ ///
+ /// Maximum age, in seconds, of a retained event. Entries older than this are
+ /// evicted regardless of capacity. 0 (or less) disables age-based eviction.
+ ///
+ /// Logger for pump lifecycle diagnostics.
+ ///
+ /// Clock used to timestamp and age-evict replay entries. Inject a fake to make
+ /// age-eviction deterministic in tests.
+ ///
+ public SessionEventDistributor(
+ string sessionId,
+ Func> eventSourceFactory,
+ int subscriberQueueCapacity,
+ int replayBufferCapacity,
+ double replayRetentionSeconds,
+ ILogger logger,
+ TimeProvider timeProvider)
{
ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
ArgumentNullException.ThrowIfNull(eventSourceFactory);
ArgumentOutOfRangeException.ThrowIfLessThan(subscriberQueueCapacity, 1);
+ ArgumentOutOfRangeException.ThrowIfNegative(replayBufferCapacity);
+ ArgumentOutOfRangeException.ThrowIfNegative(replayRetentionSeconds);
ArgumentNullException.ThrowIfNull(logger);
+ ArgumentNullException.ThrowIfNull(timeProvider);
_sessionId = sessionId;
_eventSourceFactory = eventSourceFactory;
_subscriberQueueCapacity = subscriberQueueCapacity;
_shutdownTimeout = DefaultShutdownTimeout;
+ _replayBufferCapacity = replayBufferCapacity;
+ _ageEvictionEnabled = replayRetentionSeconds > 0;
+ _replayRetention = _ageEvictionEnabled
+ ? TimeSpan.FromSeconds(replayRetentionSeconds)
+ : TimeSpan.Zero;
_logger = logger;
+ _timeProvider = timeProvider;
}
///
@@ -232,6 +304,12 @@ public sealed class SessionEventDistributor : IAsyncDisposable
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
+ // Retain for replay BEFORE fan-out so a reconnecting subscriber that
+ // queries between fan-out and its own read still sees this event. Order
+ // is preserved: the pump is the single appender and events arrive in
+ // source order.
+ AppendToReplayBuffer(mxEvent);
+
// Enumerating a ConcurrentDictionary's Values never throws on concurrent
// add/remove; a subscriber registered mid-iteration may miss this event,
// which matches "late subscribers see events after they register".
@@ -289,6 +367,132 @@ public sealed class SessionEventDistributor : IAsyncDisposable
}
}
+ ///
+ /// Returns the retained events with strictly
+ /// greater than , in ascending sequence order, so a
+ /// reconnecting or reattaching subscriber can replay what it missed.
+ ///
+ ///
+ /// The last worker sequence the caller already observed. Only events newer than this
+ /// are returned.
+ ///
+ ///
+ /// The retained events newer than , in order. Never
+ /// null; empty when nothing newer is retained.
+ ///
+ ///
+ /// when events between and the
+ /// oldest retained event were already evicted (by capacity or age), meaning the caller
+ /// missed events that can no longer be replayed and must re-snapshot. When
+ /// , whatever IS still retained is still returned via
+ /// .
+ ///
+ ///
+ /// Always — the out parameters fully describe the result. The
+ /// return value exists for a fluent call shape and future extension.
+ ///
+ ///
+ /// Gap semantics, by buffer state:
+ ///
+ /// -
+ /// Buffer non-empty: is iff
+ /// is below the oldest retained sequence minus
+ /// one (i.e. at least one event newer than but
+ /// older than the oldest retained was evicted). When
+ /// equals or exceeds the newest retained
+ /// sequence the caller is fully caught up: empty list, no gap.
+ ///
+ /// -
+ /// Buffer empty (retention disabled, nothing seen yet, or everything evicted):
+ /// empty list, and is iff
+ /// is below the highest sequence ever seen —
+ /// i.e. the caller is behind but nothing is retained to replay. If no event has
+ /// ever been seen, or the caller is already at/ahead of the highest seen, there
+ /// is nothing to miss: no gap.
+ ///
+ ///
+ ///
+ public bool TryGetReplayFrom(ulong afterSequence, out IReadOnlyList events, out bool gap)
+ {
+ lock (_replayLock)
+ {
+ EvictAged();
+
+ if (_replayBuffer.Count == 0)
+ {
+ events = [];
+ // Nothing retained. The caller missed events only if it is behind the
+ // highest sequence ever seen (and we have seen at least one event).
+ gap = _anyEventSeen && afterSequence < _highestSequenceSeen;
+ return true;
+ }
+
+ ulong oldestRetained = _replayBuffer.First!.Value.Event.WorkerSequence;
+
+ // A gap exists when at least one event newer than afterSequence was evicted,
+ // i.e. afterSequence sits below the oldest-retained-minus-one boundary.
+ gap = afterSequence + 1 < oldestRetained;
+
+ List newer = [];
+ foreach (ReplayEntry entry in _replayBuffer)
+ {
+ if (entry.Event.WorkerSequence > afterSequence)
+ {
+ newer.Add(entry.Event);
+ }
+ }
+
+ events = newer;
+ return true;
+ }
+ }
+
+ private void AppendToReplayBuffer(MxEvent mxEvent)
+ {
+ lock (_replayLock)
+ {
+ _anyEventSeen = true;
+ if (mxEvent.WorkerSequence > _highestSequenceSeen)
+ {
+ _highestSequenceSeen = mxEvent.WorkerSequence;
+ }
+
+ // Capacity 0 disables retention: track the highest-seen sequence (so replay
+ // can still report a gap) but keep no events.
+ if (_replayBufferCapacity == 0)
+ {
+ return;
+ }
+
+ _replayBuffer.AddLast(new ReplayEntry(mxEvent, _timeProvider.GetUtcNow()));
+
+ // Capacity eviction: drop oldest until within bound.
+ while (_replayBuffer.Count > _replayBufferCapacity)
+ {
+ _replayBuffer.RemoveFirst();
+ }
+
+ EvictAged();
+ }
+ }
+
+ // Must be called under _replayLock. Drops entries older than the retention window.
+ private void EvictAged()
+ {
+ if (!_ageEvictionEnabled || _replayBuffer.Count == 0)
+ {
+ return;
+ }
+
+ DateTimeOffset cutoff = _timeProvider.GetUtcNow() - _replayRetention;
+ while (_replayBuffer.First is { } first && first.Value.RetainedAt < cutoff)
+ {
+ _replayBuffer.RemoveFirst();
+ }
+ }
+
+ private readonly record struct ReplayEntry(MxEvent Event, DateTimeOffset RetainedAt);
+
private sealed class Subscriber(long id, Channel channel)
{
public long Id { get; } = id;
diff --git a/src/ZB.MOM.WW.MxGateway.Server/appsettings.json b/src/ZB.MOM.WW.MxGateway.Server/appsettings.json
index f18ae0d..d64ab14 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/appsettings.json
+++ b/src/ZB.MOM.WW.MxGateway.Server/appsettings.json
@@ -50,7 +50,9 @@
},
"Events": {
"QueueCapacity": 10000,
- "BackpressurePolicy": "FailFast"
+ "BackpressurePolicy": "FailFast",
+ "ReplayBufferCapacity": 1024,
+ "ReplayRetentionSeconds": 300
},
"Dashboard": {
"Enabled": true,
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
index 8842d1b..7df4273 100644
--- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs
@@ -1,5 +1,6 @@
using System.Threading.Channels;
using Microsoft.Extensions.Logging.Abstractions;
+using Microsoft.Extensions.Time.Testing;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Sessions;
@@ -112,12 +113,142 @@ public sealed class SessionEventDistributorTests
Assert.Throws(() => distributor.Register());
}
+ [Fact]
+ public async Task ReplayBuffer_OverCapacity_EvictsOldestFirst_AndReportsGap()
+ {
+ Channel source = Channel.CreateUnbounded();
+ await using SessionEventDistributor distributor = CreateDistributor(
+ source.Reader,
+ replayBufferCapacity: 3,
+ replayRetentionSeconds: 0);
+ await distributor.StartAsync(CancellationToken.None);
+
+ // A live subscriber forces the pump to fan (and thereby retain) each event,
+ // and gives us a deterministic point to know the pump has processed event 5.
+ using IEventSubscriberLease lease = distributor.Register();
+ for (ulong sequence = 1; sequence <= 5; sequence++)
+ {
+ source.Writer.TryWrite(Event(sequence));
+ }
+
+ for (ulong sequence = 1; sequence <= 5; sequence++)
+ {
+ MxEvent e = await ReadOneAsync(lease.Reader);
+ Assert.Equal(sequence, e.WorkerSequence);
+ }
+
+ // Capacity 3 retains only the newest three: sequences 3, 4, 5. Events 1 and 2
+ // were evicted, so a caller asking from 0 missed events => gap=true, and it
+ // gets only the retained tail.
+ bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList replay, out bool gap);
+
+ Assert.True(found);
+ Assert.True(gap);
+ Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence));
+ }
+
+ [Fact]
+ public async Task ReplayBuffer_WithinRetainedWindow_ReturnsNewerEvents_NoGap()
+ {
+ Channel source = Channel.CreateUnbounded();
+ await using SessionEventDistributor distributor = CreateDistributor(
+ source.Reader,
+ replayBufferCapacity: 10,
+ replayRetentionSeconds: 0);
+ await distributor.StartAsync(CancellationToken.None);
+
+ using IEventSubscriberLease lease = distributor.Register();
+ for (ulong sequence = 1; sequence <= 5; sequence++)
+ {
+ source.Writer.TryWrite(Event(sequence));
+ _ = await ReadOneAsync(lease.Reader);
+ }
+
+ // afterSequence 2 is still inside the retained window [1..5], so no gap and
+ // exactly the newer events 3, 4, 5 come back.
+ bool found = distributor.TryGetReplayFrom(2, out IReadOnlyList replay, out bool gap);
+
+ Assert.True(found);
+ Assert.False(gap);
+ Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence));
+ }
+
+ [Fact]
+ public async Task ReplayBuffer_AgedEntries_AreEvictedAfterRetentionElapses()
+ {
+ FakeTimeProvider time = new(new DateTimeOffset(2026, 1, 1, 0, 0, 0, TimeSpan.Zero));
+ Channel source = Channel.CreateUnbounded();
+ await using SessionEventDistributor distributor = CreateDistributor(
+ source.Reader,
+ replayBufferCapacity: 100,
+ replayRetentionSeconds: 30,
+ timeProvider: time);
+ await distributor.StartAsync(CancellationToken.None);
+
+ using IEventSubscriberLease lease = distributor.Register();
+
+ // Two old events, then advance the clock well past the retention window.
+ source.Writer.TryWrite(Event(1));
+ source.Writer.TryWrite(Event(2));
+ _ = await ReadOneAsync(lease.Reader);
+ _ = await ReadOneAsync(lease.Reader);
+
+ time.Advance(TimeSpan.FromSeconds(60));
+
+ // A fresh event triggers age-eviction of the now-stale entries 1 and 2.
+ source.Writer.TryWrite(Event(3));
+ _ = await ReadOneAsync(lease.Reader);
+
+ bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList replay, out bool gap);
+
+ Assert.True(found);
+ // Events 1 and 2 aged out; only 3 remains, and 0 predates the oldest retained.
+ Assert.Equal(new ulong[] { 3 }, replay.Select(e => e.WorkerSequence));
+ Assert.True(gap);
+ }
+
+ [Fact]
+ public async Task ReplayBuffer_AfterSequenceNewerThanAllRetained_ReturnsEmpty_NoGap()
+ {
+ Channel source = Channel.CreateUnbounded();
+ await using SessionEventDistributor distributor = CreateDistributor(
+ source.Reader,
+ replayBufferCapacity: 10,
+ replayRetentionSeconds: 0);
+ await distributor.StartAsync(CancellationToken.None);
+
+ using IEventSubscriberLease lease = distributor.Register();
+ for (ulong sequence = 1; sequence <= 3; sequence++)
+ {
+ source.Writer.TryWrite(Event(sequence));
+ _ = await ReadOneAsync(lease.Reader);
+ }
+
+ // afterSequence 3 is at/after the newest retained; nothing newer, and the
+ // caller is fully caught up => empty list, gap=false.
+ bool found = distributor.TryGetReplayFrom(3, out IReadOnlyList replay, out bool gap);
+
+ Assert.True(found);
+ Assert.False(gap);
+ Assert.Empty(replay);
+ }
+
private static SessionEventDistributor CreateDistributor(ChannelReader source)
+ => CreateDistributor(source, replayBufferCapacity: 1024, replayRetentionSeconds: 300);
+
+ private static SessionEventDistributor CreateDistributor(
+ ChannelReader source,
+ int replayBufferCapacity,
+ double replayRetentionSeconds,
+ TimeProvider? timeProvider = null)
=> new(
"session-test",
ct => source.ReadAllAsync(ct),
subscriberQueueCapacity: 64,
- NullLogger.Instance);
+ replayBufferCapacity: replayBufferCapacity,
+ replayRetentionSeconds: replayRetentionSeconds,
+ NullLogger.Instance,
+ timeProvider ?? TimeProvider.System);
private static MxEvent Event(ulong sequence)
=> new() { SessionId = "session-test", WorkerSequence = sequence };