|
|
|
|
@@ -27,8 +27,20 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient;
|
|
|
|
|
/// </para>
|
|
|
|
|
/// </remarks>
|
|
|
|
|
public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId)
|
|
|
|
|
: IDriver, ITagDiscovery, IReadable, IWritable, IDisposable, IAsyncDisposable
|
|
|
|
|
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable
|
|
|
|
|
{
|
|
|
|
|
// ---- ISubscribable + IHostConnectivityProbe state ----
|
|
|
|
|
|
|
|
|
|
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, RemoteSubscription> _subscriptions = new();
|
|
|
|
|
private long _nextSubscriptionId;
|
|
|
|
|
private readonly object _probeLock = new();
|
|
|
|
|
private HostState _hostState = HostState.Unknown;
|
|
|
|
|
private DateTime _hostStateChangedUtc = DateTime.UtcNow;
|
|
|
|
|
private KeepAliveEventHandler? _keepAliveHandler;
|
|
|
|
|
|
|
|
|
|
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
|
|
|
|
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
|
|
|
|
|
|
|
|
|
|
// OPC UA StatusCode constants the driver surfaces for local-side faults. Upstream-server
|
|
|
|
|
// StatusCodes are passed through verbatim per driver-specs.md §8 "cascading quality" —
|
|
|
|
|
// downstream clients need to distinguish 'remote source down' from 'local driver failure'.
|
|
|
|
|
@@ -110,8 +122,21 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
|
|
|
|
|
|
|
|
|
session.KeepAliveInterval = (int)_options.KeepAliveInterval.TotalMilliseconds;
|
|
|
|
|
|
|
|
|
|
// Wire the session's keep-alive channel into HostState. OPC UA keep-alives are
|
|
|
|
|
// authoritative for session liveness: the SDK pings on KeepAliveInterval and sets
|
|
|
|
|
// KeepAliveStopped when N intervals elapse without a response. That's strictly
|
|
|
|
|
// better than a driver-side polling probe — no extra round-trip, no duplicate
|
|
|
|
|
// semantic.
|
|
|
|
|
_keepAliveHandler = (_, e) =>
|
|
|
|
|
{
|
|
|
|
|
var healthy = !ServiceResult.IsBad(e.Status);
|
|
|
|
|
TransitionTo(healthy ? HostState.Running : HostState.Stopped);
|
|
|
|
|
};
|
|
|
|
|
session.KeepAlive += _keepAliveHandler;
|
|
|
|
|
|
|
|
|
|
Session = session;
|
|
|
|
|
_health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null);
|
|
|
|
|
TransitionTo(HostState.Running);
|
|
|
|
|
}
|
|
|
|
|
catch (Exception ex)
|
|
|
|
|
{
|
|
|
|
|
@@ -214,10 +239,29 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
|
|
|
|
|
|
|
|
|
public async Task ShutdownAsync(CancellationToken cancellationToken)
|
|
|
|
|
{
|
|
|
|
|
// Tear down remote subscriptions first — otherwise Session.Close will try and may fail
|
|
|
|
|
// with BadSubscriptionIdInvalid noise in the upstream log. _subscriptions is cleared
|
|
|
|
|
// whether or not the wire-side delete succeeds since the local handles are useless
|
|
|
|
|
// after close anyway.
|
|
|
|
|
foreach (var rs in _subscriptions.Values)
|
|
|
|
|
{
|
|
|
|
|
try { await rs.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); }
|
|
|
|
|
catch { /* best-effort */ }
|
|
|
|
|
}
|
|
|
|
|
_subscriptions.Clear();
|
|
|
|
|
|
|
|
|
|
if (_keepAliveHandler is not null && Session is not null)
|
|
|
|
|
{
|
|
|
|
|
try { Session.KeepAlive -= _keepAliveHandler; } catch { }
|
|
|
|
|
}
|
|
|
|
|
_keepAliveHandler = null;
|
|
|
|
|
|
|
|
|
|
try { if (Session is Session s) await s.CloseAsync(cancellationToken).ConfigureAwait(false); }
|
|
|
|
|
catch { /* best-effort */ }
|
|
|
|
|
try { Session?.Dispose(); } catch { }
|
|
|
|
|
Session = null;
|
|
|
|
|
|
|
|
|
|
TransitionTo(HostState.Unknown);
|
|
|
|
|
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -484,6 +528,131 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ---- ISubscribable ----
|
|
|
|
|
|
|
|
|
|
public async Task<ISubscriptionHandle> SubscribeAsync(
|
|
|
|
|
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
|
|
|
|
{
|
|
|
|
|
var session = RequireSession();
|
|
|
|
|
var id = Interlocked.Increment(ref _nextSubscriptionId);
|
|
|
|
|
var handle = new OpcUaSubscriptionHandle(id);
|
|
|
|
|
|
|
|
|
|
// Floor the publishing interval at 50ms — OPC UA servers routinely negotiate
|
|
|
|
|
// minimum-supported intervals up anyway, but sending sub-50ms wastes negotiation
|
|
|
|
|
// bandwidth on every subscription create.
|
|
|
|
|
var intervalMs = publishingInterval < TimeSpan.FromMilliseconds(50)
|
|
|
|
|
? 50
|
|
|
|
|
: (int)publishingInterval.TotalMilliseconds;
|
|
|
|
|
|
|
|
|
|
var subscription = new Subscription(telemetry: null!, new SubscriptionOptions
|
|
|
|
|
{
|
|
|
|
|
DisplayName = $"opcua-sub-{id}",
|
|
|
|
|
PublishingInterval = intervalMs,
|
|
|
|
|
KeepAliveCount = 10,
|
|
|
|
|
LifetimeCount = 1000,
|
|
|
|
|
MaxNotificationsPerPublish = 0,
|
|
|
|
|
PublishingEnabled = true,
|
|
|
|
|
Priority = 0,
|
|
|
|
|
TimestampsToReturn = TimestampsToReturn.Both,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
session.AddSubscription(subscription);
|
|
|
|
|
await subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
|
|
|
|
|
|
|
|
|
|
foreach (var fullRef in fullReferences)
|
|
|
|
|
{
|
|
|
|
|
if (!TryParseNodeId(session, fullRef, out var nodeId)) continue;
|
|
|
|
|
// The tag string is routed through MonitoredItem.Handle so the Notification
|
|
|
|
|
// handler can identify which tag changed without an extra lookup.
|
|
|
|
|
var item = new MonitoredItem(telemetry: null!, new MonitoredItemOptions
|
|
|
|
|
{
|
|
|
|
|
DisplayName = fullRef,
|
|
|
|
|
StartNodeId = nodeId,
|
|
|
|
|
AttributeId = Attributes.Value,
|
|
|
|
|
MonitoringMode = MonitoringMode.Reporting,
|
|
|
|
|
SamplingInterval = intervalMs,
|
|
|
|
|
QueueSize = 1,
|
|
|
|
|
DiscardOldest = true,
|
|
|
|
|
})
|
|
|
|
|
{
|
|
|
|
|
Handle = fullRef,
|
|
|
|
|
};
|
|
|
|
|
item.Notification += (mi, args) => OnMonitoredItemNotification(handle, mi, args);
|
|
|
|
|
subscription.AddItem(item);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false);
|
|
|
|
|
_subscriptions[id] = new RemoteSubscription(subscription, handle);
|
|
|
|
|
}
|
|
|
|
|
finally { _gate.Release(); }
|
|
|
|
|
|
|
|
|
|
return handle;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public async Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
|
|
|
|
{
|
|
|
|
|
if (handle is not OpcUaSubscriptionHandle h) return;
|
|
|
|
|
if (!_subscriptions.TryRemove(h.Id, out var rs)) return;
|
|
|
|
|
|
|
|
|
|
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
|
|
|
|
try
|
|
|
|
|
{
|
|
|
|
|
try { await rs.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); }
|
|
|
|
|
catch { /* best-effort — the subscription may already be gone on reconnect */ }
|
|
|
|
|
}
|
|
|
|
|
finally { _gate.Release(); }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void OnMonitoredItemNotification(OpcUaSubscriptionHandle handle, MonitoredItem item, MonitoredItemNotificationEventArgs args)
|
|
|
|
|
{
|
|
|
|
|
// args.NotificationValue arrives as a MonitoredItemNotification for value-change
|
|
|
|
|
// subscriptions; extract its DataValue. The Handle property carries our tag string.
|
|
|
|
|
if (args.NotificationValue is not MonitoredItemNotification mn) return;
|
|
|
|
|
var dv = mn.Value;
|
|
|
|
|
if (dv is null) return;
|
|
|
|
|
var fullRef = (item.Handle as string) ?? item.DisplayName ?? string.Empty;
|
|
|
|
|
var snapshot = new DataValueSnapshot(
|
|
|
|
|
Value: dv.Value,
|
|
|
|
|
StatusCode: dv.StatusCode.Code,
|
|
|
|
|
SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
|
|
|
|
|
ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? DateTime.UtcNow : dv.ServerTimestamp);
|
|
|
|
|
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, fullRef, snapshot));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private sealed record RemoteSubscription(Subscription Subscription, OpcUaSubscriptionHandle Handle);
|
|
|
|
|
|
|
|
|
|
private sealed record OpcUaSubscriptionHandle(long Id) : ISubscriptionHandle
|
|
|
|
|
{
|
|
|
|
|
public string DiagnosticId => $"opcua-sub-{Id}";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ---- IHostConnectivityProbe ----
|
|
|
|
|
|
|
|
|
|
/// <summary>Endpoint-URL-keyed host identity for the Admin /hosts dashboard.</summary>
|
|
|
|
|
public string HostName => _options.EndpointUrl;
|
|
|
|
|
|
|
|
|
|
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses()
|
|
|
|
|
{
|
|
|
|
|
lock (_probeLock)
|
|
|
|
|
return [new HostConnectivityStatus(HostName, _hostState, _hostStateChangedUtc)];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void TransitionTo(HostState newState)
|
|
|
|
|
{
|
|
|
|
|
HostState old;
|
|
|
|
|
lock (_probeLock)
|
|
|
|
|
{
|
|
|
|
|
old = _hostState;
|
|
|
|
|
if (old == newState) return;
|
|
|
|
|
_hostState = newState;
|
|
|
|
|
_hostStateChangedUtc = DateTime.UtcNow;
|
|
|
|
|
}
|
|
|
|
|
OnHostStatusChanged?.Invoke(this, new HostStatusChangedEventArgs(HostName, old, newState));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
|
|
|
|
|
|
|
|
|
|
public async ValueTask DisposeAsync()
|
|
|
|
|
|