Phase 3 PR 69 -- OPC UA Client ISubscribable + IHostConnectivityProbe. Completes the OpcUaClientDriver capability surface — now matches the Galaxy + Modbus + S7 driver coverage. ISubscribable: SubscribeAsync creates a new upstream Subscription via the non-obsolete Subscription(ITelemetryContext, SubscriptionOptions) ctor + AddItem/CreateItemsAsync flow, wires each MonitoredItem's Notification event into OnDataChange. Tag strings round-trip through MonitoredItem.Handle so the notification handler can identify which tag changed without a second lookup. Publishing interval floored at 50ms (servers negotiate up anyway; sub-50ms wastes round-trip). SubscriptionOptions uses KeepAliveCount=10, LifetimeCount=1000, TimestampsToReturn=Both so SourceTimestamp passthrough for the cascading-quality rule works through subscription paths too. UnsubscribeAsync calls Subscription.DeleteAsync(silent:true) and tolerates unknown handles (returns cleanly) because the caller's race with server-side cleanup after a session drop shouldn't crash either side. Session shutdown explicitly deletes every remote subscription before closing — avoids BadSubscriptionIdInvalid noise in the upstream server's log on Close. IHostConnectivityProbe: HostName surfaced as the EndpointUrl (not host:port like the Modbus/S7 drivers) so the Admin /hosts dashboard can render the full opc.tcp:// URL as a clickable target back at the remote server. HostState tracked via session.KeepAlive event — OPC UA's built-in keep-alive is authoritative for session liveness (the SDK pings on KeepAliveInterval, sets KeepAliveStopped after N missed pings), strictly better than a driver-side polling probe: no extra wire round-trip, no duplicate semantic with the native protocol. Handler transitions Running on healthy keep-alives and Stopped on any Bad service-result. Initial Running raised at end of InitializeAsync once the session is up; Shutdown transitions back to Unknown + unwires the handler. Unit tests (OpcUaClientSubscribeAndProbeTests, 3 facts): SubscribeAsync_without_initialize_throws_InvalidOperationException, UnsubscribeAsync_with_unknown_handle_is_noop (session-drop-race safety), GetHostStatuses_returns_endpoint_url_row_pre_init (asserts EndpointUrl as the host identity -- the full opc.tcp://plc.example:4840 URL). Live-session subscribe/unsubscribe round-trip + keep-alive state transition coverage lands in a follow-up PR once we scaffold the in-process OPC UA server fixture. 13/13 OpcUaClient.Tests pass. dotnet build clean. All six capability interfaces (IDriver / ITagDiscovery / IReadable / IWritable / ISubscribable / IHostConnectivityProbe) implemented — OPC UA Client driver surface complete.
This commit is contained in:
@@ -27,8 +27,20 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient;
|
|||||||
/// </para>
|
/// </para>
|
||||||
/// </remarks>
|
/// </remarks>
|
||||||
public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId)
|
public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId)
|
||||||
: IDriver, ITagDiscovery, IReadable, IWritable, IDisposable, IAsyncDisposable
|
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable
|
||||||
{
|
{
|
||||||
|
// ---- ISubscribable + IHostConnectivityProbe state ----
|
||||||
|
|
||||||
|
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, RemoteSubscription> _subscriptions = new();
|
||||||
|
private long _nextSubscriptionId;
|
||||||
|
private readonly object _probeLock = new();
|
||||||
|
private HostState _hostState = HostState.Unknown;
|
||||||
|
private DateTime _hostStateChangedUtc = DateTime.UtcNow;
|
||||||
|
private KeepAliveEventHandler? _keepAliveHandler;
|
||||||
|
|
||||||
|
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||||
|
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
|
||||||
|
|
||||||
// OPC UA StatusCode constants the driver surfaces for local-side faults. Upstream-server
|
// OPC UA StatusCode constants the driver surfaces for local-side faults. Upstream-server
|
||||||
// StatusCodes are passed through verbatim per driver-specs.md §8 "cascading quality" —
|
// StatusCodes are passed through verbatim per driver-specs.md §8 "cascading quality" —
|
||||||
// downstream clients need to distinguish 'remote source down' from 'local driver failure'.
|
// downstream clients need to distinguish 'remote source down' from 'local driver failure'.
|
||||||
@@ -110,8 +122,21 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
|||||||
|
|
||||||
session.KeepAliveInterval = (int)_options.KeepAliveInterval.TotalMilliseconds;
|
session.KeepAliveInterval = (int)_options.KeepAliveInterval.TotalMilliseconds;
|
||||||
|
|
||||||
|
// Wire the session's keep-alive channel into HostState. OPC UA keep-alives are
|
||||||
|
// authoritative for session liveness: the SDK pings on KeepAliveInterval and sets
|
||||||
|
// KeepAliveStopped when N intervals elapse without a response. That's strictly
|
||||||
|
// better than a driver-side polling probe — no extra round-trip, no duplicate
|
||||||
|
// semantic.
|
||||||
|
_keepAliveHandler = (_, e) =>
|
||||||
|
{
|
||||||
|
var healthy = !ServiceResult.IsBad(e.Status);
|
||||||
|
TransitionTo(healthy ? HostState.Running : HostState.Stopped);
|
||||||
|
};
|
||||||
|
session.KeepAlive += _keepAliveHandler;
|
||||||
|
|
||||||
Session = session;
|
Session = session;
|
||||||
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
|
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
|
||||||
|
TransitionTo(HostState.Running);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@@ -214,10 +239,29 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
|||||||
|
|
||||||
public async Task ShutdownAsync(CancellationToken cancellationToken)
|
public async Task ShutdownAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
|
// Tear down remote subscriptions first — otherwise Session.Close will try and may fail
|
||||||
|
// with BadSubscriptionIdInvalid noise in the upstream log. _subscriptions is cleared
|
||||||
|
// whether or not the wire-side delete succeeds since the local handles are useless
|
||||||
|
// after close anyway.
|
||||||
|
foreach (var rs in _subscriptions.Values)
|
||||||
|
{
|
||||||
|
try { await rs.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); }
|
||||||
|
catch { /* best-effort */ }
|
||||||
|
}
|
||||||
|
_subscriptions.Clear();
|
||||||
|
|
||||||
|
if (_keepAliveHandler is not null && Session is not null)
|
||||||
|
{
|
||||||
|
try { Session.KeepAlive -= _keepAliveHandler; } catch { }
|
||||||
|
}
|
||||||
|
_keepAliveHandler = null;
|
||||||
|
|
||||||
try { if (Session is Session s) await s.CloseAsync(cancellationToken).ConfigureAwait(false); }
|
try { if (Session is Session s) await s.CloseAsync(cancellationToken).ConfigureAwait(false); }
|
||||||
catch { /* best-effort */ }
|
catch { /* best-effort */ }
|
||||||
try { Session?.Dispose(); } catch { }
|
try { Session?.Dispose(); } catch { }
|
||||||
Session = null;
|
Session = null;
|
||||||
|
|
||||||
|
TransitionTo(HostState.Unknown);
|
||||||
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
|
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -484,6 +528,131 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- ISubscribable ----
|
||||||
|
|
||||||
|
public async Task<ISubscriptionHandle> SubscribeAsync(
|
||||||
|
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var session = RequireSession();
|
||||||
|
var id = Interlocked.Increment(ref _nextSubscriptionId);
|
||||||
|
var handle = new OpcUaSubscriptionHandle(id);
|
||||||
|
|
||||||
|
// Floor the publishing interval at 50ms — OPC UA servers routinely negotiate
|
||||||
|
// minimum-supported intervals up anyway, but sending sub-50ms wastes negotiation
|
||||||
|
// bandwidth on every subscription create.
|
||||||
|
var intervalMs = publishingInterval < TimeSpan.FromMilliseconds(50)
|
||||||
|
? 50
|
||||||
|
: (int)publishingInterval.TotalMilliseconds;
|
||||||
|
|
||||||
|
var subscription = new Subscription(telemetry: null!, new SubscriptionOptions
|
||||||
|
{
|
||||||
|
DisplayName = $"opcua-sub-{id}",
|
||||||
|
PublishingInterval = intervalMs,
|
||||||
|
KeepAliveCount = 10,
|
||||||
|
LifetimeCount = 1000,
|
||||||
|
MaxNotificationsPerPublish = 0,
|
||||||
|
PublishingEnabled = true,
|
||||||
|
Priority = 0,
|
||||||
|
TimestampsToReturn = TimestampsToReturn.Both,
|
||||||
|
});
|
||||||
|
|
||||||
|
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
session.AddSubscription(subscription);
|
||||||
|
await subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
foreach (var fullRef in fullReferences)
|
||||||
|
{
|
||||||
|
if (!TryParseNodeId(session, fullRef, out var nodeId)) continue;
|
||||||
|
// The tag string is routed through MonitoredItem.Handle so the Notification
|
||||||
|
// handler can identify which tag changed without an extra lookup.
|
||||||
|
var item = new MonitoredItem(telemetry: null!, new MonitoredItemOptions
|
||||||
|
{
|
||||||
|
DisplayName = fullRef,
|
||||||
|
StartNodeId = nodeId,
|
||||||
|
AttributeId = Attributes.Value,
|
||||||
|
MonitoringMode = MonitoringMode.Reporting,
|
||||||
|
SamplingInterval = intervalMs,
|
||||||
|
QueueSize = 1,
|
||||||
|
DiscardOldest = true,
|
||||||
|
})
|
||||||
|
{
|
||||||
|
Handle = fullRef,
|
||||||
|
};
|
||||||
|
item.Notification += (mi, args) => OnMonitoredItemNotification(handle, mi, args);
|
||||||
|
subscription.AddItem(item);
|
||||||
|
}
|
||||||
|
|
||||||
|
await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
_subscriptions[id] = new RemoteSubscription(subscription, handle);
|
||||||
|
}
|
||||||
|
finally { _gate.Release(); }
|
||||||
|
|
||||||
|
return handle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
if (handle is not OpcUaSubscriptionHandle h) return;
|
||||||
|
if (!_subscriptions.TryRemove(h.Id, out var rs)) return;
|
||||||
|
|
||||||
|
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
try { await rs.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); }
|
||||||
|
catch { /* best-effort — the subscription may already be gone on reconnect */ }
|
||||||
|
}
|
||||||
|
finally { _gate.Release(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
private void OnMonitoredItemNotification(OpcUaSubscriptionHandle handle, MonitoredItem item, MonitoredItemNotificationEventArgs args)
|
||||||
|
{
|
||||||
|
// args.NotificationValue arrives as a MonitoredItemNotification for value-change
|
||||||
|
// subscriptions; extract its DataValue. The Handle property carries our tag string.
|
||||||
|
if (args.NotificationValue is not MonitoredItemNotification mn) return;
|
||||||
|
var dv = mn.Value;
|
||||||
|
if (dv is null) return;
|
||||||
|
var fullRef = (item.Handle as string) ?? item.DisplayName ?? string.Empty;
|
||||||
|
var snapshot = new DataValueSnapshot(
|
||||||
|
Value: dv.Value,
|
||||||
|
StatusCode: dv.StatusCode.Code,
|
||||||
|
SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
|
||||||
|
ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? DateTime.UtcNow : dv.ServerTimestamp);
|
||||||
|
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, fullRef, snapshot));
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed record RemoteSubscription(Subscription Subscription, OpcUaSubscriptionHandle Handle);
|
||||||
|
|
||||||
|
private sealed record OpcUaSubscriptionHandle(long Id) : ISubscriptionHandle
|
||||||
|
{
|
||||||
|
public string DiagnosticId => $"opcua-sub-{Id}";
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- IHostConnectivityProbe ----
|
||||||
|
|
||||||
|
/// <summary>Endpoint-URL-keyed host identity for the Admin /hosts dashboard.</summary>
|
||||||
|
public string HostName => _options.EndpointUrl;
|
||||||
|
|
||||||
|
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses()
|
||||||
|
{
|
||||||
|
lock (_probeLock)
|
||||||
|
return [new HostConnectivityStatus(HostName, _hostState, _hostStateChangedUtc)];
|
||||||
|
}
|
||||||
|
|
||||||
|
private void TransitionTo(HostState newState)
|
||||||
|
{
|
||||||
|
HostState old;
|
||||||
|
lock (_probeLock)
|
||||||
|
{
|
||||||
|
old = _hostState;
|
||||||
|
if (old == newState) return;
|
||||||
|
_hostState = newState;
|
||||||
|
_hostStateChangedUtc = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState));
|
||||||
|
}
|
||||||
|
|
||||||
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
|
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||||||
|
|
||||||
public async ValueTask DisposeAsync()
|
public async ValueTask DisposeAsync()
|
||||||
|
|||||||
@@ -0,0 +1,50 @@
|
|||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Scaffold tests for <c>ISubscribable</c> + <c>IHostConnectivityProbe</c> that don't
|
||||||
|
/// need a live remote server. Live-session tests (subscribe/unsubscribe round-trip,
|
||||||
|
/// keep-alive transitions) land in a follow-up PR once the in-process OPC UA server
|
||||||
|
/// fixture is scaffolded.
|
||||||
|
/// </summary>
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class OpcUaClientSubscribeAndProbeTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task SubscribeAsync_without_initialize_throws_InvalidOperationException()
|
||||||
|
{
|
||||||
|
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-sub-uninit");
|
||||||
|
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||||
|
await drv.SubscribeAsync(["ns=2;s=Demo"], TimeSpan.FromMilliseconds(100), TestContext.Current.CancellationToken));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task UnsubscribeAsync_with_unknown_handle_is_noop()
|
||||||
|
{
|
||||||
|
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-sub-unknown");
|
||||||
|
// UnsubscribeAsync returns cleanly for handles it doesn't recognise — protects against
|
||||||
|
// the caller's race with server-side cleanup after a session drop.
|
||||||
|
await drv.UnsubscribeAsync(new FakeHandle(), TestContext.Current.CancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void GetHostStatuses_returns_endpoint_url_row_pre_init()
|
||||||
|
{
|
||||||
|
using var drv = new OpcUaClientDriver(
|
||||||
|
new OpcUaClientDriverOptions { EndpointUrl = "opc.tcp://plc.example:4840" },
|
||||||
|
"opcua-hosts");
|
||||||
|
var rows = drv.GetHostStatuses();
|
||||||
|
rows.Count.ShouldBe(1);
|
||||||
|
rows[0].HostName.ShouldBe("opc.tcp://plc.example:4840",
|
||||||
|
"host identity mirrors the endpoint URL so the Admin /hosts dashboard can link back to the remote server");
|
||||||
|
rows[0].State.ShouldBe(HostState.Unknown);
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeHandle : ISubscriptionHandle
|
||||||
|
{
|
||||||
|
public string DiagnosticId => "fake";
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user