Files
lmxopcua/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs
T
Joseph Doherty 1411950077 feat(galaxy): SubscriptionRegistry.TryResolveItemHandle forward lookup
Add _itemHandleByFullRef (OrdinalIgnoreCase ConcurrentDictionary) maintained
in lock-step with _subscribersByItemHandle across Register/Remove/Rebind.
TryResolveItemHandle cross-checks the authoritative reverse map so a stale
forward entry can never hand out a dead handle. Also wires the scaffolded
_addItemCallCount increment in EnsureItemHandleAsync (field was declared but
never assigned, causing a TreatWarningsAsErrors build failure on the branch).
8 new xUnit + Shouldly facts covering register/case-insensitive/remove/rebind/
failed-handle/liveness-guard paths.
2026-06-18 04:18:01 -04:00

240 lines
13 KiB
C#

using System.Collections.Concurrent;
using System.Collections.Immutable;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
/// <summary>
/// Bookkeeping for live subscriptions. Maps each driver-issued <c>SubscriptionId</c> to the
/// set of (full-reference, gw item-handle) pairs the gateway returned, and maintains the
/// reverse map (item-handle → set of driver subscriptions) so the
/// <see cref="EventPump"/> can fan out a single OnDataChange event to every driver
/// subscription that includes the changed tag.
/// </summary>
/// <remarks>
/// A tag may legitimately appear in multiple driver subscriptions (separate clients or
/// OPC UA monitored items observing the same Galaxy attribute). Using a single shared
/// gw subscription per session and fanning out on the driver side keeps the gateway's
/// work bounded; the reverse map is the fan-out index.
/// </remarks>
internal sealed class SubscriptionRegistry
{
private readonly ConcurrentDictionary<long, SubscriptionEntry> _bySubscriptionId = new();
// Driver.Galaxy-012: use ImmutableHashSet<long> for the reverse map so removal is
// O(log n) instead of "rebuild the entire ConcurrentBag from a LINQ filter on every
// unsubscribe"; reads are lock-free because the immutable snapshot is published
// atomically via ConcurrentDictionary AddOrUpdate.
private readonly ConcurrentDictionary<int, ImmutableHashSet<long>> _subscribersByItemHandle = new();
// Forward index for the Galaxy writer: fullRef (case-insensitive) → live item handle.
// Maintained in lock-step with _subscribersByItemHandle; entries are cleaned up when
// the last subscriber for a handle is removed, and TryResolveItemHandle guards against
// stale entries by cross-checking _subscribersByItemHandle at read time.
private readonly ConcurrentDictionary<string, int> _itemHandleByFullRef =
new(StringComparer.OrdinalIgnoreCase);
private long _nextSubscriptionId;
/// <summary>Gets the number of tracked subscriptions.</summary>
public int TrackedSubscriptionCount => _bySubscriptionId.Count;
/// <summary>Gets the number of tracked item handles.</summary>
public int TrackedItemHandleCount => _subscribersByItemHandle.Count;
/// <summary>Allocate a fresh subscription id. Monotonic; unique per registry lifetime.</summary>
public long NextSubscriptionId() => Interlocked.Increment(ref _nextSubscriptionId);
/// <summary>
/// Register a subscription and the per-tag item handles the gateway returned for it.
/// Failed tags (item handle = 0 or negative) are stored anyway so unsubscribe can
/// emit per-tag UnsubscribeBulk for the ones that did succeed.
/// </summary>
/// <param name="subscriptionId">The subscription identifier.</param>
/// <param name="bindings">The tag bindings for the subscription.</param>
public void Register(long subscriptionId, IReadOnlyList<TagBinding> bindings)
{
var entry = new SubscriptionEntry(subscriptionId, bindings);
_bySubscriptionId[subscriptionId] = entry;
foreach (var binding in bindings)
{
if (binding.ItemHandle <= 0) continue; // failed gw subscribe — no events expected
_subscribersByItemHandle.AddOrUpdate(
binding.ItemHandle,
_ => [subscriptionId],
(_, set) => set.Add(subscriptionId));
_itemHandleByFullRef[binding.FullReference] = binding.ItemHandle;
}
}
/// <summary>
/// Remove a subscription. Returns the bindings the caller should pass to
/// <c>UnsubscribeBulkAsync</c>; null when the id was never registered.
/// </summary>
/// <param name="subscriptionId">The subscription identifier.</param>
/// <returns>The bindings for the subscription, or null if not found.</returns>
public IReadOnlyList<TagBinding>? Remove(long subscriptionId)
{
if (!_bySubscriptionId.TryRemove(subscriptionId, out var entry)) return null;
foreach (var binding in entry.Bindings)
{
if (binding.ItemHandle <= 0) continue;
// Driver.Galaxy-012: ImmutableHashSet.Remove is O(log n) and the result is
// published atomically — no need to rebuild from a LINQ filter.
if (!_subscribersByItemHandle.TryGetValue(binding.ItemHandle, out var set)) continue;
var remaining = set.Remove(subscriptionId);
if (remaining.IsEmpty)
{
_subscribersByItemHandle.TryRemove(binding.ItemHandle, out _);
// Clean up the forward index only when this handle is still the current
// mapping — a concurrent re-add for the same ref must not be clobbered.
if (_itemHandleByFullRef.TryGetValue(binding.FullReference, out var fwd) && fwd == binding.ItemHandle)
_itemHandleByFullRef.TryRemove(binding.FullReference, out _);
}
else _subscribersByItemHandle[binding.ItemHandle] = remaining;
}
return entry.Bindings;
}
/// <summary>
/// Look up the (subscription id, full reference) pairs that should receive an
/// OnDataChange for the given gw item handle. Returns empty when nobody subscribes.
/// </summary>
/// <remarks>
/// Driver.Galaxy-012: O(1) per subscriber via the per-entry
/// <c>FullRefByItemHandle</c> index, rather than a <c>FirstOrDefault</c> linear
/// scan of the binding list. At 50k tags / 1Hz this turns each dispatch from a
/// 50k-element scan into a single dictionary lookup.
/// </remarks>
/// <param name="itemHandle">The gateway item handle.</param>
/// <returns>A list of subscription and reference pairs for the item handle.</returns>
public IReadOnlyList<(long SubscriptionId, string FullReference)> ResolveSubscribers(int itemHandle)
{
if (!_subscribersByItemHandle.TryGetValue(itemHandle, out var bag)) return [];
// Each subscription may include the tag once. Walk every active subscription that
// claims this handle and pull the full ref from its index in O(1).
var result = new List<(long, string)>();
foreach (var subId in bag.Distinct())
{
if (!_bySubscriptionId.TryGetValue(subId, out var entry)) continue;
if (entry.FullRefByItemHandle.TryGetValue(itemHandle, out var fullRef))
result.Add((subId, fullRef));
}
return result;
}
/// <summary>
/// Resolve the live MXAccess item handle a current subscription holds for <paramref name="fullRef"/>,
/// or null when no live subscription covers it. The Galaxy writer borrows this handle to skip a
/// redundant AddItem round-trip on the first write to an already-subscribed tag. Guarded by the
/// authoritative live-handle set (<c>_subscribersByItemHandle</c>) so a stale forward-map entry
/// can never hand out a dead handle.
/// </summary>
/// <param name="fullRef">The dotted tag full reference (e.g. <c>TestMachine_002.TestFloat</c>).</param>
/// <returns>The live item handle, or null when none is currently subscribed.</returns>
public int? TryResolveItemHandle(string fullRef)
{
if (fullRef is null) return null;
if (_itemHandleByFullRef.TryGetValue(fullRef, out var handle)
&& _subscribersByItemHandle.ContainsKey(handle))
return handle;
return null;
}
/// <summary>Snapshot every active binding for diagnostic output.</summary>
public IReadOnlyList<TagBinding> SnapshotAllBindings() =>
[.. _bySubscriptionId.Values.SelectMany(entry => entry.Bindings)];
/// <summary>
/// Snapshot every active subscription with its bindings, grouped by subscription id.
/// Used by the reconnect replay path so it can re-issue SubscribeBulk per subscription
/// and then <see cref="Rebind"/> each one with the post-reconnect item handles.
/// </summary>
public IReadOnlyList<(long SubscriptionId, IReadOnlyList<TagBinding> Bindings)> SnapshotEntries() =>
[.. _bySubscriptionId.Values.Select(entry => (entry.SubscriptionId, entry.Bindings))];
/// <summary>
/// Replace an existing subscription's bindings with the item handles a post-reconnect
/// SubscribeBulk returned, rebuilding the reverse fan-out map so events on the new
/// handles dispatch and the now-dead pre-reconnect handles are dropped. No-op when the
/// subscription id is unknown (it was unsubscribed during the reconnect window).
/// </summary>
/// <param name="subscriptionId">The subscription identifier.</param>
/// <param name="newBindings">The new tag bindings after reconnection.</param>
public void Rebind(long subscriptionId, IReadOnlyList<TagBinding> newBindings)
{
if (!_bySubscriptionId.TryGetValue(subscriptionId, out var oldEntry)) return;
// Drop this subscription from every reverse-map set it currently appears in. The
// pre-reconnect item handles are stale once the gw re-issues fresh ones.
// Driver.Galaxy-012: ImmutableHashSet.Remove is O(log n) — no LINQ rebuild.
foreach (var binding in oldEntry.Bindings)
{
if (binding.ItemHandle <= 0) continue;
if (!_subscribersByItemHandle.TryGetValue(binding.ItemHandle, out var set)) continue;
var remaining = set.Remove(subscriptionId);
if (remaining.IsEmpty)
{
_subscribersByItemHandle.TryRemove(binding.ItemHandle, out _);
// Clean up the forward index only when this handle is still the current
// mapping — the add loop below will overwrite it with the fresh handle.
if (_itemHandleByFullRef.TryGetValue(binding.FullReference, out var fwd) && fwd == binding.ItemHandle)
_itemHandleByFullRef.TryRemove(binding.FullReference, out _);
}
else _subscribersByItemHandle[binding.ItemHandle] = remaining;
}
_bySubscriptionId[subscriptionId] = new SubscriptionEntry(subscriptionId, newBindings);
foreach (var binding in newBindings)
{
if (binding.ItemHandle <= 0) continue; // failed gw subscribe — no events expected
_subscribersByItemHandle.AddOrUpdate(
binding.ItemHandle,
_ => [subscriptionId],
(_, set) => set.Add(subscriptionId));
_itemHandleByFullRef[binding.FullReference] = binding.ItemHandle;
}
}
/// <summary>
/// Per-subscription bookkeeping. <see cref="FullRefByItemHandle"/> is an index
/// over <see cref="Bindings"/> keyed by item handle so <c>ResolveSubscribers</c>
/// is O(1) per subscriber instead of a linear scan of every binding
/// (Driver.Galaxy-012). Failed bindings (item handle ≤ 0) are excluded from the
/// index because the EventPump only dispatches for positive handles.
/// </summary>
/// <summary>Per-subscription bookkeeping entry.</summary>
private sealed class SubscriptionEntry
{
/// <summary>Gets the subscription identifier.</summary>
public long SubscriptionId { get; }
/// <summary>Gets the tag bindings for the subscription.</summary>
public IReadOnlyList<TagBinding> Bindings { get; }
/// <summary>Gets the index of full references by item handle.</summary>
public IReadOnlyDictionary<int, string> FullRefByItemHandle { get; }
/// <summary>Initializes a new subscription entry.</summary>
/// <param name="subscriptionId">The subscription identifier.</param>
/// <param name="bindings">The tag bindings for the subscription.</param>
public SubscriptionEntry(long subscriptionId, IReadOnlyList<TagBinding> bindings)
{
SubscriptionId = subscriptionId;
Bindings = bindings;
var index = new Dictionary<int, string>(bindings.Count);
foreach (var binding in bindings)
{
if (binding.ItemHandle <= 0) continue; // failed gw subscribe — no events expected
// Last-write-wins on duplicates; the driver doesn't double-register a handle
// within a single subscription, but be defensive.
index[binding.ItemHandle] = binding.FullReference;
}
FullRefByItemHandle = index;
}
}
}
/// <summary>
/// One (full reference, gw item handle) pair returned by SubscribeBulk. Item handle is
/// zero or negative when the gateway rejected this individual tag (bad name, duplicate);
/// the registry keeps the binding so the caller can surface a per-tag failure status.
/// </summary>
internal sealed record TagBinding(string FullReference, int ItemHandle);