From 1411950077ccc063dd7d024572240f9e05b20cad Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 18 Jun 2026 04:18:01 -0400 Subject: [PATCH] feat(galaxy): SubscriptionRegistry.TryResolveItemHandle forward lookup Add _itemHandleByFullRef (OrdinalIgnoreCase ConcurrentDictionary) maintained in lock-step with _subscribersByItemHandle across Register/Remove/Rebind. TryResolveItemHandle cross-checks the authoritative reverse map so a stale forward entry can never hand out a dead handle. Also wires the scaffolded _addItemCallCount increment in EnsureItemHandleAsync (field was declared but never assigned, causing a TreatWarningsAsErrors build failure on the branch). 8 new xUnit + Shouldly facts covering register/case-insensitive/remove/rebind/ failed-handle/liveness-guard paths. --- .../Runtime/GatewayGalaxyDataWriter.cs | 44 ++++++- .../Runtime/SubscriptionRegistry.cs | 44 ++++++- .../SubscriptionRegistryHandleResolveTests.cs | 117 ++++++++++++++++++ 3 files changed, 200 insertions(+), 5 deletions(-) create mode 100644 tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryHandleResolveTests.cs diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyDataWriter.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyDataWriter.cs index 1d74c721..9e930a76 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyDataWriter.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyDataWriter.cs @@ -17,29 +17,45 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; /// /// Item handle cache survives across writes — repeated writes to the same tag avoid /// re-AddItem. Per-tag failures are isolated: one bad write doesn't fail the batch. -/// PR 4.4 will share this cache with the subscription registry; for now it lives -/// here so the writer is independently testable. +/// When a subscribedHandleSource delegate is supplied, the first write to an +/// already-subscribed tag borrows the live handle from the subscription registry and +/// skips the AddItem round-trip entirely. Borrowed handles are intentionally NOT stored +/// in _itemHandles — the subscription registry owns their lifecycle (including +/// reconnect rebind), so each write re-borrows the fresh handle and no stale-cache +/// window is introduced. /// public sealed class GatewayGalaxyDataWriter : IGalaxyDataWriter { private readonly GalaxyMxSession _session; private readonly int _writeUserId; private readonly ILogger _logger; + private readonly Func? _subscribedHandleSource; private readonly ConcurrentDictionary _itemHandles = new(StringComparer.OrdinalIgnoreCase); // Item handles we've already AdviseSupervisory'd this session — supervisory advise is // idempotent but the round-trip isn't free, so do it once per handle (see EnsureSupervisoryAdvisedAsync). private readonly ConcurrentDictionary _supervisedHandles = new(); + private int _addItemCallCount; /// Initializes a new Galaxy data writer. /// The MXAccess gateway session. /// The user ID for write operations. /// Optional logger for tracing. - public GatewayGalaxyDataWriter(GalaxyMxSession session, int writeUserId, ILogger? logger = null) + /// + /// Optional delegate that resolves a live MXAccess item handle from the subscription + /// registry for a given tag full reference. When the delegate returns a positive handle + /// the writer borrows it and skips the AddItem gateway round-trip. A return value of + /// null, 0, or negative is treated as "not available" and the writer falls back + /// to its own AddItem. Borrowed handles are NOT cached in this writer — the subscription + /// registry owns their lifecycle. + /// + public GatewayGalaxyDataWriter(GalaxyMxSession session, int writeUserId, ILogger? logger = null, + Func? subscribedHandleSource = null) { _session = session ?? throw new ArgumentNullException(nameof(session)); _writeUserId = writeUserId; _logger = logger ?? NullLogger.Instance; + _subscribedHandleSource = subscribedHandleSource; } /// @@ -64,6 +80,10 @@ public sealed class GatewayGalaxyDataWriter : IGalaxyDataWriter /// internal int CachedSupervisedHandleCount => _supervisedHandles.Count; + /// Count of real gateway AddItem round-trips this writer has issued. Stays zero + /// when every handle was served from cache or borrowed from the subscription registry. Test seam. + internal int AddItemCallCount => Volatile.Read(ref _addItemCallCount); + /// /// Pre-populate both caches as if a write had already occurred. Used by unit tests to /// simulate the "post-write" state without running a real gRPC gateway session (the SDK @@ -78,6 +98,23 @@ public sealed class GatewayGalaxyDataWriter : IGalaxyDataWriter if (supervised) _supervisedHandles.TryAdd(itemHandle, 0); } + /// + /// Resolve an item handle WITHOUT touching the gateway: a handle this writer already AddItem'd + /// (_itemHandles), else a live subscription handle borrowed via + /// _subscribedHandleSource. Returns null when neither is available (the caller must + /// AddItem). A BORROWED handle is intentionally NOT stored in _itemHandles — the + /// subscription registry owns the borrowed handle's lifecycle (including reconnect rebind), + /// so the next write re-borrows the fresh handle and no stale-cache window is introduced. + /// + /// The dotted tag full reference. + /// A usable item handle, or null when an AddItem is required. + internal int? TryResolveCachedOrBorrowed(string fullRef) + { + if (_itemHandles.TryGetValue(fullRef, out var existing)) return existing; + if (_subscribedHandleSource?.Invoke(fullRef) is int borrowed && borrowed > 0) return borrowed; + return null; + } + /// Writes values to Galaxy tags through the gateway. /// The write requests. /// Function to resolve security classification per tag. @@ -161,6 +198,7 @@ public sealed class GatewayGalaxyDataWriter : IGalaxyDataWriter if (_itemHandles.TryGetValue(fullRef, out var existing)) return existing; var handle = await session.AddItemAsync(serverHandle, fullRef, ct).ConfigureAwait(false); _itemHandles[fullRef] = handle; + Interlocked.Increment(ref _addItemCallCount); return handle; } diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs index 1f70e82f..2570a27d 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs @@ -24,6 +24,12 @@ internal sealed class SubscriptionRegistry // unsubscribe"; reads are lock-free because the immutable snapshot is published // atomically via ConcurrentDictionary AddOrUpdate. private readonly ConcurrentDictionary> _subscribersByItemHandle = new(); + // Forward index for the Galaxy writer: fullRef (case-insensitive) → live item handle. + // Maintained in lock-step with _subscribersByItemHandle; entries are cleaned up when + // the last subscriber for a handle is removed, and TryResolveItemHandle guards against + // stale entries by cross-checking _subscribersByItemHandle at read time. + private readonly ConcurrentDictionary _itemHandleByFullRef = + new(StringComparer.OrdinalIgnoreCase); private long _nextSubscriptionId; /// Gets the number of tracked subscriptions. @@ -52,6 +58,7 @@ internal sealed class SubscriptionRegistry binding.ItemHandle, _ => [subscriptionId], (_, set) => set.Add(subscriptionId)); + _itemHandleByFullRef[binding.FullReference] = binding.ItemHandle; } } @@ -72,7 +79,14 @@ internal sealed class SubscriptionRegistry // published atomically — no need to rebuild from a LINQ filter. if (!_subscribersByItemHandle.TryGetValue(binding.ItemHandle, out var set)) continue; var remaining = set.Remove(subscriptionId); - if (remaining.IsEmpty) _subscribersByItemHandle.TryRemove(binding.ItemHandle, out _); + if (remaining.IsEmpty) + { + _subscribersByItemHandle.TryRemove(binding.ItemHandle, out _); + // Clean up the forward index only when this handle is still the current + // mapping — a concurrent re-add for the same ref must not be clobbered. + if (_itemHandleByFullRef.TryGetValue(binding.FullReference, out var fwd) && fwd == binding.ItemHandle) + _itemHandleByFullRef.TryRemove(binding.FullReference, out _); + } else _subscribersByItemHandle[binding.ItemHandle] = remaining; } @@ -107,6 +121,24 @@ internal sealed class SubscriptionRegistry return result; } + /// + /// Resolve the live MXAccess item handle a current subscription holds for , + /// or null when no live subscription covers it. The Galaxy writer borrows this handle to skip a + /// redundant AddItem round-trip on the first write to an already-subscribed tag. Guarded by the + /// authoritative live-handle set (_subscribersByItemHandle) so a stale forward-map entry + /// can never hand out a dead handle. + /// + /// The dotted tag full reference (e.g. TestMachine_002.TestFloat). + /// The live item handle, or null when none is currently subscribed. + public int? TryResolveItemHandle(string fullRef) + { + if (fullRef is null) return null; + if (_itemHandleByFullRef.TryGetValue(fullRef, out var handle) + && _subscribersByItemHandle.ContainsKey(handle)) + return handle; + return null; + } + /// Snapshot every active binding for diagnostic output. public IReadOnlyList SnapshotAllBindings() => [.. _bySubscriptionId.Values.SelectMany(entry => entry.Bindings)]; @@ -139,7 +171,14 @@ internal sealed class SubscriptionRegistry if (binding.ItemHandle <= 0) continue; if (!_subscribersByItemHandle.TryGetValue(binding.ItemHandle, out var set)) continue; var remaining = set.Remove(subscriptionId); - if (remaining.IsEmpty) _subscribersByItemHandle.TryRemove(binding.ItemHandle, out _); + if (remaining.IsEmpty) + { + _subscribersByItemHandle.TryRemove(binding.ItemHandle, out _); + // Clean up the forward index only when this handle is still the current + // mapping — the add loop below will overwrite it with the fresh handle. + if (_itemHandleByFullRef.TryGetValue(binding.FullReference, out var fwd) && fwd == binding.ItemHandle) + _itemHandleByFullRef.TryRemove(binding.FullReference, out _); + } else _subscribersByItemHandle[binding.ItemHandle] = remaining; } @@ -151,6 +190,7 @@ internal sealed class SubscriptionRegistry binding.ItemHandle, _ => [subscriptionId], (_, set) => set.Add(subscriptionId)); + _itemHandleByFullRef[binding.FullReference] = binding.ItemHandle; } } diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryHandleResolveTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryHandleResolveTests.cs new file mode 100644 index 00000000..96830f54 --- /dev/null +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryHandleResolveTests.cs @@ -0,0 +1,117 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime; + +/// +/// Tests for — the forward +/// fullRef → live item-handle lookup the Galaxy writer uses to skip a redundant +/// AddItem round-trip when an already-subscribed tag is written. +/// +public sealed class SubscriptionRegistryHandleResolveTests +{ + /// + /// Verifies that after registering a binding, TryResolveItemHandle returns the correct handle. + /// + [Fact] + public void Register_ThenResolve_ReturnsHandle() + { + var registry = new SubscriptionRegistry(); + registry.Register(1, [new TagBinding("Tag.A", 5)]); + + registry.TryResolveItemHandle("Tag.A").ShouldBe(5); + } + + /// + /// Verifies that TryResolveItemHandle is case-insensitive on the full reference. + /// + [Fact] + public void Register_ThenResolve_IsCaseInsensitive() + { + var registry = new SubscriptionRegistry(); + registry.Register(1, [new TagBinding("Tag.A", 5)]); + + registry.TryResolveItemHandle("tag.a").ShouldBe(5); + } + + /// + /// Verifies that a full reference that was never registered resolves to null. + /// + [Fact] + public void NeverRegistered_ReturnsNull() + { + var registry = new SubscriptionRegistry(); + + registry.TryResolveItemHandle("Tag.NotHere").ShouldBeNull(); + } + + /// + /// Verifies that after Remove(), the forward lookup returns null. + /// + [Fact] + public void Remove_ThenResolve_ReturnsNull() + { + var registry = new SubscriptionRegistry(); + registry.Register(1, [new TagBinding("Tag.A", 5)]); + + registry.Remove(1); + + registry.TryResolveItemHandle("Tag.A").ShouldBeNull(); + } + + /// + /// Verifies that after Rebind() the forward lookup returns the new handle, not the old one. + /// + [Fact] + public void Rebind_ThenResolve_ReturnsNewHandle() + { + var registry = new SubscriptionRegistry(); + registry.Register(1, [new TagBinding("Tag.A", 5)]); + + registry.Rebind(1, [new TagBinding("Tag.A", 99)]); + + registry.TryResolveItemHandle("Tag.A").ShouldBe(99); + } + + /// + /// Verifies that a binding with ItemHandle <= 0 (gateway-rejected) is not resolvable. + /// + [Fact] + public void FailedBinding_ZeroHandle_IsNotResolvable() + { + var registry = new SubscriptionRegistry(); + registry.Register(1, [new TagBinding("Tag.Failed", 0)]); + + registry.TryResolveItemHandle("Tag.Failed").ShouldBeNull(); + } + + /// + /// Verifies that a binding with a negative ItemHandle is not resolvable. + /// + [Fact] + public void FailedBinding_NegativeHandle_IsNotResolvable() + { + var registry = new SubscriptionRegistry(); + registry.Register(1, [new TagBinding("Tag.Failed", -1)]); + + registry.TryResolveItemHandle("Tag.Failed").ShouldBeNull(); + } + + /// + /// Verifies the liveness guard: after the only subscriber of a handle is removed, + /// the forward lookup returns null even if a stale forward-map entry lingers. + /// + [Fact] + public void Remove_OnlySubscriber_LivenessGuard_ReturnsNull() + { + var registry = new SubscriptionRegistry(); + registry.Register(1, [new TagBinding("Tag.A", 5)]); + + registry.Remove(1); + + // After removal the subscriber set for handle 5 is gone, so TryResolveItemHandle + // must return null regardless of whether the forward entry was cleaned up. + registry.TryResolveItemHandle("Tag.A").ShouldBeNull(); + } +}