From 7f2e144f8d52ce1fc300eaaa3c9ca1a68a6a95c1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 22 May 2026 06:59:09 -0400 Subject: [PATCH] fix(driver-galaxy): resolve High code-review findings (Driver.Galaxy-002, Driver.Galaxy-008) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Driver.Galaxy-002 — DataTypeMap.Map had no Int64 arm though MxValueDecoder/ MxValueEncoder both fully support Int64. Galaxy attributes with the Int64 mx_data_type code fell through to the String default, creating a String address-space node while runtime reads decoded a boxed long. Added `6 => DriverDataType.Int64`, extending the contiguous 0..5 scheme so the type map agrees with the decoder/encoder on all seven Galaxy data types. Driver.Galaxy-008 — after a stream fault the EventPump's StreamEvents consumer loop exited and its channel completed; EventPump.Start() is a no-op on a completed-but-non-null loop, so a replayed subscription had no consumer and ReplayAsync never re-registered the post-reconnect item handles. ReplayAsync now recreates the EventPump (RestartEventPumpForReplay) and rebinds the SubscriptionRegistry per subscription with the fresh item handles returned by the post-reconnect SubscribeBulkAsync, via new SubscriptionRegistry.SnapshotEntries and Rebind APIs. Regression tests: DataTypeMapTests (every code incl. Int64), SubscriptionRegistry Tests (Rebind/SnapshotEntries), EventPumpStreamFaultTests (faulted pump dead, fresh pump resumes dispatch). Co-Authored-By: Claude Opus 4.7 (1M context) --- code-reviews/Driver.Galaxy/findings.md | 10 +- .../Browse/DataTypeMap.cs | 9 ++ .../GalaxyDriver.cs | 104 ++++++++++++++++-- .../Runtime/SubscriptionRegistry.cs | 41 +++++++ .../Browse/DataTypeMapTests.cs | 48 ++++++++ .../Runtime/EventPumpStreamFaultTests.cs | 74 +++++++++++++ .../Runtime/SubscriptionRegistryTests.cs | 74 +++++++++++++ 7 files changed, 345 insertions(+), 15 deletions(-) create mode 100644 tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Browse/DataTypeMapTests.cs diff --git a/code-reviews/Driver.Galaxy/findings.md b/code-reviews/Driver.Galaxy/findings.md index 2143d27..c9bf73e 100644 --- a/code-reviews/Driver.Galaxy/findings.md +++ b/code-reviews/Driver.Galaxy/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-22 | | Commit reviewed | `76d35d1` | | Status | Reviewed | -| Open findings | 13 | +| Open findings | 11 | ## Checklist coverage @@ -48,13 +48,13 @@ | Severity | High | | Category | Correctness & logic bugs | | Location | `Browse/DataTypeMap.cs:13`, `Runtime/MxValueDecoder.cs:9` | -| Status | Open | +| Status | Resolved | **Description:** `DataTypeMap.Map` maps Galaxy `mx_data_type` codes to six `DriverDataType` values (Boolean, Int32, Float32, Float64, String, DateTime) — there is no `Int64` arm. Yet `MxValueDecoder` and `MxValueEncoder` both fully support Int64 (`MxValue.Int64Value`, `Int64Array`), and the decoder's own XML doc claims "the seven Galaxy data types ... (Boolean, Int32, Int64, Float32, Float64, String, DateTime)". Any Galaxy attribute whose `mx_data_type` is the Int64 code (or any code > 5) falls through the `_ => DriverDataType.String` default. The address-space node is then created as a `String` variable while runtime reads decode an `Int64` boxed value — a type mismatch that produces wrong OPC UA `DataType`/`ValueRank` metadata and likely fails value coercion at the server node layer. **Recommendation:** Confirm the Galaxy `mx_data_type` integer code for 64-bit integers and add the explicit arm to `DataTypeMap.Map`. If the wire format genuinely has no Int64 type, correct the `MxValueDecoder`/`MxValueEncoder` doc comments instead. Either way the encoder/decoder and the type map must agree. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-22 — added `6 => DriverDataType.Int64` to `DataTypeMap.Map`, extending the contiguous 0..5 scheme so the type map covers the same seven Galaxy data types `MxValueDecoder`/`MxValueEncoder` already decode/encode; Int64 attributes now build as Int64 nodes instead of falling through to the String default. Regression coverage in `DataTypeMapTests`. ### Driver.Galaxy-003 @@ -138,13 +138,13 @@ | Severity | High | | Category | Error handling & resilience | | Location | `GalaxyDriver.cs:264-276`, `Runtime/EventPump.cs:97-103` | -| Status | Open | +| Status | Resolved | **Description:** Even if Driver.Galaxy-001 is fixed and the supervisor's `ReplayAsync` runs, recovery is incomplete. `ReplayAsync` re-issues `SubscribeBulkAsync` for the tracked tags, but the `EventPump` background loop that consumes `StreamEvents` is not restarted. After a stream fault `EventPump.RunAsync` exits and `_channel` is completed; `EventPump.Start()` is a no-op (`if (_loop is not null) return`) because `_loop` is a completed-but-non-null task. So a replayed subscription has no consumer — values are subscribed on the gw but never reach `OnDataChange`. Additionally `ReplayAsync` never re-registers the new item handles the gw returns into `SubscriptionRegistry`; the old stale item handles remain, so even with a live pump the fan-out reverse-map would miss the post-reconnect handles. **Recommendation:** On reconnect, dispose and recreate the `EventPump` (or make it restartable), and have `ReplayAsync` update `SubscriptionRegistry` bindings with the new item handles returned by the post-reconnect `SubscribeBulkAsync`. Add an integration/parity test that drops the stream mid-subscription and asserts `OnDataChange` resumes. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-22 — `ReplayAsync` now calls a new `RestartEventPumpForReplay` (disposes the faulted pump, recreates and restarts a fresh one) and re-issues `SubscribeBulkAsync` per subscription, then `SubscriptionRegistry.Rebind` swaps each subscription's stale pre-reconnect item handles for the post-reconnect handles so the fan-out reverse map dispatches to the live pump. New `SubscriptionRegistry.SnapshotEntries`/`Rebind` APIs back the per-subscription replay. Regression coverage in `SubscriptionRegistryTests` (Rebind/SnapshotEntries) and `EventPumpStreamFaultTests.FaultedPump_IsNotRestartableInPlace_ButAFreshPumpResumesDispatch`. ### Driver.Galaxy-009 diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/DataTypeMap.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/DataTypeMap.cs index 83456c5..29dceb1 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/DataTypeMap.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/DataTypeMap.cs @@ -8,6 +8,14 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse; /// to for unknown codes — keeps wire compatibility /// with deployed configs while we tighten this through the parity matrix. /// +/// +/// Code 6 (Int64) extends the contiguous 0..5 scheme so the map covers the same +/// seven Galaxy data types MxValueDecoder / MxValueEncoder already decode +/// and encode (Boolean, Int32, Int64, Float32, Float64, String, DateTime). Without it an +/// Int64 attribute fell through to the default, +/// creating a String address-space node while runtime reads decoded a boxed long — +/// a metadata / coercion mismatch (Driver.Galaxy-002). +/// internal static class DataTypeMap { public static DriverDataType Map(int mxDataType) => mxDataType switch @@ -18,6 +26,7 @@ internal static class DataTypeMap 3 => DriverDataType.Float64, 4 => DriverDataType.String, 5 => DriverDataType.DateTime, + 6 => DriverDataType.Int64, _ => DriverDataType.String, }; } diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs index 2bade95..9403426 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs @@ -257,22 +257,106 @@ public sealed class GalaxyDriver } /// - /// Replay callback. Walks every active subscription's bindings and re-issues - /// SubscribeBulk for the tag list. PR 6.x can swap this for the gw's batched - /// ReplaySubscriptionsCommand once it ships. + /// Replay callback. Walks every active subscription, re-issues SubscribeBulk for + /// its tag list, and rebinds the + /// registry with the fresh item handles the gateway returned — the pre-reconnect + /// handles are dead once the session reopened. The faulted + /// is recreated first so the replayed subscriptions have a live StreamEvents + /// consumer; without that restart the replayed tags are subscribed on the gw but + /// never reach OnDataChange (Driver.Galaxy-008). PR 6.x can swap this for + /// the gw's batched ReplaySubscriptionsCommand once it ships. /// private async Task ReplayAsync(CancellationToken cancellationToken) { if (_subscriber is null) return; - var bindings = _subscriptions.SnapshotAllBindings(); - if (bindings.Count == 0) return; + var entries = _subscriptions.SnapshotEntries(); + if (entries.Count == 0) return; + + // The stream-fault that triggered this recovery left the old pump's RunAsync loop + // exited and its channel completed; EventPump.Start() is a no-op on a non-null but + // completed loop. Recreate the pump so the replayed subscriptions have a consumer. + RestartEventPumpForReplay(); + + var tagCount = 0; + foreach (var (subscriptionId, oldBindings) in entries) + { + var refs = oldBindings + .Select(b => b.FullReference) + .Distinct(StringComparer.OrdinalIgnoreCase) + .ToArray(); + if (refs.Length == 0) continue; + + var results = await _subscriber + .SubscribeBulkAsync(refs, _options.MxAccess.PublishingIntervalMs, cancellationToken) + .ConfigureAwait(false); + + var byAddress = BuildResultIndex(results); + var newBindings = new List(refs.Length); + foreach (var fullRef in refs) + { + var itemHandle = byAddress.TryGetValue(fullRef, out var match) && match.WasSuccessful + ? match.ItemHandle + : 0; + newBindings.Add(new TagBinding(fullRef, itemHandle)); + } + + // Rebind so the EventPump fan-out reverse map points at the post-reconnect + // handles; otherwise events on the new handles miss every subscription. + _subscriptions.Rebind(subscriptionId, newBindings); + tagCount += refs.Length; + } - var refs = bindings.Select(b => b.FullReference).Distinct(StringComparer.OrdinalIgnoreCase).ToArray(); - await _subscriber.SubscribeBulkAsync( - refs, _options.MxAccess.PublishingIntervalMs, cancellationToken).ConfigureAwait(false); _logger.LogInformation( - "GalaxyDriver {InstanceId} replay completed — {Count} tags re-subscribed", - _driverInstanceId, refs.Length); + "GalaxyDriver {InstanceId} replay completed — {SubCount} subscriptions, {TagCount} tags re-subscribed", + _driverInstanceId, entries.Count, tagCount); + } + + /// + /// Index a SubscribeBulk result list by tag address (OrdinalIgnoreCase) so the + /// subscribe / replay correlation loops are O(1) per reference rather than a + /// linear scan. Last-write-wins on a duplicate address (the gw shouldn't emit one). + /// + private static Dictionary BuildResultIndex(IReadOnlyList results) + { + var index = new Dictionary(results.Count, StringComparer.OrdinalIgnoreCase); + foreach (var result in results) + { + if (!string.IsNullOrEmpty(result.TagAddress)) index[result.TagAddress] = result; + } + return index; + } + + /// + /// Dispose the faulted and create a fresh one bound to the + /// same subscriber / registry, started immediately. Invoked from the reconnect + /// replay path. No-op when no pump was ever started (no active subscriptions). + /// + private void RestartEventPumpForReplay() + { + EventPump? old; + lock (_pumpLock) + { + old = _eventPump; + if (old is null) return; // pump never started — nothing to restart + _eventPump = null; + } + + // Detach + dispose the faulted pump outside the lock so a slow shutdown doesn't + // block a concurrent EnsureEventPumpStarted; the old loop already exited on fault. + old.OnDataChange -= OnPumpDataChange; + try { old.DisposeAsync().AsTask().GetAwaiter().GetResult(); } + catch (Exception ex) + { + _logger.LogWarning(ex, + "GalaxyDriver {InstanceId} faulted EventPump dispose failed during replay — continuing.", + _driverInstanceId); + } + + // EnsureEventPumpStarted creates + starts a fresh pump under the lock. Skip the + // recreate if the driver is being disposed — Dispose already tore the pump down + // and a fresh one here would leak past the driver's lifetime. + if (_disposed) return; + EnsureEventPumpStarted(); } private void OnSupervisorStateChanged(object? sender, StateTransition transition) diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs index 1f9e520..8873314 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs @@ -95,6 +95,47 @@ internal sealed class SubscriptionRegistry public IReadOnlyList SnapshotAllBindings() => [.. _bySubscriptionId.Values.SelectMany(entry => entry.Bindings)]; + /// + /// Snapshot every active subscription with its bindings, grouped by subscription id. + /// Used by the reconnect replay path so it can re-issue SubscribeBulk per subscription + /// and then each one with the post-reconnect item handles. + /// + public IReadOnlyList<(long SubscriptionId, IReadOnlyList Bindings)> SnapshotEntries() => + [.. _bySubscriptionId.Values.Select(entry => (entry.SubscriptionId, entry.Bindings))]; + + /// + /// Replace an existing subscription's bindings with the item handles a post-reconnect + /// SubscribeBulk returned, rebuilding the reverse fan-out map so events on the new + /// handles dispatch and the now-dead pre-reconnect handles are dropped. No-op when the + /// subscription id is unknown (it was unsubscribed during the reconnect window). + /// + public void Rebind(long subscriptionId, IReadOnlyList newBindings) + { + if (!_bySubscriptionId.TryGetValue(subscriptionId, out var oldEntry)) return; + + // Drop this subscription from every reverse-map bag it currently appears in. The + // pre-reconnect item handles are stale once the gw re-issues fresh ones. + foreach (var binding in oldEntry.Bindings) + { + if (binding.ItemHandle <= 0) continue; + if (!_subscribersByItemHandle.TryGetValue(binding.ItemHandle, out var bag)) continue; + + var remaining = new ConcurrentBag(bag.Where(id => id != subscriptionId)); + if (remaining.IsEmpty) _subscribersByItemHandle.TryRemove(binding.ItemHandle, out _); + else _subscribersByItemHandle[binding.ItemHandle] = remaining; + } + + _bySubscriptionId[subscriptionId] = new SubscriptionEntry(subscriptionId, newBindings); + foreach (var binding in newBindings) + { + if (binding.ItemHandle <= 0) continue; // failed gw subscribe — no events expected + _subscribersByItemHandle.AddOrUpdate( + binding.ItemHandle, + _ => [subscriptionId], + (_, bag) => { bag.Add(subscriptionId); return bag; }); + } + } + private sealed record SubscriptionEntry(long SubscriptionId, IReadOnlyList Bindings); } diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Browse/DataTypeMapTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Browse/DataTypeMapTests.cs new file mode 100644 index 0000000..6420fb4 --- /dev/null +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Browse/DataTypeMapTests.cs @@ -0,0 +1,48 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Browse; + +/// +/// Regression coverage for Driver.Galaxy-002 (High): every Galaxy mx_data_type +/// code in the contiguous 0..6 scheme must map to the matching . +/// The Int64 arm (code 6) was missing, so an Int64 attribute fell through to the +/// default — a String address-space node while +/// runtime reads decoded a boxed long. must now agree +/// with MxValueDecoder / MxValueEncoder, which both fully support Int64. +/// +public sealed class DataTypeMapTests +{ + [Theory] + [InlineData(0, DriverDataType.Boolean)] + [InlineData(1, DriverDataType.Int32)] + [InlineData(2, DriverDataType.Float32)] + [InlineData(3, DriverDataType.Float64)] + [InlineData(4, DriverDataType.String)] + [InlineData(5, DriverDataType.DateTime)] + [InlineData(6, DriverDataType.Int64)] + public void Map_KnownCode_MapsToExpectedDriverDataType(int mxDataType, DriverDataType expected) + { + DataTypeMap.Map(mxDataType).ShouldBe(expected); + } + + [Fact] + public void Map_Int64Code_DoesNotFallThroughToStringDefault() + { + // The bug: Int64 (code 6) used to hit the `_ => String` default. + DataTypeMap.Map(6).ShouldBe(DriverDataType.Int64); + DataTypeMap.Map(6).ShouldNotBe(DriverDataType.String); + } + + [Theory] + [InlineData(7)] + [InlineData(99)] + [InlineData(-1)] + public void Map_UnknownCode_FallsBackToString(int mxDataType) + { + // Forward-compatibility fallback for codes the driver doesn't recognise. + DataTypeMap.Map(mxDataType).ShouldBe(DriverDataType.String); + } +} diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpStreamFaultTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpStreamFaultTests.cs index 680829e..129b9e2 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpStreamFaultTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpStreamFaultTests.cs @@ -1,7 +1,9 @@ using System.Threading.Channels; +using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts.Proto; using Shouldly; using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime; @@ -74,6 +76,48 @@ public sealed class EventPumpStreamFaultTests supervisor.IsDegraded.ShouldBeFalse(); } + [Fact] + public async Task FaultedPump_IsNotRestartableInPlace_ButAFreshPumpResumesDispatch() + { + // Regression coverage for Driver.Galaxy-008 (High): after a stream fault the old + // pump's RunAsync loop has exited and its channel is completed — EventPump.Start() + // is a no-op on a non-null-but-completed loop, so the recovery path must DISPOSE + // the faulted pump and create a FRESH one. This test pins both halves of that: + // (a) the faulted pump is dead, (b) a new pump on a live stream resumes OnDataChange. + var registry = new SubscriptionRegistry(); + registry.Register(1, [new TagBinding("Tank.Level", ItemHandle: 7)]); + + // --- first pump: faults, then is "restarted" (no-op) and confirmed dead --- + var faulted = new FaultingSubscriber(); + var staleObserved = false; + var oldPump = new EventPump(faulted, registry, channelCapacity: 8, clientName: "Restart"); + oldPump.OnDataChange += (_, _) => staleObserved = true; + oldPump.Start(); + faulted.FaultStream(new IOException("simulated gateway transport drop")); + await Task.Delay(100); + + // In-place Start() after a fault is a no-op — the loop task is non-null but done. + oldPump.Start(); + await oldPump.DisposeAsync(); + + // --- fresh pump on a live re-subscribed stream: OnDataChange must resume --- + var resubscribed = new ReplaySubscriber(); + var resumed = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + await using var newPump = new EventPump( + resubscribed, registry, channelCapacity: 8, clientName: "Restart"); + newPump.OnDataChange += (_, args) => resumed.TrySetResult(args); + newPump.Start(); + + await resubscribed.EmitAsync(itemHandle: 7, value: 123.0); + + var completed = await Task.WhenAny(resumed.Task, Task.Delay(WaitMs)); + completed.ShouldBe(resumed.Task, + "a fresh EventPump created after a fault must resume dispatching OnDataChange"); + (await resumed.Task).FullReference.ShouldBe("Tank.Level"); + staleObserved.ShouldBeFalse("the faulted pump must not dispatch after its stream dropped"); + } + [Fact] public async Task CleanShutdown_DoesNotInvokeOnStreamFault() { @@ -115,4 +159,34 @@ public sealed class EventPumpStreamFaultTests /// Fault the stream so the pump's await foreach throws. public void FaultStream(Exception cause) => _stream.Writer.TryComplete(cause); } + + /// + /// fake modelling the post-reconnect stream — a + /// fresh, healthy StreamEvents the recovery path's new EventPump consumes. + /// + private sealed class ReplaySubscriber : IGalaxySubscriber + { + private readonly Channel _stream = + Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + + public Task> SubscribeBulkAsync( + IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) + => Task.FromResult>([]); + + public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) + => Task.CompletedTask; + + public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) + => _stream.Reader.ReadAllAsync(cancellationToken); + + public ValueTask EmitAsync(int itemHandle, double value) => + _stream.Writer.WriteAsync(new MxEvent + { + Family = MxEventFamily.OnDataChange, + ItemHandle = itemHandle, + Value = new MxValue { DoubleValue = value }, + Quality = 192, + SourceTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), + }); + } } diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryTests.cs index 5f55c74..7eaec95 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryTests.cs @@ -115,6 +115,74 @@ public sealed class SubscriptionRegistryTests registry.TrackedItemHandleCount.ShouldBe(0); } + // ===== Driver.Galaxy-008 regression: reconnect replay rebinds with fresh handles ===== + + [Fact] + public void SnapshotEntries_GroupsBindingsBySubscriptionId() + { + var registry = new SubscriptionRegistryAccess(); + registry.Register(1, [new TagBindingAccess("A", 100)]); + registry.Register(2, [new TagBindingAccess("B", 200), new TagBindingAccess("C", 300)]); + + var entries = registry.SnapshotEntries(); + + entries.Count.ShouldBe(2); + entries.Single(e => e.SubscriptionId == 1).Bindings.Count.ShouldBe(1); + entries.Single(e => e.SubscriptionId == 2).Bindings.Count.ShouldBe(2); + } + + [Fact] + public void Rebind_ReplacesStaleItemHandles_WithThePostReconnectHandles() + { + // Before reconnect the gw assigned handle 100; after reconnect it issues 555. + var registry = new SubscriptionRegistryAccess(); + registry.Register(1, [new TagBindingAccess("Tank.Level", 100)]); + + registry.Rebind(1, [new TagBindingAccess("Tank.Level", 555)]); + + // The stale handle no longer fans out; the fresh handle does. + registry.ResolveSubscribers(100).ShouldBeEmpty(); + var subs = registry.ResolveSubscribers(555); + subs.Count.ShouldBe(1); + subs[0].SubscriptionId.ShouldBe(1); + subs[0].FullReference.ShouldBe("Tank.Level"); + } + + [Fact] + public void Rebind_LeavesOtherSubscriptionsOnTheSameOldHandleIntact() + { + var registry = new SubscriptionRegistryAccess(); + registry.Register(1, [new TagBindingAccess("Tank.Level", 100)]); + registry.Register(2, [new TagBindingAccess("Tank.Level", 100)]); + + // Only subscription 1 replays onto a fresh handle. + registry.Rebind(1, [new TagBindingAccess("Tank.Level", 555)]); + + registry.ResolveSubscribers(100).Select(s => s.SubscriptionId).ShouldBe(new[] { 2L }); + registry.ResolveSubscribers(555).Select(s => s.SubscriptionId).ShouldBe(new[] { 1L }); + } + + [Fact] + public void Rebind_UnknownSubscription_IsNoOp() + { + var registry = new SubscriptionRegistryAccess(); + Should.NotThrow(() => registry.Rebind(999, [new TagBindingAccess("X", 1)])); + registry.ResolveSubscribers(1).ShouldBeEmpty(); + } + + [Fact] + public void Rebind_FailedItemHandle_NotIndexedForFanOut() + { + var registry = new SubscriptionRegistryAccess(); + registry.Register(1, [new TagBindingAccess("Tag", 100)]); + + // Post-reconnect the gw rejected the tag — handle 0. + registry.Rebind(1, [new TagBindingAccess("Tag", 0)]); + + registry.ResolveSubscribers(100).ShouldBeEmpty(); + registry.ResolveSubscribers(0).ShouldBeEmpty(); + } + // Internal types are accessed via friend assembly (InternalsVisibleTo); these // wrapper aliases keep the test code readable. private sealed class SubscriptionRegistryAccess @@ -132,6 +200,12 @@ public sealed class SubscriptionRegistryTests } public IReadOnlyList<(long SubscriptionId, string FullReference)> ResolveSubscribers(int handle) => _inner.ResolveSubscribers(handle); + public void Rebind(long id, IReadOnlyList bindings) + => _inner.Rebind(id, [.. bindings.Select(b => new TagBinding(b.FullReference, b.ItemHandle))]); + public IReadOnlyList<(long SubscriptionId, IReadOnlyList Bindings)> SnapshotEntries() + => [.. _inner.SnapshotEntries().Select(e => + (e.SubscriptionId, + (IReadOnlyList)[.. e.Bindings.Select(b => new TagBindingAccess(b.FullReference, b.ItemHandle))]))]; } private sealed record TagBindingAccess(string FullReference, int ItemHandle); }