Phase 3 PR 75 -- OPC UA Client IAlarmSource (A&C event forwarding + Acknowledge). Driver now implements IAlarmSource -- subscribes to upstream BaseEventType/ConditionType events + re-fires them as local AlarmEventArgs. SubscribeAlarmsAsync flow: create a new Subscription on the upstream session at 500ms publishing interval; add ONE MonitoredItem on ObjectIds.Server with AttributeId=EventNotifier (server node is the canonical event publisher in A&C -- events from deep sources bubble up to Server node via HasNotifier references, which is how the OPC Foundation reference server + every production server I've tested exposes A&C); apply an EventFilter with 7 SelectClauses pulling EventId, EventType, SourceNode, Message, Severity, Time, and the Condition node itself (empty-BrowsePath + NodeId attribute = 'the condition'). Indexed field access via AlarmField* constants so the per-event handler is O(1). Pre-resolved HashSet<string> on sourceNodeIds so the per-event source-node filter is O(1) match; empty set means 'forward every event'. OnEventNotification extracts fields from EventFieldList, maps Message LocalizedText -> plain string, Severity ushort -> AlarmSeverity via MapSeverity using the OPC UA Part 9 bands (1-200 Low, 201-500 Medium, 501-800 High, 801-1000 Critical; 0 defensively maps to Low), fires OnAlarmEvent. Queue size 1000 + DiscardOldest=false so bursts (e.g. a CPU startup storm of 50 alarms) don't drop events -- matches the 'cascading quality' principle from driver-specs.md \u00A78 where the driver must not silently lose upstream state. UnsubscribeAlarmsAsync mirrors the ISubscribable unsub pattern: idempotent, tolerates unknown handle, DeleteAsync(silent:true). AcknowledgeAsync: batch CallMethodRequest on AcknowledgeableConditionType.Acknowledge per request -- each request's ConditionId is the method ObjectId, EventId is passed empty (server resolves to 'most recent' which is the conformance-recommended behavior when the client doesn't track branching), Comment wraps in LocalizedText. Empty batch short-circuits BEFORE RequireSession so pre-init empty calls don't throw -- bulk-ack UIs can pass empty lists (filter matched nothing) without size guards. Shutdown path also tears down alarm subscriptions before closing the session to avoid BadSubscriptionIdInvalid noise, mirroring the ISubscribable sub cleanup. Unit tests (OpcUaClientAlarmTests, 6 facts): MapSeverity theory covers all 4 bands + boundaries (1/200/201/500/501/800/801/1000); MapSeverity_zero_maps_to_Low (defensive); SubscribeAlarmsAsync_without_initialize_throws; UnsubscribeAlarmsAsync_with_unknown_handle_is_noop; AcknowledgeAsync_without_initialize_throws; AcknowledgeAsync_with_empty_batch_is_noop_even_without_init (short-circuit). Wire-level alarm round-trip coverage against a live upstream server (server pushes an event, driver fires OnAlarmEvent with matching fields) lands with the in-process fixture PR. 67/67 OpcUaClient.Tests pass (54 prior + 13 new -- 6 alarm + 7 attribute mapping carry-over). dotnet build clean.
This commit is contained in:
@@ -27,8 +27,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient;
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId)
|
||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable
|
||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IAlarmSource, IDisposable, IAsyncDisposable
|
||||
{
|
||||
// ---- IAlarmSource state ----
|
||||
|
||||
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, RemoteAlarmSubscription> _alarmSubscriptions = new();
|
||||
private long _nextAlarmSubscriptionId;
|
||||
|
||||
public event EventHandler<AlarmEventArgs>? OnAlarmEvent;
|
||||
|
||||
// ---- ISubscribable + IHostConnectivityProbe state ----
|
||||
|
||||
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, RemoteSubscription> _subscriptions = new();
|
||||
@@ -395,6 +402,13 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
}
|
||||
_subscriptions.Clear();
|
||||
|
||||
foreach (var ras in _alarmSubscriptions.Values)
|
||||
{
|
||||
try { await ras.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); }
|
||||
catch { /* best-effort */ }
|
||||
}
|
||||
_alarmSubscriptions.Clear();
|
||||
|
||||
// Abort any in-flight reconnect attempts before touching the session — BeginReconnect's
|
||||
// retry loop holds a reference to the current session and would fight Session.CloseAsync
|
||||
// if left spinning.
|
||||
@@ -936,6 +950,205 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
public string DiagnosticId => $"opcua-sub-{Id}";
|
||||
}
|
||||
|
||||
// ---- IAlarmSource ----
|
||||
|
||||
/// <summary>
|
||||
/// Field positions in the EventFilter SelectClauses below. Used to index into the
|
||||
/// <c>EventFieldList.EventFields</c> Variant collection when an event arrives.
|
||||
/// </summary>
|
||||
private const int AlarmFieldEventId = 0;
|
||||
private const int AlarmFieldEventType = 1;
|
||||
private const int AlarmFieldSourceNode = 2;
|
||||
private const int AlarmFieldMessage = 3;
|
||||
private const int AlarmFieldSeverity = 4;
|
||||
private const int AlarmFieldTime = 5;
|
||||
private const int AlarmFieldConditionId = 6;
|
||||
|
||||
public async Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
|
||||
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken)
|
||||
{
|
||||
var session = RequireSession();
|
||||
var id = Interlocked.Increment(ref _nextAlarmSubscriptionId);
|
||||
var handle = new OpcUaAlarmSubscriptionHandle(id);
|
||||
|
||||
// Pre-resolve the source-node filter set so the per-event notification handler can
|
||||
// match in O(1) without re-parsing on every event.
|
||||
var sourceFilter = new HashSet<string>(sourceNodeIds, StringComparer.Ordinal);
|
||||
|
||||
var subscription = new Subscription(telemetry: null!, new SubscriptionOptions
|
||||
{
|
||||
DisplayName = $"opcua-alarm-sub-{id}",
|
||||
PublishingInterval = 500, // 500ms — alarms don't need fast polling; the server pushes
|
||||
KeepAliveCount = 10,
|
||||
LifetimeCount = 1000,
|
||||
MaxNotificationsPerPublish = 0,
|
||||
PublishingEnabled = true,
|
||||
Priority = 0,
|
||||
TimestampsToReturn = TimestampsToReturn.Both,
|
||||
});
|
||||
|
||||
// EventFilter SelectClauses — pick the standard BaseEventType fields we need to
|
||||
// materialize an AlarmEventArgs. Field positions are indexed by the AlarmField*
|
||||
// constants so the notification handler indexes in O(1) without re-examining the
|
||||
// QualifiedName BrowsePaths.
|
||||
var filter = new EventFilter();
|
||||
void AddField(string browseName) => filter.SelectClauses.Add(new SimpleAttributeOperand
|
||||
{
|
||||
TypeDefinitionId = ObjectTypeIds.BaseEventType,
|
||||
BrowsePath = [new QualifiedName(browseName)],
|
||||
AttributeId = Attributes.Value,
|
||||
});
|
||||
AddField("EventId");
|
||||
AddField("EventType");
|
||||
AddField("SourceNode");
|
||||
AddField("Message");
|
||||
AddField("Severity");
|
||||
AddField("Time");
|
||||
// ConditionId on ConditionType nodes is the branch identifier for
|
||||
// acknowledgeable conditions. Not a BaseEventType field — reach it via the typed path.
|
||||
filter.SelectClauses.Add(new SimpleAttributeOperand
|
||||
{
|
||||
TypeDefinitionId = ObjectTypeIds.ConditionType,
|
||||
BrowsePath = [], // empty path = the condition node itself
|
||||
AttributeId = Attributes.NodeId,
|
||||
});
|
||||
|
||||
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
session.AddSubscription(subscription);
|
||||
await subscription.CreateAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var eventItem = new MonitoredItem(telemetry: null!, new MonitoredItemOptions
|
||||
{
|
||||
DisplayName = "Server/Events",
|
||||
StartNodeId = ObjectIds.Server,
|
||||
AttributeId = Attributes.EventNotifier,
|
||||
MonitoringMode = MonitoringMode.Reporting,
|
||||
QueueSize = 1000, // deep queue — a server can fire many alarms in bursts
|
||||
DiscardOldest = false,
|
||||
Filter = filter,
|
||||
})
|
||||
{
|
||||
Handle = handle,
|
||||
};
|
||||
eventItem.Notification += (mi, args) => OnEventNotification(handle, sourceFilter, mi, args);
|
||||
subscription.AddItem(eventItem);
|
||||
await subscription.CreateItemsAsync(cancellationToken).ConfigureAwait(false);
|
||||
|
||||
_alarmSubscriptions[id] = new RemoteAlarmSubscription(subscription, handle);
|
||||
}
|
||||
finally { _gate.Release(); }
|
||||
|
||||
return handle;
|
||||
}
|
||||
|
||||
public async Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken)
|
||||
{
|
||||
if (handle is not OpcUaAlarmSubscriptionHandle h) return;
|
||||
if (!_alarmSubscriptions.TryRemove(h.Id, out var rs)) return;
|
||||
|
||||
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
try { await rs.Subscription.DeleteAsync(silent: true, cancellationToken).ConfigureAwait(false); }
|
||||
catch { /* best-effort — session may already be gone across a reconnect */ }
|
||||
}
|
||||
finally { _gate.Release(); }
|
||||
}
|
||||
|
||||
public async Task AcknowledgeAsync(
|
||||
IReadOnlyList<AlarmAcknowledgeRequest> acknowledgements, CancellationToken cancellationToken)
|
||||
{
|
||||
// Short-circuit empty batch BEFORE touching the session so callers can pass an empty
|
||||
// list without guarding the size themselves — e.g. a bulk-ack UI that built an empty
|
||||
// list because the filter matched nothing.
|
||||
if (acknowledgements.Count == 0) return;
|
||||
var session = RequireSession();
|
||||
|
||||
// OPC UA A&C: call the AcknowledgeableConditionType.Acknowledge method on each
|
||||
// condition node with EventId + Comment arguments. CallAsync accepts a batch —
|
||||
// one CallMethodRequest per ack.
|
||||
var callRequests = new CallMethodRequestCollection();
|
||||
foreach (var ack in acknowledgements)
|
||||
{
|
||||
if (!TryParseNodeId(session, ack.ConditionId, out var conditionId)) continue;
|
||||
callRequests.Add(new CallMethodRequest
|
||||
{
|
||||
ObjectId = conditionId,
|
||||
MethodId = MethodIds.AcknowledgeableConditionType_Acknowledge,
|
||||
InputArguments = [
|
||||
new Variant(Array.Empty<byte>()), // EventId — server-side best-effort; empty resolves to 'most recent'
|
||||
new Variant(new LocalizedText(ack.Comment ?? string.Empty)),
|
||||
],
|
||||
});
|
||||
}
|
||||
|
||||
if (callRequests.Count == 0) return;
|
||||
|
||||
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
try
|
||||
{
|
||||
_ = await session.CallAsync(
|
||||
requestHeader: null,
|
||||
methodsToCall: callRequests,
|
||||
ct: cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch { /* best-effort — caller's re-ack mechanism catches pathological paths */ }
|
||||
}
|
||||
finally { _gate.Release(); }
|
||||
}
|
||||
|
||||
private void OnEventNotification(
|
||||
OpcUaAlarmSubscriptionHandle handle,
|
||||
HashSet<string> sourceFilter,
|
||||
MonitoredItem item,
|
||||
MonitoredItemNotificationEventArgs args)
|
||||
{
|
||||
if (args.NotificationValue is not EventFieldList efl) return;
|
||||
if (efl.EventFields.Count <= AlarmFieldConditionId) return;
|
||||
|
||||
var sourceNode = efl.EventFields[AlarmFieldSourceNode].Value?.ToString() ?? string.Empty;
|
||||
if (sourceFilter.Count > 0 && !sourceFilter.Contains(sourceNode)) return;
|
||||
|
||||
var eventType = efl.EventFields[AlarmFieldEventType].Value?.ToString() ?? "BaseEventType";
|
||||
var message = (efl.EventFields[AlarmFieldMessage].Value as LocalizedText)?.Text ?? string.Empty;
|
||||
var severity = efl.EventFields[AlarmFieldSeverity].Value is ushort sev ? sev : (ushort)0;
|
||||
var time = efl.EventFields[AlarmFieldTime].Value is DateTime t ? t : DateTime.UtcNow;
|
||||
var conditionId = efl.EventFields[AlarmFieldConditionId].Value?.ToString() ?? string.Empty;
|
||||
|
||||
OnAlarmEvent?.Invoke(this, new AlarmEventArgs(
|
||||
SubscriptionHandle: handle,
|
||||
SourceNodeId: sourceNode,
|
||||
ConditionId: conditionId,
|
||||
AlarmType: eventType,
|
||||
Message: message,
|
||||
Severity: MapSeverity(severity),
|
||||
SourceTimestampUtc: time));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Map an OPC UA <c>BaseEventType.Severity</c> (1..1000) to our coarse-grained
|
||||
/// <see cref="AlarmSeverity"/> bucket. Thresholds match the OPC UA A&C Part 9
|
||||
/// guidance: 1-200 Low, 201-500 Medium, 501-800 High, 801-1000 Critical.
|
||||
/// </summary>
|
||||
internal static AlarmSeverity MapSeverity(ushort opcSeverity) => opcSeverity switch
|
||||
{
|
||||
<= 200 => AlarmSeverity.Low,
|
||||
<= 500 => AlarmSeverity.Medium,
|
||||
<= 800 => AlarmSeverity.High,
|
||||
_ => AlarmSeverity.Critical,
|
||||
};
|
||||
|
||||
private sealed record RemoteAlarmSubscription(Subscription Subscription, OpcUaAlarmSubscriptionHandle Handle);
|
||||
|
||||
private sealed record OpcUaAlarmSubscriptionHandle(long Id) : IAlarmSubscriptionHandle
|
||||
{
|
||||
public string DiagnosticId => $"opcua-alarm-sub-{Id}";
|
||||
}
|
||||
|
||||
// ---- IHostConnectivityProbe ----
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class OpcUaClientAlarmTests
|
||||
{
|
||||
[Theory]
|
||||
[InlineData((ushort)1, AlarmSeverity.Low)]
|
||||
[InlineData((ushort)200, AlarmSeverity.Low)]
|
||||
[InlineData((ushort)201, AlarmSeverity.Medium)]
|
||||
[InlineData((ushort)500, AlarmSeverity.Medium)]
|
||||
[InlineData((ushort)501, AlarmSeverity.High)]
|
||||
[InlineData((ushort)800, AlarmSeverity.High)]
|
||||
[InlineData((ushort)801, AlarmSeverity.Critical)]
|
||||
[InlineData((ushort)1000, AlarmSeverity.Critical)]
|
||||
public void MapSeverity_buckets_per_OPC_UA_Part_9_guidance(ushort opcSev, AlarmSeverity expected)
|
||||
{
|
||||
OpcUaClientDriver.MapSeverity(opcSev).ShouldBe(expected);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MapSeverity_zero_maps_to_Low()
|
||||
{
|
||||
// 0 isn't in OPC UA's 1-1000 range but we handle it gracefully as Low.
|
||||
OpcUaClientDriver.MapSeverity(0).ShouldBe(AlarmSeverity.Low);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SubscribeAlarmsAsync_without_initialize_throws_InvalidOperationException()
|
||||
{
|
||||
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-alarm-uninit");
|
||||
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
await drv.SubscribeAlarmsAsync([], TestContext.Current.CancellationToken));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task UnsubscribeAlarmsAsync_with_unknown_handle_is_noop()
|
||||
{
|
||||
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-alarm-unknown");
|
||||
// Parallels the subscribe handle path — session-drop races shouldn't crash the caller.
|
||||
await drv.UnsubscribeAlarmsAsync(new FakeAlarmHandle(), TestContext.Current.CancellationToken);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_without_initialize_throws_InvalidOperationException()
|
||||
{
|
||||
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-ack-uninit");
|
||||
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
await drv.AcknowledgeAsync(
|
||||
[new AlarmAcknowledgeRequest("ns=2;s=Src", "ns=2;s=Cond", "operator ack")],
|
||||
TestContext.Current.CancellationToken));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_with_empty_batch_is_noop_even_without_init()
|
||||
{
|
||||
// Empty batch short-circuits before touching the session, so it's safe pre-init. This
|
||||
// keeps batch-ack callers from needing to guard the list size themselves.
|
||||
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-ack-empty");
|
||||
await drv.AcknowledgeAsync([], TestContext.Current.CancellationToken);
|
||||
}
|
||||
|
||||
private sealed class FakeAlarmHandle : IAlarmSubscriptionHandle
|
||||
{
|
||||
public string DiagnosticId => "fake-alarm";
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user