diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs index 0e3dee2..9baef88 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs @@ -27,8 +27,20 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient; /// /// public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId) - : IDriver, ITagDiscovery, IReadable, IWritable, IDisposable, IAsyncDisposable + : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable { + // ---- ISubscribable + IHostConnectivityProbe state ---- + + private readonly System.Collections.Concurrent.ConcurrentDictionary _subscriptions = new(); + private long _nextSubscriptionId; + private readonly object _probeLock = new(); + private HostState _hostState = HostState.Unknown; + private DateTime _hostStateChangedUtc = DateTime.UtcNow; + private KeepAliveEventHandler? _keepAliveHandler; + + public event EventHandler? OnDataChange; + public event EventHandler? OnHostStatusChanged; + // OPC UA StatusCode constants the driver surfaces for local-side faults. Upstream-server // StatusCodes are passed through verbatim per driver-specs.md §8 "cascading quality" — // downstream clients need to distinguish 'remote source down' from 'local driver failure'. @@ -110,8 +122,21 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d session.KeepAliveInterval = (int)_options.KeepAliveInterval.TotalMilliseconds; + // Wire the session's keep-alive channel into HostState. OPC UA keep-alives are + // authoritative for session liveness: the SDK pings on KeepAliveInterval and sets + // KeepAliveStopped when N intervals elapse without a response. That's strictly + // better than a driver-side polling probe — no extra round-trip, no duplicate + // semantic. + _keepAliveHandler = (_, e) => + { + var healthy = !ServiceResult.IsBad(e.Status); + TransitionTo(healthy ? HostState.Running : HostState.Stopped); + }; + session.KeepAlive += _keepAliveHandler; + Session = session; _health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null); + TransitionTo(HostState.Running); } catch (Exception ex) { @@ -214,10 +239,29 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d public async Task ShutdownAsync(CancellationToken cancellationToken) { + // Tear down remote subscriptions first — otherwise Session.Close will try and may fail + // with BadSubscriptionIdInvalid noise in the upstream log. _subscriptions is cleared + // whether or not the wire-side delete succeeds since the local handles are useless + // after close anyway. + foreach (var rs in _subscriptions.Values) + { + try { await rs.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); } + catch { /* best-effort */ } + } + _subscriptions.Clear(); + + if (_keepAliveHandler is not null && Session is not null) + { + try { Session.KeepAlive -= _keepAliveHandler; } catch { } + } + _keepAliveHandler = null; + try { if (Session is Session s) await s.CloseAsync(cancellationToken).ConfigureAwait(false); } catch { /* best-effort */ } try { Session?.Dispose(); } catch { } Session = null; + + TransitionTo(HostState.Unknown); _health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null); } @@ -484,6 +528,131 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d } } + // ---- ISubscribable ---- + + public async Task SubscribeAsync( + IReadOnlyList fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) + { + var session = RequireSession(); + var id = Interlocked.Increment(ref _nextSubscriptionId); + var handle = new OpcUaSubscriptionHandle(id); + + // Floor the publishing interval at 50ms — OPC UA servers routinely negotiate + // minimum-supported intervals up anyway, but sending sub-50ms wastes negotiation + // bandwidth on every subscription create. + var intervalMs = publishingInterval < TimeSpan.FromMilliseconds(50) + ? 50 + : (int)publishingInterval.TotalMilliseconds; + + var subscription = new Subscription(telemetry: null!, new SubscriptionOptions + { + DisplayName = $"opcua-sub-{id}", + PublishingInterval = intervalMs, + KeepAliveCount = 10, + LifetimeCount = 1000, + MaxNotificationsPerPublish = 0, + PublishingEnabled = true, + Priority = 0, + TimestampsToReturn = TimestampsToReturn.Both, + }); + + await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + session.AddSubscription(subscription); + await subscription.CreateAsync(cancellationToken).ConfigureAwait(false); + + foreach (var fullRef in fullReferences) + { + if (!TryParseNodeId(session, fullRef, out var nodeId)) continue; + // The tag string is routed through MonitoredItem.Handle so the Notification + // handler can identify which tag changed without an extra lookup. + var item = new MonitoredItem(telemetry: null!, new MonitoredItemOptions + { + DisplayName = fullRef, + StartNodeId = nodeId, + AttributeId = Attributes.Value, + MonitoringMode = MonitoringMode.Reporting, + SamplingInterval = intervalMs, + QueueSize = 1, + DiscardOldest = true, + }) + { + Handle = fullRef, + }; + item.Notification += (mi, args) => OnMonitoredItemNotification(handle, mi, args); + subscription.AddItem(item); + } + + await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false); + _subscriptions[id] = new RemoteSubscription(subscription, handle); + } + finally { _gate.Release(); } + + return handle; + } + + public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken) + { + if (handle is not OpcUaSubscriptionHandle h) return; + if (!_subscriptions.TryRemove(h.Id, out var rs)) return; + + await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + try { await rs.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); } + catch { /* best-effort — the subscription may already be gone on reconnect */ } + } + finally { _gate.Release(); } + } + + private void OnMonitoredItemNotification(OpcUaSubscriptionHandle handle, MonitoredItem item, MonitoredItemNotificationEventArgs args) + { + // args.NotificationValue arrives as a MonitoredItemNotification for value-change + // subscriptions; extract its DataValue. The Handle property carries our tag string. + if (args.NotificationValue is not MonitoredItemNotification mn) return; + var dv = mn.Value; + if (dv is null) return; + var fullRef = (item.Handle as string) ?? item.DisplayName ?? string.Empty; + var snapshot = new DataValueSnapshot( + Value: dv.Value, + StatusCode: dv.StatusCode.Code, + SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp, + ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? DateTime.UtcNow : dv.ServerTimestamp); + OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, fullRef, snapshot)); + } + + private sealed record RemoteSubscription(Subscription Subscription, OpcUaSubscriptionHandle Handle); + + private sealed record OpcUaSubscriptionHandle(long Id) : ISubscriptionHandle + { + public string DiagnosticId => $"opcua-sub-{Id}"; + } + + // ---- IHostConnectivityProbe ---- + + /// Endpoint-URL-keyed host identity for the Admin /hosts dashboard. + public string HostName => _options.EndpointUrl; + + public IReadOnlyList GetHostStatuses() + { + lock (_probeLock) + return [new HostConnectivityStatus(HostName, _hostState, _hostStateChangedUtc)]; + } + + private void TransitionTo(HostState newState) + { + HostState old; + lock (_probeLock) + { + old = _hostState; + if (old == newState) return; + _hostState = newState; + _hostStateChangedUtc = DateTime.UtcNow; + } + OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState)); + } + public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); public async ValueTask DisposeAsync() diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientSubscribeAndProbeTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientSubscribeAndProbeTests.cs new file mode 100644 index 0000000..befbd22 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientSubscribeAndProbeTests.cs @@ -0,0 +1,50 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests; + +/// +/// Scaffold tests for ISubscribable + IHostConnectivityProbe that don't +/// need a live remote server. Live-session tests (subscribe/unsubscribe round-trip, +/// keep-alive transitions) land in a follow-up PR once the in-process OPC UA server +/// fixture is scaffolded. +/// +[Trait("Category", "Unit")] +public sealed class OpcUaClientSubscribeAndProbeTests +{ + [Fact] + public async Task SubscribeAsync_without_initialize_throws_InvalidOperationException() + { + using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-sub-uninit"); + await Should.ThrowAsync(async () => + await drv.SubscribeAsync(["ns=2;s=Demo"], TimeSpan.FromMilliseconds(100), TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task UnsubscribeAsync_with_unknown_handle_is_noop() + { + using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-sub-unknown"); + // UnsubscribeAsync returns cleanly for handles it doesn't recognise — protects against + // the caller's race with server-side cleanup after a session drop. + await drv.UnsubscribeAsync(new FakeHandle(), TestContext.Current.CancellationToken); + } + + [Fact] + public void GetHostStatuses_returns_endpoint_url_row_pre_init() + { + using var drv = new OpcUaClientDriver( + new OpcUaClientDriverOptions { EndpointUrl = "opc.tcp://plc.example:4840" }, + "opcua-hosts"); + var rows = drv.GetHostStatuses(); + rows.Count.ShouldBe(1); + rows[0].HostName.ShouldBe("opc.tcp://plc.example:4840", + "host identity mirrors the endpoint URL so the Admin /hosts dashboard can link back to the remote server"); + rows[0].State.ShouldBe(HostState.Unknown); + } + + private sealed class FakeHandle : ISubscriptionHandle + { + public string DiagnosticId => "fake"; + } +}