diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyDataWriter.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyDataWriter.cs
index 1d74c721..9e930a76 100644
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyDataWriter.cs
+++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxyDataWriter.cs
@@ -17,29 +17,45 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
///
/// Item handle cache survives across writes — repeated writes to the same tag avoid
/// re-AddItem. Per-tag failures are isolated: one bad write doesn't fail the batch.
-/// PR 4.4 will share this cache with the subscription registry; for now it lives
-/// here so the writer is independently testable.
+/// When a subscribedHandleSource delegate is supplied, the first write to an
+/// already-subscribed tag borrows the live handle from the subscription registry and
+/// skips the AddItem round-trip entirely. Borrowed handles are intentionally NOT stored
+/// in _itemHandles — the subscription registry owns their lifecycle (including
+/// reconnect rebind), so each write re-borrows the fresh handle and no stale-cache
+/// window is introduced.
///
public sealed class GatewayGalaxyDataWriter : IGalaxyDataWriter
{
private readonly GalaxyMxSession _session;
private readonly int _writeUserId;
private readonly ILogger _logger;
+ private readonly Func? _subscribedHandleSource;
private readonly ConcurrentDictionary _itemHandles =
new(StringComparer.OrdinalIgnoreCase);
// Item handles we've already AdviseSupervisory'd this session — supervisory advise is
// idempotent but the round-trip isn't free, so do it once per handle (see EnsureSupervisoryAdvisedAsync).
private readonly ConcurrentDictionary _supervisedHandles = new();
+ private int _addItemCallCount;
/// Initializes a new Galaxy data writer.
/// The MXAccess gateway session.
/// The user ID for write operations.
/// Optional logger for tracing.
- public GatewayGalaxyDataWriter(GalaxyMxSession session, int writeUserId, ILogger? logger = null)
+ ///
+ /// Optional delegate that resolves a live MXAccess item handle from the subscription
+ /// registry for a given tag full reference. When the delegate returns a positive handle
+ /// the writer borrows it and skips the AddItem gateway round-trip. A return value of
+ /// null, 0, or negative is treated as "not available" and the writer falls back
+ /// to its own AddItem. Borrowed handles are NOT cached in this writer — the subscription
+ /// registry owns their lifecycle.
+ ///
+ public GatewayGalaxyDataWriter(GalaxyMxSession session, int writeUserId, ILogger? logger = null,
+ Func? subscribedHandleSource = null)
{
_session = session ?? throw new ArgumentNullException(nameof(session));
_writeUserId = writeUserId;
_logger = logger ?? NullLogger.Instance;
+ _subscribedHandleSource = subscribedHandleSource;
}
///
@@ -64,6 +80,10 @@ public sealed class GatewayGalaxyDataWriter : IGalaxyDataWriter
///
internal int CachedSupervisedHandleCount => _supervisedHandles.Count;
+ /// Count of real gateway AddItem round-trips this writer has issued. Stays zero
+ /// when every handle was served from cache or borrowed from the subscription registry. Test seam.
+ internal int AddItemCallCount => Volatile.Read(ref _addItemCallCount);
+
///
/// Pre-populate both caches as if a write had already occurred. Used by unit tests to
/// simulate the "post-write" state without running a real gRPC gateway session (the SDK
@@ -78,6 +98,23 @@ public sealed class GatewayGalaxyDataWriter : IGalaxyDataWriter
if (supervised) _supervisedHandles.TryAdd(itemHandle, 0);
}
+ ///
+ /// Resolve an item handle WITHOUT touching the gateway: a handle this writer already AddItem'd
+ /// (_itemHandles), else a live subscription handle borrowed via
+ /// _subscribedHandleSource. Returns null when neither is available (the caller must
+ /// AddItem). A BORROWED handle is intentionally NOT stored in _itemHandles — the
+ /// subscription registry owns the borrowed handle's lifecycle (including reconnect rebind),
+ /// so the next write re-borrows the fresh handle and no stale-cache window is introduced.
+ ///
+ /// The dotted tag full reference.
+ /// A usable item handle, or null when an AddItem is required.
+ internal int? TryResolveCachedOrBorrowed(string fullRef)
+ {
+ if (_itemHandles.TryGetValue(fullRef, out var existing)) return existing;
+ if (_subscribedHandleSource?.Invoke(fullRef) is int borrowed && borrowed > 0) return borrowed;
+ return null;
+ }
+
/// Writes values to Galaxy tags through the gateway.
/// The write requests.
/// Function to resolve security classification per tag.
@@ -161,6 +198,7 @@ public sealed class GatewayGalaxyDataWriter : IGalaxyDataWriter
if (_itemHandles.TryGetValue(fullRef, out var existing)) return existing;
var handle = await session.AddItemAsync(serverHandle, fullRef, ct).ConfigureAwait(false);
_itemHandles[fullRef] = handle;
+ Interlocked.Increment(ref _addItemCallCount);
return handle;
}
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs
index 1f70e82f..2570a27d 100644
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs
+++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs
@@ -24,6 +24,12 @@ internal sealed class SubscriptionRegistry
// unsubscribe"; reads are lock-free because the immutable snapshot is published
// atomically via ConcurrentDictionary AddOrUpdate.
private readonly ConcurrentDictionary> _subscribersByItemHandle = new();
+ // Forward index for the Galaxy writer: fullRef (case-insensitive) → live item handle.
+ // Maintained in lock-step with _subscribersByItemHandle; entries are cleaned up when
+ // the last subscriber for a handle is removed, and TryResolveItemHandle guards against
+ // stale entries by cross-checking _subscribersByItemHandle at read time.
+ private readonly ConcurrentDictionary _itemHandleByFullRef =
+ new(StringComparer.OrdinalIgnoreCase);
private long _nextSubscriptionId;
/// Gets the number of tracked subscriptions.
@@ -52,6 +58,7 @@ internal sealed class SubscriptionRegistry
binding.ItemHandle,
_ => [subscriptionId],
(_, set) => set.Add(subscriptionId));
+ _itemHandleByFullRef[binding.FullReference] = binding.ItemHandle;
}
}
@@ -72,7 +79,14 @@ internal sealed class SubscriptionRegistry
// published atomically — no need to rebuild from a LINQ filter.
if (!_subscribersByItemHandle.TryGetValue(binding.ItemHandle, out var set)) continue;
var remaining = set.Remove(subscriptionId);
- if (remaining.IsEmpty) _subscribersByItemHandle.TryRemove(binding.ItemHandle, out _);
+ if (remaining.IsEmpty)
+ {
+ _subscribersByItemHandle.TryRemove(binding.ItemHandle, out _);
+ // Clean up the forward index only when this handle is still the current
+ // mapping — a concurrent re-add for the same ref must not be clobbered.
+ if (_itemHandleByFullRef.TryGetValue(binding.FullReference, out var fwd) && fwd == binding.ItemHandle)
+ _itemHandleByFullRef.TryRemove(binding.FullReference, out _);
+ }
else _subscribersByItemHandle[binding.ItemHandle] = remaining;
}
@@ -107,6 +121,24 @@ internal sealed class SubscriptionRegistry
return result;
}
+ ///
+ /// Resolve the live MXAccess item handle a current subscription holds for ,
+ /// or null when no live subscription covers it. The Galaxy writer borrows this handle to skip a
+ /// redundant AddItem round-trip on the first write to an already-subscribed tag. Guarded by the
+ /// authoritative live-handle set (_subscribersByItemHandle) so a stale forward-map entry
+ /// can never hand out a dead handle.
+ ///
+ /// The dotted tag full reference (e.g. TestMachine_002.TestFloat).
+ /// The live item handle, or null when none is currently subscribed.
+ public int? TryResolveItemHandle(string fullRef)
+ {
+ if (fullRef is null) return null;
+ if (_itemHandleByFullRef.TryGetValue(fullRef, out var handle)
+ && _subscribersByItemHandle.ContainsKey(handle))
+ return handle;
+ return null;
+ }
+
/// Snapshot every active binding for diagnostic output.
public IReadOnlyList SnapshotAllBindings() =>
[.. _bySubscriptionId.Values.SelectMany(entry => entry.Bindings)];
@@ -139,7 +171,14 @@ internal sealed class SubscriptionRegistry
if (binding.ItemHandle <= 0) continue;
if (!_subscribersByItemHandle.TryGetValue(binding.ItemHandle, out var set)) continue;
var remaining = set.Remove(subscriptionId);
- if (remaining.IsEmpty) _subscribersByItemHandle.TryRemove(binding.ItemHandle, out _);
+ if (remaining.IsEmpty)
+ {
+ _subscribersByItemHandle.TryRemove(binding.ItemHandle, out _);
+ // Clean up the forward index only when this handle is still the current
+ // mapping — the add loop below will overwrite it with the fresh handle.
+ if (_itemHandleByFullRef.TryGetValue(binding.FullReference, out var fwd) && fwd == binding.ItemHandle)
+ _itemHandleByFullRef.TryRemove(binding.FullReference, out _);
+ }
else _subscribersByItemHandle[binding.ItemHandle] = remaining;
}
@@ -151,6 +190,7 @@ internal sealed class SubscriptionRegistry
binding.ItemHandle,
_ => [subscriptionId],
(_, set) => set.Add(subscriptionId));
+ _itemHandleByFullRef[binding.FullReference] = binding.ItemHandle;
}
}
diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryHandleResolveTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryHandleResolveTests.cs
new file mode 100644
index 00000000..96830f54
--- /dev/null
+++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryHandleResolveTests.cs
@@ -0,0 +1,117 @@
+using Shouldly;
+using Xunit;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime;
+
+///
+/// Tests for — the forward
+/// fullRef → live item-handle lookup the Galaxy writer uses to skip a redundant
+/// AddItem round-trip when an already-subscribed tag is written.
+///
+public sealed class SubscriptionRegistryHandleResolveTests
+{
+ ///
+ /// Verifies that after registering a binding, TryResolveItemHandle returns the correct handle.
+ ///
+ [Fact]
+ public void Register_ThenResolve_ReturnsHandle()
+ {
+ var registry = new SubscriptionRegistry();
+ registry.Register(1, [new TagBinding("Tag.A", 5)]);
+
+ registry.TryResolveItemHandle("Tag.A").ShouldBe(5);
+ }
+
+ ///
+ /// Verifies that TryResolveItemHandle is case-insensitive on the full reference.
+ ///
+ [Fact]
+ public void Register_ThenResolve_IsCaseInsensitive()
+ {
+ var registry = new SubscriptionRegistry();
+ registry.Register(1, [new TagBinding("Tag.A", 5)]);
+
+ registry.TryResolveItemHandle("tag.a").ShouldBe(5);
+ }
+
+ ///
+ /// Verifies that a full reference that was never registered resolves to null.
+ ///
+ [Fact]
+ public void NeverRegistered_ReturnsNull()
+ {
+ var registry = new SubscriptionRegistry();
+
+ registry.TryResolveItemHandle("Tag.NotHere").ShouldBeNull();
+ }
+
+ ///
+ /// Verifies that after Remove(), the forward lookup returns null.
+ ///
+ [Fact]
+ public void Remove_ThenResolve_ReturnsNull()
+ {
+ var registry = new SubscriptionRegistry();
+ registry.Register(1, [new TagBinding("Tag.A", 5)]);
+
+ registry.Remove(1);
+
+ registry.TryResolveItemHandle("Tag.A").ShouldBeNull();
+ }
+
+ ///
+ /// Verifies that after Rebind() the forward lookup returns the new handle, not the old one.
+ ///
+ [Fact]
+ public void Rebind_ThenResolve_ReturnsNewHandle()
+ {
+ var registry = new SubscriptionRegistry();
+ registry.Register(1, [new TagBinding("Tag.A", 5)]);
+
+ registry.Rebind(1, [new TagBinding("Tag.A", 99)]);
+
+ registry.TryResolveItemHandle("Tag.A").ShouldBe(99);
+ }
+
+ ///
+ /// Verifies that a binding with ItemHandle <= 0 (gateway-rejected) is not resolvable.
+ ///
+ [Fact]
+ public void FailedBinding_ZeroHandle_IsNotResolvable()
+ {
+ var registry = new SubscriptionRegistry();
+ registry.Register(1, [new TagBinding("Tag.Failed", 0)]);
+
+ registry.TryResolveItemHandle("Tag.Failed").ShouldBeNull();
+ }
+
+ ///
+ /// Verifies that a binding with a negative ItemHandle is not resolvable.
+ ///
+ [Fact]
+ public void FailedBinding_NegativeHandle_IsNotResolvable()
+ {
+ var registry = new SubscriptionRegistry();
+ registry.Register(1, [new TagBinding("Tag.Failed", -1)]);
+
+ registry.TryResolveItemHandle("Tag.Failed").ShouldBeNull();
+ }
+
+ ///
+ /// Verifies the liveness guard: after the only subscriber of a handle is removed,
+ /// the forward lookup returns null even if a stale forward-map entry lingers.
+ ///
+ [Fact]
+ public void Remove_OnlySubscriber_LivenessGuard_ReturnsNull()
+ {
+ var registry = new SubscriptionRegistry();
+ registry.Register(1, [new TagBinding("Tag.A", 5)]);
+
+ registry.Remove(1);
+
+ // After removal the subscriber set for handle 5 is gone, so TryResolveItemHandle
+ // must return null regardless of whether the forward entry was cleaned up.
+ registry.TryResolveItemHandle("Tag.A").ShouldBeNull();
+ }
+}