feat(sessions): add bounded replay ring buffer to SessionEventDistributor
This commit is contained in:
@@ -41,7 +41,9 @@ paths, timeouts, queue sizes, enum values, or protocol values are invalid.
|
|||||||
},
|
},
|
||||||
"Events": {
|
"Events": {
|
||||||
"QueueCapacity": 10000,
|
"QueueCapacity": 10000,
|
||||||
"BackpressurePolicy": "FailFast"
|
"BackpressurePolicy": "FailFast",
|
||||||
|
"ReplayBufferCapacity": 1024,
|
||||||
|
"ReplayRetentionSeconds": 300
|
||||||
},
|
},
|
||||||
"Dashboard": {
|
"Dashboard": {
|
||||||
"Enabled": true,
|
"Enabled": true,
|
||||||
@@ -135,12 +137,19 @@ ordering and avoids competing consumers.
|
|||||||
|--------|---------|-------------|
|
|--------|---------|-------------|
|
||||||
| `MxGateway:Events:QueueCapacity` | `10000` | Capacity for bounded per-session event queues used by the gateway worker event channel and the public gRPC event stream queue. |
|
| `MxGateway:Events:QueueCapacity` | `10000` | Capacity for bounded per-session event queues used by the gateway worker event channel and the public gRPC event stream queue. |
|
||||||
| `MxGateway:Events:BackpressurePolicy` | `FailFast` | Event backpressure behavior. `FailFast` faults the session on public stream queue overflow. `DisconnectSubscriber` disconnects only the slow stream. |
|
| `MxGateway:Events:BackpressurePolicy` | `FailFast` | Event backpressure behavior. `FailFast` faults the session on public stream queue overflow. `DisconnectSubscriber` disconnects only the slow stream. |
|
||||||
|
| `MxGateway:Events:ReplayBufferCapacity` | `1024` | Maximum number of events retained per session in the replay ring buffer, used to re-deliver events a returning subscriber missed (reconnect/reattach). The oldest retained event is evicted once this count is exceeded. `0` disables replay retention. |
|
||||||
|
| `MxGateway:Events:ReplayRetentionSeconds` | `300` | Maximum age, in seconds, of an event retained in the replay ring buffer. Entries older than this are evicted regardless of capacity. `0` disables age-based eviction. |
|
||||||
|
|
||||||
`QueueCapacity` must be greater than zero. With `FailFast`, queue overflow
|
`QueueCapacity` must be greater than zero. With `FailFast`, queue overflow
|
||||||
faults the affected worker or session instead of silently dropping MXAccess
|
faults the affected worker or session instead of silently dropping MXAccess
|
||||||
events. With `DisconnectSubscriber`, public gRPC stream overflow terminates only
|
events. With `DisconnectSubscriber`, public gRPC stream overflow terminates only
|
||||||
the affected stream while the MXAccess session remains active.
|
the affected stream while the MXAccess session remains active.
|
||||||
|
|
||||||
|
`ReplayBufferCapacity` and `ReplayRetentionSeconds` must each be greater than or
|
||||||
|
equal to zero (either dimension can be disabled with `0`). A returning subscriber
|
||||||
|
that asks for events older than the oldest still-retained event is told it missed
|
||||||
|
events (a "gap") and must re-snapshot; whatever is still retained is replayed.
|
||||||
|
|
||||||
## Dashboard Options
|
## Dashboard Options
|
||||||
|
|
||||||
| Option | Default | Description |
|
| Option | Default | Description |
|
||||||
|
|||||||
@@ -11,4 +11,20 @@ public sealed class EventOptions
|
|||||||
/// Gets the backpressure policy for event queue overflow.
|
/// Gets the backpressure policy for event queue overflow.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public EventBackpressurePolicy BackpressurePolicy { get; init; } = EventBackpressurePolicy.FailFast;
|
public EventBackpressurePolicy BackpressurePolicy { get; init; } = EventBackpressurePolicy.FailFast;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets the maximum number of events retained in the per-session replay ring buffer
|
||||||
|
/// used to re-deliver events a returning subscriber missed (reconnect/reattach).
|
||||||
|
/// When the buffer exceeds this count the oldest retained events are evicted first.
|
||||||
|
/// A value of <c>0</c> disables replay retention entirely.
|
||||||
|
/// </summary>
|
||||||
|
public int ReplayBufferCapacity { get; init; } = 1024;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets the maximum age, in seconds, of an event retained in the per-session replay
|
||||||
|
/// ring buffer. Entries older than this are evicted regardless of capacity. A value
|
||||||
|
/// of <c>0</c> disables age-based eviction (only <see cref="ReplayBufferCapacity"/>
|
||||||
|
/// bounds the buffer).
|
||||||
|
/// </summary>
|
||||||
|
public double ReplayRetentionSeconds { get; init; } = 300;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -193,6 +193,16 @@ public sealed class GatewayOptionsValidator : OptionsValidatorBase<GatewayOption
|
|||||||
{
|
{
|
||||||
builder.Add("MxGateway:Events:BackpressurePolicy must be a supported backpressure policy.");
|
builder.Add("MxGateway:Events:BackpressurePolicy must be a supported backpressure policy.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReplayBufferCapacity and ReplayRetentionSeconds are bounds on the replay ring
|
||||||
|
// buffer; 0 is a valid value (disables that dimension), so only negatives fail.
|
||||||
|
AddIfNegative(
|
||||||
|
options.ReplayBufferCapacity,
|
||||||
|
"MxGateway:Events:ReplayBufferCapacity must be greater than or equal to zero.",
|
||||||
|
builder);
|
||||||
|
builder.RequireThat(
|
||||||
|
options.ReplayRetentionSeconds >= 0,
|
||||||
|
"MxGateway:Events:ReplayRetentionSeconds must be greater than or equal to zero.");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void ValidateDashboard(DashboardOptions options, ValidationBuilder builder)
|
private static void ValidateDashboard(DashboardOptions options, ValidationBuilder builder)
|
||||||
|
|||||||
@@ -55,10 +55,24 @@ public sealed class SessionEventDistributor : IAsyncDisposable
|
|||||||
private readonly int _subscriberQueueCapacity;
|
private readonly int _subscriberQueueCapacity;
|
||||||
private readonly TimeSpan _shutdownTimeout;
|
private readonly TimeSpan _shutdownTimeout;
|
||||||
private readonly ILogger<SessionEventDistributor> _logger;
|
private readonly ILogger<SessionEventDistributor> _logger;
|
||||||
|
private readonly TimeProvider _timeProvider;
|
||||||
private readonly ConcurrentDictionary<long, Subscriber> _subscribers = new();
|
private readonly ConcurrentDictionary<long, Subscriber> _subscribers = new();
|
||||||
private readonly CancellationTokenSource _shutdownCts = new();
|
private readonly CancellationTokenSource _shutdownCts = new();
|
||||||
private readonly object _lifecycleLock = new();
|
private readonly object _lifecycleLock = new();
|
||||||
|
|
||||||
|
// Replay ring buffer. Appended on the pump thread and queried from arbitrary
|
||||||
|
// threads via TryGetReplayFrom, so every access is under _replayLock. The deque
|
||||||
|
// keeps events in ascending WorkerSequence order (the pump fans in source order),
|
||||||
|
// so the oldest retained event is always at the front. Capacity == 0 disables
|
||||||
|
// retention; RetentionSeconds <= 0 disables age-based eviction.
|
||||||
|
private readonly int _replayBufferCapacity;
|
||||||
|
private readonly TimeSpan _replayRetention;
|
||||||
|
private readonly bool _ageEvictionEnabled;
|
||||||
|
private readonly LinkedList<ReplayEntry> _replayBuffer = new();
|
||||||
|
private readonly object _replayLock = new();
|
||||||
|
private bool _anyEventSeen;
|
||||||
|
private ulong _highestSequenceSeen;
|
||||||
|
|
||||||
private long _nextSubscriberId;
|
private long _nextSubscriberId;
|
||||||
private Task? _pumpTask;
|
private Task? _pumpTask;
|
||||||
private bool _started;
|
private bool _started;
|
||||||
@@ -78,22 +92,80 @@ public sealed class SessionEventDistributor : IAsyncDisposable
|
|||||||
/// queue capacity shape used today.
|
/// queue capacity shape used today.
|
||||||
/// </param>
|
/// </param>
|
||||||
/// <param name="logger">Logger for pump lifecycle diagnostics.</param>
|
/// <param name="logger">Logger for pump lifecycle diagnostics.</param>
|
||||||
|
/// <remarks>
|
||||||
|
/// This overload disables the replay ring buffer (capacity 0). Use the overload
|
||||||
|
/// taking replay parameters to retain events for reconnect/reattach replay.
|
||||||
|
/// </remarks>
|
||||||
public SessionEventDistributor(
|
public SessionEventDistributor(
|
||||||
string sessionId,
|
string sessionId,
|
||||||
Func<CancellationToken, IAsyncEnumerable<MxEvent>> eventSourceFactory,
|
Func<CancellationToken, IAsyncEnumerable<MxEvent>> eventSourceFactory,
|
||||||
int subscriberQueueCapacity,
|
int subscriberQueueCapacity,
|
||||||
ILogger<SessionEventDistributor> logger)
|
ILogger<SessionEventDistributor> logger)
|
||||||
|
: this(
|
||||||
|
sessionId,
|
||||||
|
eventSourceFactory,
|
||||||
|
subscriberQueueCapacity,
|
||||||
|
replayBufferCapacity: 0,
|
||||||
|
replayRetentionSeconds: 0,
|
||||||
|
logger,
|
||||||
|
TimeProvider.System)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Initializes a per-session event distributor with a bounded replay ring buffer.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="sessionId">Owning session id, used only for logging context.</param>
|
||||||
|
/// <param name="eventSourceFactory">
|
||||||
|
/// Factory producing the session's event stream given a cancellation token.
|
||||||
|
/// The pump consumes this exactly once. See the type remarks for the seam Task 4
|
||||||
|
/// plugs into.
|
||||||
|
/// </param>
|
||||||
|
/// <param name="subscriberQueueCapacity">
|
||||||
|
/// Bounded capacity of each per-subscriber channel. Mirrors the gRPC event-stream
|
||||||
|
/// queue capacity shape used today.
|
||||||
|
/// </param>
|
||||||
|
/// <param name="replayBufferCapacity">
|
||||||
|
/// Maximum number of events retained for replay. The oldest retained event is
|
||||||
|
/// evicted once this count is exceeded. <c>0</c> disables retention entirely.
|
||||||
|
/// </param>
|
||||||
|
/// <param name="replayRetentionSeconds">
|
||||||
|
/// Maximum age, in seconds, of a retained event. Entries older than this are
|
||||||
|
/// evicted regardless of capacity. <c>0</c> (or less) disables age-based eviction.
|
||||||
|
/// </param>
|
||||||
|
/// <param name="logger">Logger for pump lifecycle diagnostics.</param>
|
||||||
|
/// <param name="timeProvider">
|
||||||
|
/// Clock used to timestamp and age-evict replay entries. Inject a fake to make
|
||||||
|
/// age-eviction deterministic in tests.
|
||||||
|
/// </param>
|
||||||
|
public SessionEventDistributor(
|
||||||
|
string sessionId,
|
||||||
|
Func<CancellationToken, IAsyncEnumerable<MxEvent>> eventSourceFactory,
|
||||||
|
int subscriberQueueCapacity,
|
||||||
|
int replayBufferCapacity,
|
||||||
|
double replayRetentionSeconds,
|
||||||
|
ILogger<SessionEventDistributor> logger,
|
||||||
|
TimeProvider timeProvider)
|
||||||
{
|
{
|
||||||
ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
|
ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
|
||||||
ArgumentNullException.ThrowIfNull(eventSourceFactory);
|
ArgumentNullException.ThrowIfNull(eventSourceFactory);
|
||||||
ArgumentOutOfRangeException.ThrowIfLessThan(subscriberQueueCapacity, 1);
|
ArgumentOutOfRangeException.ThrowIfLessThan(subscriberQueueCapacity, 1);
|
||||||
|
ArgumentOutOfRangeException.ThrowIfNegative(replayBufferCapacity);
|
||||||
|
ArgumentOutOfRangeException.ThrowIfNegative(replayRetentionSeconds);
|
||||||
ArgumentNullException.ThrowIfNull(logger);
|
ArgumentNullException.ThrowIfNull(logger);
|
||||||
|
ArgumentNullException.ThrowIfNull(timeProvider);
|
||||||
|
|
||||||
_sessionId = sessionId;
|
_sessionId = sessionId;
|
||||||
_eventSourceFactory = eventSourceFactory;
|
_eventSourceFactory = eventSourceFactory;
|
||||||
_subscriberQueueCapacity = subscriberQueueCapacity;
|
_subscriberQueueCapacity = subscriberQueueCapacity;
|
||||||
_shutdownTimeout = DefaultShutdownTimeout;
|
_shutdownTimeout = DefaultShutdownTimeout;
|
||||||
|
_replayBufferCapacity = replayBufferCapacity;
|
||||||
|
_ageEvictionEnabled = replayRetentionSeconds > 0;
|
||||||
|
_replayRetention = _ageEvictionEnabled
|
||||||
|
? TimeSpan.FromSeconds(replayRetentionSeconds)
|
||||||
|
: TimeSpan.Zero;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
|
_timeProvider = timeProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -232,6 +304,12 @@ public sealed class SessionEventDistributor : IAsyncDisposable
|
|||||||
.WithCancellation(cancellationToken)
|
.WithCancellation(cancellationToken)
|
||||||
.ConfigureAwait(false))
|
.ConfigureAwait(false))
|
||||||
{
|
{
|
||||||
|
// Retain for replay BEFORE fan-out so a reconnecting subscriber that
|
||||||
|
// queries between fan-out and its own read still sees this event. Order
|
||||||
|
// is preserved: the pump is the single appender and events arrive in
|
||||||
|
// source order.
|
||||||
|
AppendToReplayBuffer(mxEvent);
|
||||||
|
|
||||||
// Enumerating a ConcurrentDictionary's Values never throws on concurrent
|
// Enumerating a ConcurrentDictionary's Values never throws on concurrent
|
||||||
// add/remove; a subscriber registered mid-iteration may miss this event,
|
// add/remove; a subscriber registered mid-iteration may miss this event,
|
||||||
// which matches "late subscribers see events after they register".
|
// which matches "late subscribers see events after they register".
|
||||||
@@ -289,6 +367,132 @@ public sealed class SessionEventDistributor : IAsyncDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns the retained events with <see cref="MxEvent.WorkerSequence"/> strictly
|
||||||
|
/// greater than <paramref name="afterSequence"/>, in ascending sequence order, so a
|
||||||
|
/// reconnecting or reattaching subscriber can replay what it missed.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="afterSequence">
|
||||||
|
/// The last worker sequence the caller already observed. Only events newer than this
|
||||||
|
/// are returned.
|
||||||
|
/// </param>
|
||||||
|
/// <param name="events">
|
||||||
|
/// The retained events newer than <paramref name="afterSequence"/>, in order. Never
|
||||||
|
/// null; empty when nothing newer is retained.
|
||||||
|
/// </param>
|
||||||
|
/// <param name="gap">
|
||||||
|
/// <see langword="true"/> when events between <paramref name="afterSequence"/> and the
|
||||||
|
/// oldest retained event were already evicted (by capacity or age), meaning the caller
|
||||||
|
/// missed events that can no longer be replayed and must re-snapshot. When
|
||||||
|
/// <see langword="true"/>, whatever IS still retained is still returned via
|
||||||
|
/// <paramref name="events"/>.
|
||||||
|
/// </param>
|
||||||
|
/// <returns>
|
||||||
|
/// Always <see langword="true"/> — the out parameters fully describe the result. The
|
||||||
|
/// return value exists for a fluent call shape and future extension.
|
||||||
|
/// </returns>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>Gap semantics, by buffer state:</para>
|
||||||
|
/// <list type="bullet">
|
||||||
|
/// <item>
|
||||||
|
/// Buffer non-empty: <paramref name="gap"/> is <see langword="true"/> iff
|
||||||
|
/// <paramref name="afterSequence"/> is below the oldest retained sequence minus
|
||||||
|
/// one (i.e. at least one event newer than <paramref name="afterSequence"/> but
|
||||||
|
/// older than the oldest retained was evicted). When
|
||||||
|
/// <paramref name="afterSequence"/> equals or exceeds the newest retained
|
||||||
|
/// sequence the caller is fully caught up: empty list, no gap.
|
||||||
|
/// </item>
|
||||||
|
/// <item>
|
||||||
|
/// Buffer empty (retention disabled, nothing seen yet, or everything evicted):
|
||||||
|
/// empty list, and <paramref name="gap"/> is <see langword="true"/> iff
|
||||||
|
/// <paramref name="afterSequence"/> is below the highest sequence ever seen —
|
||||||
|
/// i.e. the caller is behind but nothing is retained to replay. If no event has
|
||||||
|
/// ever been seen, or the caller is already at/ahead of the highest seen, there
|
||||||
|
/// is nothing to miss: no gap.
|
||||||
|
/// </item>
|
||||||
|
/// </list>
|
||||||
|
/// </remarks>
|
||||||
|
public bool TryGetReplayFrom(ulong afterSequence, out IReadOnlyList<MxEvent> events, out bool gap)
|
||||||
|
{
|
||||||
|
lock (_replayLock)
|
||||||
|
{
|
||||||
|
EvictAged();
|
||||||
|
|
||||||
|
if (_replayBuffer.Count == 0)
|
||||||
|
{
|
||||||
|
events = [];
|
||||||
|
// Nothing retained. The caller missed events only if it is behind the
|
||||||
|
// highest sequence ever seen (and we have seen at least one event).
|
||||||
|
gap = _anyEventSeen && afterSequence < _highestSequenceSeen;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
ulong oldestRetained = _replayBuffer.First!.Value.Event.WorkerSequence;
|
||||||
|
|
||||||
|
// A gap exists when at least one event newer than afterSequence was evicted,
|
||||||
|
// i.e. afterSequence sits below the oldest-retained-minus-one boundary.
|
||||||
|
gap = afterSequence + 1 < oldestRetained;
|
||||||
|
|
||||||
|
List<MxEvent> newer = [];
|
||||||
|
foreach (ReplayEntry entry in _replayBuffer)
|
||||||
|
{
|
||||||
|
if (entry.Event.WorkerSequence > afterSequence)
|
||||||
|
{
|
||||||
|
newer.Add(entry.Event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
events = newer;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void AppendToReplayBuffer(MxEvent mxEvent)
|
||||||
|
{
|
||||||
|
lock (_replayLock)
|
||||||
|
{
|
||||||
|
_anyEventSeen = true;
|
||||||
|
if (mxEvent.WorkerSequence > _highestSequenceSeen)
|
||||||
|
{
|
||||||
|
_highestSequenceSeen = mxEvent.WorkerSequence;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Capacity 0 disables retention: track the highest-seen sequence (so replay
|
||||||
|
// can still report a gap) but keep no events.
|
||||||
|
if (_replayBufferCapacity == 0)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_replayBuffer.AddLast(new ReplayEntry(mxEvent, _timeProvider.GetUtcNow()));
|
||||||
|
|
||||||
|
// Capacity eviction: drop oldest until within bound.
|
||||||
|
while (_replayBuffer.Count > _replayBufferCapacity)
|
||||||
|
{
|
||||||
|
_replayBuffer.RemoveFirst();
|
||||||
|
}
|
||||||
|
|
||||||
|
EvictAged();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Must be called under _replayLock. Drops entries older than the retention window.
|
||||||
|
private void EvictAged()
|
||||||
|
{
|
||||||
|
if (!_ageEvictionEnabled || _replayBuffer.Count == 0)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
DateTimeOffset cutoff = _timeProvider.GetUtcNow() - _replayRetention;
|
||||||
|
while (_replayBuffer.First is { } first && first.Value.RetainedAt < cutoff)
|
||||||
|
{
|
||||||
|
_replayBuffer.RemoveFirst();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private readonly record struct ReplayEntry(MxEvent Event, DateTimeOffset RetainedAt);
|
||||||
|
|
||||||
private sealed class Subscriber(long id, Channel<MxEvent> channel)
|
private sealed class Subscriber(long id, Channel<MxEvent> channel)
|
||||||
{
|
{
|
||||||
public long Id { get; } = id;
|
public long Id { get; } = id;
|
||||||
|
|||||||
@@ -50,7 +50,9 @@
|
|||||||
},
|
},
|
||||||
"Events": {
|
"Events": {
|
||||||
"QueueCapacity": 10000,
|
"QueueCapacity": 10000,
|
||||||
"BackpressurePolicy": "FailFast"
|
"BackpressurePolicy": "FailFast",
|
||||||
|
"ReplayBufferCapacity": 1024,
|
||||||
|
"ReplayRetentionSeconds": 300
|
||||||
},
|
},
|
||||||
"Dashboard": {
|
"Dashboard": {
|
||||||
"Enabled": true,
|
"Enabled": true,
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
using System.Threading.Channels;
|
using System.Threading.Channels;
|
||||||
using Microsoft.Extensions.Logging.Abstractions;
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using Microsoft.Extensions.Time.Testing;
|
||||||
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
||||||
using ZB.MOM.WW.MxGateway.Server.Sessions;
|
using ZB.MOM.WW.MxGateway.Server.Sessions;
|
||||||
|
|
||||||
@@ -112,12 +113,142 @@ public sealed class SessionEventDistributorTests
|
|||||||
Assert.Throws<ObjectDisposedException>(() => distributor.Register());
|
Assert.Throws<ObjectDisposedException>(() => distributor.Register());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReplayBuffer_OverCapacity_EvictsOldestFirst_AndReportsGap()
|
||||||
|
{
|
||||||
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||||
|
await using SessionEventDistributor distributor = CreateDistributor(
|
||||||
|
source.Reader,
|
||||||
|
replayBufferCapacity: 3,
|
||||||
|
replayRetentionSeconds: 0);
|
||||||
|
await distributor.StartAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
// A live subscriber forces the pump to fan (and thereby retain) each event,
|
||||||
|
// and gives us a deterministic point to know the pump has processed event 5.
|
||||||
|
using IEventSubscriberLease lease = distributor.Register();
|
||||||
|
for (ulong sequence = 1; sequence <= 5; sequence++)
|
||||||
|
{
|
||||||
|
source.Writer.TryWrite(Event(sequence));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (ulong sequence = 1; sequence <= 5; sequence++)
|
||||||
|
{
|
||||||
|
MxEvent e = await ReadOneAsync(lease.Reader);
|
||||||
|
Assert.Equal(sequence, e.WorkerSequence);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Capacity 3 retains only the newest three: sequences 3, 4, 5. Events 1 and 2
|
||||||
|
// were evicted, so a caller asking from 0 missed events => gap=true, and it
|
||||||
|
// gets only the retained tail.
|
||||||
|
bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList<MxEvent> replay, out bool gap);
|
||||||
|
|
||||||
|
Assert.True(found);
|
||||||
|
Assert.True(gap);
|
||||||
|
Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReplayBuffer_WithinRetainedWindow_ReturnsNewerEvents_NoGap()
|
||||||
|
{
|
||||||
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||||
|
await using SessionEventDistributor distributor = CreateDistributor(
|
||||||
|
source.Reader,
|
||||||
|
replayBufferCapacity: 10,
|
||||||
|
replayRetentionSeconds: 0);
|
||||||
|
await distributor.StartAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
using IEventSubscriberLease lease = distributor.Register();
|
||||||
|
for (ulong sequence = 1; sequence <= 5; sequence++)
|
||||||
|
{
|
||||||
|
source.Writer.TryWrite(Event(sequence));
|
||||||
|
_ = await ReadOneAsync(lease.Reader);
|
||||||
|
}
|
||||||
|
|
||||||
|
// afterSequence 2 is still inside the retained window [1..5], so no gap and
|
||||||
|
// exactly the newer events 3, 4, 5 come back.
|
||||||
|
bool found = distributor.TryGetReplayFrom(2, out IReadOnlyList<MxEvent> replay, out bool gap);
|
||||||
|
|
||||||
|
Assert.True(found);
|
||||||
|
Assert.False(gap);
|
||||||
|
Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReplayBuffer_AgedEntries_AreEvictedAfterRetentionElapses()
|
||||||
|
{
|
||||||
|
FakeTimeProvider time = new(new DateTimeOffset(2026, 1, 1, 0, 0, 0, TimeSpan.Zero));
|
||||||
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||||
|
await using SessionEventDistributor distributor = CreateDistributor(
|
||||||
|
source.Reader,
|
||||||
|
replayBufferCapacity: 100,
|
||||||
|
replayRetentionSeconds: 30,
|
||||||
|
timeProvider: time);
|
||||||
|
await distributor.StartAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
using IEventSubscriberLease lease = distributor.Register();
|
||||||
|
|
||||||
|
// Two old events, then advance the clock well past the retention window.
|
||||||
|
source.Writer.TryWrite(Event(1));
|
||||||
|
source.Writer.TryWrite(Event(2));
|
||||||
|
_ = await ReadOneAsync(lease.Reader);
|
||||||
|
_ = await ReadOneAsync(lease.Reader);
|
||||||
|
|
||||||
|
time.Advance(TimeSpan.FromSeconds(60));
|
||||||
|
|
||||||
|
// A fresh event triggers age-eviction of the now-stale entries 1 and 2.
|
||||||
|
source.Writer.TryWrite(Event(3));
|
||||||
|
_ = await ReadOneAsync(lease.Reader);
|
||||||
|
|
||||||
|
bool found = distributor.TryGetReplayFrom(0, out IReadOnlyList<MxEvent> replay, out bool gap);
|
||||||
|
|
||||||
|
Assert.True(found);
|
||||||
|
// Events 1 and 2 aged out; only 3 remains, and 0 predates the oldest retained.
|
||||||
|
Assert.Equal(new ulong[] { 3 }, replay.Select(e => e.WorkerSequence));
|
||||||
|
Assert.True(gap);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReplayBuffer_AfterSequenceNewerThanAllRetained_ReturnsEmpty_NoGap()
|
||||||
|
{
|
||||||
|
Channel<MxEvent> source = Channel.CreateUnbounded<MxEvent>();
|
||||||
|
await using SessionEventDistributor distributor = CreateDistributor(
|
||||||
|
source.Reader,
|
||||||
|
replayBufferCapacity: 10,
|
||||||
|
replayRetentionSeconds: 0);
|
||||||
|
await distributor.StartAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
using IEventSubscriberLease lease = distributor.Register();
|
||||||
|
for (ulong sequence = 1; sequence <= 3; sequence++)
|
||||||
|
{
|
||||||
|
source.Writer.TryWrite(Event(sequence));
|
||||||
|
_ = await ReadOneAsync(lease.Reader);
|
||||||
|
}
|
||||||
|
|
||||||
|
// afterSequence 3 is at/after the newest retained; nothing newer, and the
|
||||||
|
// caller is fully caught up => empty list, gap=false.
|
||||||
|
bool found = distributor.TryGetReplayFrom(3, out IReadOnlyList<MxEvent> replay, out bool gap);
|
||||||
|
|
||||||
|
Assert.True(found);
|
||||||
|
Assert.False(gap);
|
||||||
|
Assert.Empty(replay);
|
||||||
|
}
|
||||||
|
|
||||||
private static SessionEventDistributor CreateDistributor(ChannelReader<MxEvent> source)
|
private static SessionEventDistributor CreateDistributor(ChannelReader<MxEvent> source)
|
||||||
|
=> CreateDistributor(source, replayBufferCapacity: 1024, replayRetentionSeconds: 300);
|
||||||
|
|
||||||
|
private static SessionEventDistributor CreateDistributor(
|
||||||
|
ChannelReader<MxEvent> source,
|
||||||
|
int replayBufferCapacity,
|
||||||
|
double replayRetentionSeconds,
|
||||||
|
TimeProvider? timeProvider = null)
|
||||||
=> new(
|
=> new(
|
||||||
"session-test",
|
"session-test",
|
||||||
ct => source.ReadAllAsync(ct),
|
ct => source.ReadAllAsync(ct),
|
||||||
subscriberQueueCapacity: 64,
|
subscriberQueueCapacity: 64,
|
||||||
NullLogger<SessionEventDistributor>.Instance);
|
replayBufferCapacity: replayBufferCapacity,
|
||||||
|
replayRetentionSeconds: replayRetentionSeconds,
|
||||||
|
NullLogger<SessionEventDistributor>.Instance,
|
||||||
|
timeProvider ?? TimeProvider.System);
|
||||||
|
|
||||||
private static MxEvent Event(ulong sequence)
|
private static MxEvent Event(ulong sequence)
|
||||||
=> new() { SessionId = "session-test", WorkerSequence = sequence };
|
=> new() { SessionId = "session-test", WorkerSequence = sequence };
|
||||||
|
|||||||
Reference in New Issue
Block a user