From e962737d2c1184ef6b24dc750bc73dc56642eaad Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 15 Jun 2026 12:42:15 -0400 Subject: [PATCH] feat(sessions): add bounded replay ring buffer to SessionEventDistributor --- docs/GatewayConfiguration.md | 11 +- .../Configuration/EventOptions.cs | 16 ++ .../Configuration/GatewayOptionsValidator.cs | 10 + .../Sessions/SessionEventDistributor.cs | 204 ++++++++++++++++++ .../appsettings.json | 4 +- .../Sessions/SessionEventDistributorTests.cs | 133 +++++++++++- 6 files changed, 375 insertions(+), 3 deletions(-) diff --git a/docs/GatewayConfiguration.md b/docs/GatewayConfiguration.md index b4f45a5..90db1b5 100644 --- a/docs/GatewayConfiguration.md +++ b/docs/GatewayConfiguration.md @@ -41,7 +41,9 @@ paths, timeouts, queue sizes, enum values, or protocol values are invalid. }, "Events": { "QueueCapacity": 10000, - "BackpressurePolicy": "FailFast" + "BackpressurePolicy": "FailFast", + "ReplayBufferCapacity": 1024, + "ReplayRetentionSeconds": 300 }, "Dashboard": { "Enabled": true, @@ -135,12 +137,19 @@ ordering and avoids competing consumers. |--------|---------|-------------| | `MxGateway:Events:QueueCapacity` | `10000` | Capacity for bounded per-session event queues used by the gateway worker event channel and the public gRPC event stream queue. | | `MxGateway:Events:BackpressurePolicy` | `FailFast` | Event backpressure behavior. `FailFast` faults the session on public stream queue overflow. `DisconnectSubscriber` disconnects only the slow stream. | +| `MxGateway:Events:ReplayBufferCapacity` | `1024` | Maximum number of events retained per session in the replay ring buffer, used to re-deliver events a returning subscriber missed (reconnect/reattach). The oldest retained event is evicted once this count is exceeded. `0` disables replay retention. | +| `MxGateway:Events:ReplayRetentionSeconds` | `300` | Maximum age, in seconds, of an event retained in the replay ring buffer. Entries older than this are evicted regardless of capacity. `0` disables age-based eviction. | `QueueCapacity` must be greater than zero. With `FailFast`, queue overflow faults the affected worker or session instead of silently dropping MXAccess events. With `DisconnectSubscriber`, public gRPC stream overflow terminates only the affected stream while the MXAccess session remains active. +`ReplayBufferCapacity` and `ReplayRetentionSeconds` must each be greater than or +equal to zero (either dimension can be disabled with `0`). A returning subscriber +that asks for events older than the oldest still-retained event is told it missed +events (a "gap") and must re-snapshot; whatever is still retained is replayed. + ## Dashboard Options | Option | Default | Description | diff --git a/src/ZB.MOM.WW.MxGateway.Server/Configuration/EventOptions.cs b/src/ZB.MOM.WW.MxGateway.Server/Configuration/EventOptions.cs index 3d3fb2e..408fb79 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Configuration/EventOptions.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Configuration/EventOptions.cs @@ -11,4 +11,20 @@ public sealed class EventOptions /// Gets the backpressure policy for event queue overflow. /// public EventBackpressurePolicy BackpressurePolicy { get; init; } = EventBackpressurePolicy.FailFast; + + /// + /// Gets the maximum number of events retained in the per-session replay ring buffer + /// used to re-deliver events a returning subscriber missed (reconnect/reattach). + /// When the buffer exceeds this count the oldest retained events are evicted first. + /// A value of 0 disables replay retention entirely. + /// + public int ReplayBufferCapacity { get; init; } = 1024; + + /// + /// Gets the maximum age, in seconds, of an event retained in the per-session replay + /// ring buffer. Entries older than this are evicted regardless of capacity. A value + /// of 0 disables age-based eviction (only + /// bounds the buffer). + /// + public double ReplayRetentionSeconds { get; init; } = 300; } diff --git a/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs b/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs index fb7ed3e..f93ec03 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Configuration/GatewayOptionsValidator.cs @@ -193,6 +193,16 @@ public sealed class GatewayOptionsValidator : OptionsValidatorBase= 0, + "MxGateway:Events:ReplayRetentionSeconds must be greater than or equal to zero."); } private static void ValidateDashboard(DashboardOptions options, ValidationBuilder builder) diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs index 038a510..8cb55ce 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs @@ -55,10 +55,24 @@ public sealed class SessionEventDistributor : IAsyncDisposable private readonly int _subscriberQueueCapacity; private readonly TimeSpan _shutdownTimeout; private readonly ILogger _logger; + private readonly TimeProvider _timeProvider; private readonly ConcurrentDictionary _subscribers = new(); private readonly CancellationTokenSource _shutdownCts = new(); private readonly object _lifecycleLock = new(); + // Replay ring buffer. Appended on the pump thread and queried from arbitrary + // threads via TryGetReplayFrom, so every access is under _replayLock. The deque + // keeps events in ascending WorkerSequence order (the pump fans in source order), + // so the oldest retained event is always at the front. Capacity == 0 disables + // retention; RetentionSeconds <= 0 disables age-based eviction. + private readonly int _replayBufferCapacity; + private readonly TimeSpan _replayRetention; + private readonly bool _ageEvictionEnabled; + private readonly LinkedList _replayBuffer = new(); + private readonly object _replayLock = new(); + private bool _anyEventSeen; + private ulong _highestSequenceSeen; + private long _nextSubscriberId; private Task? _pumpTask; private bool _started; @@ -78,22 +92,80 @@ public sealed class SessionEventDistributor : IAsyncDisposable /// queue capacity shape used today. /// /// Logger for pump lifecycle diagnostics. + /// + /// This overload disables the replay ring buffer (capacity 0). Use the overload + /// taking replay parameters to retain events for reconnect/reattach replay. + /// public SessionEventDistributor( string sessionId, Func> eventSourceFactory, int subscriberQueueCapacity, ILogger logger) + : this( + sessionId, + eventSourceFactory, + subscriberQueueCapacity, + replayBufferCapacity: 0, + replayRetentionSeconds: 0, + logger, + TimeProvider.System) + { + } + + /// + /// Initializes a per-session event distributor with a bounded replay ring buffer. + /// + /// Owning session id, used only for logging context. + /// + /// Factory producing the session's event stream given a cancellation token. + /// The pump consumes this exactly once. See the type remarks for the seam Task 4 + /// plugs into. + /// + /// + /// Bounded capacity of each per-subscriber channel. Mirrors the gRPC event-stream + /// queue capacity shape used today. + /// + /// + /// Maximum number of events retained for replay. The oldest retained event is + /// evicted once this count is exceeded. 0 disables retention entirely. + /// + /// + /// Maximum age, in seconds, of a retained event. Entries older than this are + /// evicted regardless of capacity. 0 (or less) disables age-based eviction. + /// + /// Logger for pump lifecycle diagnostics. + /// + /// Clock used to timestamp and age-evict replay entries. Inject a fake to make + /// age-eviction deterministic in tests. + /// + public SessionEventDistributor( + string sessionId, + Func> eventSourceFactory, + int subscriberQueueCapacity, + int replayBufferCapacity, + double replayRetentionSeconds, + ILogger logger, + TimeProvider timeProvider) { ArgumentException.ThrowIfNullOrWhiteSpace(sessionId); ArgumentNullException.ThrowIfNull(eventSourceFactory); ArgumentOutOfRangeException.ThrowIfLessThan(subscriberQueueCapacity, 1); + ArgumentOutOfRangeException.ThrowIfNegative(replayBufferCapacity); + ArgumentOutOfRangeException.ThrowIfNegative(replayRetentionSeconds); ArgumentNullException.ThrowIfNull(logger); + ArgumentNullException.ThrowIfNull(timeProvider); _sessionId = sessionId; _eventSourceFactory = eventSourceFactory; _subscriberQueueCapacity = subscriberQueueCapacity; _shutdownTimeout = DefaultShutdownTimeout; + _replayBufferCapacity = replayBufferCapacity; + _ageEvictionEnabled = replayRetentionSeconds > 0; + _replayRetention = _ageEvictionEnabled + ? TimeSpan.FromSeconds(replayRetentionSeconds) + : TimeSpan.Zero; _logger = logger; + _timeProvider = timeProvider; } /// @@ -232,6 +304,12 @@ public sealed class SessionEventDistributor : IAsyncDisposable .WithCancellation(cancellationToken) .ConfigureAwait(false)) { + // Retain for replay BEFORE fan-out so a reconnecting subscriber that + // queries between fan-out and its own read still sees this event. Order + // is preserved: the pump is the single appender and events arrive in + // source order. + AppendToReplayBuffer(mxEvent); + // Enumerating a ConcurrentDictionary's Values never throws on concurrent // add/remove; a subscriber registered mid-iteration may miss this event, // which matches "late subscribers see events after they register". @@ -289,6 +367,132 @@ public sealed class SessionEventDistributor : IAsyncDisposable } } + /// + /// Returns the retained events with strictly + /// greater than , in ascending sequence order, so a + /// reconnecting or reattaching subscriber can replay what it missed. + /// + /// + /// The last worker sequence the caller already observed. Only events newer than this + /// are returned. + /// + /// + /// The retained events newer than , in order. Never + /// null; empty when nothing newer is retained. + /// + /// + /// when events between and the + /// oldest retained event were already evicted (by capacity or age), meaning the caller + /// missed events that can no longer be replayed and must re-snapshot. When + /// , whatever IS still retained is still returned via + /// . + /// + /// + /// Always — the out parameters fully describe the result. The + /// return value exists for a fluent call shape and future extension. + /// + /// + /// Gap semantics, by buffer state: + /// + /// + /// Buffer non-empty: is iff + /// is below the oldest retained sequence minus + /// one (i.e. at least one event newer than but + /// older than the oldest retained was evicted). When + /// equals or exceeds the newest retained + /// sequence the caller is fully caught up: empty list, no gap. + /// + /// + /// Buffer empty (retention disabled, nothing seen yet, or everything evicted): + /// empty list, and is iff + /// is below the highest sequence ever seen — + /// i.e. the caller is behind but nothing is retained to replay. If no event has + /// ever been seen, or the caller is already at/ahead of the highest seen, there + /// is nothing to miss: no gap. + /// + /// + /// + public bool TryGetReplayFrom(ulong afterSequence, out IReadOnlyList events, out bool gap) + { + lock (_replayLock) + { + EvictAged(); + + if (_replayBuffer.Count == 0) + { + events = []; + // Nothing retained. The caller missed events only if it is behind the + // highest sequence ever seen (and we have seen at least one event). + gap = _anyEventSeen && afterSequence < _highestSequenceSeen; + return true; + } + + ulong oldestRetained = _replayBuffer.First!.Value.Event.WorkerSequence; + + // A gap exists when at least one event newer than afterSequence was evicted, + // i.e. afterSequence sits below the oldest-retained-minus-one boundary. + gap = afterSequence + 1 < oldestRetained; + + List newer = []; + foreach (ReplayEntry entry in _replayBuffer) + { + if (entry.Event.WorkerSequence > afterSequence) + { + newer.Add(entry.Event); + } + } + + events = newer; + return true; + } + } + + private void AppendToReplayBuffer(MxEvent mxEvent) + { + lock (_replayLock) + { + _anyEventSeen = true; + if (mxEvent.WorkerSequence > _highestSequenceSeen) + { + _highestSequenceSeen = mxEvent.WorkerSequence; + } + + // Capacity 0 disables retention: track the highest-seen sequence (so replay + // can still report a gap) but keep no events. + if (_replayBufferCapacity == 0) + { + return; + } + + _replayBuffer.AddLast(new ReplayEntry(mxEvent, _timeProvider.GetUtcNow())); + + // Capacity eviction: drop oldest until within bound. + while (_replayBuffer.Count > _replayBufferCapacity) + { + _replayBuffer.RemoveFirst(); + } + + EvictAged(); + } + } + + // Must be called under _replayLock. Drops entries older than the retention window. + private void EvictAged() + { + if (!_ageEvictionEnabled || _replayBuffer.Count == 0) + { + return; + } + + DateTimeOffset cutoff = _timeProvider.GetUtcNow() - _replayRetention; + while (_replayBuffer.First is { } first && first.Value.RetainedAt < cutoff) + { + _replayBuffer.RemoveFirst(); + } + } + + private readonly record struct ReplayEntry(MxEvent Event, DateTimeOffset RetainedAt); + private sealed class Subscriber(long id, Channel channel) { public long Id { get; } = id; diff --git a/src/ZB.MOM.WW.MxGateway.Server/appsettings.json b/src/ZB.MOM.WW.MxGateway.Server/appsettings.json index f18ae0d..d64ab14 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/appsettings.json +++ b/src/ZB.MOM.WW.MxGateway.Server/appsettings.json @@ -50,7 +50,9 @@ }, "Events": { "QueueCapacity": 10000, - "BackpressurePolicy": "FailFast" + "BackpressurePolicy": "FailFast", + "ReplayBufferCapacity": 1024, + "ReplayRetentionSeconds": 300 }, "Dashboard": { "Enabled": true, diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs index 8842d1b..7df4273 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs @@ -1,5 +1,6 @@ using System.Threading.Channels; using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Time.Testing; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Server.Sessions; @@ -112,12 +113,142 @@ public sealed class SessionEventDistributorTests Assert.Throws(() => distributor.Register()); } + [Fact] + public async Task ReplayBuffer_OverCapacity_EvictsOldestFirst_AndReportsGap() + { + Channel source = Channel.CreateUnbounded(); + await using SessionEventDistributor distributor = CreateDistributor( + source.Reader, + replayBufferCapacity: 3, + replayRetentionSeconds: 0); + await distributor.StartAsync(CancellationToken.None); + + // A live subscriber forces the pump to fan (and thereby retain) each event, + // and gives us a deterministic point to know the pump has processed event 5. + using IEventSubscriberLease lease = distributor.Register(); + for (ulong sequence = 1; sequence <= 5; sequence++) + { + source.Writer.TryWrite(Event(sequence)); + } + + for (ulong sequence = 1; sequence <= 5; sequence++) + { + MxEvent e = await ReadOneAsync(lease.Reader); + Assert.Equal(sequence, e.WorkerSequence); + } + + // Capacity 3 retains only the newest three: sequences 3, 4, 5. Events 1 and 2 + // were evicted, so a caller asking from 0 missed events => gap=true, and it + // gets only the retained tail. + bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList replay, out bool gap); + + Assert.True(found); + Assert.True(gap); + Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence)); + } + + [Fact] + public async Task ReplayBuffer_WithinRetainedWindow_ReturnsNewerEvents_NoGap() + { + Channel source = Channel.CreateUnbounded(); + await using SessionEventDistributor distributor = CreateDistributor( + source.Reader, + replayBufferCapacity: 10, + replayRetentionSeconds: 0); + await distributor.StartAsync(CancellationToken.None); + + using IEventSubscriberLease lease = distributor.Register(); + for (ulong sequence = 1; sequence <= 5; sequence++) + { + source.Writer.TryWrite(Event(sequence)); + _ = await ReadOneAsync(lease.Reader); + } + + // afterSequence 2 is still inside the retained window [1..5], so no gap and + // exactly the newer events 3, 4, 5 come back. + bool found = distributor.TryGetReplayFrom(2, out IReadOnlyList replay, out bool gap); + + Assert.True(found); + Assert.False(gap); + Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence)); + } + + [Fact] + public async Task ReplayBuffer_AgedEntries_AreEvictedAfterRetentionElapses() + { + FakeTimeProvider time = new(new DateTimeOffset(2026, 1, 1, 0, 0, 0, TimeSpan.Zero)); + Channel source = Channel.CreateUnbounded(); + await using SessionEventDistributor distributor = CreateDistributor( + source.Reader, + replayBufferCapacity: 100, + replayRetentionSeconds: 30, + timeProvider: time); + await distributor.StartAsync(CancellationToken.None); + + using IEventSubscriberLease lease = distributor.Register(); + + // Two old events, then advance the clock well past the retention window. + source.Writer.TryWrite(Event(1)); + source.Writer.TryWrite(Event(2)); + _ = await ReadOneAsync(lease.Reader); + _ = await ReadOneAsync(lease.Reader); + + time.Advance(TimeSpan.FromSeconds(60)); + + // A fresh event triggers age-eviction of the now-stale entries 1 and 2. + source.Writer.TryWrite(Event(3)); + _ = await ReadOneAsync(lease.Reader); + + bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList replay, out bool gap); + + Assert.True(found); + // Events 1 and 2 aged out; only 3 remains, and 0 predates the oldest retained. + Assert.Equal(new ulong[] { 3 }, replay.Select(e => e.WorkerSequence)); + Assert.True(gap); + } + + [Fact] + public async Task ReplayBuffer_AfterSequenceNewerThanAllRetained_ReturnsEmpty_NoGap() + { + Channel source = Channel.CreateUnbounded(); + await using SessionEventDistributor distributor = CreateDistributor( + source.Reader, + replayBufferCapacity: 10, + replayRetentionSeconds: 0); + await distributor.StartAsync(CancellationToken.None); + + using IEventSubscriberLease lease = distributor.Register(); + for (ulong sequence = 1; sequence <= 3; sequence++) + { + source.Writer.TryWrite(Event(sequence)); + _ = await ReadOneAsync(lease.Reader); + } + + // afterSequence 3 is at/after the newest retained; nothing newer, and the + // caller is fully caught up => empty list, gap=false. + bool found = distributor.TryGetReplayFrom(3, out IReadOnlyList replay, out bool gap); + + Assert.True(found); + Assert.False(gap); + Assert.Empty(replay); + } + private static SessionEventDistributor CreateDistributor(ChannelReader source) + => CreateDistributor(source, replayBufferCapacity: 1024, replayRetentionSeconds: 300); + + private static SessionEventDistributor CreateDistributor( + ChannelReader source, + int replayBufferCapacity, + double replayRetentionSeconds, + TimeProvider? timeProvider = null) => new( "session-test", ct => source.ReadAllAsync(ct), subscriberQueueCapacity: 64, - NullLogger.Instance); + replayBufferCapacity: replayBufferCapacity, + replayRetentionSeconds: replayRetentionSeconds, + NullLogger.Instance, + timeProvider ?? TimeProvider.System); private static MxEvent Event(ulong sequence) => new() { SessionId = "session-test", WorkerSequence = sequence };