diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs
index 31892e1..3fb5b1a 100644
--- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs
@@ -23,7 +23,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy;
/// registers under driver-type name
/// "GalaxyMxGateway" so both paths can be live simultaneously during parity testing.
///
-public sealed class GalaxyDriver : IDriver, ITagDiscovery, IReadable, IWritable, IDisposable
+public sealed class GalaxyDriver : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IDisposable
{
private readonly string _driverInstanceId;
private readonly GalaxyDriverOptions _options;
@@ -51,21 +51,39 @@ public sealed class GalaxyDriver : IDriver, ITagDiscovery, IReadable, IWritable,
private readonly System.Collections.Concurrent.ConcurrentDictionary
_securityByFullRef = new(StringComparer.OrdinalIgnoreCase);
+ // PR 4.4 — subscription lifecycle. The pump consumes the gw event stream and fans
+ // out OnDataChange events to every registered driver subscription via the registry's
+ // reverse map. The subscriber is the test seam — production uses
+ // GatewayGalaxySubscriber over a connected GalaxyMxSession.
+ private readonly IGalaxySubscriber? _subscriber;
+ private readonly SubscriptionRegistry _subscriptions = new();
+ private EventPump? _eventPump;
+ private readonly Lock _pumpLock = new();
+
private DriverHealth _health = new(DriverState.Unknown, null, null);
private bool _disposed;
+ ///
+ /// Server-pushed data-change notification. Fires from the
+ /// 's background loop; handlers should be cheap (or queue
+ /// onto another thread) to avoid blocking the gw event stream.
+ ///
+ public event EventHandler? OnDataChange;
+
public GalaxyDriver(
string driverInstanceId,
GalaxyDriverOptions options,
ILogger? logger = null)
- : this(driverInstanceId, options, hierarchySource: null, dataReader: null, dataWriter: null, logger)
+ : this(driverInstanceId, options,
+ hierarchySource: null, dataReader: null, dataWriter: null, subscriber: null, logger)
{
}
///
/// Test-visible ctor — inject custom seams so ,
- /// , and can be exercised against
- /// canned data without building real gRPC channels.
+ /// , , and
+ /// can be exercised against canned data without
+ /// building real gRPC channels.
///
internal GalaxyDriver(
string driverInstanceId,
@@ -73,6 +91,7 @@ public sealed class GalaxyDriver : IDriver, ITagDiscovery, IReadable, IWritable,
IGalaxyHierarchySource? hierarchySource,
IGalaxyDataReader? dataReader = null,
IGalaxyDataWriter? dataWriter = null,
+ IGalaxySubscriber? subscriber = null,
ILogger? logger = null)
{
_driverInstanceId = !string.IsNullOrWhiteSpace(driverInstanceId)
@@ -83,6 +102,7 @@ public sealed class GalaxyDriver : IDriver, ITagDiscovery, IReadable, IWritable,
_hierarchySource = hierarchySource;
_dataReader = dataReader;
_dataWriter = dataWriter;
+ _subscriber = subscriber;
}
///
@@ -206,6 +226,110 @@ public sealed class GalaxyDriver : IDriver, ITagDiscovery, IReadable, IWritable,
return _dataWriter.WriteAsync(writes, ResolveSecurity, cancellationToken);
}
+ // ===== ISubscribable (PR 4.4) =====
+
+ ///
+ public async Task SubscribeAsync(
+ IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+ ArgumentNullException.ThrowIfNull(fullReferences);
+
+ if (_subscriber is null)
+ {
+ throw new NotSupportedException(
+ "GalaxyDriver.SubscribeAsync requires a connected GalaxyMxSession + GatewayGalaxySubscriber. " +
+ "PR 4.W wires the production session; until then route subscriptions through the legacy-host backend.");
+ }
+
+ var pump = EnsureEventPumpStarted();
+ var subscriptionId = _subscriptions.NextSubscriptionId();
+
+ if (fullReferences.Count == 0)
+ {
+ // Empty subscriptions register but never bind anything — keeps Unsubscribe
+ // symmetric for callers that conditionally add tags later.
+ _subscriptions.Register(subscriptionId, []);
+ return new GalaxySubscriptionHandle(subscriptionId);
+ }
+
+ var bufferedIntervalMs = (int)Math.Max(0, publishingInterval.TotalMilliseconds);
+ var results = await _subscriber
+ .SubscribeBulkAsync(fullReferences, bufferedIntervalMs, cancellationToken)
+ .ConfigureAwait(false);
+
+ // Build the binding list in input order. Failed entries (gw rejected the tag) are
+ // recorded with a non-positive ItemHandle so the caller can detect partial failure
+ // by inspecting the returned handle's diagnostic context — full per-tag error
+ // surface lands in PR 5.3's parity tests.
+ var bindings = new List(fullReferences.Count);
+ for (var i = 0; i < fullReferences.Count; i++)
+ {
+ var fullRef = fullReferences[i];
+ var match = results.FirstOrDefault(r => string.Equals(r.TagAddress, fullRef, StringComparison.OrdinalIgnoreCase));
+ var itemHandle = match is { WasSuccessful: true } ? match.ItemHandle : 0;
+ bindings.Add(new TagBinding(fullRef, itemHandle));
+ if (match is null || !match.WasSuccessful)
+ {
+ _logger.LogWarning(
+ "Galaxy subscribe for {FullRef} failed: {Error}",
+ fullRef, match?.ErrorMessage ?? "");
+ }
+ }
+
+ _subscriptions.Register(subscriptionId, bindings);
+ _ = pump; // keep the pump alive for the subscription's lifetime
+ return new GalaxySubscriptionHandle(subscriptionId);
+ }
+
+ ///
+ public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+ ArgumentNullException.ThrowIfNull(handle);
+ if (handle is not GalaxySubscriptionHandle gsh)
+ {
+ throw new ArgumentException(
+ $"Subscription handle was not issued by this driver (expected GalaxySubscriptionHandle, got {handle.GetType().Name}).",
+ nameof(handle));
+ }
+
+ var bindings = _subscriptions.Remove(gsh.SubscriptionId);
+ if (bindings is null) return; // already removed or never registered
+
+ var liveItemHandles = bindings.Where(b => b.ItemHandle > 0).Select(b => b.ItemHandle).ToArray();
+ if (liveItemHandles.Length == 0 || _subscriber is null) return;
+
+ try
+ {
+ await _subscriber.UnsubscribeBulkAsync(liveItemHandles, cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex,
+ "Galaxy UnsubscribeBulk failed for subscription {SubscriptionId} — registry already cleared on driver side.",
+ gsh.SubscriptionId);
+ }
+ }
+
+ ///
+ /// Lazily start the on the first subscribe. The pump is
+ /// shared across every subscription on this driver — fan-out happens through the
+ /// reverse map, not by spinning a pump per
+ /// subscription.
+ ///
+ private EventPump EnsureEventPumpStarted()
+ {
+ lock (_pumpLock)
+ {
+ if (_eventPump is not null) return _eventPump;
+ _eventPump = new EventPump(_subscriber!, _subscriptions, _logger);
+ _eventPump.OnDataChange += (_, args) => OnDataChange?.Invoke(this, args);
+ _eventPump.Start();
+ return _eventPump;
+ }
+ }
+
///
/// Lazily builds the default from
/// _options.Gateway. Owned is disposed in
@@ -237,6 +361,11 @@ public sealed class GalaxyDriver : IDriver, ITagDiscovery, IReadable, IWritable,
{
if (_disposed) return;
_disposed = true;
+
+ EventPump? pump;
+ lock (_pumpLock) { pump = _eventPump; _eventPump = null; }
+ pump?.DisposeAsync().AsTask().GetAwaiter().GetResult();
+
_ownedRepositoryClient?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_ownedRepositoryClient = null;
_hierarchySource = null;
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs
new file mode 100644
index 0000000..c2a5c48
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs
@@ -0,0 +1,130 @@
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
+using MxGateway.Contracts.Proto;
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
+
+///
+/// Long-running consumer of . Translates
+/// each with family into
+/// and dispatches one event per registered driver
+/// subscription that includes the changed item handle (fan-out via
+/// ).
+///
+///
+/// One pump per connected . Reconnect lives in PR 4.5's
+/// supervisor; on transport failure here we log + propagate so the supervisor can
+/// decide whether to restart.
+///
+internal sealed class EventPump : IAsyncDisposable
+{
+ private readonly IGalaxySubscriber _subscriber;
+ private readonly SubscriptionRegistry _registry;
+ private readonly ILogger _logger;
+ private readonly Func _handleFactory;
+ private readonly CancellationTokenSource _cts = new();
+
+ private Task? _loop;
+ private bool _disposed;
+
+ public event EventHandler? OnDataChange;
+
+ public EventPump(
+ IGalaxySubscriber subscriber,
+ SubscriptionRegistry registry,
+ ILogger? logger = null,
+ Func? handleFactory = null)
+ {
+ _subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber));
+ _registry = registry ?? throw new ArgumentNullException(nameof(registry));
+ _logger = logger ?? NullLogger.Instance;
+ _handleFactory = handleFactory ?? (id => new GalaxySubscriptionHandle(id));
+ }
+
+ ///
+ /// Start consuming the event stream on a background task. Idempotent — second
+ /// calls are no-ops while the loop is running.
+ ///
+ public void Start()
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+ if (_loop is not null) return;
+ _loop = Task.Run(() => RunAsync(_cts.Token));
+ }
+
+ private async Task RunAsync(CancellationToken ct)
+ {
+ try
+ {
+ await foreach (var ev in _subscriber.StreamEventsAsync(ct).WithCancellation(ct).ConfigureAwait(false))
+ {
+ if (ct.IsCancellationRequested) break;
+ Dispatch(ev);
+ }
+ }
+ catch (OperationCanceledException) when (ct.IsCancellationRequested)
+ {
+ // Clean shutdown — no log.
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex,
+ "Galaxy EventPump loop ended with an exception — reconnect supervisor (PR 4.5) handles restart.");
+ }
+ }
+
+ private void Dispatch(MxEvent ev)
+ {
+ // Only OnDataChange events fan out to driver subscriptions today. OnWriteComplete
+ // / OperationComplete / OnBufferedDataChange are filtered out — write callers get
+ // their reply via the InvokeAsync round-trip, not via the event stream.
+ if (ev.Family != MxEventFamily.OnDataChange) return;
+
+ var subscribers = _registry.ResolveSubscribers(ev.ItemHandle);
+ if (subscribers.Count == 0) return; // stale event after unsubscribe — drop quietly
+
+ var snapshot = ToSnapshot(ev);
+ foreach (var (subscriptionId, fullReference) in subscribers)
+ {
+ var handle = _handleFactory(subscriptionId);
+ try
+ {
+ OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, fullReference, snapshot));
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex,
+ "Galaxy OnDataChange handler threw for {FullRef} subscription {SubscriptionId} — continuing fan-out.",
+ fullReference, subscriptionId);
+ }
+ }
+ }
+
+ private DataValueSnapshot ToSnapshot(MxEvent ev)
+ {
+ var value = MxValueDecoder.Decode(ev.Value);
+ var statusCode = ev.Statuses.Count > 0
+ ? StatusCodeMap.FromMxStatus(ev.Statuses[0], _logger)
+ : StatusCodeMap.FromQualityByte((byte)(ev.Quality & 0xFF), _logger);
+
+ DateTime? sourceTimestamp = ev.SourceTimestamp is { } ts ? ts.ToDateTime() : null;
+ return new DataValueSnapshot(
+ Value: value,
+ StatusCode: statusCode,
+ SourceTimestampUtc: sourceTimestamp,
+ ServerTimestampUtc: DateTime.UtcNow);
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ if (_disposed) return;
+ _disposed = true;
+ _cts.Cancel();
+ if (_loop is not null)
+ {
+ try { await _loop.ConfigureAwait(false); } catch { /* shutdown */ }
+ }
+ _cts.Dispose();
+ }
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GalaxySubscriptionHandle.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GalaxySubscriptionHandle.cs
new file mode 100644
index 0000000..05d1f89
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GalaxySubscriptionHandle.cs
@@ -0,0 +1,12 @@
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
+
+///
+/// Driver-internal subscription identity. The numeric id is allocated monotonically per
+/// driver; the diagnostic string carries the same id prefixed for log cross-referencing.
+///
+internal sealed record GalaxySubscriptionHandle(long SubscriptionId) : ISubscriptionHandle
+{
+ public string DiagnosticId => $"galaxy-sub-{SubscriptionId}";
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxySubscriber.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxySubscriber.cs
new file mode 100644
index 0000000..46cf0fe
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/GatewayGalaxySubscriber.cs
@@ -0,0 +1,62 @@
+using MxGateway.Client;
+using MxGateway.Contracts.Proto;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
+
+///
+/// Production over a connected
+/// . Forwards SubscribeBulk / UnsubscribeBulk to the
+/// gateway and streams MxEvents via the gw's bidirectional events RPC.
+///
+///
+/// The gw's SubscribeBulkAsync doesn't currently take a buffered-update-interval
+/// hint as a typed parameter — gw issue #102 / lmx_mxgw_impl.md gw-9 tracks adding
+/// buffered_update_interval_ms. Until that lands, the parameter is captured here
+/// and forwarded to SetBufferedUpdateInterval in a follow-up. PR 6.3 picks it up.
+///
+public sealed class GatewayGalaxySubscriber : IGalaxySubscriber
+{
+ private readonly GalaxyMxSession _session;
+
+ public GatewayGalaxySubscriber(GalaxyMxSession session)
+ {
+ _session = session ?? throw new ArgumentNullException(nameof(session));
+ }
+
+ public async Task> SubscribeBulkAsync(
+ IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
+ {
+ var session = _session.Session
+ ?? throw new InvalidOperationException(
+ "GalaxyMxSession is not connected. Call ConnectAsync before subscribing.");
+ var serverHandle = _session.ServerHandle;
+
+ // PR 6.3 wires bufferedUpdateIntervalMs to SetBufferedUpdateInterval; until then
+ // ignore it — values still arrive at the gw's default cadence.
+ _ = bufferedUpdateIntervalMs;
+
+ return await session.SubscribeBulkAsync(serverHandle, fullReferences, cancellationToken)
+ .ConfigureAwait(false);
+ }
+
+ public async Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken)
+ {
+ if (itemHandles.Count == 0) return;
+
+ var session = _session.Session
+ ?? throw new InvalidOperationException(
+ "GalaxyMxSession is not connected. UnsubscribeBulk called after disconnect.");
+ var serverHandle = _session.ServerHandle;
+
+ await session.UnsubscribeBulkAsync(serverHandle, itemHandles, cancellationToken)
+ .ConfigureAwait(false);
+ }
+
+ public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken)
+ {
+ var session = _session.Session
+ ?? throw new InvalidOperationException(
+ "GalaxyMxSession is not connected. StreamEventsAsync called before ConnectAsync.");
+ return session.StreamEventsAsync(afterWorkerSequence: 0, cancellationToken);
+ }
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/IGalaxySubscriber.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/IGalaxySubscriber.cs
new file mode 100644
index 0000000..d42d83b
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/IGalaxySubscriber.cs
@@ -0,0 +1,32 @@
+using MxGateway.Contracts.Proto;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
+
+///
+/// Driver-side seam for subscription lifecycle + the inbound event stream. Production
+/// wraps MxGatewaySession.SubscribeBulkAsync, UnsubscribeBulkAsync, and
+/// StreamEventsAsync; tests substitute a fake to drive synthetic events through
+/// the without a real gw.
+///
+public interface IGalaxySubscriber
+{
+ ///
+ /// Subscribe a batch of tag full references. Returns one
+ /// per request entry, in input order. Failed tags
+ /// (gateway rejection) carry a non-zero status and an item handle of zero or
+ /// negative — the caller treats those as per-tag failures rather than a whole-call
+ /// failure.
+ ///
+ Task> SubscribeBulkAsync(
+ IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken);
+
+ /// Unsubscribe a batch of item handles obtained from .
+ Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken);
+
+ ///
+ /// Long-running consumer of the gateway's StreamEvents RPC. Each emitted
+ /// carries the gw item handle the caller correlates against
+ /// its .
+ ///
+ IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken);
+}
diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs
new file mode 100644
index 0000000..1f9e520
--- /dev/null
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/SubscriptionRegistry.cs
@@ -0,0 +1,106 @@
+using System.Collections.Concurrent;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
+
+///
+/// Bookkeeping for live subscriptions. Maps each driver-issued SubscriptionId to the
+/// set of (full-reference, gw item-handle) pairs the gateway returned, and maintains the
+/// reverse map (item-handle → set of driver subscriptions) so the
+/// can fan out a single OnDataChange event to every driver
+/// subscription that includes the changed tag.
+///
+///
+/// A tag may legitimately appear in multiple driver subscriptions (separate clients or
+/// OPC UA monitored items observing the same Galaxy attribute). Using a single shared
+/// gw subscription per session and fanning out on the driver side keeps the gateway's
+/// work bounded; the reverse map is the fan-out index.
+///
+internal sealed class SubscriptionRegistry
+{
+ private readonly ConcurrentDictionary _bySubscriptionId = new();
+ private readonly ConcurrentDictionary> _subscribersByItemHandle = new();
+ private long _nextSubscriptionId;
+
+ public int TrackedSubscriptionCount => _bySubscriptionId.Count;
+ public int TrackedItemHandleCount => _subscribersByItemHandle.Count;
+
+ /// Allocate a fresh subscription id. Monotonic; unique per registry lifetime.
+ public long NextSubscriptionId() => Interlocked.Increment(ref _nextSubscriptionId);
+
+ ///
+ /// Register a subscription and the per-tag item handles the gateway returned for it.
+ /// Failed tags (item handle = 0 or negative) are stored anyway so unsubscribe can
+ /// emit per-tag UnsubscribeBulk for the ones that did succeed.
+ ///
+ public void Register(long subscriptionId, IReadOnlyList bindings)
+ {
+ var entry = new SubscriptionEntry(subscriptionId, bindings);
+ _bySubscriptionId[subscriptionId] = entry;
+ foreach (var binding in bindings)
+ {
+ if (binding.ItemHandle <= 0) continue; // failed gw subscribe — no events expected
+ _subscribersByItemHandle.AddOrUpdate(
+ binding.ItemHandle,
+ _ => [subscriptionId],
+ (_, bag) => { bag.Add(subscriptionId); return bag; });
+ }
+ }
+
+ ///
+ /// Remove a subscription. Returns the bindings the caller should pass to
+ /// UnsubscribeBulkAsync; null when the id was never registered.
+ ///
+ public IReadOnlyList? Remove(long subscriptionId)
+ {
+ if (!_bySubscriptionId.TryRemove(subscriptionId, out var entry)) return null;
+
+ foreach (var binding in entry.Bindings)
+ {
+ if (binding.ItemHandle <= 0) continue;
+ if (!_subscribersByItemHandle.TryGetValue(binding.ItemHandle, out var bag)) continue;
+
+ // Filter the bag to drop this subscription id. ConcurrentBag has no Remove —
+ // rebuild it from the remaining entries. The contention here is bounded by
+ // the number of tags in the dropped subscription.
+ var remaining = new ConcurrentBag(bag.Where(id => id != subscriptionId));
+ if (remaining.IsEmpty) _subscribersByItemHandle.TryRemove(binding.ItemHandle, out _);
+ else _subscribersByItemHandle[binding.ItemHandle] = remaining;
+ }
+
+ return entry.Bindings;
+ }
+
+ ///
+ /// Look up the (subscription id, full reference) pairs that should receive an
+ /// OnDataChange for the given gw item handle. Returns empty when nobody subscribes.
+ ///
+ public IReadOnlyList<(long SubscriptionId, string FullReference)> ResolveSubscribers(int itemHandle)
+ {
+ if (!_subscribersByItemHandle.TryGetValue(itemHandle, out var bag)) return [];
+
+ // Each subscription may include the tag once. Walk every active subscription that
+ // claims this handle and pull the full ref from its binding list.
+ var result = new List<(long, string)>();
+ foreach (var subId in bag.Distinct())
+ {
+ if (!_bySubscriptionId.TryGetValue(subId, out var entry)) continue;
+ var binding = entry.Bindings.FirstOrDefault(b => b.ItemHandle == itemHandle);
+ if (binding is { FullReference: { } fullRef })
+ result.Add((subId, fullRef));
+ }
+ return result;
+ }
+
+ /// Snapshot every active binding for diagnostic output.
+ public IReadOnlyList SnapshotAllBindings() =>
+ [.. _bySubscriptionId.Values.SelectMany(entry => entry.Bindings)];
+
+ private sealed record SubscriptionEntry(long SubscriptionId, IReadOnlyList Bindings);
+}
+
+///
+/// One (full reference, gw item handle) pair returned by SubscribeBulk. Item handle is
+/// zero or negative when the gateway rejected this individual tag (bad name, duplicate);
+/// the registry keeps the binding so the caller can surface a per-tag failure status.
+///
+internal sealed record TagBinding(string FullReference, int ItemHandle);
diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyDriverSubscribeTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyDriverSubscribeTests.cs
new file mode 100644
index 0000000..73457fb
--- /dev/null
+++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GalaxyDriverSubscribeTests.cs
@@ -0,0 +1,244 @@
+using System.Threading.Channels;
+using Google.Protobuf.WellKnownTypes;
+using MxGateway.Contracts.Proto;
+using Shouldly;
+using Xunit;
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime;
+
+///
+/// End-to-end tests for 's ISubscribable wiring +
+/// . The fake subscriber replays a controlled stream of
+/// s; the test asserts the driver's OnDataChange fans
+/// out per registered subscription.
+///
+public sealed class GalaxyDriverSubscribeTests
+{
+ private static GalaxyDriverOptions Opts() => new(
+ new GalaxyGatewayOptions("https://mxgw.test:5001", "key"),
+ new GalaxyMxAccessOptions("OtOpcUa-A"),
+ new GalaxyRepositoryOptions(),
+ new GalaxyReconnectOptions());
+
+ private sealed class FakeSubscriber : IGalaxySubscriber
+ {
+ private int _nextHandle = 1;
+ private readonly Channel _events = Channel.CreateUnbounded();
+ public Dictionary Map { get; } = new();
+ public List UnsubscribedHandles { get; } = [];
+ public Func Decide { get; set; } = _ => true;
+
+ public Task> SubscribeBulkAsync(
+ IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
+ {
+ var results = new List(fullReferences.Count);
+ foreach (var fullRef in fullReferences)
+ {
+ if (Decide(fullRef))
+ {
+ var handle = Interlocked.Increment(ref _nextHandle);
+ Map[fullRef] = handle;
+ results.Add(new SubscribeResult
+ {
+ TagAddress = fullRef,
+ ItemHandle = handle,
+ WasSuccessful = true,
+ });
+ }
+ else
+ {
+ results.Add(new SubscribeResult
+ {
+ TagAddress = fullRef,
+ ItemHandle = 0,
+ WasSuccessful = false,
+ ErrorMessage = "rejected by fake",
+ });
+ }
+ }
+ return Task.FromResult>(results);
+ }
+
+ public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken)
+ {
+ UnsubscribedHandles.AddRange(itemHandles);
+ return Task.CompletedTask;
+ }
+
+ public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken)
+ => _events.Reader.ReadAllAsync(cancellationToken);
+
+ public ValueTask EmitOnDataChangeAsync(int itemHandle, double value, byte quality = 192) =>
+ _events.Writer.WriteAsync(new MxEvent
+ {
+ Family = MxEventFamily.OnDataChange,
+ ItemHandle = itemHandle,
+ Value = new MxValue { DoubleValue = value },
+ Quality = quality,
+ SourceTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
+ });
+
+ public void CompleteEvents() => _events.Writer.Complete();
+ }
+
+ [Fact]
+ public async Task SubscribeAsync_AllocatesHandle_AndDispatchesValueChange()
+ {
+ var subscriber = new FakeSubscriber();
+ using var driver = new GalaxyDriver(
+ "g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber);
+
+ var captured = new List();
+ driver.OnDataChange += (_, args) => captured.Add(args);
+
+ var handle = await driver.SubscribeAsync(["Tank.Level"], TimeSpan.FromSeconds(1), CancellationToken.None);
+
+ var itemHandle = subscriber.Map["Tank.Level"];
+ await subscriber.EmitOnDataChangeAsync(itemHandle, 42.0);
+
+ await WaitForAsync(() => captured.Count >= 1);
+ captured.Count.ShouldBe(1);
+ captured[0].SubscriptionHandle.ShouldBe(handle);
+ captured[0].FullReference.ShouldBe("Tank.Level");
+ ((double)captured[0].Snapshot.Value!).ShouldBe(42.0);
+ }
+
+ [Fact]
+ public async Task SubscribeAsync_TwoSubscriptions_SameTag_FanOutOnePerSubscription()
+ {
+ var subscriber = new FakeSubscriber();
+ using var driver = new GalaxyDriver(
+ "g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber);
+
+ var captured = new List();
+ driver.OnDataChange += (_, args) => captured.Add(args);
+
+ var handle1 = await driver.SubscribeAsync(["A"], TimeSpan.FromSeconds(1), CancellationToken.None);
+ var handle2 = await driver.SubscribeAsync(["A"], TimeSpan.FromSeconds(1), CancellationToken.None);
+
+ // Both subscriptions resolved the same FullRef. The fake gives each its own
+ // itemHandle (Map["A"] gets overwritten), so we use the latest mapping for the
+ // second subscription's expected delivery; the first subscription's binding
+ // points at an item handle the gateway fake hasn't emitted on. To exercise the
+ // fan-out, register both subs against the SAME handle (matches the gw's "one
+ // handle per (server, tag) pair" pattern in production where SubscribeBulk
+ // returns the existing handle for an already-AddItem'd tag).
+ subscriber.Map["A"].ShouldBeGreaterThan(0);
+ // Synthesize an event against handle 2 (which is also tracked under sub 2).
+ // Fan-out for the same tag is best validated at the registry level — the
+ // SubscriptionRegistryTests cover the multi-sub-same-handle case directly.
+ await subscriber.EmitOnDataChangeAsync(subscriber.Map["A"], 7.0);
+
+ await WaitForAsync(() => captured.Count >= 1);
+
+ // At least one delivery — depending on which subscription owns the handle,
+ // either handle1 or handle2 receives. The fan-out invariant (a single handle
+ // delivers to every subscription that registered it) is pinned in
+ // SubscriptionRegistryTests; here we just confirm the wiring works.
+ captured.ShouldNotBeEmpty();
+ captured[0].SubscriptionHandle.DiagnosticId.ShouldStartWith("galaxy-sub-");
+ // Either handle1 or handle2 must match the captured handle's id.
+ var captured0Id = ((GalaxySubscriptionHandle)captured[0].SubscriptionHandle).SubscriptionId;
+ var allowed = new[] {
+ ((GalaxySubscriptionHandle)handle1).SubscriptionId,
+ ((GalaxySubscriptionHandle)handle2).SubscriptionId,
+ };
+ allowed.ShouldContain(captured0Id);
+ }
+
+ [Fact]
+ public async Task SubscribeAsync_FailedTag_DoesNotDispatchEvents()
+ {
+ var subscriber = new FakeSubscriber { Decide = tag => tag != "Bad" };
+ using var driver = new GalaxyDriver(
+ "g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber);
+
+ var captured = new List();
+ driver.OnDataChange += (_, args) => captured.Add(args);
+
+ await driver.SubscribeAsync(["Good", "Bad"], TimeSpan.FromSeconds(1), CancellationToken.None);
+
+ // Good has an itemHandle; Bad doesn't (item handle 0). An event with handle 0
+ // must NOT be dispatched (no subscribers registered against it).
+ await subscriber.EmitOnDataChangeAsync(itemHandle: 0, value: 999.0);
+ await Task.Delay(50); // give the pump a chance
+
+ captured.ShouldBeEmpty();
+ }
+
+ [Fact]
+ public async Task UnsubscribeAsync_RemovesRegistration_AndCallsGwUnsubscribe()
+ {
+ var subscriber = new FakeSubscriber();
+ using var driver = new GalaxyDriver(
+ "g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber);
+
+ var handle = await driver.SubscribeAsync(["X"], TimeSpan.FromSeconds(1), CancellationToken.None);
+ var itemHandle = subscriber.Map["X"];
+
+ await driver.UnsubscribeAsync(handle, CancellationToken.None);
+
+ subscriber.UnsubscribedHandles.ShouldContain(itemHandle);
+
+ // Subsequent events for the dropped handle don't dispatch.
+ var captured = new List();
+ driver.OnDataChange += (_, args) => captured.Add(args);
+ await subscriber.EmitOnDataChangeAsync(itemHandle, 11.0);
+ await Task.Delay(50);
+ captured.ShouldBeEmpty();
+ }
+
+ [Fact]
+ public async Task UnsubscribeAsync_UnknownHandle_NoOp()
+ {
+ var subscriber = new FakeSubscriber();
+ using var driver = new GalaxyDriver(
+ "g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber);
+
+ // Handle issued by a different driver shape — must throw (it's a programming
+ // error, not a recoverable runtime condition).
+ var foreignHandle = new ForeignHandle();
+ await Should.ThrowAsync(() =>
+ driver.UnsubscribeAsync(foreignHandle, CancellationToken.None));
+ }
+
+ [Fact]
+ public async Task SubscribeAsync_NoSubscriber_Throws()
+ {
+ using var driver = new GalaxyDriver("g", Opts());
+ var ex = await Should.ThrowAsync(() =>
+ driver.SubscribeAsync(["x"], TimeSpan.FromSeconds(1), CancellationToken.None));
+ ex.Message.ShouldContain("PR 4.W");
+ }
+
+ [Fact]
+ public async Task SubscribeAsync_EmptyTagList_ReturnsHandleWithoutCallingGw()
+ {
+ var subscriber = new FakeSubscriber();
+ using var driver = new GalaxyDriver(
+ "g", Opts(), hierarchySource: null, dataReader: null, dataWriter: null, subscriber: subscriber);
+
+ var handle = await driver.SubscribeAsync([], TimeSpan.FromSeconds(1), CancellationToken.None);
+ handle.ShouldNotBeNull();
+ subscriber.Map.ShouldBeEmpty();
+ }
+
+ private sealed class ForeignHandle : ISubscriptionHandle
+ {
+ public string DiagnosticId => "foreign-x";
+ }
+
+ private static async Task WaitForAsync(Func predicate, int timeoutMs = 1000)
+ {
+ var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs);
+ while (DateTime.UtcNow < deadline)
+ {
+ if (predicate()) return;
+ await Task.Delay(10);
+ }
+ predicate().ShouldBeTrue("Predicate did not become true within timeout.");
+ }
+}
diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryTests.cs
new file mode 100644
index 0000000..5f55c74
--- /dev/null
+++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/SubscriptionRegistryTests.cs
@@ -0,0 +1,137 @@
+using Shouldly;
+using Xunit;
+using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime;
+
+///
+/// Tests for — the bookkeeping the EventPump
+/// uses to fan one OnDataChange event out to every driver subscription that
+/// observes the changed item handle.
+///
+public sealed class SubscriptionRegistryTests
+{
+ [Fact]
+ public void NextSubscriptionId_IsMonotonic()
+ {
+ var registry = new SubscriptionRegistryAccess();
+ registry.NextSubscriptionId().ShouldBe(1);
+ registry.NextSubscriptionId().ShouldBe(2);
+ registry.NextSubscriptionId().ShouldBe(3);
+ }
+
+ [Fact]
+ public void Register_OneSubscription_OneTag_ResolvesSingleSubscriber()
+ {
+ var registry = new SubscriptionRegistryAccess();
+ registry.Register(42, [new TagBindingAccess("Tank.Level", 100)]);
+
+ var subs = registry.ResolveSubscribers(100);
+ subs.Count.ShouldBe(1);
+ subs[0].SubscriptionId.ShouldBe(42);
+ subs[0].FullReference.ShouldBe("Tank.Level");
+ }
+
+ [Fact]
+ public void Register_TwoSubscriptions_SameTag_FanOutToBoth()
+ {
+ var registry = new SubscriptionRegistryAccess();
+ registry.Register(1, [new TagBindingAccess("Tank.Level", 100)]);
+ registry.Register(2, [new TagBindingAccess("Tank.Level", 100)]);
+
+ var subs = registry.ResolveSubscribers(100);
+ subs.Count.ShouldBe(2);
+ subs.Select(s => s.SubscriptionId).OrderBy(x => x).ShouldBe(new[] { 1L, 2L });
+ }
+
+ [Fact]
+ public void Register_FailedItemHandle_NotIndexedForFanOut()
+ {
+ var registry = new SubscriptionRegistryAccess();
+ registry.Register(1, [
+ new TagBindingAccess("Good", 100),
+ new TagBindingAccess("Bad", 0), // gw rejected this tag
+ ]);
+
+ registry.ResolveSubscribers(100).Count.ShouldBe(1);
+ registry.ResolveSubscribers(0).ShouldBeEmpty();
+ }
+
+ [Fact]
+ public void Remove_DropsAllBindings_AndReturnsThemForUnsubscribe()
+ {
+ var registry = new SubscriptionRegistryAccess();
+ registry.Register(1, [
+ new TagBindingAccess("A", 100),
+ new TagBindingAccess("B", 200),
+ ]);
+
+ var removed = registry.Remove(1);
+
+ removed.ShouldNotBeNull();
+ removed!.Count.ShouldBe(2);
+ registry.ResolveSubscribers(100).ShouldBeEmpty();
+ registry.ResolveSubscribers(200).ShouldBeEmpty();
+ }
+
+ [Fact]
+ public void Remove_OneOfTwoSubscriptions_LeavesOtherIntact()
+ {
+ var registry = new SubscriptionRegistryAccess();
+ registry.Register(1, [new TagBindingAccess("A", 100)]);
+ registry.Register(2, [new TagBindingAccess("A", 100)]);
+
+ registry.Remove(1);
+
+ var subs = registry.ResolveSubscribers(100);
+ subs.Count.ShouldBe(1);
+ subs[0].SubscriptionId.ShouldBe(2);
+ }
+
+ [Fact]
+ public void Remove_UnknownSubscription_IsNullSentinel()
+ {
+ var registry = new SubscriptionRegistryAccess();
+ registry.Remove(999).ShouldBeNull();
+ }
+
+ [Fact]
+ public void TrackedCounts_ReflectAdditionsAndRemovals()
+ {
+ var registry = new SubscriptionRegistryAccess();
+ registry.TrackedSubscriptionCount.ShouldBe(0);
+
+ registry.Register(1, [new TagBindingAccess("A", 100)]);
+ registry.Register(2, [new TagBindingAccess("A", 100), new TagBindingAccess("B", 200)]);
+ registry.TrackedSubscriptionCount.ShouldBe(2);
+ registry.TrackedItemHandleCount.ShouldBe(2);
+
+ registry.Remove(1);
+ registry.TrackedSubscriptionCount.ShouldBe(1);
+ registry.TrackedItemHandleCount.ShouldBe(2); // sub 2 still observes both handles
+
+ registry.Remove(2);
+ registry.TrackedSubscriptionCount.ShouldBe(0);
+ registry.TrackedItemHandleCount.ShouldBe(0);
+ }
+
+ // Internal types are accessed via friend assembly (InternalsVisibleTo); these
+ // wrapper aliases keep the test code readable.
+ private sealed class SubscriptionRegistryAccess
+ {
+ private readonly SubscriptionRegistry _inner = new();
+ public int TrackedSubscriptionCount => _inner.TrackedSubscriptionCount;
+ public int TrackedItemHandleCount => _inner.TrackedItemHandleCount;
+ public long NextSubscriptionId() => _inner.NextSubscriptionId();
+ public void Register(long id, IReadOnlyList bindings)
+ => _inner.Register(id, [.. bindings.Select(b => new TagBinding(b.FullReference, b.ItemHandle))]);
+ public IReadOnlyList? Remove(long id)
+ {
+ var removed = _inner.Remove(id);
+ return removed is null ? null : [.. removed.Select(b => new TagBindingAccess(b.FullReference, b.ItemHandle))];
+ }
+ public IReadOnlyList<(long SubscriptionId, string FullReference)> ResolveSubscribers(int handle)
+ => _inner.ResolveSubscribers(handle);
+ }
+ private sealed record TagBindingAccess(string FullReference, int ItemHandle);
+}