diff --git a/code-reviews/Driver.Galaxy/findings.md b/code-reviews/Driver.Galaxy/findings.md
index 2143d27..c9bf73e 100644
--- a/code-reviews/Driver.Galaxy/findings.md
+++ b/code-reviews/Driver.Galaxy/findings.md
@@ -7,7 +7,7 @@
| Review date | 2026-05-22 |
| Commit reviewed | `76d35d1` |
| Status | Reviewed |
-| Open findings | 13 |
+| Open findings | 11 |
## Checklist coverage
@@ -48,13 +48,13 @@
| Severity | High |
| Category | Correctness & logic bugs |
| Location | `Browse/DataTypeMap.cs:13`, `Runtime/MxValueDecoder.cs:9` |
-| Status | Open |
+| Status | Resolved |
**Description:** `DataTypeMap.Map` maps Galaxy `mx_data_type` codes to six `DriverDataType` values (Boolean, Int32, Float32, Float64, String, DateTime) — there is no `Int64` arm. Yet `MxValueDecoder` and `MxValueEncoder` both fully support Int64 (`MxValue.Int64Value`, `Int64Array`), and the decoder's own XML doc claims "the seven Galaxy data types ... (Boolean, Int32, Int64, Float32, Float64, String, DateTime)". Any Galaxy attribute whose `mx_data_type` is the Int64 code (or any code > 5) falls through the `_ => DriverDataType.String` default. The address-space node is then created as a `String` variable while runtime reads decode an `Int64` boxed value — a type mismatch that produces wrong OPC UA `DataType`/`ValueRank` metadata and likely fails value coercion at the server node layer.
**Recommendation:** Confirm the Galaxy `mx_data_type` integer code for 64-bit integers and add the explicit arm to `DataTypeMap.Map`. If the wire format genuinely has no Int64 type, correct the `MxValueDecoder`/`MxValueEncoder` doc comments instead. Either way the encoder/decoder and the type map must agree.
-**Resolution:** _(open)_
+**Resolution:** Resolved 2026-05-22 — added `6 => DriverDataType.Int64` to `DataTypeMap.Map`, extending the contiguous 0..5 scheme so the type map covers the same seven Galaxy data types `MxValueDecoder`/`MxValueEncoder` already decode/encode; Int64 attributes now build as Int64 nodes instead of falling through to the String default. Regression coverage in `DataTypeMapTests`.
### Driver.Galaxy-003
@@ -138,13 +138,13 @@
| Severity | High |
| Category | Error handling & resilience |
| Location | `GalaxyDriver.cs:264-276`, `Runtime/EventPump.cs:97-103` |
-| Status | Open |
+| Status | Resolved |
**Description:** Even if Driver.Galaxy-001 is fixed and the supervisor's `ReplayAsync` runs, recovery is incomplete. `ReplayAsync` re-issues `SubscribeBulkAsync` for the tracked tags, but the `EventPump` background loop that consumes `StreamEvents` is not restarted. After a stream fault `EventPump.RunAsync` exits and `_channel` is completed; `EventPump.Start()` is a no-op (`if (_loop is not null) return`) because `_loop` is a completed-but-non-null task. So a replayed subscription has no consumer — values are subscribed on the gw but never reach `OnDataChange`. Additionally `ReplayAsync` never re-registers the new item handles the gw returns into `SubscriptionRegistry`; the old stale item handles remain, so even with a live pump the fan-out reverse-map would miss the post-reconnect handles.
**Recommendation:** On reconnect, dispose and recreate the `EventPump` (or make it restartable), and have `ReplayAsync` update `SubscriptionRegistry` bindings with the new item handles returned by the post-reconnect `SubscribeBulkAsync`. Add an integration/parity test that drops the stream mid-subscription and asserts `OnDataChange` resumes.
-**Resolution:** _(open)_
+**Resolution:** Resolved 2026-05-22 — `ReplayAsync` now calls a new `RestartEventPumpForReplay` (disposes the faulted pump, recreates and restarts a fresh one) and re-issues `SubscribeBulkAsync` per subscription, then `SubscriptionRegistry.Rebind` swaps each subscription's stale pre-reconnect item handles for the post-reconnect handles so the fan-out reverse map dispatches to the live pump. New `SubscriptionRegistry.SnapshotEntries`/`Rebind` APIs back the per-subscription replay. Regression coverage in `SubscriptionRegistryTests` (Rebind/SnapshotEntries) and `EventPumpStreamFaultTests.FaultedPump_IsNotRestartableInPlace_ButAFreshPumpResumesDispatch`.
### Driver.Galaxy-009
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/DataTypeMap.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/DataTypeMap.cs
index 83456c5..29dceb1 100644
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/DataTypeMap.cs
+++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Browse/DataTypeMap.cs
@@ -8,6 +8,14 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse;
/// to for unknown codes — keeps wire compatibility
/// with deployed configs while we tighten this through the parity matrix.
///
+///
+/// Code 6 (Int64) extends the contiguous 0..5 scheme so the map covers the same
+/// seven Galaxy data types MxValueDecoder / MxValueEncoder already decode
+/// and encode (Boolean, Int32, Int64, Float32, Float64, String, DateTime). Without it an
+/// Int64 attribute fell through to the default,
+/// creating a String address-space node while runtime reads decoded a boxed long —
+/// a metadata / coercion mismatch (Driver.Galaxy-002).
+///
internal static class DataTypeMap
{
public static DriverDataType Map(int mxDataType) => mxDataType switch
@@ -18,6 +26,7 @@ internal static class DataTypeMap
3 => DriverDataType.Float64,
4 => DriverDataType.String,
5 => DriverDataType.DateTime,
+ 6 => DriverDataType.Int64,
_ => DriverDataType.String,
};
}
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs
index 2bade95..9403426 100644
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs
+++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs
@@ -257,22 +257,106 @@ public sealed class GalaxyDriver
}
///
- /// Replay callback. Walks every active subscription's bindings and re-issues
- /// SubscribeBulk for the tag list. PR 6.x can swap this for the gw's batched
- /// ReplaySubscriptionsCommand once it ships.
+ /// Replay callback. Walks every active subscription, re-issues SubscribeBulk for
+ /// its tag list, and rebinds the
+ /// registry with the fresh item handles the gateway returned — the pre-reconnect
+ /// handles are dead once the session reopened. The faulted
+ /// is recreated first so the replayed subscriptions have a live StreamEvents
+ /// consumer; without that restart the replayed tags are subscribed on the gw but
+ /// never reach OnDataChange (Driver.Galaxy-008). PR 6.x can swap this for
+ /// the gw's batched ReplaySubscriptionsCommand once it ships.
///
private async Task ReplayAsync(CancellationToken cancellationToken)
{
if (_subscriber is null) return;
- var bindings = _subscriptions.SnapshotAllBindings();
- if (bindings.Count == 0) return;
+ var entries = _subscriptions.SnapshotEntries();
+ if (entries.Count == 0) return;
+
+ // The stream-fault that triggered this recovery left the old pump's RunAsync loop
+ // exited and its channel completed; EventPump.Start() is a no-op on a non-null but
+ // completed loop. Recreate the pump so the replayed subscriptions have a consumer.
+ RestartEventPumpForReplay();
+
+ var tagCount = 0;
+ foreach (var (subscriptionId, oldBindings) in entries)
+ {
+ var refs = oldBindings
+ .Select(b => b.FullReference)
+ .Distinct(StringComparer.OrdinalIgnoreCase)
+ .ToArray();
+ if (refs.Length == 0) continue;
+
+ var results = await _subscriber
+ .SubscribeBulkAsync(refs, _options.MxAccess.PublishingIntervalMs, cancellationToken)
+ .ConfigureAwait(false);
+
+ var byAddress = BuildResultIndex(results);
+ var newBindings = new List(refs.Length);
+ foreach (var fullRef in refs)
+ {
+ var itemHandle = byAddress.TryGetValue(fullRef, out var match) && match.WasSuccessful
+ ? match.ItemHandle
+ : 0;
+ newBindings.Add(new TagBinding(fullRef, itemHandle));
+ }
+
+ // Rebind so the EventPump fan-out reverse map points at the post-reconnect
+ // handles; otherwise events on the new handles miss every subscription.
+ _subscriptions.Rebind(subscriptionId, newBindings);
+ tagCount += refs.Length;
+ }
- var refs = bindings.Select(b => b.FullReference).Distinct(StringComparer.OrdinalIgnoreCase).ToArray();
- await _subscriber.SubscribeBulkAsync(
- refs, _options.MxAccess.PublishingIntervalMs, cancellationToken).ConfigureAwait(false);
_logger.LogInformation(
- "GalaxyDriver {InstanceId} replay completed — {Count} tags re-subscribed",
- _driverInstanceId, refs.Length);
+ "GalaxyDriver {InstanceId} replay completed — {SubCount} subscriptions, {TagCount} tags re-subscribed",
+ _driverInstanceId, entries.Count, tagCount);
+ }
+
+ ///
+ /// Index a SubscribeBulk result list by tag address (OrdinalIgnoreCase) so the
+ /// subscribe / replay correlation loops are O(1) per reference rather than a
+ /// linear scan. Last-write-wins on a duplicate address (the gw shouldn't emit one).
+ ///
+ private static Dictionary BuildResultIndex(IReadOnlyList results)
+ {
+ var index = new Dictionary(results.Count, StringComparer.OrdinalIgnoreCase);
+ foreach (var result in results)
+ {
+ if (!string.IsNullOrEmpty(result.TagAddress)) index[result.TagAddress] = result;
+ }
+ return index;
+ }
+
+ ///
+ /// Dispose the faulted and create a fresh one bound to the
+ /// same subscriber / registry, started immediately. Invoked from the reconnect
+ /// replay path. No-op when no pump was ever started (no active subscriptions).
+ ///
+ private void RestartEventPumpForReplay()
+ {
+ EventPump? old;
+ lock (_pumpLock)
+ {
+ old = _eventPump;
+ if (old is null) return; // pump never started — nothing to restart
+ _eventPump = null;
+ }
+
+ // Detach + dispose the faulted pump outside the lock so a slow shutdown doesn't
+ // block a concurrent EnsureEventPumpStarted; the old loop already exited on fault.
+ old.OnDataChange -= OnPumpDataChange;
+ try { old.DisposeAsync().AsTask().GetAwaiter().GetResult(); }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex,
+ "GalaxyDriver {InstanceId} faulted EventPump dispose failed during replay — continuing.",
+ _driverInstanceId);
+ }
+
+ // EnsureEventPumpStarted creates + starts a fresh pump under the lock. Skip the
+ // recreate if the driver is being disposed — Dispose already tore the pump down
+ // and a fresh one here would leak past the driver's lifetime.
+ if (_disposed) return;
+ EnsureEventPumpStarted();
}
private void OnSupervisorStateChanged(object? sender, StateTransition transition)
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs
index 1f9e520..8873314 100644
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs
+++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs
@@ -95,6 +95,47 @@ internal sealed class SubscriptionRegistry
public IReadOnlyList SnapshotAllBindings() =>
[.. _bySubscriptionId.Values.SelectMany(entry => entry.Bindings)];
+ ///
+ /// Snapshot every active subscription with its bindings, grouped by subscription id.
+ /// Used by the reconnect replay path so it can re-issue SubscribeBulk per subscription
+ /// and then each one with the post-reconnect item handles.
+ ///
+ public IReadOnlyList<(long SubscriptionId, IReadOnlyList Bindings)> SnapshotEntries() =>
+ [.. _bySubscriptionId.Values.Select(entry => (entry.SubscriptionId, entry.Bindings))];
+
+ ///
+ /// Replace an existing subscription's bindings with the item handles a post-reconnect
+ /// SubscribeBulk returned, rebuilding the reverse fan-out map so events on the new
+ /// handles dispatch and the now-dead pre-reconnect handles are dropped. No-op when the
+ /// subscription id is unknown (it was unsubscribed during the reconnect window).
+ ///
+ public void Rebind(long subscriptionId, IReadOnlyList newBindings)
+ {
+ if (!_bySubscriptionId.TryGetValue(subscriptionId, out var oldEntry)) return;
+
+ // Drop this subscription from every reverse-map bag it currently appears in. The
+ // pre-reconnect item handles are stale once the gw re-issues fresh ones.
+ foreach (var binding in oldEntry.Bindings)
+ {
+ if (binding.ItemHandle <= 0) continue;
+ if (!_subscribersByItemHandle.TryGetValue(binding.ItemHandle, out var bag)) continue;
+
+ var remaining = new ConcurrentBag(bag.Where(id => id != subscriptionId));
+ if (remaining.IsEmpty) _subscribersByItemHandle.TryRemove(binding.ItemHandle, out _);
+ else _subscribersByItemHandle[binding.ItemHandle] = remaining;
+ }
+
+ _bySubscriptionId[subscriptionId] = new SubscriptionEntry(subscriptionId, newBindings);
+ foreach (var binding in newBindings)
+ {
+ if (binding.ItemHandle <= 0) continue; // failed gw subscribe — no events expected
+ _subscribersByItemHandle.AddOrUpdate(
+ binding.ItemHandle,
+ _ => [subscriptionId],
+ (_, bag) => { bag.Add(subscriptionId); return bag; });
+ }
+ }
+
private sealed record SubscriptionEntry(long SubscriptionId, IReadOnlyList Bindings);
}
diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Browse/DataTypeMapTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Browse/DataTypeMapTests.cs
new file mode 100644
index 0000000..6420fb4
--- /dev/null
+++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Browse/DataTypeMapTests.cs
@@ -0,0 +1,48 @@
+using Shouldly;
+using Xunit;
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Browse;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Browse;
+
+///
+/// Regression coverage for Driver.Galaxy-002 (High): every Galaxy mx_data_type
+/// code in the contiguous 0..6 scheme must map to the matching .
+/// The Int64 arm (code 6) was missing, so an Int64 attribute fell through to the
+/// default — a String address-space node while
+/// runtime reads decoded a boxed long. must now agree
+/// with MxValueDecoder / MxValueEncoder, which both fully support Int64.
+///
+public sealed class DataTypeMapTests
+{
+ [Theory]
+ [InlineData(0, DriverDataType.Boolean)]
+ [InlineData(1, DriverDataType.Int32)]
+ [InlineData(2, DriverDataType.Float32)]
+ [InlineData(3, DriverDataType.Float64)]
+ [InlineData(4, DriverDataType.String)]
+ [InlineData(5, DriverDataType.DateTime)]
+ [InlineData(6, DriverDataType.Int64)]
+ public void Map_KnownCode_MapsToExpectedDriverDataType(int mxDataType, DriverDataType expected)
+ {
+ DataTypeMap.Map(mxDataType).ShouldBe(expected);
+ }
+
+ [Fact]
+ public void Map_Int64Code_DoesNotFallThroughToStringDefault()
+ {
+ // The bug: Int64 (code 6) used to hit the `_ => String` default.
+ DataTypeMap.Map(6).ShouldBe(DriverDataType.Int64);
+ DataTypeMap.Map(6).ShouldNotBe(DriverDataType.String);
+ }
+
+ [Theory]
+ [InlineData(7)]
+ [InlineData(99)]
+ [InlineData(-1)]
+ public void Map_UnknownCode_FallsBackToString(int mxDataType)
+ {
+ // Forward-compatibility fallback for codes the driver doesn't recognise.
+ DataTypeMap.Map(mxDataType).ShouldBe(DriverDataType.String);
+ }
+}
diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpStreamFaultTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpStreamFaultTests.cs
index 680829e..129b9e2 100644
--- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpStreamFaultTests.cs
+++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpStreamFaultTests.cs
@@ -1,7 +1,9 @@
using System.Threading.Channels;
+using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
using Shouldly;
using Xunit;
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime;
@@ -74,6 +76,48 @@ public sealed class EventPumpStreamFaultTests
supervisor.IsDegraded.ShouldBeFalse();
}
+ [Fact]
+ public async Task FaultedPump_IsNotRestartableInPlace_ButAFreshPumpResumesDispatch()
+ {
+ // Regression coverage for Driver.Galaxy-008 (High): after a stream fault the old
+ // pump's RunAsync loop has exited and its channel is completed — EventPump.Start()
+ // is a no-op on a non-null-but-completed loop, so the recovery path must DISPOSE
+ // the faulted pump and create a FRESH one. This test pins both halves of that:
+ // (a) the faulted pump is dead, (b) a new pump on a live stream resumes OnDataChange.
+ var registry = new SubscriptionRegistry();
+ registry.Register(1, [new TagBinding("Tank.Level", ItemHandle: 7)]);
+
+ // --- first pump: faults, then is "restarted" (no-op) and confirmed dead ---
+ var faulted = new FaultingSubscriber();
+ var staleObserved = false;
+ var oldPump = new EventPump(faulted, registry, channelCapacity: 8, clientName: "Restart");
+ oldPump.OnDataChange += (_, _) => staleObserved = true;
+ oldPump.Start();
+ faulted.FaultStream(new IOException("simulated gateway transport drop"));
+ await Task.Delay(100);
+
+ // In-place Start() after a fault is a no-op — the loop task is non-null but done.
+ oldPump.Start();
+ await oldPump.DisposeAsync();
+
+ // --- fresh pump on a live re-subscribed stream: OnDataChange must resume ---
+ var resubscribed = new ReplaySubscriber();
+ var resumed = new TaskCompletionSource(
+ TaskCreationOptions.RunContinuationsAsynchronously);
+ await using var newPump = new EventPump(
+ resubscribed, registry, channelCapacity: 8, clientName: "Restart");
+ newPump.OnDataChange += (_, args) => resumed.TrySetResult(args);
+ newPump.Start();
+
+ await resubscribed.EmitAsync(itemHandle: 7, value: 123.0);
+
+ var completed = await Task.WhenAny(resumed.Task, Task.Delay(WaitMs));
+ completed.ShouldBe(resumed.Task,
+ "a fresh EventPump created after a fault must resume dispatching OnDataChange");
+ (await resumed.Task).FullReference.ShouldBe("Tank.Level");
+ staleObserved.ShouldBeFalse("the faulted pump must not dispatch after its stream dropped");
+ }
+
[Fact]
public async Task CleanShutdown_DoesNotInvokeOnStreamFault()
{
@@ -115,4 +159,34 @@ public sealed class EventPumpStreamFaultTests
/// Fault the stream so the pump's await foreach throws.
public void FaultStream(Exception cause) => _stream.Writer.TryComplete(cause);
}
+
+ ///
+ /// fake modelling the post-reconnect stream — a
+ /// fresh, healthy StreamEvents the recovery path's new EventPump consumes.
+ ///
+ private sealed class ReplaySubscriber : IGalaxySubscriber
+ {
+ private readonly Channel _stream =
+ Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true });
+
+ public Task> SubscribeBulkAsync(
+ IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
+ => Task.FromResult>([]);
+
+ public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken)
+ => Task.CompletedTask;
+
+ public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken)
+ => _stream.Reader.ReadAllAsync(cancellationToken);
+
+ public ValueTask EmitAsync(int itemHandle, double value) =>
+ _stream.Writer.WriteAsync(new MxEvent
+ {
+ Family = MxEventFamily.OnDataChange,
+ ItemHandle = itemHandle,
+ Value = new MxValue { DoubleValue = value },
+ Quality = 192,
+ SourceTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
+ });
+ }
}
diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryTests.cs
index 5f55c74..7eaec95 100644
--- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryTests.cs
+++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryTests.cs
@@ -115,6 +115,74 @@ public sealed class SubscriptionRegistryTests
registry.TrackedItemHandleCount.ShouldBe(0);
}
+ // ===== Driver.Galaxy-008 regression: reconnect replay rebinds with fresh handles =====
+
+ [Fact]
+ public void SnapshotEntries_GroupsBindingsBySubscriptionId()
+ {
+ var registry = new SubscriptionRegistryAccess();
+ registry.Register(1, [new TagBindingAccess("A", 100)]);
+ registry.Register(2, [new TagBindingAccess("B", 200), new TagBindingAccess("C", 300)]);
+
+ var entries = registry.SnapshotEntries();
+
+ entries.Count.ShouldBe(2);
+ entries.Single(e => e.SubscriptionId == 1).Bindings.Count.ShouldBe(1);
+ entries.Single(e => e.SubscriptionId == 2).Bindings.Count.ShouldBe(2);
+ }
+
+ [Fact]
+ public void Rebind_ReplacesStaleItemHandles_WithThePostReconnectHandles()
+ {
+ // Before reconnect the gw assigned handle 100; after reconnect it issues 555.
+ var registry = new SubscriptionRegistryAccess();
+ registry.Register(1, [new TagBindingAccess("Tank.Level", 100)]);
+
+ registry.Rebind(1, [new TagBindingAccess("Tank.Level", 555)]);
+
+ // The stale handle no longer fans out; the fresh handle does.
+ registry.ResolveSubscribers(100).ShouldBeEmpty();
+ var subs = registry.ResolveSubscribers(555);
+ subs.Count.ShouldBe(1);
+ subs[0].SubscriptionId.ShouldBe(1);
+ subs[0].FullReference.ShouldBe("Tank.Level");
+ }
+
+ [Fact]
+ public void Rebind_LeavesOtherSubscriptionsOnTheSameOldHandleIntact()
+ {
+ var registry = new SubscriptionRegistryAccess();
+ registry.Register(1, [new TagBindingAccess("Tank.Level", 100)]);
+ registry.Register(2, [new TagBindingAccess("Tank.Level", 100)]);
+
+ // Only subscription 1 replays onto a fresh handle.
+ registry.Rebind(1, [new TagBindingAccess("Tank.Level", 555)]);
+
+ registry.ResolveSubscribers(100).Select(s => s.SubscriptionId).ShouldBe(new[] { 2L });
+ registry.ResolveSubscribers(555).Select(s => s.SubscriptionId).ShouldBe(new[] { 1L });
+ }
+
+ [Fact]
+ public void Rebind_UnknownSubscription_IsNoOp()
+ {
+ var registry = new SubscriptionRegistryAccess();
+ Should.NotThrow(() => registry.Rebind(999, [new TagBindingAccess("X", 1)]));
+ registry.ResolveSubscribers(1).ShouldBeEmpty();
+ }
+
+ [Fact]
+ public void Rebind_FailedItemHandle_NotIndexedForFanOut()
+ {
+ var registry = new SubscriptionRegistryAccess();
+ registry.Register(1, [new TagBindingAccess("Tag", 100)]);
+
+ // Post-reconnect the gw rejected the tag — handle 0.
+ registry.Rebind(1, [new TagBindingAccess("Tag", 0)]);
+
+ registry.ResolveSubscribers(100).ShouldBeEmpty();
+ registry.ResolveSubscribers(0).ShouldBeEmpty();
+ }
+
// Internal types are accessed via friend assembly (InternalsVisibleTo); these
// wrapper aliases keep the test code readable.
private sealed class SubscriptionRegistryAccess
@@ -132,6 +200,12 @@ public sealed class SubscriptionRegistryTests
}
public IReadOnlyList<(long SubscriptionId, string FullReference)> ResolveSubscribers(int handle)
=> _inner.ResolveSubscribers(handle);
+ public void Rebind(long id, IReadOnlyList bindings)
+ => _inner.Rebind(id, [.. bindings.Select(b => new TagBinding(b.FullReference, b.ItemHandle))]);
+ public IReadOnlyList<(long SubscriptionId, IReadOnlyList Bindings)> SnapshotEntries()
+ => [.. _inner.SnapshotEntries().Select(e =>
+ (e.SubscriptionId,
+ (IReadOnlyList)[.. e.Bindings.Select(b => new TagBindingAccess(b.FullReference, b.ItemHandle))]))];
}
private sealed record TagBindingAccess(string FullReference, int ItemHandle);
}