From 36ab8d15f1de9aad3cd09f67f160d8d086515886 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 16 Jun 2026 07:22:19 -0400 Subject: [PATCH] feat(sessions): replay-on-reconnect with ReplayGap sentinel --- docs/Sessions.md | 20 +- .../Grpc/EventStreamService.cs | 89 ++++++- .../EventSubscriberReplayAttachment.cs | 43 ++++ .../Sessions/GatewaySession.cs | 95 +++++++ .../Sessions/SessionEventDistributor.cs | 175 +++++++++++-- .../Gateway/Grpc/EventStreamServiceTests.cs | 239 +++++++++++++++++- .../Sessions/SessionEventDistributorTests.cs | 104 ++++++++ 7 files changed, 736 insertions(+), 29 deletions(-) create mode 100644 src/ZB.MOM.WW.MxGateway.Server/Sessions/EventSubscriberReplayAttachment.cs diff --git a/docs/Sessions.md b/docs/Sessions.md index 884242a..f9d0fac 100644 --- a/docs/Sessions.md +++ b/docs/Sessions.md @@ -205,7 +205,25 @@ Sessions open with `MxGateway:Sessions:DefaultLeaseSeconds` (default 1800) added Mechanically: when the last external subscriber detaches and `DetachGraceSeconds > 0`, `DetachEventSubscriber` stamps `DetachedAtUtc` from the session's `TimeProvider` under `_syncRoot` (the detach→grace-start transition). `AttachEventSubscriber` clears `DetachedAtUtc` under the same lock when a subscriber re-attaches (the reattach→grace-cancel transition), so the two races and the sweeper's read all serialize on `_syncRoot`. `SessionManager.CloseExpiredLeasesAsync` checks `IsDetachGraceExpired(now)` alongside `IsLeaseExpired(now)`: a session detached for at least `DetachGraceSeconds` with no active external subscriber is closed by the same lease sweep, with the distinct `DetachGraceExpiredReason` (`"detach-grace-expired"`) so operators can tell a short reconnect-window expiry from a long idle-lease expiry. Setting `DetachGraceSeconds` to `0` disables retention and reverts to the original behavior: a detached session is retained only until its normal lease expires. -The reconnect/replay path that re-attaches a dropped client to a retained session is implemented separately (Task 12); `DetachGraceSeconds` controls retention and expiry only. +`DetachGraceSeconds` controls retention and expiry only; the reconnect/replay path that re-attaches a dropped client to a retained session is described in [Reconnect and replay](#reconnect-and-replay). + +#### Reconnect and replay + +A client that drops mid-stream reconnects by re-issuing `StreamEvents` with `StreamEventsRequest.after_worker_sequence` set to the last `worker_sequence` it observed. A non-zero `after_worker_sequence` means *resume*; `0` means *fresh stream* and behaves exactly as a first-time subscribe — no replay, no sentinel. + +On a resume, `EventStreamService.StreamEventsAsync` attaches through `GatewaySession.AttachEventSubscriberWithReplay`, which calls `SessionEventDistributor.RegisterWithReplay`. That method snapshots the session's replay ring for events newer than `after_worker_sequence` **and** registers the live subscriber inside a single `_replayLock` critical section. This atomicity is what makes the replay→live handoff free of gaps and duplicates: the pump appends each event to the replay ring (under `_replayLock`) before fanning it to subscriber channels, so relative to that one critical section every event is either in the replay snapshot or fanned into the freshly-registered live channel — never both observably, never neither. + +The handoff is sealed by a watermark. `RegisterWithReplay` returns `LiveResumeSequence` (the highest replayed sequence, or `after_worker_sequence` when nothing was replayed); `EventStreamService` then filters the live channel to events strictly greater than that watermark. An event that was both included in the replay snapshot and — racing the registration — also written to the live channel has `worker_sequence <= LiveResumeSequence`, so the live filter drops it exactly once (no duplicate), while every newer event is delivered (no gap). The same per-item filter governs replayed and live events identically, so a constrained or resuming caller never sees a replayed event it could not have seen live. + +Emit order on a resumed stream: + +1. **ReplayGap sentinel (only when events were evicted).** If the requested `after_worker_sequence` predates the oldest event still retained — i.e. events in the open interval were dropped by capacity or age eviction and are unrecoverable — the gateway first yields a single sentinel `MxEvent` with `replay_gap` populated (`requested_after_sequence` = the requested watermark, `oldest_available_sequence` = the oldest still-retained sequence). The sentinel carries the session id; its `family` is `UNSPECIFIED`, its `body` oneof is unset, and no per-item fields are populated. It is an explicit, documented control signal — *not* a synthesized MXAccess event — telling the client to discard local state and re-snapshot. A client that wants to resume without another gap should set `after_worker_sequence = oldest_available_sequence - 1` on its next request. +2. **Retained replay batch.** The still-retained events newer than the requested watermark, in ascending `worker_sequence` order. +3. **Live events**, resuming strictly after `LiveResumeSequence`. + +When `after_worker_sequence` is inside the retained window (nothing was evicted), step 1 is skipped: the stream replays the retained tail then resumes live with no sentinel. + +The ReplayGap sentinel is emitted **only** on the `StreamEvents` server stream and only to the resuming subscriber — it is never fanned to other subscribers and never appears in `DrainEventsReply` (the diagnostic drain path is untouched). Replay retention itself is bounded by `MxGateway:Events:ReplayBufferCapacity` (count) and `ReplayRetentionSeconds` (age); see [Configuration](GatewayConfiguration.md). ### Close diff --git a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs index 6653aff..8b7ee51 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Grpc/EventStreamService.cs @@ -71,17 +71,80 @@ public sealed class EventStreamService( // The subscriber mode (single vs. multi) is derived inside AttachEventSubscriber from // the session's own SessionEventStreaming.AllowMultipleEventSubscribers field — the // same source the distributor uses — so the two cannot diverge. - IEventSubscriberLease subscriber = session.AttachEventSubscriber( - options.Value.Sessions.MaxEventSubscribersPerSession); + // + // Reconnect/resume (Task 12): when AfterWorkerSequence > 0 the client is resuming, so + // attach via the replay variant that atomically snapshots the replay ring AND registers + // the live subscriber under one lock. That single critical section is the crux of the + // no-gap/no-duplicate handoff: every replayed event has sequence <= LiveResumeSequence + // and every live event delivered below is filtered to sequence > LiveResumeSequence, so + // an event that was both replayed and (racing the registration) fanned into the live + // channel is dropped exactly once, while no newer event is skipped. See + // SessionEventDistributor.RegisterWithReplay for the full argument. + // + // AfterWorkerSequence == 0 (fresh stream, not a resume) keeps the pre-Task-12 behavior: + // a plain attach, no replay, no sentinel, and the live filter watermark stays 0. + ulong afterWorkerSequence = request.AfterWorkerSequence; + IEventSubscriberLease subscriber; + IReadOnlyList replayedEvents = []; + bool replayGap = false; + ulong oldestAvailableSequence = 0; + + if (afterWorkerSequence > 0) + { + EventSubscriberReplayAttachment attachment = session.AttachEventSubscriberWithReplay( + options.Value.Sessions.MaxEventSubscribersPerSession, + afterWorkerSequence); + subscriber = attachment.Lease; + replayedEvents = attachment.ReplayedEvents; + replayGap = attachment.Gap; + oldestAvailableSequence = attachment.OldestAvailableSequence; + + // The live filter resumes strictly after the last replayed sequence (or, when + // nothing was replayed, after the requested watermark). This is what makes the + // handoff free of duplicates: anything <= this watermark was already replayed. + afterWorkerSequence = attachment.LiveResumeSequence; + } + else + { + subscriber = session.AttachEventSubscriber( + options.Value.Sessions.MaxEventSubscribersPerSession); + } int streamQueueDepth = 0; - ulong afterWorkerSequence = request.AfterWorkerSequence; IAsyncEnumerator reader = subscriber.Reader .ReadAllAsync(cancellationToken) .GetAsyncEnumerator(cancellationToken); try { + // Emit order for a resume: the ReplayGap sentinel FIRST (only when events were + // evicted), then the still-retained replay batch, then live. The sentinel is an + // explicit documented control signal (not a synthesized MXAccess event) and is + // delivered ONLY to this resuming subscriber — it is never fanned to other + // subscribers and never appears in DrainEventsReply (that path is untouched). + if (replayGap) + { + yield return CreateReplayGapSentinel( + request.SessionId, + request.AfterWorkerSequence, + oldestAvailableSequence); + } + + foreach (MxEvent replayedEvent in replayedEvents) + { + // Replayed events pass through the SAME per-item filter the live loop applies, + // so a constrained/resuming caller never sees a replayed event it could not + // have seen live. The watermark dropped events at/below the requested + // AfterWorkerSequence; the snapshot already excluded those, but this keeps the + // filter identical for replay and live. + if (replayedEvent.WorkerSequence <= request.AfterWorkerSequence) + { + continue; + } + + yield return replayedEvent; + } + while (true) { MxEvent mxEvent; @@ -144,4 +207,24 @@ public sealed class EventStreamService( metrics.StreamDisconnected("Detached"); } } + + // Builds the single ReplayGap control sentinel emitted at the head of a resumed + // StreamEvents stream when the requested AfterWorkerSequence predates the oldest event + // still retained (events were evicted). Per the proto contract (MxEvent.replay_gap), + // the sentinel carries the session id and the populated ReplayGap, with family + // UNSPECIFIED, no body, and no per-item fields. It is a documented control signal — NOT a + // synthesized MXAccess event — so emitting it does not violate the no-synthesis rule. + private static MxEvent CreateReplayGapSentinel( + string sessionId, + ulong requestedAfterSequence, + ulong oldestAvailableSequence) + => new() + { + SessionId = sessionId, + ReplayGap = new ReplayGap + { + RequestedAfterSequence = requestedAfterSequence, + OldestAvailableSequence = oldestAvailableSequence, + }, + }; } diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/EventSubscriberReplayAttachment.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/EventSubscriberReplayAttachment.cs new file mode 100644 index 0000000..36a5c4a --- /dev/null +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/EventSubscriberReplayAttachment.cs @@ -0,0 +1,43 @@ +using ZB.MOM.WW.MxGateway.Contracts.Proto; + +namespace ZB.MOM.WW.MxGateway.Server.Sessions; + +/// +/// The result of a reconnect/resume attach +/// (, Task 12): the live +/// subscriber lease plus the replay batch and resume watermarks snapshotted atomically +/// with the registration, so the replay→live handoff has no gap and no duplicate. +/// +/// +/// The live event subscriber lease. Disposing it unregisters the distributor subscriber +/// and decrements the session's active-subscriber count, exactly as a fresh attach. +/// +/// +/// Retained events with worker sequence strictly greater than the requested +/// afterSequence, in ascending order. These must be yielded (after the optional +/// gap sentinel) before live events. Never null; empty when nothing newer is retained. +/// +/// +/// when events between the requested afterSequence and the +/// oldest retained event were already evicted, so the client missed unrecoverable events. +/// When the caller emits a ReplayGap sentinel before the +/// replay batch. +/// +/// +/// The oldest worker sequence still retained and replayable; 0 when nothing is +/// retained. Populates the ReplayGap.oldest_available_sequence field. Meaningful +/// only when is . +/// +/// +/// The worker sequence the live channel must resume strictly after: the highest replayed +/// sequence, or the requested afterSequence when nothing was replayed. The caller +/// applies this as the per-subscriber live filter so any event both replayed and fanned +/// into the live channel is dropped exactly once (no duplicate) while every newer event +/// is delivered (no gap). +/// +public readonly record struct EventSubscriberReplayAttachment( + IEventSubscriberLease Lease, + IReadOnlyList ReplayedEvents, + bool Gap, + ulong OldestAvailableSequence, + ulong LiveResumeSequence); diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs index 2253897..be4d36f 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/GatewaySession.cs @@ -433,6 +433,32 @@ public sealed class GatewaySession return lease; } + // Reconnect/resume variant of StartDistributorAndRegister (Task 12). Snapshots the replay + // ring for events newer than afterSequence AND registers the live subscriber atomically + // under the distributor's replay lock, so the replay→live handoff has no gap and no + // duplicate (see SessionEventDistributor.RegisterWithReplay). The pump is started after + // registration, exactly as the fresh-attach path, so the very first subscriber on a + // freshly-Ready session still sees the stream from its beginning. + private IEventSubscriberLease StartDistributorAndRegisterWithReplay( + ulong afterSequence, + out IReadOnlyList replayedEvents, + out bool gap, + out ulong oldestAvailableSequence, + out ulong liveResumeSequence) + { + SessionEventDistributor distributor = EnsureDistributorCreated(out bool startNow); + + IEventSubscriberLease lease = distributor.RegisterWithReplay( + afterSequence, + out replayedEvents, + out gap, + out oldestAvailableSequence, + out liveResumeSequence); + StartPumpIfRequested(distributor, startNow); + + return lease; + } + // Constructs the distributor exactly once and reports whether THIS caller is the one // that should start the pump (i.e. it observed the unstarted state and claimed the // start). Both the construction and the started-flag flip happen under _syncRoot so two @@ -811,6 +837,75 @@ public sealed class GatewaySession } } + /// + /// Reconnect/resume variant of (Task 12). Attaches + /// an event subscriber AND atomically snapshots the session replay ring for events newer + /// than , so a resuming client can replay what it missed + /// before live delivery resumes — with no gap and no duplicate across the handoff. + /// + /// See . + /// + /// The last worker sequence the resuming client already observed. Replay returns events + /// strictly newer than this; the caller must filter the live channel to events strictly + /// newer than . + /// + /// + /// The lease plus the replay batch, gap flag, and resume watermarks. See + /// for the no-gap/no-duplicate + /// guarantee. + /// + public EventSubscriberReplayAttachment AttachEventSubscriberWithReplay(int maxSubscribers, ulong afterSequence) + { + bool allowMultipleSubscribers = _eventStreaming.AllowMultipleEventSubscribers; + int effectiveCap = allowMultipleSubscribers ? Math.Max(1, maxSubscribers) : 1; + + lock (_syncRoot) + { + if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready) + { + throw new SessionManagerException( + SessionManagerErrorCode.SessionNotReady, + $"Session {SessionId} is not ready for event streaming. Current state is {_state}."); + } + + if (_activeEventSubscriberCount >= effectiveCap) + { + throw allowMultipleSubscribers + ? new SessionManagerException( + SessionManagerErrorCode.EventSubscriberLimitReached, + $"Session {SessionId} has reached its maximum of {effectiveCap} concurrent event stream subscribers.") + : new SessionManagerException( + SessionManagerErrorCode.EventSubscriberAlreadyActive, + $"Session {SessionId} already has an active event stream subscriber."); + } + + _activeEventSubscriberCount++; + _detachedAtUtc = null; + } + + try + { + IEventSubscriberLease distributorLease = StartDistributorAndRegisterWithReplay( + afterSequence, + out IReadOnlyList replayedEvents, + out bool gap, + out ulong oldestAvailableSequence, + out ulong liveResumeSequence); + + return new EventSubscriberReplayAttachment( + new EventSubscriberLease(this, distributorLease), + replayedEvents, + gap, + oldestAvailableSequence, + liveResumeSequence); + } + catch + { + DetachEventSubscriber(); + throw; + } + } + /// /// Invokes a worker command synchronously and returns the reply. /// diff --git a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs index 89cc1ce..d2f3e77 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Sessions/SessionEventDistributor.cs @@ -287,30 +287,14 @@ public sealed class SessionEventDistributor : IAsyncDisposable /// public IEventSubscriberLease Register(bool isInternal = false) { - // The pump is the single writer for this channel; readers are single-consumer - // (one gRPC stream / dashboard subscriber). Synchronous continuations are - // disabled so a slow reader can never stall the pump on its completion. - // - // The pump MUST stay non-blocking: it writes with the non-blocking TryWrite so one - // slow reader can never stall the single pump that feeds every subscriber. FullMode - // is deliberately Wait — NOT because the pump ever blocks (it never calls the blocking - // WriteAsync overload), but because Wait is the only BoundedChannelFullMode under - // which TryWrite returns false when the channel is full. That false return IS the - // overflow signal the pump needs to apply the per-subscriber backpressure policy. The - // Drop* modes would make TryWrite silently succeed-and-drop, hiding overflow and - // re-introducing the silent data loss this task removes. So: Wait mode + TryWrite = - // a non-blocking pump that still detects a full subscriber channel. - Channel channel = Channel.CreateBounded( - new BoundedChannelOptions(_subscriberQueueCapacity) - { - SingleReader = true, - SingleWriter = true, - FullMode = BoundedChannelFullMode.Wait, - AllowSynchronousContinuations = false, - }); - + Channel channel = CreateSubscriberChannel(); long id = Interlocked.Increment(ref _nextSubscriberId); Subscriber subscriber = new(id, channel, isInternal); + return RegisterSubscriber(subscriber); + } + + private IEventSubscriberLease RegisterSubscriber(Subscriber subscriber) + { // The disposed check AND the map add happen under the same lock with no await // in between. DisposeAsync sets _disposed=true under this same lock before it @@ -320,7 +304,152 @@ public sealed class SessionEventDistributor : IAsyncDisposable lock (_lifecycleLock) { ObjectDisposedException.ThrowIf(_disposed, this); - _subscribers[id] = subscriber; + _subscribers[subscriber.Id] = subscriber; + } + + return new SubscriberLease(this, subscriber); + } + + // Creates a per-subscriber bounded channel. The pump is the single writer; readers are + // single-consumer (one gRPC stream / dashboard subscriber). Synchronous continuations are + // disabled so a slow reader can never stall the pump on its completion. + // + // The pump MUST stay non-blocking: it writes with the non-blocking TryWrite so one slow + // reader can never stall the single pump that feeds every subscriber. FullMode is + // deliberately Wait — NOT because the pump ever blocks (it never calls the blocking + // WriteAsync overload), but because Wait is the only BoundedChannelFullMode under which + // TryWrite returns false when the channel is full. That false return IS the overflow signal + // the pump needs to apply the per-subscriber backpressure policy. The Drop* modes would + // make TryWrite silently succeed-and-drop, hiding overflow and re-introducing silent data + // loss. So: Wait mode + TryWrite = a non-blocking pump that still detects a full channel. + private Channel CreateSubscriberChannel() + => Channel.CreateBounded( + new BoundedChannelOptions(_subscriberQueueCapacity) + { + SingleReader = true, + SingleWriter = true, + FullMode = BoundedChannelFullMode.Wait, + AllowSynchronousContinuations = false, + }); + + /// + /// Atomically snapshots the replay ring for events newer than + /// AND registers a live subscriber, so the + /// replay→live handoff has no gap and no duplicate (Task 12 reconnect/resume). + /// + /// + /// The last worker sequence the reconnecting client already observed. Replay returns + /// events strictly newer than this; the live channel is filtered (by the caller) to + /// events strictly newer than the last replayed sequence. + /// + /// + /// The retained events newer than , in ascending + /// sequence order. Never null; empty when nothing newer is retained. + /// + /// + /// when events between and the + /// oldest retained event were already evicted (capacity/age), so the client missed + /// events that can no longer be replayed and must re-snapshot. Mirrors + /// gap semantics. + /// + /// + /// The oldest worker sequence still retained and replayable. 0 when nothing is + /// retained. Meaningful to the caller only when is + /// (it populates the ReplayGap sentinel's + /// oldest_available_sequence). + /// + /// + /// The worker sequence the live channel must resume strictly after: the highest + /// replayed sequence, or when nothing was replayed. + /// The caller MUST apply this as the per-subscriber live filter so any event that was + /// both replayed here and subsequently fanned into this subscriber's live channel is + /// dropped exactly once (no duplicate), while every newer event is delivered (no gap). + /// + /// + /// for a gateway-owned internal subscriber. See + /// . + /// + /// + /// + /// Why this is atomic and the handoff is correct. The replay snapshot and the + /// subscriber registration both run inside the SAME _replayLock critical + /// section. The pump appends each event to the replay buffer under _replayLock + /// before fanning it to subscribers (outside the lock). Therefore, relative + /// to this method's critical section, for every event E: + /// + /// + /// + /// If the pump appended E before this critical section, E is in + /// (when newer than + /// ). The pump's fan-out of E may race the + /// registration: if it writes E to this new channel too, E's sequence is + /// <= liveResumeSequence, so the caller's live filter DROPS it — no + /// duplicate. + /// + /// + /// If the pump appends E after this critical section, E is NOT in the snapshot, + /// but this subscriber is already registered, so the pump fans E into the live + /// channel with sequence > liveResumeSequence — delivered as live, no + /// gap. + /// + /// + /// + /// Lock ordering: this is the only path that holds both _replayLock and + /// _lifecycleLock; it always takes _replayLock first then + /// _lifecycleLock. No other path acquires both, so there is no inversion. + /// + /// + public IEventSubscriberLease RegisterWithReplay( + ulong afterSequence, + out IReadOnlyList replayedEvents, + out bool gap, + out ulong oldestAvailableSequence, + out ulong liveResumeSequence, + bool isInternal = false) + { + Channel channel = CreateSubscriberChannel(); + long id = Interlocked.Increment(ref _nextSubscriberId); + Subscriber subscriber = new(id, channel, isInternal); + + // Snapshot replay AND register under a single _replayLock section so the live channel + // begins exactly where the replay snapshot ends — see the remarks for the no-gap / + // no-duplicate argument. _lifecycleLock is nested inside (consistent ordering) only to + // honor the disposed check and the same add semantics as Register. + lock (_replayLock) + { + EvictAged(); + + List newer = []; + ulong highestReplayed = afterSequence; + + if (_replayBuffer.Count == 0) + { + oldestAvailableSequence = 0; + gap = _anyEventSeen && afterSequence < _highestSequenceSeen; + } + else + { + oldestAvailableSequence = _replayBuffer.First!.Value.Event.WorkerSequence; + gap = oldestAvailableSequence > 0 && afterSequence < oldestAvailableSequence - 1; + + foreach (ReplayEntry entry in _replayBuffer) + { + if (entry.Event.WorkerSequence > afterSequence) + { + newer.Add(entry.Event); + highestReplayed = entry.Event.WorkerSequence; + } + } + } + + replayedEvents = newer; + liveResumeSequence = highestReplayed; + + lock (_lifecycleLock) + { + ObjectDisposedException.ThrowIf(_disposed, this); + _subscribers[id] = subscriber; + } } return new SubscriberLease(this, subscriber); diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs index b09d3f9..a44f36e 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs @@ -300,6 +300,218 @@ public sealed class EventStreamServiceTests Assert.Equal(1, metrics.GetSnapshot().Faults); } + /// + /// Task 12: resuming with AfterWorkerSequence inside the retained window replays exactly + /// the newer retained events (in order, no dup) then live, with NO ReplayGap sentinel. + /// + [Fact] + public async Task StreamEventsAsync_ResumeWithinRetainedWindow_ReplaysNewerThenLive_NoSentinel() + { + System.Threading.Channels.Channel live = + System.Threading.Channels.Channel.CreateUnbounded(); + FakeWorkerClient workerClient = new() { LiveEvents = live }; + for (ulong sequence = 1; sequence <= 5; sequence++) + { + workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange)); + } + + GatewaySession session = CreateReadySession(workerClient); + EventStreamService service = CreateService(new FakeSessionManager(session)); + + // Prime: drain the static 1..5 through a first subscriber so the replay ring retains them. + await PrimeReplayAsync(service, session.SessionId, expectedCount: 5); + + // Resume after sequence 2: retained window [1..5] covers it — replay 3,4,5 then live. + await using IAsyncEnumerator resume = service + .StreamEventsAsync(CreateRequest(session.SessionId, afterWorkerSequence: 2), CancellationToken.None) + .GetAsyncEnumerator(); + + MxEvent r3 = await ReadNextAsync(resume); + MxEvent r4 = await ReadNextAsync(resume); + MxEvent r5 = await ReadNextAsync(resume); + Assert.Equal(new ulong[] { 3, 4, 5 }, new[] { r3.WorkerSequence, r4.WorkerSequence, r5.WorkerSequence }); + Assert.Null(r3.ReplayGap); + + // No sentinel anywhere; next is a LIVE event. + live.Writer.TryWrite(CreateWorkerEvent(6, MxEventFamily.OnDataChange)); + MxEvent liveEvent = await ReadNextAsync(resume); + Assert.Equal(6ul, liveEvent.WorkerSequence); + Assert.Null(liveEvent.ReplayGap); + } + + /// + /// Task 12: resuming with AfterWorkerSequence older than the oldest retained yields the + /// ReplayGap sentinel FIRST (correct requested/oldest), then the retained tail, then live. + /// + [Fact] + public async Task StreamEventsAsync_ResumeOlderThanOldestRetained_EmitsSentinelFirst_ThenTailThenLive() + { + System.Threading.Channels.Channel live = + System.Threading.Channels.Channel.CreateUnbounded(); + FakeWorkerClient workerClient = new() { LiveEvents = live }; + for (ulong sequence = 1; sequence <= 5; sequence++) + { + workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange)); + } + + // Replay capacity 3 retains only 3,4,5; 1,2 are evicted. + GatewaySession session = CreateReadySession(workerClient, replayBufferCapacity: 3); + EventStreamService service = CreateService(new FakeSessionManager(session)); + + await PrimeReplayAsync(service, session.SessionId, expectedCount: 5); + + // Resume after 1: events 1,2 are below the oldest retained (3) and were evicted, so + // they are unrecoverable => sentinel first, then the retained tail 3,4,5, then live. + await using IAsyncEnumerator realResume = service + .StreamEventsAsync(CreateRequest(session.SessionId, afterWorkerSequence: 1), CancellationToken.None) + .GetAsyncEnumerator(); + + MxEvent sentinel = await ReadNextAsync(realResume); + Assert.NotNull(sentinel.ReplayGap); + Assert.Equal(1ul, sentinel.ReplayGap.RequestedAfterSequence); + Assert.Equal(3ul, sentinel.ReplayGap.OldestAvailableSequence); + Assert.Equal(MxEventFamily.Unspecified, sentinel.Family); + Assert.Equal(session.SessionId, sentinel.SessionId); + + MxEvent r3 = await ReadNextAsync(realResume); + MxEvent r4 = await ReadNextAsync(realResume); + MxEvent r5 = await ReadNextAsync(realResume); + Assert.Equal(new ulong[] { 3, 4, 5 }, new[] { r3.WorkerSequence, r4.WorkerSequence, r5.WorkerSequence }); + Assert.Null(r3.ReplayGap); + + live.Writer.TryWrite(CreateWorkerEvent(6, MxEventFamily.OnDataChange)); + MxEvent liveEvent = await ReadNextAsync(realResume); + Assert.Equal(6ul, liveEvent.WorkerSequence); + } + + /// + /// Task 12: the replay→live boundary is contiguous — no duplicate and no skip — even + /// when events span the handoff. + /// + [Fact] + public async Task StreamEventsAsync_ResumeHandoff_IsContiguous_NoDuplicateNoSkip() + { + System.Threading.Channels.Channel live = + System.Threading.Channels.Channel.CreateUnbounded(); + FakeWorkerClient workerClient = new() { LiveEvents = live }; + for (ulong sequence = 1; sequence <= 4; sequence++) + { + workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange)); + } + + GatewaySession session = CreateReadySession(workerClient); + EventStreamService service = CreateService(new FakeSessionManager(session)); + + await PrimeReplayAsync(service, session.SessionId, expectedCount: 4); + + // Resume after 2: replay 3,4 then live 5,6,7. Collect across the boundary and assert + // the full sequence is contiguous with no duplicate and no skip. + await using IAsyncEnumerator resume = service + .StreamEventsAsync(CreateRequest(session.SessionId, afterWorkerSequence: 2), CancellationToken.None) + .GetAsyncEnumerator(); + + List collected = []; + collected.Add((await ReadNextAsync(resume)).WorkerSequence); // 3 + collected.Add((await ReadNextAsync(resume)).WorkerSequence); // 4 + + for (ulong sequence = 5; sequence <= 7; sequence++) + { + live.Writer.TryWrite(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange)); + collected.Add((await ReadNextAsync(resume)).WorkerSequence); + } + + Assert.Equal(new ulong[] { 3, 4, 5, 6, 7 }, collected); + } + + /// + /// Task 12: the per-item filter applies to REPLAYED events identically to live — a + /// replayed event at/below the requested watermark is never delivered. + /// + [Fact] + public async Task StreamEventsAsync_ResumeReplay_AppliesPerItemFilter_DropsAtOrBelowWatermark() + { + System.Threading.Channels.Channel live = + System.Threading.Channels.Channel.CreateUnbounded(); + FakeWorkerClient workerClient = new() { LiveEvents = live }; + for (ulong sequence = 1; sequence <= 5; sequence++) + { + workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange)); + } + + GatewaySession session = CreateReadySession(workerClient); + EventStreamService service = CreateService(new FakeSessionManager(session)); + + await PrimeReplayAsync(service, session.SessionId, expectedCount: 5); + + // Resume after 3: only 4,5 may be delivered. Events 1,2,3 — present in the ring but at + // or below the watermark — must be filtered out of the replay, never seen. The first two + // reads must be exactly 4 then 5 (no sentinel, no <=3 event); a live tag confirms the + // stream resumed live strictly after 5. + await using IAsyncEnumerator resume = service + .StreamEventsAsync(CreateRequest(session.SessionId, afterWorkerSequence: 3), CancellationToken.None) + .GetAsyncEnumerator(); + + MxEvent first = await ReadNextAsync(resume); + MxEvent second = await ReadNextAsync(resume); + Assert.Equal(4ul, first.WorkerSequence); + Assert.Equal(5ul, second.WorkerSequence); + Assert.Null(first.ReplayGap); + Assert.Null(second.ReplayGap); + + // The very next delivered event is the live 6 — proving nothing <=3 slipped in and the + // handoff resumed strictly after the replay tail. + live.Writer.TryWrite(CreateWorkerEvent(6, MxEventFamily.OnDataChange)); + MxEvent liveEvent = await ReadNextAsync(resume); + Assert.Equal(6ul, liveEvent.WorkerSequence); + } + + /// + /// Task 12: AfterWorkerSequence == 0 is a fresh stream (not a resume) — no replay, no + /// sentinel, just live events as before. + /// + [Fact] + public async Task StreamEventsAsync_FreshStreamAfterSequenceZero_NoReplayNoSentinel() + { + FakeWorkerClient workerClient = new(); + for (ulong sequence = 1; sequence <= 3; sequence++) + { + workerClient.Events.Add(CreateWorkerEvent(sequence, MxEventFamily.OnDataChange)); + } + + workerClient.CompleteAfterConfiguredEvents = true; + GatewaySession session = CreateReadySession(workerClient); + EventStreamService service = CreateService(new FakeSessionManager(session)); + + List events = await CollectEventsAsync(service, session.SessionId); + + Assert.Equal(new ulong[] { 1, 2, 3 }, events.Select(e => e.WorkerSequence)); + Assert.DoesNotContain(events, e => e.ReplayGap is not null); + } + + // Drains the first `expectedCount` events through a throwaway subscriber so the session's + // replay ring retains them, then disposes the subscriber. The pump (started on first + // attach) keeps running for the session, so subsequent resume attaches see the retained + // events. + private static async Task PrimeReplayAsync( + EventStreamService service, + string sessionId, + int expectedCount) + { + await using IAsyncEnumerator primer = service + .StreamEventsAsync(CreateRequest(sessionId), CancellationToken.None) + .GetAsyncEnumerator(); + for (int i = 0; i < expectedCount; i++) + { + await ReadNextAsync(primer); + } + } + + private static async Task ReadNextAsync(IAsyncEnumerator enumerator) + { + Assert.True(await enumerator.MoveNextAsync().AsTask().WaitAsync(TestTimeout)); + return enumerator.Current; + } + private static EventStreamService CreateService( FakeSessionManager sessionManager, GatewayMetrics? metrics = null, @@ -334,11 +546,12 @@ public sealed class EventStreamServiceTests return events; } - private static StreamEventsRequest CreateRequest(string sessionId) + private static StreamEventsRequest CreateRequest(string sessionId, ulong afterWorkerSequence = 0) { return new StreamEventsRequest { SessionId = sessionId, + AfterWorkerSequence = afterWorkerSequence, }; } @@ -347,7 +560,8 @@ public sealed class EventStreamServiceTests string sessionId = "session-events", int queueCapacity = 8, GatewayMetrics? metrics = null, - EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast) + EventBackpressurePolicy backpressurePolicy = EventBackpressurePolicy.FailFast, + int replayBufferCapacity = 1024) { // The per-subscriber overflow policy now lives in the session's // SessionEventDistributor, so the session must share the same metrics sink and @@ -373,6 +587,8 @@ public sealed class EventStreamServiceTests { QueueCapacity = queueCapacity, BackpressurePolicy = backpressurePolicy, + ReplayBufferCapacity = replayBufferCapacity, + ReplayRetentionSeconds = 0, }, NullLogger.Instance, TimeProvider.System, @@ -513,6 +729,13 @@ public sealed class EventStreamServiceTests /// Gets or sets whether to complete the event stream after configured events are yielded. public bool CompleteAfterConfiguredEvents { get; set; } + /// + /// Optional live channel source. When set, the worker drains the static + /// first, then streams from this channel until it completes, + /// letting a test feed events on demand (e.g. to exercise replay→live handoff). + /// + public System.Threading.Channels.Channel? LiveEvents { get; init; } + /// Gets or sets an optional exception to throw as a terminal event stream fault. public Exception? TerminalException { get; init; } @@ -558,6 +781,18 @@ public sealed class EventStreamServiceTests throw TerminalException; } + if (LiveEvents is not null) + { + await foreach (WorkerEvent liveEvent in LiveEvents.Reader + .ReadAllAsync(cancellationToken) + .ConfigureAwait(false)) + { + yield return liveEvent; + } + + yield break; + } + if (CompleteAfterConfiguredEvents) { yield break; diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs index 0d5bc8d..1f3dc65 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Sessions/SessionEventDistributorTests.cs @@ -572,6 +572,110 @@ public sealed class SessionEventDistributorTests "isOnlySubscriber must be true for a lone external subscriber in single-subscriber mode."); } + [Fact] + public async Task RegisterWithReplay_WithinRetainedWindow_ReturnsNewerEvents_NoGap_ThenLive() + { + Channel source = Channel.CreateUnbounded(); + await using SessionEventDistributor distributor = CreateDistributor( + source.Reader, + replayBufferCapacity: 10, + replayRetentionSeconds: 0); + await distributor.StartAsync(CancellationToken.None); + + // A primer subscriber forces the pump to retain events 1..5 deterministically. + using IEventSubscriberLease primer = distributor.Register(); + for (ulong sequence = 1; sequence <= 5; sequence++) + { + source.Writer.TryWrite(Event(sequence)); + _ = await ReadOneAsync(primer.Reader); + } + + // Resume after sequence 2: retained window [1..5] still covers it — no gap, replay 3..5. + using IEventSubscriberLease resume = distributor.RegisterWithReplay( + 2, + out IReadOnlyList replay, + out bool gap, + out ulong oldestAvailable, + out ulong liveResume); + + Assert.False(gap); + Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence)); + Assert.Equal(5ul, liveResume); + Assert.Equal(1ul, oldestAvailable); + + // A subsequent live event flows to the resumed subscriber's channel. + source.Writer.TryWrite(Event(6)); + MxEvent live = await ReadOneAsync(resume.Reader); + Assert.Equal(6ul, live.WorkerSequence); + } + + [Fact] + public async Task RegisterWithReplay_BelowOldestRetained_ReportsGap_AndOldestAvailable() + { + Channel source = Channel.CreateUnbounded(); + await using SessionEventDistributor distributor = CreateDistributor( + source.Reader, + replayBufferCapacity: 3, + replayRetentionSeconds: 0); + await distributor.StartAsync(CancellationToken.None); + + using IEventSubscriberLease primer = distributor.Register(); + for (ulong sequence = 1; sequence <= 5; sequence++) + { + source.Writer.TryWrite(Event(sequence)); + _ = await ReadOneAsync(primer.Reader); + } + + // Capacity 3 retains 3,4,5; events 1,2 were evicted. Resume after 0 => gap, oldest=3. + using IEventSubscriberLease resume = distributor.RegisterWithReplay( + 0, + out IReadOnlyList replay, + out bool gap, + out ulong oldestAvailable, + out ulong liveResume); + + Assert.True(gap); + Assert.Equal(3ul, oldestAvailable); + Assert.Equal(new ulong[] { 3, 4, 5 }, replay.Select(e => e.WorkerSequence)); + Assert.Equal(5ul, liveResume); + } + + [Fact] + public async Task RegisterWithReplay_NothingRetainedNewer_LiveResumeEqualsAfterSequence_NoGap() + { + Channel source = Channel.CreateUnbounded(); + await using SessionEventDistributor distributor = CreateDistributor( + source.Reader, + replayBufferCapacity: 10, + replayRetentionSeconds: 0); + await distributor.StartAsync(CancellationToken.None); + + using IEventSubscriberLease primer = distributor.Register(); + for (ulong sequence = 1; sequence <= 3; sequence++) + { + source.Writer.TryWrite(Event(sequence)); + _ = await ReadOneAsync(primer.Reader); + } + + // Resume after 3 (newest retained): nothing newer, fully caught up — no gap, empty + // replay, and the live filter resumes after the requested watermark unchanged. + using IEventSubscriberLease resume = distributor.RegisterWithReplay( + 3, + out IReadOnlyList replay, + out bool gap, + out ulong oldestAvailable, + out ulong liveResume); + + Assert.False(gap); + Assert.Empty(replay); + Assert.Equal(3ul, liveResume); + Assert.Equal(1ul, oldestAvailable); + + source.Writer.TryWrite(Event(4)); + MxEvent live = await ReadOneAsync(resume.Reader); + Assert.Equal(4ul, live.WorkerSequence); + } + private static async Task DrainUntilFaultAsync(ChannelReader reader) { // Drains any buffered events, then surfaces the channel's completion fault (if any)