Merge pull request 'AbCip IAlarmSource via ALMD projection (#177, feature-flagged)' (#159) from abcip-alarm-source into v2

This commit was merged in pull request #159.
This commit is contained in:
2026-04-20 04:26:39 -04:00
4 changed files with 481 additions and 1 deletions

View File

@@ -0,0 +1,232 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip;
/// <summary>
/// Task #177 — projects AB Logix ALMD alarm instructions onto the OPC UA alarm surface by
/// polling the ALMD UDT's <c>InFaulted</c> / <c>Acked</c> / <c>Severity</c> members at a
/// configurable interval + translating state transitions into <c>OnAlarmEvent</c>
/// callbacks on the owning <see cref="AbCipDriver"/>. Feature-flagged off by default via
/// <see cref="AbCipDriverOptions.EnableAlarmProjection"/>; callers that leave the flag off
/// get a no-op subscribe path so capability negotiation still works.
/// </summary>
/// <remarks>
/// <para>ALMD-only in this pass. ALMA (analog alarm) projection is a follow-up because
/// its threshold + limit semantics need more design — ALMD's "is the alarm active + has
/// the operator acked" shape maps cleanly onto the driver-agnostic
/// <see cref="IAlarmSource"/> contract without concessions.</para>
///
/// <para>Polling reuses <see cref="AbCipDriver.ReadAsync"/>, so ALMD reads get the #194
/// whole-UDT optimization for free when the ALMD is declared with its standard members.
/// One poll loop per subscription call; the loop batches every
/// member read across the full source-node set into a single ReadAsync per tick.</para>
///
/// <para>ALMD <c>Acked</c> write semantics on Logix are rising-edge sensitive at the
/// instruction level — writing <c>Acked=1</c> directly is honored by FT View + the
/// standard HMI templates, but some PLC programs read <c>AckCmd</c> + look for the edge
/// themselves. We pick the simpler <c>Acked</c> write for first pass; operators whose
/// ladder watches <c>AckCmd</c> can wire a follow-up "AckCmd 0→1→0" pulse on the client
/// side until a driver-level knob lands.</para>
/// </remarks>
internal sealed class AbCipAlarmProjection : IAsyncDisposable
{
private readonly AbCipDriver _driver;
private readonly TimeSpan _pollInterval;
private readonly Dictionary<long, Subscription> _subs = new();
private readonly Lock _subsLock = new();
private long _nextId;
public AbCipAlarmProjection(AbCipDriver driver, TimeSpan pollInterval)
{
_driver = driver;
_pollInterval = pollInterval;
}
public async Task<IAlarmSubscriptionHandle> SubscribeAsync(
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken)
{
var id = Interlocked.Increment(ref _nextId);
var handle = new AbCipAlarmSubscriptionHandle(id);
var cts = new CancellationTokenSource();
var sub = new Subscription(handle, [..sourceNodeIds], cts);
lock (_subsLock) _subs[id] = sub;
sub.Loop = Task.Run(() => RunPollLoopAsync(sub, cts.Token), cts.Token);
await Task.CompletedTask;
return handle;
}
public async Task UnsubscribeAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken)
{
if (handle is not AbCipAlarmSubscriptionHandle h) return;
Subscription? sub;
lock (_subsLock)
{
if (!_subs.Remove(h.Id, out sub)) return;
}
try { sub.Cts.Cancel(); } catch { }
try { await sub.Loop.ConfigureAwait(false); } catch { }
sub.Cts.Dispose();
}
public async Task AcknowledgeAsync(
IReadOnlyList<AlarmAcknowledgeRequest> acknowledgements, CancellationToken cancellationToken)
{
if (acknowledgements.Count == 0) return;
// Write Acked=1 per request. IWritable isn't on AbCipAlarmProjection so route through
// the driver's public interface — delegating instead of re-implementing the write path
// keeps the bit-in-DINT + idempotency + per-call-host-resolve knobs intact.
var requests = acknowledgements
.Select(a => new WriteRequest($"{a.SourceNodeId}.Acked", true))
.ToArray();
// Best-effort — the driver's WriteAsync returns per-item status; individual ack
// failures don't poison the batch. Swallow the return so a single faulted ack
// doesn't bubble out of the caller's batch expectation.
_ = await _driver.WriteAsync(requests, cancellationToken).ConfigureAwait(false);
}
public async ValueTask DisposeAsync()
{
List<Subscription> snap;
lock (_subsLock) { snap = _subs.Values.ToList(); _subs.Clear(); }
foreach (var sub in snap)
{
try { sub.Cts.Cancel(); } catch { }
try { await sub.Loop.ConfigureAwait(false); } catch { }
sub.Cts.Dispose();
}
}
/// <summary>
/// Poll-tick body — reads <c>InFaulted</c> + <c>Severity</c> for every source node id
/// in the subscription, diffs each against last-seen state, fires raise/clear events.
/// Extracted so tests can drive one tick without standing up the Task.Run loop.
/// </summary>
internal void Tick(Subscription sub, IReadOnlyList<DataValueSnapshot> results)
{
// results index layout: for each sourceNode, [InFaulted, Severity] in order.
for (var i = 0; i < sub.SourceNodeIds.Count; i++)
{
var nodeId = sub.SourceNodeIds[i];
var inFaultedDv = results[i * 2];
var severityDv = results[i * 2 + 1];
if (inFaultedDv.StatusCode != AbCipStatusMapper.Good) continue;
var nowFaulted = ToBool(inFaultedDv.Value);
var severity = ToInt(severityDv.Value);
var wasFaulted = sub.LastInFaulted.GetValueOrDefault(nodeId, false);
sub.LastInFaulted[nodeId] = nowFaulted;
if (!wasFaulted && nowFaulted)
{
_driver.InvokeAlarmEvent(new AlarmEventArgs(
sub.Handle, nodeId, ConditionId: $"{nodeId}#active",
AlarmType: "ALMD",
Message: $"ALMD {nodeId} raised",
Severity: MapSeverity(severity),
SourceTimestampUtc: DateTime.UtcNow));
}
else if (wasFaulted && !nowFaulted)
{
_driver.InvokeAlarmEvent(new AlarmEventArgs(
sub.Handle, nodeId, ConditionId: $"{nodeId}#active",
AlarmType: "ALMD",
Message: $"ALMD {nodeId} cleared",
Severity: MapSeverity(severity),
SourceTimestampUtc: DateTime.UtcNow));
}
}
}
private async Task RunPollLoopAsync(Subscription sub, CancellationToken ct)
{
var refs = new List<string>(sub.SourceNodeIds.Count * 2);
foreach (var nodeId in sub.SourceNodeIds)
{
refs.Add($"{nodeId}.InFaulted");
refs.Add($"{nodeId}.Severity");
}
while (!ct.IsCancellationRequested)
{
try
{
var results = await _driver.ReadAsync(refs, ct).ConfigureAwait(false);
Tick(sub, results);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested) { break; }
catch { /* per-tick failures are non-fatal; next tick retries */ }
try { await Task.Delay(_pollInterval, ct).ConfigureAwait(false); }
catch (OperationCanceledException) { break; }
}
}
internal static AlarmSeverity MapSeverity(int raw) => raw switch
{
<= 250 => AlarmSeverity.Low,
<= 500 => AlarmSeverity.Medium,
<= 750 => AlarmSeverity.High,
_ => AlarmSeverity.Critical,
};
private static bool ToBool(object? v) => v switch
{
bool b => b,
int i => i != 0,
long l => l != 0,
_ => false,
};
private static int ToInt(object? v) => v switch
{
int i => i,
long l => (int)l,
short s => s,
byte b => b,
_ => 0,
};
internal sealed class Subscription
{
public Subscription(AbCipAlarmSubscriptionHandle handle, IReadOnlyList<string> sourceNodeIds, CancellationTokenSource cts)
{
Handle = handle; SourceNodeIds = sourceNodeIds; Cts = cts;
}
public AbCipAlarmSubscriptionHandle Handle { get; }
public IReadOnlyList<string> SourceNodeIds { get; }
public CancellationTokenSource Cts { get; }
public Task Loop { get; set; } = Task.CompletedTask;
public Dictionary<string, bool> LastInFaulted { get; } = new(StringComparer.Ordinal);
}
}
/// <summary>Handle returned by <see cref="AbCipAlarmProjection.SubscribeAsync"/>.</summary>
public sealed record AbCipAlarmSubscriptionHandle(long Id) : IAlarmSubscriptionHandle
{
public string DiagnosticId => $"abcip-alarm-sub-{Id}";
}
/// <summary>
/// Detects the ALMD / ALMA signature in an <see cref="AbCipTagDefinition"/>'s declared
/// members. Used by both discovery (to stamp <c>IsAlarm=true</c> on the emitted
/// variable) + initial driver setup (to decide which tags the alarm projection owns).
/// </summary>
public static class AbCipAlarmDetector
{
/// <summary>
/// <c>true</c> when <paramref name="tag"/> is a Structure whose declared members match
/// the ALMD signature (<c>InFaulted</c> + <c>Acked</c> present). ALMA detection
/// (analog alarms with <c>HHLimit</c>/<c>HLimit</c>/<c>LLimit</c>/<c>LLLimit</c>)
/// ships as a follow-up.
/// </summary>
public static bool IsAlmd(AbCipTagDefinition tag)
{
if (tag.DataType != AbCipDataType.Structure || tag.Members is null) return false;
var names = tag.Members.Select(m => m.Name).ToHashSet(StringComparer.OrdinalIgnoreCase);
return names.Contains("InFaulted") && names.Contains("Acked");
}
}

View File

@@ -21,7 +21,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip;
/// <see cref="PlcTagHandle"/> and reconnects each device.</para>
/// </remarks>
public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, ISubscribable,
IHostConnectivityProbe, IPerCallHostResolver, IDisposable, IAsyncDisposable
IHostConnectivityProbe, IPerCallHostResolver, IAlarmSource, IDisposable, IAsyncDisposable
{
private readonly AbCipDriverOptions _options;
private readonly string _driverInstanceId;
@@ -32,10 +32,15 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
private readonly PollGroupEngine _poll;
private readonly Dictionary<string, DeviceState> _devices = new(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, AbCipTagDefinition> _tagsByName = new(StringComparer.OrdinalIgnoreCase);
private readonly AbCipAlarmProjection _alarmProjection;
private DriverHealth _health = new(DriverState.Unknown, null, null);
public event EventHandler<DataChangeEventArgs>? OnDataChange;
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
public event EventHandler<AlarmEventArgs>? OnAlarmEvent;
/// <summary>Internal seam for the alarm projection to raise events through the driver.</summary>
internal void InvokeAlarmEvent(AlarmEventArgs args) => OnAlarmEvent?.Invoke(this, args);
public AbCipDriver(AbCipDriverOptions options, string driverInstanceId,
IAbCipTagFactory? tagFactory = null,
@@ -52,6 +57,7 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
reader: ReadAsync,
onChange: (handle, tagRef, snapshot) =>
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, snapshot)));
_alarmProjection = new AbCipAlarmProjection(this, _options.AlarmPollInterval);
}
/// <summary>
@@ -162,6 +168,7 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
await _alarmProjection.DisposeAsync().ConfigureAwait(false);
await _poll.DisposeAsync().ConfigureAwait(false);
foreach (var state in _devices.Values)
{
@@ -187,6 +194,39 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
return Task.CompletedTask;
}
// ---- IAlarmSource (ALMD projection, #177) ----
/// <summary>
/// Subscribe to ALMD alarm transitions on <paramref name="sourceNodeIds"/>. Each id
/// names a declared ALMD UDT tag; the projection polls the tag's <c>InFaulted</c> +
/// <c>Severity</c> members at <see cref="AbCipDriverOptions.AlarmPollInterval"/> and
/// fires <see cref="OnAlarmEvent"/> on 0→1 (raise) + 1→0 (clear) transitions.
/// Feature-gated — when <see cref="AbCipDriverOptions.EnableAlarmProjection"/> is
/// <c>false</c> (the default), returns a handle wrapping a no-op subscription so
/// capability negotiation still works; <see cref="OnAlarmEvent"/> never fires.
/// </summary>
public Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken)
{
if (!_options.EnableAlarmProjection)
{
var disabled = new AbCipAlarmSubscriptionHandle(0);
return Task.FromResult<IAlarmSubscriptionHandle>(disabled);
}
return _alarmProjection.SubscribeAsync(sourceNodeIds, cancellationToken);
}
public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken) =>
_options.EnableAlarmProjection
? _alarmProjection.UnsubscribeAsync(handle, cancellationToken)
: Task.CompletedTask;
public Task AcknowledgeAsync(
IReadOnlyList<AlarmAcknowledgeRequest> acknowledgements, CancellationToken cancellationToken) =>
_options.EnableAlarmProjection
? _alarmProjection.AcknowledgeAsync(acknowledgements, cancellationToken)
: Task.CompletedTask;
// ---- IHostConnectivityProbe ----
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses() =>

View File

@@ -38,6 +38,24 @@ public sealed class AbCipDriverOptions
/// should appear in the address space.
/// </summary>
public bool EnableControllerBrowse { get; init; }
/// <summary>
/// Task #177 — when <c>true</c>, declared ALMD tags are surfaced as alarm conditions
/// via <see cref="Core.Abstractions.IAlarmSource"/>; the driver polls each subscribed
/// alarm's <c>InFaulted</c> + <c>Severity</c> members + fires <c>OnAlarmEvent</c> on
/// state transitions. Default <c>false</c> — operators explicitly opt in because
/// projection semantics don't exactly mirror Rockwell FT Alarm &amp; Events; shops
/// running FT Live should keep this off + take alarms through the native route.
/// </summary>
public bool EnableAlarmProjection { get; init; }
/// <summary>
/// Poll interval for the ALMD projection loop. Shorter intervals catch faster edges
/// at the cost of PLC round-trips; edges shorter than this interval are invisible to
/// the projection (a 0→1→0 transition within one tick collapses to no event). Default
/// 1 second — matches typical SCADA alarm-refresh conventions.
/// </summary>
public TimeSpan AlarmPollInterval { get; init; } = TimeSpan.FromSeconds(1);
}
/// <summary>

View File

@@ -0,0 +1,190 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests;
/// <summary>
/// Task #177 — tests covering ALMD projection detection, feature-flag gate,
/// subscribe/unsubscribe lifecycle, state-transition event emission, and acknowledge.
/// </summary>
[Trait("Category", "Unit")]
public sealed class AbCipAlarmProjectionTests
{
private const string Device = "ab://10.0.0.5/1,0";
private static AbCipTagDefinition AlmdTag(string name) => new(
name, Device, name, AbCipDataType.Structure, Members:
[
new AbCipStructureMember("InFaulted", AbCipDataType.DInt), // Logix stores ALMD bools as DINT
new AbCipStructureMember("Acked", AbCipDataType.DInt),
new AbCipStructureMember("Severity", AbCipDataType.DInt),
new AbCipStructureMember("In", AbCipDataType.DInt),
]);
[Fact]
public void AbCipAlarmDetector_Flags_AlmdSignature_As_Alarm()
{
var almd = AlmdTag("HighTemp");
AbCipAlarmDetector.IsAlmd(almd).ShouldBeTrue();
var plainUdt = new AbCipTagDefinition("Plain", Device, "Plain", AbCipDataType.Structure, Members:
[new AbCipStructureMember("X", AbCipDataType.DInt)]);
AbCipAlarmDetector.IsAlmd(plainUdt).ShouldBeFalse();
var atomic = new AbCipTagDefinition("Plain", Device, "Plain", AbCipDataType.DInt);
AbCipAlarmDetector.IsAlmd(atomic).ShouldBeFalse();
}
[Fact]
public void Severity_Mapping_Matches_OPC_UA_Convention()
{
// Logix severity 11000 — mirror the OpcUaClient ACAndC bucketing.
AbCipAlarmProjection.MapSeverity(100).ShouldBe(AlarmSeverity.Low);
AbCipAlarmProjection.MapSeverity(400).ShouldBe(AlarmSeverity.Medium);
AbCipAlarmProjection.MapSeverity(600).ShouldBe(AlarmSeverity.High);
AbCipAlarmProjection.MapSeverity(900).ShouldBe(AlarmSeverity.Critical);
}
[Fact]
public async Task FeatureFlag_Off_SubscribeAlarms_Returns_Handle_But_Never_Polls()
{
var factory = new FakeAbCipTagFactory();
var opts = new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions(Device)],
Tags = [AlmdTag("HighTemp")],
EnableAlarmProjection = false, // explicit; also the default
};
var drv = new AbCipDriver(opts, "drv-1", factory);
await drv.InitializeAsync("{}", CancellationToken.None);
var handle = await drv.SubscribeAlarmsAsync(["HighTemp"], CancellationToken.None);
handle.ShouldNotBeNull();
handle.DiagnosticId.ShouldContain("abcip-alarm-sub-");
// Wait a touch — if polling were active, a fake member-read would be triggered.
await Task.Delay(100);
factory.Tags.ShouldNotContainKey("HighTemp.InFaulted");
factory.Tags.ShouldNotContainKey("HighTemp.Severity");
await drv.UnsubscribeAlarmsAsync(handle, CancellationToken.None);
await drv.ShutdownAsync(CancellationToken.None);
}
[Fact]
public async Task FeatureFlag_On_Subscribe_Starts_Polling_And_Fires_Raise_On_0_to_1()
{
var factory = new FakeAbCipTagFactory();
var opts = new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions(Device)],
Tags = [AlmdTag("HighTemp")],
EnableAlarmProjection = true,
AlarmPollInterval = TimeSpan.FromMilliseconds(20),
};
var drv = new AbCipDriver(opts, "drv-1", factory);
await drv.InitializeAsync("{}", CancellationToken.None);
var events = new List<AlarmEventArgs>();
drv.OnAlarmEvent += (_, e) => { lock (events) events.Add(e); };
var handle = await drv.SubscribeAlarmsAsync(["HighTemp"], CancellationToken.None);
// The ALMD UDT is declared so whole-UDT grouping kicks in; the parent HighTemp runtime
// gets created + polled. Set InFaulted offset-value to 0 first (clear), wait a tick,
// then flip to 1 (fault) + wait for the raise event.
await WaitForTagCreation(factory, "HighTemp");
factory.Tags["HighTemp"].ValuesByOffset[0] = 0; // InFaulted=false at offset 0
factory.Tags["HighTemp"].ValuesByOffset[8] = 500; // Severity at offset 8 (after InFaulted+Acked)
await Task.Delay(80); // let a tick seed the "last-seen false" state
factory.Tags["HighTemp"].ValuesByOffset[0] = 1; // flip to faulted
await Task.Delay(200); // allow several polls to be safe
lock (events)
{
events.ShouldContain(e => e.SourceNodeId == "HighTemp" && e.AlarmType == "ALMD"
&& e.Message.Contains("raised"));
}
await drv.UnsubscribeAlarmsAsync(handle, CancellationToken.None);
await drv.ShutdownAsync(CancellationToken.None);
}
[Fact]
public async Task Clear_Event_Fires_On_1_to_0_Transition()
{
var factory = new FakeAbCipTagFactory();
var opts = new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions(Device)],
Tags = [AlmdTag("HighTemp")],
EnableAlarmProjection = true,
AlarmPollInterval = TimeSpan.FromMilliseconds(20),
};
var drv = new AbCipDriver(opts, "drv-1", factory);
await drv.InitializeAsync("{}", CancellationToken.None);
var events = new List<AlarmEventArgs>();
drv.OnAlarmEvent += (_, e) => { lock (events) events.Add(e); };
var handle = await drv.SubscribeAlarmsAsync(["HighTemp"], CancellationToken.None);
await WaitForTagCreation(factory, "HighTemp");
factory.Tags["HighTemp"].ValuesByOffset[0] = 1;
factory.Tags["HighTemp"].ValuesByOffset[8] = 500;
await Task.Delay(80); // observe raise
factory.Tags["HighTemp"].ValuesByOffset[0] = 0;
await Task.Delay(200);
lock (events)
{
events.ShouldContain(e => e.Message.Contains("raised"));
events.ShouldContain(e => e.Message.Contains("cleared"));
}
await drv.UnsubscribeAlarmsAsync(handle, CancellationToken.None);
await drv.ShutdownAsync(CancellationToken.None);
}
[Fact]
public async Task Unsubscribe_Stops_The_Poll_Loop()
{
var factory = new FakeAbCipTagFactory();
var opts = new AbCipDriverOptions
{
Devices = [new AbCipDeviceOptions(Device)],
Tags = [AlmdTag("HighTemp")],
EnableAlarmProjection = true,
AlarmPollInterval = TimeSpan.FromMilliseconds(20),
};
var drv = new AbCipDriver(opts, "drv-1", factory);
await drv.InitializeAsync("{}", CancellationToken.None);
var handle = await drv.SubscribeAlarmsAsync(["HighTemp"], CancellationToken.None);
await WaitForTagCreation(factory, "HighTemp");
var preUnsubReadCount = factory.Tags["HighTemp"].ReadCount;
await drv.UnsubscribeAlarmsAsync(handle, CancellationToken.None);
await Task.Delay(100); // well past several poll intervals if the loop were still alive
var postDelayReadCount = factory.Tags["HighTemp"].ReadCount;
// Allow at most one straggler read between the unsubscribe-cancel + the loop exit.
(postDelayReadCount - preUnsubReadCount).ShouldBeLessThanOrEqualTo(1);
await drv.ShutdownAsync(CancellationToken.None);
}
private static async Task WaitForTagCreation(FakeAbCipTagFactory factory, string tagName)
{
var deadline = DateTime.UtcNow.AddSeconds(2);
while (DateTime.UtcNow < deadline)
{
if (factory.Tags.ContainsKey(tagName)) return;
await Task.Delay(10);
}
throw new TimeoutException($"Tag {tagName} was never created by the fake factory.");
}
}