Compare commits
4 Commits
phase-7-st
...
phase-7-st
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
25ad4b1929 | ||
| 51d0b27bfd | |||
|
|
df39809526 | ||
| 2a8bcc8f60 |
@@ -5,6 +5,8 @@
|
||||
<Project Path="src/ZB.MOM.WW.OtOpcUa.Core/ZB.MOM.WW.OtOpcUa.Core.csproj"/>
|
||||
<Project Path="src/ZB.MOM.WW.OtOpcUa.Core.Scripting/ZB.MOM.WW.OtOpcUa.Core.Scripting.csproj"/>
|
||||
<Project Path="src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.csproj"/>
|
||||
<Project Path="src/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.csproj"/>
|
||||
<Project Path="src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj"/>
|
||||
<Project Path="src/ZB.MOM.WW.OtOpcUa.Server/ZB.MOM.WW.OtOpcUa.Server.csproj"/>
|
||||
<Project Path="src/ZB.MOM.WW.OtOpcUa.Admin/ZB.MOM.WW.OtOpcUa.Admin.csproj"/>
|
||||
<Project Path="src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
|
||||
@@ -30,6 +32,8 @@
|
||||
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Core.Tests/ZB.MOM.WW.OtOpcUa.Core.Tests.csproj"/>
|
||||
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests/ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests.csproj"/>
|
||||
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests.csproj"/>
|
||||
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.Tests/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.Tests.csproj"/>
|
||||
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests.csproj"/>
|
||||
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Server.Tests/ZB.MOM.WW.OtOpcUa.Server.Tests.csproj"/>
|
||||
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Admin.Tests/ZB.MOM.WW.OtOpcUa.Admin.Tests.csproj"/>
|
||||
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Tests.csproj"/>
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||
|
||||
/// <summary>
|
||||
/// The event shape the historian sink consumes — source-agnostic across scripted
|
||||
/// alarms + Galaxy-native + AB CIP ALMD + any future IAlarmSource per Phase 7 plan
|
||||
/// decision #15 (sink scope = all alarm sources, not just scripted). A per-alarm
|
||||
/// <c>HistorizeToAveva</c> toggle on the producer side gates which events flow.
|
||||
/// </summary>
|
||||
/// <param name="AlarmId">Stable condition identity.</param>
|
||||
/// <param name="EquipmentPath">UNS path of the Equipment node the alarm hangs under. Doubles as the "SourceNode" in Historian's alarm schema.</param>
|
||||
/// <param name="AlarmName">Human-readable alarm name.</param>
|
||||
/// <param name="AlarmTypeName">Concrete Part 9 subtype — "LimitAlarm" / "DiscreteAlarm" / "OffNormalAlarm" / "AlarmCondition". Used as the Historian "AlarmType" column.</param>
|
||||
/// <param name="Severity">Mapped to Historian's numeric priority on the sink side.</param>
|
||||
/// <param name="EventKind">
|
||||
/// Which state transition this event represents — "Activated" / "Cleared" /
|
||||
/// "Acknowledged" / "Confirmed" / "Shelved" / "Unshelved" / "Disabled" / "Enabled" /
|
||||
/// "CommentAdded". Free-form string because different alarm sources use different
|
||||
/// vocabularies; the Galaxy.Host handler maps to the historian's enum on the wire.
|
||||
/// </param>
|
||||
/// <param name="Message">Fully-rendered message text — template tokens already resolved upstream.</param>
|
||||
/// <param name="User">Operator who triggered the transition. "system" for engine-driven events (shelving expiry, predicate change).</param>
|
||||
/// <param name="Comment">Operator-supplied free-form text, if any.</param>
|
||||
/// <param name="TimestampUtc">When the transition occurred.</param>
|
||||
public sealed record AlarmHistorianEvent(
|
||||
string AlarmId,
|
||||
string EquipmentPath,
|
||||
string AlarmName,
|
||||
string AlarmTypeName,
|
||||
AlarmSeverity Severity,
|
||||
string EventKind,
|
||||
string Message,
|
||||
string User,
|
||||
string? Comment,
|
||||
DateTime TimestampUtc);
|
||||
@@ -0,0 +1,82 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||
|
||||
/// <summary>
|
||||
/// The historian sink contract — where qualifying alarm events land. Phase 7 plan
|
||||
/// decision #17: ingestion routes through Galaxy.Host's pipe so we reuse the
|
||||
/// already-loaded <c>aahClientManaged</c> DLLs without loading 32-bit native code
|
||||
/// in the main .NET 10 server. Tests use an in-memory fake; production uses
|
||||
/// <see cref="SqliteStoreAndForwardSink"/>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// <see cref="EnqueueAsync"/> is fire-and-forget from the engine's perspective —
|
||||
/// the sink MUST NOT block the emitting thread. Production implementations
|
||||
/// (<see cref="SqliteStoreAndForwardSink"/>) persist to a local SQLite queue
|
||||
/// first, then drain asynchronously to the actual historian. Per Phase 7 plan
|
||||
/// decision #16, failed downstream writes replay with exponential backoff;
|
||||
/// operator actions are never blocked waiting on the historian.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <see cref="GetStatus"/> exposes queue depth + drain rate + last error
|
||||
/// for the Admin UI <c>/alarms/historian</c> diagnostics page (Stream F).
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public interface IAlarmHistorianSink
|
||||
{
|
||||
/// <summary>Durably enqueue the event. Returns as soon as the queue row is committed.</summary>
|
||||
Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>Snapshot of current queue depth + drain health.</summary>
|
||||
HistorianSinkStatus GetStatus();
|
||||
}
|
||||
|
||||
/// <summary>No-op default for tests or deployments that don't historize alarms.</summary>
|
||||
public sealed class NullAlarmHistorianSink : IAlarmHistorianSink
|
||||
{
|
||||
public static readonly NullAlarmHistorianSink Instance = new();
|
||||
public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
public HistorianSinkStatus GetStatus() => new(
|
||||
QueueDepth: 0,
|
||||
DeadLetterDepth: 0,
|
||||
LastDrainUtc: null,
|
||||
LastSuccessUtc: null,
|
||||
LastError: null,
|
||||
DrainState: HistorianDrainState.Disabled);
|
||||
}
|
||||
|
||||
/// <summary>Diagnostic snapshot surfaced to the Admin UI + /healthz endpoints.</summary>
|
||||
public sealed record HistorianSinkStatus(
|
||||
long QueueDepth,
|
||||
long DeadLetterDepth,
|
||||
DateTime? LastDrainUtc,
|
||||
DateTime? LastSuccessUtc,
|
||||
string? LastError,
|
||||
HistorianDrainState DrainState);
|
||||
|
||||
/// <summary>Where the drain worker is in its state machine.</summary>
|
||||
public enum HistorianDrainState
|
||||
{
|
||||
Disabled,
|
||||
Idle,
|
||||
Draining,
|
||||
BackingOff,
|
||||
}
|
||||
|
||||
/// <summary>Signaled by the Galaxy.Host-side handler when it fails a batch — drain worker uses this to decide retry cadence.</summary>
|
||||
public enum HistorianWriteOutcome
|
||||
{
|
||||
/// <summary>Successfully persisted to the historian. Remove from queue.</summary>
|
||||
Ack,
|
||||
/// <summary>Transient failure (historian disconnected, timeout, busy). Leave queued; retry after backoff.</summary>
|
||||
RetryPlease,
|
||||
/// <summary>Permanent failure (malformed event, unrecoverable SDK error). Move to dead-letter table.</summary>
|
||||
PermanentFail,
|
||||
}
|
||||
|
||||
/// <summary>What the drain worker delegates writes to — Stream G wires this to the Galaxy.Host IPC client.</summary>
|
||||
public interface IAlarmHistorianWriter
|
||||
{
|
||||
/// <summary>Push a batch of events to the historian. Returns one outcome per event, same order.</summary>
|
||||
Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
|
||||
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -0,0 +1,397 @@
|
||||
using System.Text.Json;
|
||||
using Microsoft.Data.Sqlite;
|
||||
using Serilog;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||
|
||||
/// <summary>
|
||||
/// Phase 7 plan decisions #16–#17 implementation: durable SQLite queue on the node
|
||||
/// absorbs every qualifying alarm event, a drain worker batches rows to Galaxy.Host
|
||||
/// via <see cref="IAlarmHistorianWriter"/> on an exponential-backoff cadence, and
|
||||
/// operator acks never block on the historian being reachable.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Queue schema:
|
||||
/// <code>
|
||||
/// CREATE TABLE Queue (
|
||||
/// RowId INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
/// AlarmId TEXT NOT NULL,
|
||||
/// EnqueuedUtc TEXT NOT NULL,
|
||||
/// PayloadJson TEXT NOT NULL,
|
||||
/// AttemptCount INTEGER NOT NULL DEFAULT 0,
|
||||
/// LastAttemptUtc TEXT NULL,
|
||||
/// LastError TEXT NULL,
|
||||
/// DeadLettered INTEGER NOT NULL DEFAULT 0
|
||||
/// );
|
||||
/// </code>
|
||||
/// Dead-lettered rows stay in place for the configured retention window (default
|
||||
/// 30 days per Phase 7 plan decision #21) so operators can inspect + manually
|
||||
/// retry before the sweeper purges them. Regular queue capacity is bounded —
|
||||
/// overflow evicts the oldest non-dead-lettered rows with a WARN log.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Drain runs on a shared <see cref="System.Threading.Timer"/>. Exponential
|
||||
/// backoff on <see cref="HistorianWriteOutcome.RetryPlease"/>: 1s → 2s → 5s →
|
||||
/// 15s → 60s cap. <see cref="HistorianWriteOutcome.PermanentFail"/> rows flip
|
||||
/// the <c>DeadLettered</c> flag on the individual row; neighbors in the batch
|
||||
/// still retry on their own cadence.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
{
|
||||
/// <summary>Default queue capacity — oldest non-dead-lettered rows evicted past this.</summary>
|
||||
public const long DefaultCapacity = 1_000_000;
|
||||
public static readonly TimeSpan DefaultDeadLetterRetention = TimeSpan.FromDays(30);
|
||||
|
||||
private static readonly TimeSpan[] BackoffLadder =
|
||||
[
|
||||
TimeSpan.FromSeconds(1),
|
||||
TimeSpan.FromSeconds(2),
|
||||
TimeSpan.FromSeconds(5),
|
||||
TimeSpan.FromSeconds(15),
|
||||
TimeSpan.FromSeconds(60),
|
||||
];
|
||||
|
||||
private readonly string _connectionString;
|
||||
private readonly IAlarmHistorianWriter _writer;
|
||||
private readonly ILogger _logger;
|
||||
private readonly int _batchSize;
|
||||
private readonly long _capacity;
|
||||
private readonly TimeSpan _deadLetterRetention;
|
||||
private readonly Func<DateTime> _clock;
|
||||
|
||||
private readonly SemaphoreSlim _drainGate = new(1, 1);
|
||||
private Timer? _drainTimer;
|
||||
private int _backoffIndex;
|
||||
private DateTime? _lastDrainUtc;
|
||||
private DateTime? _lastSuccessUtc;
|
||||
private string? _lastError;
|
||||
private HistorianDrainState _drainState = HistorianDrainState.Idle;
|
||||
private bool _disposed;
|
||||
|
||||
public SqliteStoreAndForwardSink(
|
||||
string databasePath,
|
||||
IAlarmHistorianWriter writer,
|
||||
ILogger logger,
|
||||
int batchSize = 100,
|
||||
long capacity = DefaultCapacity,
|
||||
TimeSpan? deadLetterRetention = null,
|
||||
Func<DateTime>? clock = null)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(databasePath))
|
||||
throw new ArgumentException("Database path required.", nameof(databasePath));
|
||||
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_batchSize = batchSize > 0 ? batchSize : throw new ArgumentOutOfRangeException(nameof(batchSize));
|
||||
_capacity = capacity > 0 ? capacity : throw new ArgumentOutOfRangeException(nameof(capacity));
|
||||
_deadLetterRetention = deadLetterRetention ?? DefaultDeadLetterRetention;
|
||||
_clock = clock ?? (() => DateTime.UtcNow);
|
||||
_connectionString = $"Data Source={databasePath}";
|
||||
|
||||
InitializeSchema();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Start the background drain worker. Not started automatically so tests can
|
||||
/// drive <see cref="DrainOnceAsync"/> deterministically.
|
||||
/// </summary>
|
||||
public void StartDrainLoop(TimeSpan tickInterval)
|
||||
{
|
||||
if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
|
||||
_drainTimer?.Dispose();
|
||||
_drainTimer = new Timer(_ => _ = DrainOnceAsync(CancellationToken.None),
|
||||
null, tickInterval, tickInterval);
|
||||
}
|
||||
|
||||
public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
|
||||
{
|
||||
if (evt is null) throw new ArgumentNullException(nameof(evt));
|
||||
if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
|
||||
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
|
||||
EnforceCapacity(conn);
|
||||
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
INSERT INTO Queue (AlarmId, EnqueuedUtc, PayloadJson, AttemptCount)
|
||||
VALUES ($alarmId, $enqueued, $payload, 0);
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$alarmId", evt.AlarmId);
|
||||
cmd.Parameters.AddWithValue("$enqueued", _clock().ToString("O"));
|
||||
cmd.Parameters.AddWithValue("$payload", JsonSerializer.Serialize(evt));
|
||||
cmd.ExecuteNonQuery();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Read up to <see cref="_batchSize"/> queued rows, forward through the writer,
|
||||
/// remove Ack'd rows, dead-letter PermanentFail rows, and extend the backoff
|
||||
/// on RetryPlease. Safe to call from multiple threads; the semaphore enforces
|
||||
/// serial execution.
|
||||
/// </summary>
|
||||
public async Task DrainOnceAsync(CancellationToken ct)
|
||||
{
|
||||
if (_disposed) return;
|
||||
if (!await _drainGate.WaitAsync(0, ct).ConfigureAwait(false)) return;
|
||||
try
|
||||
{
|
||||
_drainState = HistorianDrainState.Draining;
|
||||
_lastDrainUtc = _clock();
|
||||
|
||||
PurgeAgedDeadLetters();
|
||||
var (rowIds, events) = ReadBatch();
|
||||
if (rowIds.Count == 0)
|
||||
{
|
||||
_drainState = HistorianDrainState.Idle;
|
||||
return;
|
||||
}
|
||||
|
||||
IReadOnlyList<HistorianWriteOutcome> outcomes;
|
||||
try
|
||||
{
|
||||
outcomes = await _writer.WriteBatchAsync(events, ct).ConfigureAwait(false);
|
||||
_lastError = null;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Writer-side exception — treat entire batch as RetryPlease.
|
||||
_lastError = ex.Message;
|
||||
_logger.Warning(ex, "Historian writer threw on batch of {Count}; deferring retry", events.Count);
|
||||
BumpBackoff();
|
||||
_drainState = HistorianDrainState.BackingOff;
|
||||
return;
|
||||
}
|
||||
|
||||
if (outcomes.Count != events.Count)
|
||||
throw new InvalidOperationException(
|
||||
$"Writer returned {outcomes.Count} outcomes for {events.Count} events — expected 1:1");
|
||||
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
using var tx = conn.BeginTransaction();
|
||||
for (var i = 0; i < outcomes.Count; i++)
|
||||
{
|
||||
var outcome = outcomes[i];
|
||||
var rowId = rowIds[i];
|
||||
switch (outcome)
|
||||
{
|
||||
case HistorianWriteOutcome.Ack:
|
||||
DeleteRow(conn, tx, rowId);
|
||||
break;
|
||||
case HistorianWriteOutcome.PermanentFail:
|
||||
DeadLetterRow(conn, tx, rowId, $"permanent fail at {_clock():O}");
|
||||
break;
|
||||
case HistorianWriteOutcome.RetryPlease:
|
||||
BumpAttempt(conn, tx, rowId, "retry-please");
|
||||
break;
|
||||
}
|
||||
}
|
||||
tx.Commit();
|
||||
|
||||
var acks = outcomes.Count(o => o == HistorianWriteOutcome.Ack);
|
||||
if (acks > 0) _lastSuccessUtc = _clock();
|
||||
|
||||
if (outcomes.Any(o => o == HistorianWriteOutcome.RetryPlease))
|
||||
{
|
||||
BumpBackoff();
|
||||
_drainState = HistorianDrainState.BackingOff;
|
||||
}
|
||||
else
|
||||
{
|
||||
ResetBackoff();
|
||||
_drainState = HistorianDrainState.Idle;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_drainGate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
public HistorianSinkStatus GetStatus()
|
||||
{
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
|
||||
long queued;
|
||||
long deadlettered;
|
||||
using (var cmd = conn.CreateCommand())
|
||||
{
|
||||
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
|
||||
queued = (long)(cmd.ExecuteScalar() ?? 0L);
|
||||
}
|
||||
using (var cmd = conn.CreateCommand())
|
||||
{
|
||||
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 1";
|
||||
deadlettered = (long)(cmd.ExecuteScalar() ?? 0L);
|
||||
}
|
||||
|
||||
return new HistorianSinkStatus(
|
||||
QueueDepth: queued,
|
||||
DeadLetterDepth: deadlettered,
|
||||
LastDrainUtc: _lastDrainUtc,
|
||||
LastSuccessUtc: _lastSuccessUtc,
|
||||
LastError: _lastError,
|
||||
DrainState: _drainState);
|
||||
}
|
||||
|
||||
/// <summary>Operator action from Admin UI — retry every dead-lettered row. Non-cascading: they rejoin the regular queue + get a fresh backoff.</summary>
|
||||
public int RetryDeadLettered()
|
||||
{
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = "UPDATE Queue SET DeadLettered = 0, AttemptCount = 0, LastError = NULL WHERE DeadLettered = 1";
|
||||
return cmd.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
private (List<long> rowIds, List<AlarmHistorianEvent> events) ReadBatch()
|
||||
{
|
||||
var rowIds = new List<long>();
|
||||
var events = new List<AlarmHistorianEvent>();
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
SELECT RowId, PayloadJson FROM Queue
|
||||
WHERE DeadLettered = 0
|
||||
ORDER BY RowId ASC
|
||||
LIMIT $limit
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$limit", _batchSize);
|
||||
using var reader = cmd.ExecuteReader();
|
||||
while (reader.Read())
|
||||
{
|
||||
rowIds.Add(reader.GetInt64(0));
|
||||
var payload = reader.GetString(1);
|
||||
var evt = JsonSerializer.Deserialize<AlarmHistorianEvent>(payload);
|
||||
if (evt is not null) events.Add(evt);
|
||||
}
|
||||
return (rowIds, events);
|
||||
}
|
||||
|
||||
private static void DeleteRow(SqliteConnection conn, SqliteTransaction tx, long rowId)
|
||||
{
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.Transaction = tx;
|
||||
cmd.CommandText = "DELETE FROM Queue WHERE RowId = $id";
|
||||
cmd.Parameters.AddWithValue("$id", rowId);
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
private void DeadLetterRow(SqliteConnection conn, SqliteTransaction tx, long rowId, string reason)
|
||||
{
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.Transaction = tx;
|
||||
cmd.CommandText = """
|
||||
UPDATE Queue SET DeadLettered = 1, LastAttemptUtc = $now, LastError = $err, AttemptCount = AttemptCount + 1
|
||||
WHERE RowId = $id
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$now", _clock().ToString("O"));
|
||||
cmd.Parameters.AddWithValue("$err", reason);
|
||||
cmd.Parameters.AddWithValue("$id", rowId);
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
private void BumpAttempt(SqliteConnection conn, SqliteTransaction tx, long rowId, string reason)
|
||||
{
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.Transaction = tx;
|
||||
cmd.CommandText = """
|
||||
UPDATE Queue SET LastAttemptUtc = $now, LastError = $err, AttemptCount = AttemptCount + 1
|
||||
WHERE RowId = $id
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$now", _clock().ToString("O"));
|
||||
cmd.Parameters.AddWithValue("$err", reason);
|
||||
cmd.Parameters.AddWithValue("$id", rowId);
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
private void EnforceCapacity(SqliteConnection conn)
|
||||
{
|
||||
// Count non-dead-lettered rows only — dead-lettered rows retain for
|
||||
// post-mortem per the configured retention window.
|
||||
long count;
|
||||
using (var cmd = conn.CreateCommand())
|
||||
{
|
||||
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
|
||||
count = (long)(cmd.ExecuteScalar() ?? 0L);
|
||||
}
|
||||
if (count < _capacity) return;
|
||||
|
||||
var toEvict = count - _capacity + 1;
|
||||
using (var cmd = conn.CreateCommand())
|
||||
{
|
||||
cmd.CommandText = """
|
||||
DELETE FROM Queue
|
||||
WHERE RowId IN (
|
||||
SELECT RowId FROM Queue
|
||||
WHERE DeadLettered = 0
|
||||
ORDER BY RowId ASC
|
||||
LIMIT $n
|
||||
)
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$n", toEvict);
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
_logger.Warning(
|
||||
"Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room",
|
||||
_capacity, toEvict);
|
||||
}
|
||||
|
||||
private void PurgeAgedDeadLetters()
|
||||
{
|
||||
var cutoff = (_clock() - _deadLetterRetention).ToString("O");
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
DELETE FROM Queue
|
||||
WHERE DeadLettered = 1 AND LastAttemptUtc IS NOT NULL AND LastAttemptUtc < $cutoff
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$cutoff", cutoff);
|
||||
var purged = cmd.ExecuteNonQuery();
|
||||
if (purged > 0)
|
||||
_logger.Information("Purged {Count} dead-lettered row(s) past retention window", purged);
|
||||
}
|
||||
|
||||
private void InitializeSchema()
|
||||
{
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
CREATE TABLE IF NOT EXISTS Queue (
|
||||
RowId INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
AlarmId TEXT NOT NULL,
|
||||
EnqueuedUtc TEXT NOT NULL,
|
||||
PayloadJson TEXT NOT NULL,
|
||||
AttemptCount INTEGER NOT NULL DEFAULT 0,
|
||||
LastAttemptUtc TEXT NULL,
|
||||
LastError TEXT NULL,
|
||||
DeadLettered INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS IX_Queue_Drain ON Queue (DeadLettered, RowId);
|
||||
""";
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
private void BumpBackoff() => _backoffIndex = Math.Min(_backoffIndex + 1, BackoffLadder.Length - 1);
|
||||
private void ResetBackoff() => _backoffIndex = 0;
|
||||
public TimeSpan CurrentBackoff => BackoffLadder[_backoffIndex];
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
_drainTimer?.Dispose();
|
||||
_drainGate.Dispose();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<Nullable>enable</Nullable>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<LangVersion>latest</LangVersion>
|
||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||
<GenerateDocumentationFile>true</GenerateDocumentationFile>
|
||||
<NoWarn>$(NoWarn);CS1591</NoWarn>
|
||||
<RootNamespace>ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian</RootNamespace>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Data.Sqlite" Version="9.0.0"/>
|
||||
<PackageReference Include="Serilog" Version="4.2.0"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.Abstractions\ZB.MOM.WW.OtOpcUa.Core.Abstractions.csproj"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@@ -0,0 +1,84 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
/// <summary>
|
||||
/// Persistent per-alarm state tracked by the Part 9 state machine. Every field
|
||||
/// carried here either participates in the state machine or contributes to the
|
||||
/// audit trail required by Phase 7 plan decision #14 (GxP / 21 CFR Part 11).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// <see cref="Active"/> is re-derived from the predicate at startup per Phase 7
|
||||
/// decision #14 — the engine runs every alarm's predicate against current tag
|
||||
/// values at <c>Load</c>, overriding whatever Active state is in the store.
|
||||
/// Every other state field persists verbatim across server restarts so
|
||||
/// operators don't re-ack active alarms after an outage + shelved alarms stay
|
||||
/// shelved + audit history survives.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <see cref="Comments"/> is append-only; comments + ack/confirm user identities
|
||||
/// are the audit surface regulators consume. The engine never rewrites past
|
||||
/// entries.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed record AlarmConditionState(
|
||||
string AlarmId,
|
||||
AlarmEnabledState Enabled,
|
||||
AlarmActiveState Active,
|
||||
AlarmAckedState Acked,
|
||||
AlarmConfirmedState Confirmed,
|
||||
ShelvingState Shelving,
|
||||
DateTime LastTransitionUtc,
|
||||
DateTime? LastActiveUtc,
|
||||
DateTime? LastClearedUtc,
|
||||
DateTime? LastAckUtc,
|
||||
string? LastAckUser,
|
||||
string? LastAckComment,
|
||||
DateTime? LastConfirmUtc,
|
||||
string? LastConfirmUser,
|
||||
string? LastConfirmComment,
|
||||
IReadOnlyList<AlarmComment> Comments)
|
||||
{
|
||||
/// <summary>Initial-load state for a newly registered alarm — everything in the "no-event" position.</summary>
|
||||
public static AlarmConditionState Fresh(string alarmId, DateTime nowUtc) => new(
|
||||
AlarmId: alarmId,
|
||||
Enabled: AlarmEnabledState.Enabled,
|
||||
Active: AlarmActiveState.Inactive,
|
||||
Acked: AlarmAckedState.Acknowledged,
|
||||
Confirmed: AlarmConfirmedState.Confirmed,
|
||||
Shelving: ShelvingState.Unshelved,
|
||||
LastTransitionUtc: nowUtc,
|
||||
LastActiveUtc: null,
|
||||
LastClearedUtc: null,
|
||||
LastAckUtc: null,
|
||||
LastAckUser: null,
|
||||
LastAckComment: null,
|
||||
LastConfirmUtc: null,
|
||||
LastConfirmUser: null,
|
||||
LastConfirmComment: null,
|
||||
Comments: []);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Shelving state — kind plus, for <see cref="ShelvingKind.Timed"/>, the UTC
|
||||
/// timestamp at which the shelving auto-expires. The engine polls the timer on its
|
||||
/// evaluation cadence; callers should not rely on millisecond-precision expiry.
|
||||
/// </summary>
|
||||
public sealed record ShelvingState(ShelvingKind Kind, DateTime? UnshelveAtUtc)
|
||||
{
|
||||
public static readonly ShelvingState Unshelved = new(ShelvingKind.Unshelved, null);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A single append-only audit record — acknowledgement / confirmation / explicit
|
||||
/// comment / shelving action. Every entry carries a monotonic UTC timestamp plus the
|
||||
/// user identity Phase 6.2 authenticated.
|
||||
/// </summary>
|
||||
/// <param name="TimestampUtc">When the action happened.</param>
|
||||
/// <param name="User">OS / LDAP identity of the actor. For engine-internal events (shelving expiry, startup recovery) this is <c>"system"</c>.</param>
|
||||
/// <param name="Kind">Human-readable classification — "Acknowledge", "Confirm", "ShelveOneShot", "ShelveTimed", "Unshelve", "AddComment", "Enable", "Disable", "AutoUnshelve".</param>
|
||||
/// <param name="Text">Operator-supplied comment or engine-generated message.</param>
|
||||
public sealed record AlarmComment(
|
||||
DateTime TimestampUtc,
|
||||
string User,
|
||||
string Kind,
|
||||
string Text);
|
||||
@@ -0,0 +1,55 @@
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
/// <summary>
|
||||
/// <see cref="ScriptContext"/> subclass for alarm predicate evaluation. Reads from
|
||||
/// the engine's shared tag cache (driver + virtual tags), writes are rejected —
|
||||
/// predicates must be side-effect free so their output doesn't depend on evaluation
|
||||
/// order or drive cascade behavior.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Per Phase 7 plan Shape A decision, alarm scripts are one-script-per-alarm
|
||||
/// returning <c>bool</c>. They read any tag they want but should not write
|
||||
/// anything (the owning alarm's state is tracked by the engine, not the script).
|
||||
/// </remarks>
|
||||
public sealed class AlarmPredicateContext : ScriptContext
|
||||
{
|
||||
private readonly IReadOnlyDictionary<string, DataValueSnapshot> _readCache;
|
||||
private readonly Func<DateTime> _clock;
|
||||
|
||||
public AlarmPredicateContext(
|
||||
IReadOnlyDictionary<string, DataValueSnapshot> readCache,
|
||||
ILogger logger,
|
||||
Func<DateTime>? clock = null)
|
||||
{
|
||||
_readCache = readCache ?? throw new ArgumentNullException(nameof(readCache));
|
||||
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
_clock = clock ?? (() => DateTime.UtcNow);
|
||||
}
|
||||
|
||||
public override DataValueSnapshot GetTag(string path)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(path))
|
||||
return new DataValueSnapshot(null, 0x80340000u, null, _clock());
|
||||
return _readCache.TryGetValue(path, out var v)
|
||||
? v
|
||||
: new DataValueSnapshot(null, 0x80340000u, null, _clock());
|
||||
}
|
||||
|
||||
public override void SetVirtualTag(string path, object? value)
|
||||
{
|
||||
// Predicates must be pure — writing from an alarm script couples alarm state to
|
||||
// virtual-tag state in a way that's near-impossible to reason about. Rejected
|
||||
// at runtime with a clear message; operators see it in the scripts-*.log.
|
||||
throw new InvalidOperationException(
|
||||
"Alarm predicate scripts cannot write to virtual tags. Move the write logic " +
|
||||
"into a virtual tag whose value the alarm predicate then reads.");
|
||||
}
|
||||
|
||||
public override DateTime Now => _clock();
|
||||
|
||||
public override ILogger Logger { get; }
|
||||
}
|
||||
40
src/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms/AlarmTypes.cs
Normal file
40
src/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms/AlarmTypes.cs
Normal file
@@ -0,0 +1,40 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
/// <summary>
|
||||
/// The concrete OPC UA Part 9 alarm subtype a scripted alarm materializes as. The
|
||||
/// engine's internal state machine is identical regardless of kind — the
|
||||
/// <c>AlarmKind</c> only affects how the alarm node appears to OPC UA clients
|
||||
/// (which ObjectType it maps to) and what diagnostic fields are populated.
|
||||
/// </summary>
|
||||
public enum AlarmKind
|
||||
{
|
||||
/// <summary>Base AlarmConditionType — no numeric or discrete interpretation.</summary>
|
||||
AlarmCondition,
|
||||
/// <summary>LimitAlarmType — the condition reflects a numeric setpoint / threshold breach.</summary>
|
||||
LimitAlarm,
|
||||
/// <summary>DiscreteAlarmType — the condition reflects a specific discrete value match.</summary>
|
||||
DiscreteAlarm,
|
||||
/// <summary>OffNormalAlarmType — the condition reflects deviation from a configured "normal" state.</summary>
|
||||
OffNormalAlarm,
|
||||
}
|
||||
|
||||
/// <summary>OPC UA Part 9 EnabledState — operator-controlled alarm enable/disable.</summary>
|
||||
public enum AlarmEnabledState { Enabled, Disabled }
|
||||
|
||||
/// <summary>OPC UA Part 9 ActiveState — reflects the current predicate truth.</summary>
|
||||
public enum AlarmActiveState { Inactive, Active }
|
||||
|
||||
/// <summary>OPC UA Part 9 AckedState — operator has acknowledged the active transition.</summary>
|
||||
public enum AlarmAckedState { Unacknowledged, Acknowledged }
|
||||
|
||||
/// <summary>OPC UA Part 9 ConfirmedState — operator has confirmed the clear transition.</summary>
|
||||
public enum AlarmConfirmedState { Unconfirmed, Confirmed }
|
||||
|
||||
/// <summary>
|
||||
/// OPC UA Part 9 shelving mode.
|
||||
/// <see cref="OneShot"/> suppresses the next active transition; once cleared
|
||||
/// the shelving expires and the alarm returns to normal behavior.
|
||||
/// <see cref="Timed"/> suppresses until a configured expiry timestamp passes.
|
||||
/// <see cref="Unshelved"/> is the default state — no suppression.
|
||||
/// </summary>
|
||||
public enum ShelvingKind { Unshelved, OneShot, Timed }
|
||||
@@ -0,0 +1,47 @@
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
/// <summary>
|
||||
/// Persistence for <see cref="AlarmConditionState"/> across server restarts. Phase 7
|
||||
/// plan decision #14: operator-supplied state (EnabledState / AckedState /
|
||||
/// ConfirmedState / ShelvingState + audit trail) persists; ActiveState is
|
||||
/// recomputed from the live predicate on startup so operators never re-ack.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Stream E wires this to a SQL-backed store against the <c>ScriptedAlarmState</c>
|
||||
/// table with audit logging through <see cref="Core.Abstractions"/> IAuditLogger.
|
||||
/// Tests + local dev use <see cref="InMemoryAlarmStateStore"/>.
|
||||
/// </remarks>
|
||||
public interface IAlarmStateStore
|
||||
{
|
||||
Task<AlarmConditionState?> LoadAsync(string alarmId, CancellationToken ct);
|
||||
Task<IReadOnlyList<AlarmConditionState>> LoadAllAsync(CancellationToken ct);
|
||||
Task SaveAsync(AlarmConditionState state, CancellationToken ct);
|
||||
Task RemoveAsync(string alarmId, CancellationToken ct);
|
||||
}
|
||||
|
||||
/// <summary>In-memory default — used by tests + by dev deployments without a SQL backend.</summary>
|
||||
public sealed class InMemoryAlarmStateStore : IAlarmStateStore
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, AlarmConditionState> _map
|
||||
= new(StringComparer.Ordinal);
|
||||
|
||||
public Task<AlarmConditionState?> LoadAsync(string alarmId, CancellationToken ct)
|
||||
=> Task.FromResult(_map.TryGetValue(alarmId, out var v) ? v : null);
|
||||
|
||||
public Task<IReadOnlyList<AlarmConditionState>> LoadAllAsync(CancellationToken ct)
|
||||
=> Task.FromResult<IReadOnlyList<AlarmConditionState>>(_map.Values.ToArray());
|
||||
|
||||
public Task SaveAsync(AlarmConditionState state, CancellationToken ct)
|
||||
{
|
||||
_map[state.AlarmId] = state;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task RemoveAsync(string alarmId, CancellationToken ct)
|
||||
{
|
||||
_map.TryRemove(alarmId, out _);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
64
src/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms/MessageTemplate.cs
Normal file
64
src/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms/MessageTemplate.cs
Normal file
@@ -0,0 +1,64 @@
|
||||
using System.Text.RegularExpressions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
/// <summary>
|
||||
/// Per Phase 7 plan decision #13, alarm messages are static-with-substitution
|
||||
/// templates. The engine resolves <c>{TagPath}</c> tokens at event emission time
|
||||
/// against current tag values; unresolvable tokens become <c>{?}</c> so the event
|
||||
/// still fires but the operator sees where the reference broke.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Token syntax: <c>{path/with/slashes}</c>. Brace-stripped the contents must
|
||||
/// match a path the caller's resolver function can look up. No escaping
|
||||
/// currently — if you need literal braces in the message, reach for a feature
|
||||
/// request.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Pure function. Same inputs always produce the same string. Tests verify the
|
||||
/// edge cases (no tokens / one token / many / nested / unresolvable / bad
|
||||
/// quality / null value).
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public static class MessageTemplate
|
||||
{
|
||||
private static readonly Regex TokenRegex = new(@"\{([^{}]+)\}",
|
||||
RegexOptions.Compiled | RegexOptions.CultureInvariant);
|
||||
|
||||
/// <summary>
|
||||
/// Resolve every <c>{path}</c> token in <paramref name="template"/> using
|
||||
/// <paramref name="resolveTag"/>. Tokens whose returned <see cref="DataValueSnapshot"/>
|
||||
/// has a non-Good <see cref="DataValueSnapshot.StatusCode"/> or a null
|
||||
/// <see cref="DataValueSnapshot.Value"/> resolve to <c>{?}</c>.
|
||||
/// </summary>
|
||||
public static string Resolve(string template, Func<string, DataValueSnapshot?> resolveTag)
|
||||
{
|
||||
if (string.IsNullOrEmpty(template)) return template ?? string.Empty;
|
||||
if (resolveTag is null) throw new ArgumentNullException(nameof(resolveTag));
|
||||
|
||||
return TokenRegex.Replace(template, match =>
|
||||
{
|
||||
var path = match.Groups[1].Value.Trim();
|
||||
if (path.Length == 0) return "{?}";
|
||||
var snap = resolveTag(path);
|
||||
if (snap is null) return "{?}";
|
||||
if (snap.StatusCode != 0u) return "{?}";
|
||||
return snap.Value?.ToString() ?? "{?}";
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>Enumerate the token paths the template references. Used at publish time to validate references exist.</summary>
|
||||
public static IReadOnlyList<string> ExtractTokenPaths(string? template)
|
||||
{
|
||||
if (string.IsNullOrEmpty(template)) return Array.Empty<string>();
|
||||
var tokens = new List<string>();
|
||||
foreach (Match m in TokenRegex.Matches(template))
|
||||
{
|
||||
var path = m.Groups[1].Value.Trim();
|
||||
if (path.Length > 0) tokens.Add(path);
|
||||
}
|
||||
return tokens;
|
||||
}
|
||||
}
|
||||
294
src/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms/Part9StateMachine.cs
Normal file
294
src/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms/Part9StateMachine.cs
Normal file
@@ -0,0 +1,294 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
/// <summary>
|
||||
/// Pure functions for OPC UA Part 9 alarm-condition state transitions. Input = the
|
||||
/// current <see cref="AlarmConditionState"/> + the event; output = the new state +
|
||||
/// optional emission hint. The engine calls these; persistence happens around them.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// No instance state, no I/O, no mutation of the input record. Every transition
|
||||
/// returns a fresh record. Makes the state machine trivially unit-testable —
|
||||
/// tests assert on (input, event) -> (output) without standing anything else up.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Two invariants the machine enforces:
|
||||
/// (1) Disabled alarms never transition ActiveState / AckedState / ConfirmedState
|
||||
/// — all predicate evaluations while disabled produce a no-op result and a
|
||||
/// diagnostic log line. Re-enable restores normal flow with ActiveState
|
||||
/// re-derived from the next predicate evaluation.
|
||||
/// (2) Shelved alarms (OneShot / Timed) don't fire active transitions to
|
||||
/// subscribers, but the state record still advances so that when shelving
|
||||
/// expires the ActiveState reflects current reality. OneShot expires on the
|
||||
/// next clear; Timed expires at <see cref="ShelvingState.UnshelveAtUtc"/>.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public static class Part9StateMachine
|
||||
{
|
||||
/// <summary>
|
||||
/// Apply a predicate re-evaluation result. Handles activation, clearing,
|
||||
/// branch-stack increment when a new active arrives while prior active is
|
||||
/// still un-acked, and shelving suppression.
|
||||
/// </summary>
|
||||
public static TransitionResult ApplyPredicate(
|
||||
AlarmConditionState current,
|
||||
bool predicateTrue,
|
||||
DateTime nowUtc)
|
||||
{
|
||||
if (current.Enabled == AlarmEnabledState.Disabled)
|
||||
return TransitionResult.NoOp(current, "disabled — predicate result ignored");
|
||||
|
||||
// Expire timed shelving if the configured clock has passed.
|
||||
var shelving = MaybeExpireShelving(current.Shelving, nowUtc);
|
||||
var stateWithShelving = current with { Shelving = shelving };
|
||||
|
||||
// Shelved alarms still update state but skip event emission.
|
||||
var shelved = shelving.Kind != ShelvingKind.Unshelved;
|
||||
|
||||
if (predicateTrue && current.Active == AlarmActiveState.Inactive)
|
||||
{
|
||||
// Inactive -> Active transition.
|
||||
// OneShotShelving is consumed on the NEXT clear, not activation — so we
|
||||
// still suppress this transition's emission.
|
||||
var next = stateWithShelving with
|
||||
{
|
||||
Active = AlarmActiveState.Active,
|
||||
Acked = AlarmAckedState.Unacknowledged,
|
||||
Confirmed = AlarmConfirmedState.Unconfirmed,
|
||||
LastActiveUtc = nowUtc,
|
||||
LastTransitionUtc = nowUtc,
|
||||
};
|
||||
return new TransitionResult(next, shelved ? EmissionKind.Suppressed : EmissionKind.Activated);
|
||||
}
|
||||
|
||||
if (!predicateTrue && current.Active == AlarmActiveState.Active)
|
||||
{
|
||||
// Active -> Inactive transition.
|
||||
var next = stateWithShelving with
|
||||
{
|
||||
Active = AlarmActiveState.Inactive,
|
||||
LastClearedUtc = nowUtc,
|
||||
LastTransitionUtc = nowUtc,
|
||||
// OneShotShelving expires on clear — resetting here so the next
|
||||
// activation fires normally.
|
||||
Shelving = shelving.Kind == ShelvingKind.OneShot
|
||||
? ShelvingState.Unshelved
|
||||
: shelving,
|
||||
};
|
||||
return new TransitionResult(next, shelved ? EmissionKind.Suppressed : EmissionKind.Cleared);
|
||||
}
|
||||
|
||||
// Predicate matches current Active — no state change beyond possible shelving
|
||||
// expiry.
|
||||
return new TransitionResult(stateWithShelving, EmissionKind.None);
|
||||
}
|
||||
|
||||
/// <summary>Operator acknowledges the currently-active transition.</summary>
|
||||
public static TransitionResult ApplyAcknowledge(
|
||||
AlarmConditionState current,
|
||||
string user,
|
||||
string? comment,
|
||||
DateTime nowUtc)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(user))
|
||||
throw new ArgumentException("User identity required for audit.", nameof(user));
|
||||
|
||||
if (current.Acked == AlarmAckedState.Acknowledged)
|
||||
return TransitionResult.NoOp(current, "already acknowledged");
|
||||
|
||||
var audit = AppendComment(current.Comments, nowUtc, user, "Acknowledge", comment);
|
||||
var next = current with
|
||||
{
|
||||
Acked = AlarmAckedState.Acknowledged,
|
||||
LastAckUtc = nowUtc,
|
||||
LastAckUser = user,
|
||||
LastAckComment = comment,
|
||||
LastTransitionUtc = nowUtc,
|
||||
Comments = audit,
|
||||
};
|
||||
return new TransitionResult(next, EmissionKind.Acknowledged);
|
||||
}
|
||||
|
||||
/// <summary>Operator confirms the cleared transition. Part 9 requires confirm after clear for retain-flag alarms.</summary>
|
||||
public static TransitionResult ApplyConfirm(
|
||||
AlarmConditionState current,
|
||||
string user,
|
||||
string? comment,
|
||||
DateTime nowUtc)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(user))
|
||||
throw new ArgumentException("User identity required for audit.", nameof(user));
|
||||
|
||||
if (current.Confirmed == AlarmConfirmedState.Confirmed)
|
||||
return TransitionResult.NoOp(current, "already confirmed");
|
||||
|
||||
var audit = AppendComment(current.Comments, nowUtc, user, "Confirm", comment);
|
||||
var next = current with
|
||||
{
|
||||
Confirmed = AlarmConfirmedState.Confirmed,
|
||||
LastConfirmUtc = nowUtc,
|
||||
LastConfirmUser = user,
|
||||
LastConfirmComment = comment,
|
||||
LastTransitionUtc = nowUtc,
|
||||
Comments = audit,
|
||||
};
|
||||
return new TransitionResult(next, EmissionKind.Confirmed);
|
||||
}
|
||||
|
||||
public static TransitionResult ApplyOneShotShelve(
|
||||
AlarmConditionState current, string user, DateTime nowUtc)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(user)) throw new ArgumentException("User required.", nameof(user));
|
||||
if (current.Shelving.Kind == ShelvingKind.OneShot)
|
||||
return TransitionResult.NoOp(current, "already one-shot shelved");
|
||||
|
||||
var audit = AppendComment(current.Comments, nowUtc, user, "ShelveOneShot", null);
|
||||
var next = current with
|
||||
{
|
||||
Shelving = new ShelvingState(ShelvingKind.OneShot, null),
|
||||
LastTransitionUtc = nowUtc,
|
||||
Comments = audit,
|
||||
};
|
||||
return new TransitionResult(next, EmissionKind.Shelved);
|
||||
}
|
||||
|
||||
public static TransitionResult ApplyTimedShelve(
|
||||
AlarmConditionState current, string user, DateTime unshelveAtUtc, DateTime nowUtc)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(user)) throw new ArgumentException("User required.", nameof(user));
|
||||
if (unshelveAtUtc <= nowUtc)
|
||||
throw new ArgumentOutOfRangeException(nameof(unshelveAtUtc), "Unshelve time must be in the future.");
|
||||
|
||||
var audit = AppendComment(current.Comments, nowUtc, user, "ShelveTimed",
|
||||
$"UnshelveAtUtc={unshelveAtUtc:O}");
|
||||
var next = current with
|
||||
{
|
||||
Shelving = new ShelvingState(ShelvingKind.Timed, unshelveAtUtc),
|
||||
LastTransitionUtc = nowUtc,
|
||||
Comments = audit,
|
||||
};
|
||||
return new TransitionResult(next, EmissionKind.Shelved);
|
||||
}
|
||||
|
||||
public static TransitionResult ApplyUnshelve(AlarmConditionState current, string user, DateTime nowUtc)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(user)) throw new ArgumentException("User required.", nameof(user));
|
||||
if (current.Shelving.Kind == ShelvingKind.Unshelved)
|
||||
return TransitionResult.NoOp(current, "not shelved");
|
||||
|
||||
var audit = AppendComment(current.Comments, nowUtc, user, "Unshelve", null);
|
||||
var next = current with
|
||||
{
|
||||
Shelving = ShelvingState.Unshelved,
|
||||
LastTransitionUtc = nowUtc,
|
||||
Comments = audit,
|
||||
};
|
||||
return new TransitionResult(next, EmissionKind.Unshelved);
|
||||
}
|
||||
|
||||
public static TransitionResult ApplyEnable(AlarmConditionState current, string user, DateTime nowUtc)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(user)) throw new ArgumentException("User required.", nameof(user));
|
||||
if (current.Enabled == AlarmEnabledState.Enabled)
|
||||
return TransitionResult.NoOp(current, "already enabled");
|
||||
|
||||
var audit = AppendComment(current.Comments, nowUtc, user, "Enable", null);
|
||||
var next = current with
|
||||
{
|
||||
Enabled = AlarmEnabledState.Enabled,
|
||||
LastTransitionUtc = nowUtc,
|
||||
Comments = audit,
|
||||
};
|
||||
return new TransitionResult(next, EmissionKind.Enabled);
|
||||
}
|
||||
|
||||
public static TransitionResult ApplyDisable(AlarmConditionState current, string user, DateTime nowUtc)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(user)) throw new ArgumentException("User required.", nameof(user));
|
||||
if (current.Enabled == AlarmEnabledState.Disabled)
|
||||
return TransitionResult.NoOp(current, "already disabled");
|
||||
|
||||
var audit = AppendComment(current.Comments, nowUtc, user, "Disable", null);
|
||||
var next = current with
|
||||
{
|
||||
Enabled = AlarmEnabledState.Disabled,
|
||||
LastTransitionUtc = nowUtc,
|
||||
Comments = audit,
|
||||
};
|
||||
return new TransitionResult(next, EmissionKind.Disabled);
|
||||
}
|
||||
|
||||
public static TransitionResult ApplyAddComment(
|
||||
AlarmConditionState current, string user, string text, DateTime nowUtc)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(user)) throw new ArgumentException("User required.", nameof(user));
|
||||
if (string.IsNullOrWhiteSpace(text)) throw new ArgumentException("Comment text required.", nameof(text));
|
||||
|
||||
var audit = AppendComment(current.Comments, nowUtc, user, "AddComment", text);
|
||||
var next = current with { Comments = audit };
|
||||
return new TransitionResult(next, EmissionKind.CommentAdded);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Re-evaluate whether a currently timed-shelved alarm has expired. Returns
|
||||
/// the (possibly unshelved) state + emission hint so the engine knows to
|
||||
/// publish an Unshelved event at the right moment.
|
||||
/// </summary>
|
||||
public static TransitionResult ApplyShelvingCheck(AlarmConditionState current, DateTime nowUtc)
|
||||
{
|
||||
if (current.Shelving.Kind != ShelvingKind.Timed) return TransitionResult.None(current);
|
||||
if (current.Shelving.UnshelveAtUtc is DateTime t && nowUtc >= t)
|
||||
{
|
||||
var audit = AppendComment(current.Comments, nowUtc, "system", "AutoUnshelve",
|
||||
$"Timed shelving expired at {nowUtc:O}");
|
||||
var next = current with
|
||||
{
|
||||
Shelving = ShelvingState.Unshelved,
|
||||
LastTransitionUtc = nowUtc,
|
||||
Comments = audit,
|
||||
};
|
||||
return new TransitionResult(next, EmissionKind.Unshelved);
|
||||
}
|
||||
return TransitionResult.None(current);
|
||||
}
|
||||
|
||||
private static ShelvingState MaybeExpireShelving(ShelvingState s, DateTime nowUtc)
|
||||
{
|
||||
if (s.Kind != ShelvingKind.Timed) return s;
|
||||
return s.UnshelveAtUtc is DateTime t && nowUtc >= t ? ShelvingState.Unshelved : s;
|
||||
}
|
||||
|
||||
private static IReadOnlyList<AlarmComment> AppendComment(
|
||||
IReadOnlyList<AlarmComment> existing, DateTime ts, string user, string kind, string? text)
|
||||
{
|
||||
var list = new List<AlarmComment>(existing.Count + 1);
|
||||
list.AddRange(existing);
|
||||
list.Add(new AlarmComment(ts, user, kind, text ?? string.Empty));
|
||||
return list;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Result of a state-machine operation — new state + what to emit (if anything).</summary>
|
||||
public sealed record TransitionResult(AlarmConditionState State, EmissionKind Emission)
|
||||
{
|
||||
public static TransitionResult None(AlarmConditionState state) => new(state, EmissionKind.None);
|
||||
public static TransitionResult NoOp(AlarmConditionState state, string reason) => new(state, EmissionKind.None);
|
||||
}
|
||||
|
||||
/// <summary>What kind of event, if any, the engine should emit after a transition.</summary>
|
||||
public enum EmissionKind
|
||||
{
|
||||
/// <summary>State did not change meaningfully — no event to emit.</summary>
|
||||
None,
|
||||
/// <summary>Predicate transitioned to true while shelving was suppressing events.</summary>
|
||||
Suppressed,
|
||||
Activated,
|
||||
Cleared,
|
||||
Acknowledged,
|
||||
Confirmed,
|
||||
Shelved,
|
||||
Unshelved,
|
||||
Enabled,
|
||||
Disabled,
|
||||
CommentAdded,
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
/// <summary>
|
||||
/// Operator-authored scripted-alarm configuration. Phase 7 Stream E (config DB schema)
|
||||
/// materializes these from the <c>ScriptedAlarm</c> + <c>Script</c> tables on publish.
|
||||
/// </summary>
|
||||
/// <param name="AlarmId">
|
||||
/// Stable identity for the alarm — used as the OPC UA ConditionId + the key in the
|
||||
/// state store. Should be globally unique within the cluster; convention is
|
||||
/// <c>{EquipmentPath}::{AlarmName}</c>.
|
||||
/// </param>
|
||||
/// <param name="EquipmentPath">
|
||||
/// UNS path of the Equipment node the alarm hangs under. Alarm browse lives here;
|
||||
/// ACL binding inherits this equipment's scope per Phase 6.2.
|
||||
/// </param>
|
||||
/// <param name="AlarmName">Human-readable alarm name — used in the browse tree + Admin UI.</param>
|
||||
/// <param name="Kind">Concrete OPC UA Part 9 subtype the alarm materializes as.</param>
|
||||
/// <param name="Severity">Static severity per Phase 7 plan decision #13; not currently computed by the predicate.</param>
|
||||
/// <param name="MessageTemplate">
|
||||
/// Message text with <c>{TagPath}</c> tokens resolved at event-emission time per
|
||||
/// Phase 7 plan decision #13. Unresolvable tokens emit <c>{?}</c> + a structured
|
||||
/// error so operators can spot stale references.
|
||||
/// </param>
|
||||
/// <param name="PredicateScriptSource">
|
||||
/// Roslyn C# script returning <c>bool</c>. <c>true</c> = alarm condition currently holds (active);
|
||||
/// <c>false</c> = condition has cleared. Same sandbox rules as virtual tags per Phase 7 decision #6.
|
||||
/// </param>
|
||||
/// <param name="HistorizeToAveva">
|
||||
/// When true, every transition emission of this alarm flows to the Historian alarm
|
||||
/// sink (Stream D). Defaults to true — plant alarm history is usually the
|
||||
/// operator's primary diagnostic. Galaxy-native alarms default false since Galaxy
|
||||
/// historises them directly.
|
||||
/// </param>
|
||||
/// <param name="Retain">
|
||||
/// Part 9 retain flag — when true, the condition node remains visible after the
|
||||
/// predicate clears as long as it has un-acknowledged or un-confirmed transitions.
|
||||
/// Default true.
|
||||
/// </param>
|
||||
public sealed record ScriptedAlarmDefinition(
|
||||
string AlarmId,
|
||||
string EquipmentPath,
|
||||
string AlarmName,
|
||||
AlarmKind Kind,
|
||||
AlarmSeverity Severity,
|
||||
string MessageTemplate,
|
||||
string PredicateScriptSource,
|
||||
bool HistorizeToAveva = true,
|
||||
bool Retain = true);
|
||||
429
src/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms/ScriptedAlarmEngine.cs
Normal file
429
src/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms/ScriptedAlarmEngine.cs
Normal file
@@ -0,0 +1,429 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Serilog;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
/// <summary>
|
||||
/// Phase 7 scripted-alarm orchestrator. Compiles every configured alarm's predicate
|
||||
/// against the Stream A sandbox, subscribes to the referenced upstream tags,
|
||||
/// re-evaluates the predicate on every input change + on a shelving-check timer,
|
||||
/// applies the resulting transition through <see cref="Part9StateMachine"/>,
|
||||
/// persists state via <see cref="IAlarmStateStore"/>, and emits the resulting events
|
||||
/// through <see cref="ScriptedAlarmSource"/> (which wires into the existing
|
||||
/// <c>IAlarmSource</c> fan-out).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Scripted alarms are leaves in the evaluation DAG — no alarm's state drives
|
||||
/// another alarm's predicate. The engine maintains only an inverse index from
|
||||
/// upstream tag path → alarms referencing it; no topological sort needed
|
||||
/// (unlike the virtual-tag engine).
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Evaluation errors (script throws, timeout, coercion fail) surface as
|
||||
/// structured errors in the dedicated scripts-*.log sink plus a WARN companion
|
||||
/// in the main log. The alarm's ActiveState stays at its prior value — the
|
||||
/// engine does NOT invent a clear transition just because the predicate broke.
|
||||
/// Operators investigating a broken predicate shouldn't see a phantom
|
||||
/// clear-event preceding the failure.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class ScriptedAlarmEngine : IDisposable
|
||||
{
|
||||
private readonly ITagUpstreamSource _upstream;
|
||||
private readonly IAlarmStateStore _store;
|
||||
private readonly ScriptLoggerFactory _loggerFactory;
|
||||
private readonly ILogger _engineLogger;
|
||||
private readonly Func<DateTime> _clock;
|
||||
private readonly TimeSpan _scriptTimeout;
|
||||
|
||||
private readonly Dictionary<string, AlarmState> _alarms = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, DataValueSnapshot> _valueCache
|
||||
= new(StringComparer.Ordinal);
|
||||
private readonly Dictionary<string, HashSet<string>> _alarmsReferencing
|
||||
= new(StringComparer.Ordinal); // tag path -> alarm ids
|
||||
|
||||
private readonly List<IDisposable> _upstreamSubscriptions = [];
|
||||
private readonly SemaphoreSlim _evalGate = new(1, 1);
|
||||
private Timer? _shelvingTimer;
|
||||
private bool _loaded;
|
||||
private bool _disposed;
|
||||
|
||||
public ScriptedAlarmEngine(
|
||||
ITagUpstreamSource upstream,
|
||||
IAlarmStateStore store,
|
||||
ScriptLoggerFactory loggerFactory,
|
||||
ILogger engineLogger,
|
||||
Func<DateTime>? clock = null,
|
||||
TimeSpan? scriptTimeout = null)
|
||||
{
|
||||
_upstream = upstream ?? throw new ArgumentNullException(nameof(upstream));
|
||||
_store = store ?? throw new ArgumentNullException(nameof(store));
|
||||
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
|
||||
_engineLogger = engineLogger ?? throw new ArgumentNullException(nameof(engineLogger));
|
||||
_clock = clock ?? (() => DateTime.UtcNow);
|
||||
_scriptTimeout = scriptTimeout ?? TimedScriptEvaluator<AlarmPredicateContext, bool>.DefaultTimeout;
|
||||
}
|
||||
|
||||
/// <summary>Raised for every emission the Part9StateMachine produces that the engine should publish.</summary>
|
||||
public event EventHandler<ScriptedAlarmEvent>? OnEvent;
|
||||
|
||||
public IReadOnlyCollection<string> LoadedAlarmIds => _alarms.Keys;
|
||||
|
||||
/// <summary>
|
||||
/// Load a batch of alarm definitions. Compiles every predicate, aggregates any
|
||||
/// compile failures into one <see cref="InvalidOperationException"/>, subscribes
|
||||
/// to upstream input tags, seeds the value cache, loads persisted state from
|
||||
/// the store (falling back to Fresh for first-load alarms), and recomputes
|
||||
/// ActiveState per Phase 7 plan decision #14 (startup recovery).
|
||||
/// </summary>
|
||||
public async Task LoadAsync(IReadOnlyList<ScriptedAlarmDefinition> definitions, CancellationToken ct)
|
||||
{
|
||||
if (_disposed) throw new ObjectDisposedException(nameof(ScriptedAlarmEngine));
|
||||
if (definitions is null) throw new ArgumentNullException(nameof(definitions));
|
||||
|
||||
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
UnsubscribeFromUpstream();
|
||||
_alarms.Clear();
|
||||
_alarmsReferencing.Clear();
|
||||
|
||||
var compileFailures = new List<string>();
|
||||
foreach (var def in definitions)
|
||||
{
|
||||
try
|
||||
{
|
||||
var extraction = DependencyExtractor.Extract(def.PredicateScriptSource);
|
||||
if (!extraction.IsValid)
|
||||
{
|
||||
var joined = string.Join("; ", extraction.Rejections.Select(r => r.Message));
|
||||
compileFailures.Add($"{def.AlarmId}: dependency extraction rejected — {joined}");
|
||||
continue;
|
||||
}
|
||||
|
||||
var evaluator = ScriptEvaluator<AlarmPredicateContext, bool>.Compile(def.PredicateScriptSource);
|
||||
var timed = new TimedScriptEvaluator<AlarmPredicateContext, bool>(evaluator, _scriptTimeout);
|
||||
var logger = _loggerFactory.Create(def.AlarmId);
|
||||
|
||||
var templateTokens = MessageTemplate.ExtractTokenPaths(def.MessageTemplate);
|
||||
var allInputs = new HashSet<string>(extraction.Reads, StringComparer.Ordinal);
|
||||
foreach (var t in templateTokens) allInputs.Add(t);
|
||||
|
||||
_alarms[def.AlarmId] = new AlarmState(def, timed, extraction.Reads, templateTokens, logger,
|
||||
AlarmConditionState.Fresh(def.AlarmId, _clock()));
|
||||
|
||||
foreach (var path in allInputs)
|
||||
{
|
||||
if (!_alarmsReferencing.TryGetValue(path, out var set))
|
||||
_alarmsReferencing[path] = set = new HashSet<string>(StringComparer.Ordinal);
|
||||
set.Add(def.AlarmId);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
compileFailures.Add($"{def.AlarmId}: {ex.Message}");
|
||||
}
|
||||
}
|
||||
|
||||
if (compileFailures.Count > 0)
|
||||
{
|
||||
throw new InvalidOperationException(
|
||||
$"ScriptedAlarmEngine load failed. {compileFailures.Count} alarm(s) did not compile:\n "
|
||||
+ string.Join("\n ", compileFailures));
|
||||
}
|
||||
|
||||
// Seed the value cache with current upstream values + subscribe for changes.
|
||||
foreach (var path in _alarmsReferencing.Keys)
|
||||
{
|
||||
_valueCache[path] = _upstream.ReadTag(path);
|
||||
_upstreamSubscriptions.Add(_upstream.SubscribeTag(path, OnUpstreamChange));
|
||||
}
|
||||
|
||||
// Restore persisted state, falling back to Fresh where nothing was saved,
|
||||
// then re-derive ActiveState from the current predicate per decision #14.
|
||||
foreach (var (alarmId, state) in _alarms)
|
||||
{
|
||||
var persisted = await _store.LoadAsync(alarmId, ct).ConfigureAwait(false);
|
||||
var seed = persisted ?? state.Condition;
|
||||
var afterPredicate = await EvaluatePredicateToStateAsync(state, seed, nowUtc: _clock(), ct)
|
||||
.ConfigureAwait(false);
|
||||
_alarms[alarmId] = state with { Condition = afterPredicate };
|
||||
await _store.SaveAsync(afterPredicate, ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
_loaded = true;
|
||||
_engineLogger.Information("ScriptedAlarmEngine loaded {Count} alarm(s)", _alarms.Count);
|
||||
|
||||
// Start the shelving-check timer — ticks every 5s, expires any timed shelves
|
||||
// that have passed their UnshelveAtUtc.
|
||||
_shelvingTimer = new Timer(_ => RunShelvingCheck(),
|
||||
null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
|
||||
}
|
||||
finally
|
||||
{
|
||||
_evalGate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Current persisted state for <paramref name="alarmId"/>. Returns null for
|
||||
/// unknown alarm. Mainly used for diagnostics + the Admin UI status page.
|
||||
/// </summary>
|
||||
public AlarmConditionState? GetState(string alarmId)
|
||||
=> _alarms.TryGetValue(alarmId, out var s) ? s.Condition : null;
|
||||
|
||||
public IReadOnlyCollection<AlarmConditionState> GetAllStates()
|
||||
=> _alarms.Values.Select(a => a.Condition).ToArray();
|
||||
|
||||
public Task AcknowledgeAsync(string alarmId, string user, string? comment, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyAcknowledge(cur, user, comment, _clock()));
|
||||
|
||||
public Task ConfirmAsync(string alarmId, string user, string? comment, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyConfirm(cur, user, comment, _clock()));
|
||||
|
||||
public Task OneShotShelveAsync(string alarmId, string user, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyOneShotShelve(cur, user, _clock()));
|
||||
|
||||
public Task TimedShelveAsync(string alarmId, string user, DateTime unshelveAtUtc, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyTimedShelve(cur, user, unshelveAtUtc, _clock()));
|
||||
|
||||
public Task UnshelveAsync(string alarmId, string user, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyUnshelve(cur, user, _clock()));
|
||||
|
||||
public Task EnableAsync(string alarmId, string user, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyEnable(cur, user, _clock()));
|
||||
|
||||
public Task DisableAsync(string alarmId, string user, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyDisable(cur, user, _clock()));
|
||||
|
||||
public Task AddCommentAsync(string alarmId, string user, string text, CancellationToken ct)
|
||||
=> ApplyAsync(alarmId, ct, cur => Part9StateMachine.ApplyAddComment(cur, user, text, _clock()));
|
||||
|
||||
private async Task ApplyAsync(string alarmId, CancellationToken ct, Func<AlarmConditionState, TransitionResult> op)
|
||||
{
|
||||
EnsureLoaded();
|
||||
if (!_alarms.TryGetValue(alarmId, out var state))
|
||||
throw new ArgumentException($"Unknown alarm {alarmId}", nameof(alarmId));
|
||||
|
||||
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
var result = op(state.Condition);
|
||||
_alarms[alarmId] = state with { Condition = result.State };
|
||||
await _store.SaveAsync(result.State, ct).ConfigureAwait(false);
|
||||
if (result.Emission != EmissionKind.None) EmitEvent(state, result.State, result.Emission);
|
||||
}
|
||||
finally { _evalGate.Release(); }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Upstream-change callback. Updates the value cache + enqueues predicate
|
||||
/// re-evaluation for every alarm referencing the changed path. Fire-and-forget
|
||||
/// so driver-side dispatch isn't blocked.
|
||||
/// </summary>
|
||||
internal void OnUpstreamChange(string path, DataValueSnapshot value)
|
||||
{
|
||||
_valueCache[path] = value;
|
||||
if (_alarmsReferencing.TryGetValue(path, out var alarmIds))
|
||||
{
|
||||
_ = ReevaluateAsync(alarmIds.ToArray(), CancellationToken.None);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReevaluateAsync(IReadOnlyList<string> alarmIds, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
foreach (var id in alarmIds)
|
||||
{
|
||||
if (!_alarms.TryGetValue(id, out var state)) continue;
|
||||
var newState = await EvaluatePredicateToStateAsync(
|
||||
state, state.Condition, _clock(), ct).ConfigureAwait(false);
|
||||
if (!ReferenceEquals(newState, state.Condition))
|
||||
{
|
||||
_alarms[id] = state with { Condition = newState };
|
||||
await _store.SaveAsync(newState, ct).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally { _evalGate.Release(); }
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_engineLogger.Error(ex, "ScriptedAlarmEngine reevaluate failed");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Evaluate the predicate + apply the resulting state-machine transition.
|
||||
/// Returns the new condition state. Emits the appropriate event if the
|
||||
/// transition produces one.
|
||||
/// </summary>
|
||||
private async Task<AlarmConditionState> EvaluatePredicateToStateAsync(
|
||||
AlarmState state, AlarmConditionState seed, DateTime nowUtc, CancellationToken ct)
|
||||
{
|
||||
var inputs = BuildReadCache(state.Inputs);
|
||||
var context = new AlarmPredicateContext(inputs, state.Logger, _clock);
|
||||
|
||||
bool predicateTrue;
|
||||
try
|
||||
{
|
||||
predicateTrue = await state.Evaluator.RunAsync(context, ct).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (ScriptTimeoutException tex)
|
||||
{
|
||||
state.Logger.Warning("Alarm predicate timed out after {Timeout} — state unchanged", tex.Timeout);
|
||||
return seed;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
state.Logger.Error(ex, "Alarm predicate threw — state unchanged");
|
||||
return seed;
|
||||
}
|
||||
|
||||
var result = Part9StateMachine.ApplyPredicate(seed, predicateTrue, nowUtc);
|
||||
if (result.Emission != EmissionKind.None)
|
||||
EmitEvent(state, result.State, result.Emission);
|
||||
return result.State;
|
||||
}
|
||||
|
||||
private IReadOnlyDictionary<string, DataValueSnapshot> BuildReadCache(IReadOnlySet<string> inputs)
|
||||
{
|
||||
var d = new Dictionary<string, DataValueSnapshot>(StringComparer.Ordinal);
|
||||
foreach (var p in inputs)
|
||||
d[p] = _valueCache.TryGetValue(p, out var v) ? v : _upstream.ReadTag(p);
|
||||
return d;
|
||||
}
|
||||
|
||||
private void EmitEvent(AlarmState state, AlarmConditionState condition, EmissionKind kind)
|
||||
{
|
||||
// Suppressed kind means shelving ate the emission — we don't fire for subscribers
|
||||
// but the state record still advanced so startup recovery reflects reality.
|
||||
if (kind == EmissionKind.Suppressed || kind == EmissionKind.None) return;
|
||||
|
||||
var message = MessageTemplate.Resolve(state.Definition.MessageTemplate, TryLookup);
|
||||
var evt = new ScriptedAlarmEvent(
|
||||
AlarmId: state.Definition.AlarmId,
|
||||
EquipmentPath: state.Definition.EquipmentPath,
|
||||
AlarmName: state.Definition.AlarmName,
|
||||
Kind: state.Definition.Kind,
|
||||
Severity: state.Definition.Severity,
|
||||
Message: message,
|
||||
Condition: condition,
|
||||
Emission: kind,
|
||||
TimestampUtc: _clock());
|
||||
try { OnEvent?.Invoke(this, evt); }
|
||||
catch (Exception ex)
|
||||
{
|
||||
_engineLogger.Warning(ex, "ScriptedAlarmEngine OnEvent subscriber threw for {AlarmId}", state.Definition.AlarmId);
|
||||
}
|
||||
}
|
||||
|
||||
private DataValueSnapshot? TryLookup(string path)
|
||||
=> _valueCache.TryGetValue(path, out var v) ? v : null;
|
||||
|
||||
private void RunShelvingCheck()
|
||||
{
|
||||
if (_disposed) return;
|
||||
var ids = _alarms.Keys.ToArray();
|
||||
_ = ShelvingCheckAsync(ids, CancellationToken.None);
|
||||
}
|
||||
|
||||
private async Task ShelvingCheckAsync(IReadOnlyList<string> alarmIds, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _evalGate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
var now = _clock();
|
||||
foreach (var id in alarmIds)
|
||||
{
|
||||
if (!_alarms.TryGetValue(id, out var state)) continue;
|
||||
var result = Part9StateMachine.ApplyShelvingCheck(state.Condition, now);
|
||||
if (!ReferenceEquals(result.State, state.Condition))
|
||||
{
|
||||
_alarms[id] = state with { Condition = result.State };
|
||||
await _store.SaveAsync(result.State, ct).ConfigureAwait(false);
|
||||
if (result.Emission != EmissionKind.None)
|
||||
EmitEvent(state, result.State, result.Emission);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally { _evalGate.Release(); }
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_engineLogger.Warning(ex, "ScriptedAlarmEngine shelving-check failed");
|
||||
}
|
||||
}
|
||||
|
||||
private void UnsubscribeFromUpstream()
|
||||
{
|
||||
foreach (var s in _upstreamSubscriptions)
|
||||
{
|
||||
try { s.Dispose(); } catch { }
|
||||
}
|
||||
_upstreamSubscriptions.Clear();
|
||||
}
|
||||
|
||||
private void EnsureLoaded()
|
||||
{
|
||||
if (!_loaded) throw new InvalidOperationException(
|
||||
"ScriptedAlarmEngine not loaded. Call LoadAsync first.");
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
_shelvingTimer?.Dispose();
|
||||
UnsubscribeFromUpstream();
|
||||
_alarms.Clear();
|
||||
_alarmsReferencing.Clear();
|
||||
}
|
||||
|
||||
private sealed record AlarmState(
|
||||
ScriptedAlarmDefinition Definition,
|
||||
TimedScriptEvaluator<AlarmPredicateContext, bool> Evaluator,
|
||||
IReadOnlySet<string> Inputs,
|
||||
IReadOnlyList<string> TemplateTokens,
|
||||
ILogger Logger,
|
||||
AlarmConditionState Condition);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// One alarm emission the engine pushed to subscribers. Carries everything
|
||||
/// downstream consumers (OPC UA alarm-source adapter + historian sink) need to
|
||||
/// publish the event without re-querying the engine.
|
||||
/// </summary>
|
||||
public sealed record ScriptedAlarmEvent(
|
||||
string AlarmId,
|
||||
string EquipmentPath,
|
||||
string AlarmName,
|
||||
AlarmKind Kind,
|
||||
AlarmSeverity Severity,
|
||||
string Message,
|
||||
AlarmConditionState Condition,
|
||||
EmissionKind Emission,
|
||||
DateTime TimestampUtc);
|
||||
|
||||
/// <summary>
|
||||
/// Upstream source abstraction — intentionally identical shape to the virtual-tag
|
||||
/// engine's so Stream G can compose them behind one driver bridge.
|
||||
/// </summary>
|
||||
public interface ITagUpstreamSource
|
||||
{
|
||||
DataValueSnapshot ReadTag(string path);
|
||||
IDisposable SubscribeTag(string path, Action<string, DataValueSnapshot> observer);
|
||||
}
|
||||
122
src/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms/ScriptedAlarmSource.cs
Normal file
122
src/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms/ScriptedAlarmSource.cs
Normal file
@@ -0,0 +1,122 @@
|
||||
using System.Collections.Concurrent;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
/// <summary>
|
||||
/// Adapter that exposes <see cref="ScriptedAlarmEngine"/> through the driver-agnostic
|
||||
/// <see cref="IAlarmSource"/> surface. The existing Phase 6.1 <c>AlarmTracker</c>
|
||||
/// composition fan-out consumes this alongside Galaxy / AB CIP / FOCAS alarm
|
||||
/// sources — no per-source branching in the fan-out.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Per Phase 7 plan Stream C.6, ack / confirm / shelve / unshelve are OPC UA
|
||||
/// method calls per-condition. This adapter implements <see cref="AcknowledgeAsync"/>
|
||||
/// from the base interface; the richer Part 9 methods (Confirm / Shelve /
|
||||
/// Unshelve / AddComment) live directly on the engine, invoked from OPC UA
|
||||
/// method handlers wired up in Stream G.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// SubscribeAlarmsAsync takes a list of source-node-id filters (typically an
|
||||
/// Equipment path prefix). When the list is empty every alarm matches. The
|
||||
/// adapter doesn't maintain per-subscription state beyond the filter set — it
|
||||
/// checks each emission against every live subscription.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class ScriptedAlarmSource : IAlarmSource, IDisposable
|
||||
{
|
||||
private readonly ScriptedAlarmEngine _engine;
|
||||
private readonly ConcurrentDictionary<string, Subscription> _subscriptions
|
||||
= new(StringComparer.Ordinal);
|
||||
private bool _disposed;
|
||||
|
||||
public ScriptedAlarmSource(ScriptedAlarmEngine engine)
|
||||
{
|
||||
_engine = engine ?? throw new ArgumentNullException(nameof(engine));
|
||||
_engine.OnEvent += OnEngineEvent;
|
||||
}
|
||||
|
||||
public event EventHandler<AlarmEventArgs>? OnAlarmEvent;
|
||||
|
||||
public Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
|
||||
IReadOnlyList<string> sourceNodeIds, CancellationToken cancellationToken)
|
||||
{
|
||||
if (sourceNodeIds is null) throw new ArgumentNullException(nameof(sourceNodeIds));
|
||||
var handle = new SubscriptionHandle(Guid.NewGuid().ToString("N"));
|
||||
_subscriptions[handle.DiagnosticId] = new Subscription(handle,
|
||||
new HashSet<string>(sourceNodeIds, StringComparer.Ordinal));
|
||||
return Task.FromResult<IAlarmSubscriptionHandle>(handle);
|
||||
}
|
||||
|
||||
public Task UnsubscribeAlarmsAsync(IAlarmSubscriptionHandle handle, CancellationToken cancellationToken)
|
||||
{
|
||||
if (handle is null) throw new ArgumentNullException(nameof(handle));
|
||||
_subscriptions.TryRemove(handle.DiagnosticId, out _);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public async Task AcknowledgeAsync(
|
||||
IReadOnlyList<AlarmAcknowledgeRequest> acknowledgements, CancellationToken cancellationToken)
|
||||
{
|
||||
if (acknowledgements is null) throw new ArgumentNullException(nameof(acknowledgements));
|
||||
foreach (var a in acknowledgements)
|
||||
{
|
||||
// The base interface doesn't carry a user identity — Stream G provides the
|
||||
// authenticated principal at the OPC UA dispatch layer + proxies through
|
||||
// the engine's richer AcknowledgeAsync. Here we default to "opcua-client"
|
||||
// so callers using the raw IAlarmSource still produce an audit entry.
|
||||
await _engine.AcknowledgeAsync(a.ConditionId, "opcua-client", a.Comment, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private void OnEngineEvent(object? sender, ScriptedAlarmEvent evt)
|
||||
{
|
||||
if (_disposed) return;
|
||||
|
||||
foreach (var sub in _subscriptions.Values)
|
||||
{
|
||||
if (!Matches(sub, evt)) continue;
|
||||
var payload = new AlarmEventArgs(
|
||||
SubscriptionHandle: sub.Handle,
|
||||
SourceNodeId: evt.EquipmentPath,
|
||||
ConditionId: evt.AlarmId,
|
||||
AlarmType: evt.Kind.ToString(),
|
||||
Message: evt.Message,
|
||||
Severity: evt.Severity,
|
||||
SourceTimestampUtc: evt.TimestampUtc);
|
||||
try { OnAlarmEvent?.Invoke(this, payload); }
|
||||
catch { /* subscriber exceptions don't crash the adapter */ }
|
||||
}
|
||||
}
|
||||
|
||||
private static bool Matches(Subscription sub, ScriptedAlarmEvent evt)
|
||||
{
|
||||
if (sub.Filter.Count == 0) return true;
|
||||
// A subscription matches if any filter is a prefix of the alarm's equipment
|
||||
// path — typical use is "Enterprise/Site/Area/Line" filtering a whole line.
|
||||
foreach (var f in sub.Filter)
|
||||
{
|
||||
if (evt.EquipmentPath.Equals(f, StringComparison.Ordinal)) return true;
|
||||
if (evt.EquipmentPath.StartsWith(f + "/", StringComparison.Ordinal)) return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
_engine.OnEvent -= OnEngineEvent;
|
||||
_subscriptions.Clear();
|
||||
}
|
||||
|
||||
private sealed class SubscriptionHandle : IAlarmSubscriptionHandle
|
||||
{
|
||||
public SubscriptionHandle(string id) { DiagnosticId = id; }
|
||||
public string DiagnosticId { get; }
|
||||
}
|
||||
|
||||
private sealed record Subscription(SubscriptionHandle Handle, IReadOnlySet<string> Filter);
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<Nullable>enable</Nullable>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<LangVersion>latest</LangVersion>
|
||||
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
|
||||
<GenerateDocumentationFile>true</GenerateDocumentationFile>
|
||||
<NoWarn>$(NoWarn);CS1591</NoWarn>
|
||||
<RootNamespace>ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms</RootNamespace>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Serilog" Version="4.2.0"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.Abstractions\ZB.MOM.WW.OtOpcUa.Core.Abstractions.csproj"/>
|
||||
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.Scripting\ZB.MOM.WW.OtOpcUa.Core.Scripting.csproj"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.Tests"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@@ -60,6 +60,14 @@ public enum MessageKind : byte
|
||||
HostConnectivityStatus = 0x70,
|
||||
RuntimeStatusChange = 0x71,
|
||||
|
||||
// Phase 7 Stream D — historian alarm sink. Main server → Galaxy.Host batched
|
||||
// writes into the Aveva Historian alarm schema via the already-loaded
|
||||
// aahClientManaged DLLs. HistorianConnectivityStatus fires proactively from the
|
||||
// Host when the SDK session transitions so diagnostics flip promptly.
|
||||
HistorianAlarmEventRequest = 0x80,
|
||||
HistorianAlarmEventResponse = 0x81,
|
||||
HistorianConnectivityStatus = 0x82,
|
||||
|
||||
RecycleHostRequest = 0xF0,
|
||||
RecycleStatusResponse = 0xF1,
|
||||
|
||||
|
||||
@@ -0,0 +1,92 @@
|
||||
using System;
|
||||
using MessagePack;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
|
||||
|
||||
/// <summary>
|
||||
/// Phase 7 Stream D — IPC contracts for routing Part 9 alarm transitions from the
|
||||
/// main .NET 10 server into Galaxy.Host's already-loaded <c>aahClientManaged</c>
|
||||
/// DLLs. Reuses the Tier-C isolation + licensing pathway rather than loading 32-bit
|
||||
/// native historian code into the main server.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Batched on the wire to amortize IPC overhead — the main server's SqliteStoreAndForwardSink
|
||||
/// ships up to 100 events per request per Phase 7 plan Stream D.5.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Per-event outcomes (Ack / RetryPlease / PermanentFail) let the drain worker
|
||||
/// dead-letter malformed events without blocking neighbors in the batch.
|
||||
/// <see cref="HistorianConnectivityStatusNotification"/> fires proactively from
|
||||
/// the Host when the SDK session drops so the /hosts + /alarms/historian Admin
|
||||
/// diagnostics pages flip to red promptly instead of waiting for the next
|
||||
/// drain cycle.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
[MessagePackObject]
|
||||
public sealed class HistorianAlarmEventRequest
|
||||
{
|
||||
[Key(0)] public HistorianAlarmEventDto[] Events { get; set; } = Array.Empty<HistorianAlarmEventDto>();
|
||||
}
|
||||
|
||||
[MessagePackObject]
|
||||
public sealed class HistorianAlarmEventResponse
|
||||
{
|
||||
/// <summary>Per-event outcome, same order as the request.</summary>
|
||||
[Key(0)] public HistorianAlarmEventOutcomeDto[] Outcomes { get; set; } = Array.Empty<HistorianAlarmEventOutcomeDto>();
|
||||
}
|
||||
|
||||
/// <summary>Outcome enum — bytes on the wire so it stays compact.</summary>
|
||||
public enum HistorianAlarmEventOutcomeDto : byte
|
||||
{
|
||||
/// <summary>Successfully persisted to the historian — remove from queue.</summary>
|
||||
Ack = 0,
|
||||
/// <summary>Transient failure (historian disconnected, timeout, busy) — retry after backoff.</summary>
|
||||
RetryPlease = 1,
|
||||
/// <summary>Permanent failure (malformed, unrecoverable SDK error) — move to dead-letter.</summary>
|
||||
PermanentFail = 2,
|
||||
}
|
||||
|
||||
/// <summary>One alarm-transition payload. Fields mirror <c>Core.AlarmHistorian.AlarmHistorianEvent</c>.</summary>
|
||||
[MessagePackObject]
|
||||
public sealed class HistorianAlarmEventDto
|
||||
{
|
||||
[Key(0)] public string AlarmId { get; set; } = string.Empty;
|
||||
[Key(1)] public string EquipmentPath { get; set; } = string.Empty;
|
||||
[Key(2)] public string AlarmName { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Concrete Part 9 subtype name — "LimitAlarm" / "OffNormalAlarm" / "AlarmCondition" / "DiscreteAlarm".</summary>
|
||||
[Key(3)] public string AlarmTypeName { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Numeric severity the Host maps to the historian's priority scale.</summary>
|
||||
[Key(4)] public int Severity { get; set; }
|
||||
|
||||
/// <summary>Which transition this event represents — "Activated" / "Cleared" / "Acknowledged" / etc.</summary>
|
||||
[Key(5)] public string EventKind { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Pre-rendered message — template tokens resolved upstream.</summary>
|
||||
[Key(6)] public string Message { get; set; } = string.Empty;
|
||||
|
||||
/// <summary>Operator who triggered the transition. "system" for engine-driven events.</summary>
|
||||
[Key(7)] public string User { get; set; } = "system";
|
||||
|
||||
/// <summary>Operator-supplied free-form comment, if any.</summary>
|
||||
[Key(8)] public string? Comment { get; set; }
|
||||
|
||||
/// <summary>Source timestamp (UTC Unix milliseconds).</summary>
|
||||
[Key(9)] public long TimestampUtcUnixMs { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Proactive notification — Galaxy.Host pushes this when the historian SDK session
|
||||
/// transitions (connected / disconnected / degraded). The main server reflects this
|
||||
/// into the historian sink status so Admin UI surfaces the problem without the
|
||||
/// operator having to scrutinize drain cadence.
|
||||
/// </summary>
|
||||
[MessagePackObject]
|
||||
public sealed class HistorianConnectivityStatusNotification
|
||||
{
|
||||
[Key(0)] public string Status { get; set; } = "unknown"; // connected | disconnected | degraded
|
||||
[Key(1)] public string? Detail { get; set; }
|
||||
[Key(2)] public long ObservedAtUtcUnixMs { get; set; }
|
||||
}
|
||||
@@ -0,0 +1,286 @@
|
||||
using Serilog;
|
||||
using Serilog.Core;
|
||||
using Serilog.Events;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Verifies the durable SQLite store-and-forward queue behind the historian sink:
|
||||
/// round-trip Ack, backoff ladder on RetryPlease, dead-lettering on PermanentFail,
|
||||
/// capacity eviction, and retention-based dead-letter purge.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class SqliteStoreAndForwardSinkTests : IDisposable
|
||||
{
|
||||
private readonly string _dbPath;
|
||||
private readonly ILogger _log;
|
||||
|
||||
public SqliteStoreAndForwardSinkTests()
|
||||
{
|
||||
_dbPath = Path.Combine(Path.GetTempPath(), $"otopcua-historian-{Guid.NewGuid():N}.sqlite");
|
||||
_log = new LoggerConfiguration().MinimumLevel.Verbose().CreateLogger();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
try { if (File.Exists(_dbPath)) File.Delete(_dbPath); } catch { }
|
||||
}
|
||||
|
||||
private sealed class FakeWriter : IAlarmHistorianWriter
|
||||
{
|
||||
public Queue<HistorianWriteOutcome> NextOutcomePerEvent { get; } = new();
|
||||
public HistorianWriteOutcome DefaultOutcome { get; set; } = HistorianWriteOutcome.Ack;
|
||||
public List<IReadOnlyList<AlarmHistorianEvent>> Batches { get; } = [];
|
||||
public Exception? ThrowOnce { get; set; }
|
||||
|
||||
public Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
|
||||
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken ct)
|
||||
{
|
||||
if (ThrowOnce is not null)
|
||||
{
|
||||
var e = ThrowOnce;
|
||||
ThrowOnce = null;
|
||||
throw e;
|
||||
}
|
||||
Batches.Add(batch);
|
||||
var outcomes = new List<HistorianWriteOutcome>();
|
||||
for (var i = 0; i < batch.Count; i++)
|
||||
outcomes.Add(NextOutcomePerEvent.Count > 0 ? NextOutcomePerEvent.Dequeue() : DefaultOutcome);
|
||||
return Task.FromResult<IReadOnlyList<HistorianWriteOutcome>>(outcomes);
|
||||
}
|
||||
}
|
||||
|
||||
private static AlarmHistorianEvent Event(string alarmId, DateTime? ts = null) => new(
|
||||
AlarmId: alarmId,
|
||||
EquipmentPath: "/Site/Line1/Cell",
|
||||
AlarmName: "HighTemp",
|
||||
AlarmTypeName: "LimitAlarm",
|
||||
Severity: AlarmSeverity.High,
|
||||
EventKind: "Activated",
|
||||
Message: "temp exceeded",
|
||||
User: "system",
|
||||
Comment: null,
|
||||
TimestampUtc: ts ?? DateTime.UtcNow);
|
||||
|
||||
[Fact]
|
||||
public async Task EnqueueThenDrain_Ack_removes_row()
|
||||
{
|
||||
var writer = new FakeWriter();
|
||||
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
|
||||
|
||||
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
|
||||
sink.GetStatus().QueueDepth.ShouldBe(1);
|
||||
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
|
||||
writer.Batches.Count.ShouldBe(1);
|
||||
writer.Batches[0].Count.ShouldBe(1);
|
||||
writer.Batches[0][0].AlarmId.ShouldBe("A1");
|
||||
var status = sink.GetStatus();
|
||||
status.QueueDepth.ShouldBe(0);
|
||||
status.DeadLetterDepth.ShouldBe(0);
|
||||
status.LastSuccessUtc.ShouldNotBeNull();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Drain_with_empty_queue_is_noop()
|
||||
{
|
||||
var writer = new FakeWriter();
|
||||
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
|
||||
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
|
||||
writer.Batches.ShouldBeEmpty();
|
||||
sink.GetStatus().DrainState.ShouldBe(HistorianDrainState.Idle);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RetryPlease_bumps_backoff_and_keeps_row()
|
||||
{
|
||||
var writer = new FakeWriter();
|
||||
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.RetryPlease);
|
||||
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
|
||||
|
||||
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
|
||||
var before = sink.CurrentBackoff;
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
|
||||
sink.CurrentBackoff.ShouldBeGreaterThan(before);
|
||||
sink.GetStatus().QueueDepth.ShouldBe(1, "row stays in queue for retry");
|
||||
sink.GetStatus().DrainState.ShouldBe(HistorianDrainState.BackingOff);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Ack_after_Retry_resets_backoff()
|
||||
{
|
||||
var writer = new FakeWriter();
|
||||
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.RetryPlease);
|
||||
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
|
||||
|
||||
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
sink.CurrentBackoff.ShouldBeGreaterThan(TimeSpan.FromSeconds(1) - TimeSpan.FromMilliseconds(1));
|
||||
|
||||
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.Ack);
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
|
||||
sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(1));
|
||||
sink.GetStatus().QueueDepth.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task PermanentFail_dead_letters_one_row_only()
|
||||
{
|
||||
var writer = new FakeWriter();
|
||||
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail);
|
||||
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.Ack);
|
||||
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
|
||||
|
||||
await sink.EnqueueAsync(Event("bad"), CancellationToken.None);
|
||||
await sink.EnqueueAsync(Event("good"), CancellationToken.None);
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
|
||||
var status = sink.GetStatus();
|
||||
status.QueueDepth.ShouldBe(0, "good row acked");
|
||||
status.DeadLetterDepth.ShouldBe(1, "bad row dead-lettered");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Writer_exception_treated_as_retry_for_whole_batch()
|
||||
{
|
||||
var writer = new FakeWriter { ThrowOnce = new InvalidOperationException("pipe broken") };
|
||||
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
|
||||
|
||||
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
|
||||
var status = sink.GetStatus();
|
||||
status.QueueDepth.ShouldBe(1);
|
||||
status.LastError.ShouldBe("pipe broken");
|
||||
status.DrainState.ShouldBe(HistorianDrainState.BackingOff);
|
||||
|
||||
// Next drain after the writer recovers should Ack.
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
sink.GetStatus().QueueDepth.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Capacity_eviction_drops_oldest_nondeadlettered_row()
|
||||
{
|
||||
var writer = new FakeWriter();
|
||||
using var sink = new SqliteStoreAndForwardSink(
|
||||
_dbPath, writer, _log, batchSize: 100, capacity: 3);
|
||||
|
||||
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
|
||||
await sink.EnqueueAsync(Event("A2"), CancellationToken.None);
|
||||
await sink.EnqueueAsync(Event("A3"), CancellationToken.None);
|
||||
// A4 enqueue must evict the oldest (A1).
|
||||
await sink.EnqueueAsync(Event("A4"), CancellationToken.None);
|
||||
|
||||
sink.GetStatus().QueueDepth.ShouldBe(3);
|
||||
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
var drained = writer.Batches[0].Select(e => e.AlarmId).ToArray();
|
||||
drained.ShouldNotContain("A1");
|
||||
drained.ShouldContain("A2");
|
||||
drained.ShouldContain("A3");
|
||||
drained.ShouldContain("A4");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Deadlettered_rows_are_purged_past_retention()
|
||||
{
|
||||
var now = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
|
||||
DateTime clock = now;
|
||||
|
||||
var writer = new FakeWriter();
|
||||
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail);
|
||||
using var sink = new SqliteStoreAndForwardSink(
|
||||
_dbPath, writer, _log, deadLetterRetention: TimeSpan.FromDays(30),
|
||||
clock: () => clock);
|
||||
|
||||
await sink.EnqueueAsync(Event("bad"), CancellationToken.None);
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
sink.GetStatus().DeadLetterDepth.ShouldBe(1);
|
||||
|
||||
// Advance past retention + tick drain (which runs PurgeAgedDeadLetters).
|
||||
clock = now.AddDays(31);
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
|
||||
sink.GetStatus().DeadLetterDepth.ShouldBe(0, "purged past retention");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RetryDeadLettered_requeues_for_retry()
|
||||
{
|
||||
var writer = new FakeWriter();
|
||||
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail);
|
||||
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
|
||||
|
||||
await sink.EnqueueAsync(Event("bad"), CancellationToken.None);
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
sink.GetStatus().DeadLetterDepth.ShouldBe(1);
|
||||
|
||||
var revived = sink.RetryDeadLettered();
|
||||
revived.ShouldBe(1);
|
||||
|
||||
var status = sink.GetStatus();
|
||||
status.QueueDepth.ShouldBe(1);
|
||||
status.DeadLetterDepth.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Backoff_ladder_caps_at_60s()
|
||||
{
|
||||
var writer = new FakeWriter { DefaultOutcome = HistorianWriteOutcome.RetryPlease };
|
||||
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
|
||||
|
||||
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
|
||||
|
||||
// 10 retry rounds — ladder should cap at 60s.
|
||||
for (var i = 0; i < 10; i++)
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
|
||||
sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(60));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void NullAlarmHistorianSink_reports_disabled_status()
|
||||
{
|
||||
var s = NullAlarmHistorianSink.Instance.GetStatus();
|
||||
s.DrainState.ShouldBe(HistorianDrainState.Disabled);
|
||||
s.QueueDepth.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task NullAlarmHistorianSink_swallows_enqueue()
|
||||
{
|
||||
// Should not throw or persist anything.
|
||||
await NullAlarmHistorianSink.Instance.EnqueueAsync(Event("A1"), CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Ctor_rejects_bad_args()
|
||||
{
|
||||
var w = new FakeWriter();
|
||||
Should.Throw<ArgumentException>(() => new SqliteStoreAndForwardSink("", w, _log));
|
||||
Should.Throw<ArgumentNullException>(() => new SqliteStoreAndForwardSink(_dbPath, null!, _log));
|
||||
Should.Throw<ArgumentNullException>(() => new SqliteStoreAndForwardSink(_dbPath, w, null!));
|
||||
Should.Throw<ArgumentOutOfRangeException>(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, batchSize: 0));
|
||||
Should.Throw<ArgumentOutOfRangeException>(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, capacity: 0));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Disposed_sink_rejects_enqueue()
|
||||
{
|
||||
var writer = new FakeWriter();
|
||||
var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
|
||||
sink.Dispose();
|
||||
|
||||
await Should.ThrowAsync<ObjectDisposedException>(
|
||||
() => sink.EnqueueAsync(Event("A1"), CancellationToken.None));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<Nullable>enable</Nullable>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<IsPackable>false</IsPackable>
|
||||
<IsTestProject>true</IsTestProject>
|
||||
<RootNamespace>ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests</RootNamespace>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="xunit.v3" Version="1.1.0"/>
|
||||
<PackageReference Include="Shouldly" Version="4.3.0"/>
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0"/>
|
||||
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
|
||||
<PrivateAssets>all</PrivateAssets>
|
||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
||||
</PackageReference>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
@@ -0,0 +1,61 @@
|
||||
using System.Collections.Concurrent;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.Tests;
|
||||
|
||||
public sealed class FakeUpstream : ITagUpstreamSource
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, DataValueSnapshot> _values = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<string, List<Action<string, DataValueSnapshot>>> _subs
|
||||
= new(StringComparer.Ordinal);
|
||||
public int ActiveSubscriptionCount { get; private set; }
|
||||
|
||||
public void Set(string path, object? value, uint statusCode = 0u)
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
_values[path] = new DataValueSnapshot(value, statusCode, now, now);
|
||||
}
|
||||
|
||||
public void Push(string path, object? value, uint statusCode = 0u)
|
||||
{
|
||||
Set(path, value, statusCode);
|
||||
if (_subs.TryGetValue(path, out var list))
|
||||
{
|
||||
Action<string, DataValueSnapshot>[] snap;
|
||||
lock (list) { snap = list.ToArray(); }
|
||||
foreach (var obs in snap) obs(path, _values[path]);
|
||||
}
|
||||
}
|
||||
|
||||
public DataValueSnapshot ReadTag(string path)
|
||||
=> _values.TryGetValue(path, out var v) ? v
|
||||
: new DataValueSnapshot(null, 0x80340000u, null, DateTime.UtcNow);
|
||||
|
||||
public IDisposable SubscribeTag(string path, Action<string, DataValueSnapshot> observer)
|
||||
{
|
||||
var list = _subs.GetOrAdd(path, _ => []);
|
||||
lock (list) { list.Add(observer); }
|
||||
ActiveSubscriptionCount++;
|
||||
return new Unsub(this, path, observer);
|
||||
}
|
||||
|
||||
private sealed class Unsub : IDisposable
|
||||
{
|
||||
private readonly FakeUpstream _up;
|
||||
private readonly string _path;
|
||||
private readonly Action<string, DataValueSnapshot> _observer;
|
||||
public Unsub(FakeUpstream up, string path, Action<string, DataValueSnapshot> observer)
|
||||
{ _up = up; _path = path; _observer = observer; }
|
||||
public void Dispose()
|
||||
{
|
||||
if (_up._subs.TryGetValue(_path, out var list))
|
||||
{
|
||||
lock (list)
|
||||
{
|
||||
if (list.Remove(_observer)) _up.ActiveSubscriptionCount--;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,107 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class MessageTemplateTests
|
||||
{
|
||||
private static DataValueSnapshot Good(object? v) =>
|
||||
new(v, 0u, DateTime.UtcNow, DateTime.UtcNow);
|
||||
private static DataValueSnapshot Bad() =>
|
||||
new(null, 0x80050000u, null, DateTime.UtcNow);
|
||||
|
||||
private static DataValueSnapshot? Resolver(Dictionary<string, DataValueSnapshot> map, string path)
|
||||
=> map.TryGetValue(path, out var v) ? v : null;
|
||||
|
||||
[Fact]
|
||||
public void No_tokens_returns_template_unchanged()
|
||||
{
|
||||
MessageTemplate.Resolve("No tokens here", _ => null).ShouldBe("No tokens here");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Single_token_substituted()
|
||||
{
|
||||
var map = new Dictionary<string, DataValueSnapshot> { ["Tank/Temp"] = Good(75.5) };
|
||||
MessageTemplate.Resolve("Temp={Tank/Temp}C", p => Resolver(map, p)).ShouldBe("Temp=75.5C");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Multiple_tokens_substituted()
|
||||
{
|
||||
var map = new Dictionary<string, DataValueSnapshot>
|
||||
{
|
||||
["A"] = Good(10),
|
||||
["B"] = Good("on"),
|
||||
};
|
||||
MessageTemplate.Resolve("{A}/{B}", p => Resolver(map, p)).ShouldBe("10/on");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Bad_quality_token_becomes_question_mark()
|
||||
{
|
||||
var map = new Dictionary<string, DataValueSnapshot> { ["Bad"] = Bad() };
|
||||
MessageTemplate.Resolve("value={Bad}", p => Resolver(map, p)).ShouldBe("value={?}");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Unknown_path_becomes_question_mark()
|
||||
{
|
||||
MessageTemplate.Resolve("value={DoesNotExist}", _ => null).ShouldBe("value={?}");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Null_value_with_good_quality_becomes_question_mark()
|
||||
{
|
||||
var map = new Dictionary<string, DataValueSnapshot> { ["X"] = Good(null) };
|
||||
MessageTemplate.Resolve("{X}", p => Resolver(map, p)).ShouldBe("{?}");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Tokens_with_slashes_and_dots_resolved()
|
||||
{
|
||||
var map = new Dictionary<string, DataValueSnapshot>
|
||||
{
|
||||
["Line1/Pump.Speed"] = Good(1200),
|
||||
};
|
||||
MessageTemplate.Resolve("rpm={Line1/Pump.Speed}", p => Resolver(map, p))
|
||||
.ShouldBe("rpm=1200");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Empty_template_returns_empty()
|
||||
{
|
||||
MessageTemplate.Resolve("", _ => null).ShouldBe("");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Null_template_returns_empty_without_throwing()
|
||||
{
|
||||
MessageTemplate.Resolve(null!, _ => null).ShouldBe("");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ExtractTokenPaths_returns_every_distinct_token()
|
||||
{
|
||||
var tokens = MessageTemplate.ExtractTokenPaths("{A}/{B}/{A}/{C}");
|
||||
tokens.ShouldBe(new[] { "A", "B", "A", "C" });
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ExtractTokenPaths_empty_for_tokenless_template()
|
||||
{
|
||||
MessageTemplate.ExtractTokenPaths("No tokens").ShouldBeEmpty();
|
||||
MessageTemplate.ExtractTokenPaths("").ShouldBeEmpty();
|
||||
MessageTemplate.ExtractTokenPaths(null).ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Whitespace_inside_token_is_trimmed()
|
||||
{
|
||||
var map = new Dictionary<string, DataValueSnapshot> { ["A"] = Good(42) };
|
||||
MessageTemplate.Resolve("{ A }", p => Resolver(map, p)).ShouldBe("42");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,205 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Pure state-machine tests — no engine, no I/O, no async. Every transition rule
|
||||
/// from Phase 7 plan Stream C.2 / C.3 has at least one locking test so regressions
|
||||
/// surface as clear failures rather than subtle alarm-behavior drift.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class Part9StateMachineTests
|
||||
{
|
||||
private static readonly DateTime T0 = new(2026, 1, 1, 12, 0, 0, DateTimeKind.Utc);
|
||||
private static AlarmConditionState Fresh() => AlarmConditionState.Fresh("alarm-1", T0);
|
||||
|
||||
[Fact]
|
||||
public void Predicate_true_on_inactive_becomes_active_and_emits_Activated()
|
||||
{
|
||||
var r = Part9StateMachine.ApplyPredicate(Fresh(), predicateTrue: true, T0.AddSeconds(1));
|
||||
r.State.Active.ShouldBe(AlarmActiveState.Active);
|
||||
r.State.Acked.ShouldBe(AlarmAckedState.Unacknowledged);
|
||||
r.State.Confirmed.ShouldBe(AlarmConfirmedState.Unconfirmed);
|
||||
r.Emission.ShouldBe(EmissionKind.Activated);
|
||||
r.State.LastActiveUtc.ShouldNotBeNull();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Predicate_false_on_active_becomes_inactive_and_emits_Cleared()
|
||||
{
|
||||
var active = Part9StateMachine.ApplyPredicate(Fresh(), true, T0.AddSeconds(1)).State;
|
||||
var r = Part9StateMachine.ApplyPredicate(active, false, T0.AddSeconds(2));
|
||||
r.State.Active.ShouldBe(AlarmActiveState.Inactive);
|
||||
r.Emission.ShouldBe(EmissionKind.Cleared);
|
||||
r.State.LastClearedUtc.ShouldNotBeNull();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Predicate_unchanged_state_emits_None()
|
||||
{
|
||||
var r = Part9StateMachine.ApplyPredicate(Fresh(), false, T0);
|
||||
r.Emission.ShouldBe(EmissionKind.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Disabled_alarm_ignores_predicate()
|
||||
{
|
||||
var disabled = Part9StateMachine.ApplyDisable(Fresh(), "op1", T0.AddSeconds(1)).State;
|
||||
var r = Part9StateMachine.ApplyPredicate(disabled, true, T0.AddSeconds(2));
|
||||
r.State.Active.ShouldBe(AlarmActiveState.Inactive);
|
||||
r.Emission.ShouldBe(EmissionKind.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Acknowledge_from_unacked_records_user_and_emits()
|
||||
{
|
||||
var active = Part9StateMachine.ApplyPredicate(Fresh(), true, T0.AddSeconds(1)).State;
|
||||
var r = Part9StateMachine.ApplyAcknowledge(active, "alice", "looking into it", T0.AddSeconds(2));
|
||||
r.State.Acked.ShouldBe(AlarmAckedState.Acknowledged);
|
||||
r.State.LastAckUser.ShouldBe("alice");
|
||||
r.State.LastAckComment.ShouldBe("looking into it");
|
||||
r.State.Comments.Count.ShouldBe(1);
|
||||
r.Emission.ShouldBe(EmissionKind.Acknowledged);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Acknowledge_when_already_acked_is_noop()
|
||||
{
|
||||
var active = Part9StateMachine.ApplyPredicate(Fresh(), true, T0.AddSeconds(1)).State;
|
||||
var acked = Part9StateMachine.ApplyAcknowledge(active, "alice", null, T0.AddSeconds(2)).State;
|
||||
var r = Part9StateMachine.ApplyAcknowledge(acked, "alice", null, T0.AddSeconds(3));
|
||||
r.Emission.ShouldBe(EmissionKind.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Acknowledge_without_user_throws()
|
||||
{
|
||||
Should.Throw<ArgumentException>(() =>
|
||||
Part9StateMachine.ApplyAcknowledge(Fresh(), "", null, T0));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Confirm_after_clear_records_user_and_emits()
|
||||
{
|
||||
// Walk: activate -> ack -> clear -> confirm
|
||||
var s = Fresh();
|
||||
s = Part9StateMachine.ApplyPredicate(s, true, T0.AddSeconds(1)).State;
|
||||
s = Part9StateMachine.ApplyAcknowledge(s, "alice", null, T0.AddSeconds(2)).State;
|
||||
s = Part9StateMachine.ApplyPredicate(s, false, T0.AddSeconds(3)).State;
|
||||
|
||||
var r = Part9StateMachine.ApplyConfirm(s, "bob", "resolved", T0.AddSeconds(4));
|
||||
r.State.Confirmed.ShouldBe(AlarmConfirmedState.Confirmed);
|
||||
r.State.LastConfirmUser.ShouldBe("bob");
|
||||
r.Emission.ShouldBe(EmissionKind.Confirmed);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void OneShotShelve_suppresses_next_activation_emission()
|
||||
{
|
||||
var s = Part9StateMachine.ApplyOneShotShelve(Fresh(), "alice", T0.AddSeconds(1)).State;
|
||||
var r = Part9StateMachine.ApplyPredicate(s, true, T0.AddSeconds(2));
|
||||
r.State.Active.ShouldBe(AlarmActiveState.Active, "state still advances");
|
||||
r.Emission.ShouldBe(EmissionKind.Suppressed, "but subscribers don't see it");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void OneShotShelve_expires_on_clear()
|
||||
{
|
||||
var s = Fresh();
|
||||
s = Part9StateMachine.ApplyOneShotShelve(s, "alice", T0.AddSeconds(1)).State;
|
||||
s = Part9StateMachine.ApplyPredicate(s, true, T0.AddSeconds(2)).State;
|
||||
var r = Part9StateMachine.ApplyPredicate(s, false, T0.AddSeconds(3));
|
||||
r.State.Shelving.Kind.ShouldBe(ShelvingKind.Unshelved, "OneShot expires on clear");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void TimedShelve_requires_future_unshelve_time()
|
||||
{
|
||||
Should.Throw<ArgumentOutOfRangeException>(() =>
|
||||
Part9StateMachine.ApplyTimedShelve(Fresh(), "alice", T0, T0.AddSeconds(5)));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void TimedShelve_expires_via_shelving_check()
|
||||
{
|
||||
var until = T0.AddMinutes(5);
|
||||
var shelved = Part9StateMachine.ApplyTimedShelve(Fresh(), "alice", until, T0).State;
|
||||
shelved.Shelving.Kind.ShouldBe(ShelvingKind.Timed);
|
||||
|
||||
// Before expiry — still shelved.
|
||||
var earlier = Part9StateMachine.ApplyShelvingCheck(shelved, T0.AddMinutes(3));
|
||||
earlier.State.Shelving.Kind.ShouldBe(ShelvingKind.Timed);
|
||||
earlier.Emission.ShouldBe(EmissionKind.None);
|
||||
|
||||
// After expiry — auto-unshelved + emission.
|
||||
var after = Part9StateMachine.ApplyShelvingCheck(shelved, T0.AddMinutes(6));
|
||||
after.State.Shelving.Kind.ShouldBe(ShelvingKind.Unshelved);
|
||||
after.Emission.ShouldBe(EmissionKind.Unshelved);
|
||||
after.State.Comments.Any(c => c.Kind == "AutoUnshelve").ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Unshelve_from_unshelved_is_noop()
|
||||
{
|
||||
var r = Part9StateMachine.ApplyUnshelve(Fresh(), "alice", T0);
|
||||
r.Emission.ShouldBe(EmissionKind.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Explicit_Unshelve_emits_event()
|
||||
{
|
||||
var s = Part9StateMachine.ApplyOneShotShelve(Fresh(), "alice", T0).State;
|
||||
var r = Part9StateMachine.ApplyUnshelve(s, "bob", T0.AddSeconds(30));
|
||||
r.State.Shelving.Kind.ShouldBe(ShelvingKind.Unshelved);
|
||||
r.Emission.ShouldBe(EmissionKind.Unshelved);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void AddComment_appends_to_audit_trail_with_event()
|
||||
{
|
||||
var r = Part9StateMachine.ApplyAddComment(Fresh(), "alice", "investigating", T0.AddSeconds(5));
|
||||
r.State.Comments.Count.ShouldBe(1);
|
||||
r.State.Comments[0].Kind.ShouldBe("AddComment");
|
||||
r.State.Comments[0].User.ShouldBe("alice");
|
||||
r.State.Comments[0].Text.ShouldBe("investigating");
|
||||
r.Emission.ShouldBe(EmissionKind.CommentAdded);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Comments_are_append_only_never_rewritten()
|
||||
{
|
||||
var s = Part9StateMachine.ApplyAddComment(Fresh(), "alice", "first", T0.AddSeconds(1)).State;
|
||||
s = Part9StateMachine.ApplyAddComment(s, "bob", "second", T0.AddSeconds(2)).State;
|
||||
s = Part9StateMachine.ApplyAddComment(s, "carol", "third", T0.AddSeconds(3)).State;
|
||||
s.Comments.Count.ShouldBe(3);
|
||||
s.Comments[0].User.ShouldBe("alice");
|
||||
s.Comments[1].User.ShouldBe("bob");
|
||||
s.Comments[2].User.ShouldBe("carol");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Full_lifecycle_walk_produces_every_expected_emission()
|
||||
{
|
||||
// Walk a condition through its whole lifecycle and make sure emissions line up.
|
||||
var emissions = new List<EmissionKind>();
|
||||
var s = Fresh();
|
||||
|
||||
s = Capture(Part9StateMachine.ApplyPredicate(s, true, T0.AddSeconds(1)));
|
||||
s = Capture(Part9StateMachine.ApplyAcknowledge(s, "alice", null, T0.AddSeconds(2)));
|
||||
s = Capture(Part9StateMachine.ApplyAddComment(s, "alice", "need to investigate", T0.AddSeconds(3)));
|
||||
s = Capture(Part9StateMachine.ApplyPredicate(s, false, T0.AddSeconds(4)));
|
||||
s = Capture(Part9StateMachine.ApplyConfirm(s, "bob", null, T0.AddSeconds(5)));
|
||||
|
||||
emissions.ShouldBe(new[] {
|
||||
EmissionKind.Activated,
|
||||
EmissionKind.Acknowledged,
|
||||
EmissionKind.CommentAdded,
|
||||
EmissionKind.Cleared,
|
||||
EmissionKind.Confirmed,
|
||||
});
|
||||
|
||||
AlarmConditionState Capture(TransitionResult r) { emissions.Add(r.Emission); return r.State; }
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,316 @@
|
||||
using Serilog;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// End-to-end engine tests: load, predicate evaluation, change-triggered
|
||||
/// re-evaluation, state persistence, startup recovery, error isolation.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class ScriptedAlarmEngineTests
|
||||
{
|
||||
private static ScriptedAlarmEngine Build(FakeUpstream up, out IAlarmStateStore store)
|
||||
{
|
||||
store = new InMemoryAlarmStateStore();
|
||||
var logger = new LoggerConfiguration().CreateLogger();
|
||||
return new ScriptedAlarmEngine(up, store, new ScriptLoggerFactory(logger), logger);
|
||||
}
|
||||
|
||||
private static ScriptedAlarmDefinition Alarm(string id, string predicate,
|
||||
string msg = "condition", AlarmSeverity sev = AlarmSeverity.High) =>
|
||||
new(AlarmId: id,
|
||||
EquipmentPath: "Plant/Line1/Reactor",
|
||||
AlarmName: id,
|
||||
Kind: AlarmKind.AlarmCondition,
|
||||
Severity: sev,
|
||||
MessageTemplate: msg,
|
||||
PredicateScriptSource: predicate);
|
||||
|
||||
[Fact]
|
||||
public async Task Load_compiles_and_subscribes_to_referenced_upstreams()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("Temp", 50);
|
||||
using var eng = Build(up, out _);
|
||||
|
||||
await eng.LoadAsync([Alarm("a1", """return (int)ctx.GetTag("Temp").Value > 100;""")],
|
||||
TestContext.Current.CancellationToken);
|
||||
|
||||
eng.LoadedAlarmIds.ShouldContain("a1");
|
||||
up.ActiveSubscriptionCount.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Compile_failures_aggregated_into_one_error()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
using var eng = Build(up, out _);
|
||||
|
||||
var ex = await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
await eng.LoadAsync([
|
||||
Alarm("bad1", "return unknownIdentifier;"),
|
||||
Alarm("good", "return true;"),
|
||||
Alarm("bad2", "var x = alsoUnknown; return x;"),
|
||||
], TestContext.Current.CancellationToken));
|
||||
ex.Message.ShouldContain("2 alarm(s) did not compile");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Upstream_change_re_evaluates_predicate_and_emits_Activated()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("Temp", 50);
|
||||
using var eng = Build(up, out _);
|
||||
await eng.LoadAsync([Alarm("HighTemp", """return (int)ctx.GetTag("Temp").Value > 100;""")],
|
||||
TestContext.Current.CancellationToken);
|
||||
|
||||
var events = new List<ScriptedAlarmEvent>();
|
||||
eng.OnEvent += (_, e) => events.Add(e);
|
||||
|
||||
up.Push("Temp", 150);
|
||||
await WaitForAsync(() => events.Count > 0);
|
||||
|
||||
events[0].AlarmId.ShouldBe("HighTemp");
|
||||
events[0].Emission.ShouldBe(EmissionKind.Activated);
|
||||
eng.GetState("HighTemp")!.Active.ShouldBe(AlarmActiveState.Active);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Clearing_upstream_emits_Cleared_event()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("Temp", 150);
|
||||
using var eng = Build(up, out _);
|
||||
await eng.LoadAsync([Alarm("HighTemp", """return (int)ctx.GetTag("Temp").Value > 100;""")],
|
||||
TestContext.Current.CancellationToken);
|
||||
|
||||
// Startup sees 150 → active.
|
||||
eng.GetState("HighTemp")!.Active.ShouldBe(AlarmActiveState.Active);
|
||||
|
||||
var events = new List<ScriptedAlarmEvent>();
|
||||
eng.OnEvent += (_, e) => events.Add(e);
|
||||
|
||||
up.Push("Temp", 50);
|
||||
await WaitForAsync(() => events.Any(e => e.Emission == EmissionKind.Cleared));
|
||||
eng.GetState("HighTemp")!.Active.ShouldBe(AlarmActiveState.Inactive);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Message_template_resolves_tag_values_at_emission()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("Temp", 50);
|
||||
up.Set("Limit", 100);
|
||||
using var eng = Build(up, out _);
|
||||
await eng.LoadAsync([
|
||||
new ScriptedAlarmDefinition(
|
||||
"HighTemp", "Plant/Line1", "HighTemp",
|
||||
AlarmKind.LimitAlarm, AlarmSeverity.High,
|
||||
"Temp {Temp}C exceeded limit {Limit}C",
|
||||
"""return (int)ctx.GetTag("Temp").Value > (int)ctx.GetTag("Limit").Value;"""),
|
||||
], TestContext.Current.CancellationToken);
|
||||
|
||||
var events = new List<ScriptedAlarmEvent>();
|
||||
eng.OnEvent += (_, e) => events.Add(e);
|
||||
|
||||
up.Push("Temp", 150);
|
||||
await WaitForAsync(() => events.Any());
|
||||
|
||||
events[0].Message.ShouldBe("Temp 150C exceeded limit 100C");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Ack_records_user_and_persists_to_store()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("Temp", 150);
|
||||
using var eng = Build(up, out var store);
|
||||
await eng.LoadAsync([Alarm("HighTemp", """return (int)ctx.GetTag("Temp").Value > 100;""")],
|
||||
TestContext.Current.CancellationToken);
|
||||
|
||||
await eng.AcknowledgeAsync("HighTemp", "alice", "checking", TestContext.Current.CancellationToken);
|
||||
|
||||
var persisted = await store.LoadAsync("HighTemp", TestContext.Current.CancellationToken);
|
||||
persisted.ShouldNotBeNull();
|
||||
persisted!.Acked.ShouldBe(AlarmAckedState.Acknowledged);
|
||||
persisted.LastAckUser.ShouldBe("alice");
|
||||
persisted.LastAckComment.ShouldBe("checking");
|
||||
persisted.Comments.Any(c => c.Kind == "Acknowledge" && c.User == "alice").ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Startup_recovery_preserves_ack_but_rederives_active_from_predicate()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("Temp", 50); // predicate will go false on second load
|
||||
|
||||
// First run — alarm goes active + operator acks.
|
||||
using (var eng1 = Build(up, out var sharedStore))
|
||||
{
|
||||
up.Set("Temp", 150);
|
||||
await eng1.LoadAsync([Alarm("HighTemp", """return (int)ctx.GetTag("Temp").Value > 100;""")],
|
||||
TestContext.Current.CancellationToken);
|
||||
eng1.GetState("HighTemp")!.Active.ShouldBe(AlarmActiveState.Active);
|
||||
|
||||
await eng1.AcknowledgeAsync("HighTemp", "alice", null, TestContext.Current.CancellationToken);
|
||||
eng1.GetState("HighTemp")!.Acked.ShouldBe(AlarmAckedState.Acknowledged);
|
||||
}
|
||||
|
||||
// Simulate restart — temp is back to 50 (below threshold).
|
||||
up.Set("Temp", 50);
|
||||
var logger = new LoggerConfiguration().CreateLogger();
|
||||
var store2 = new InMemoryAlarmStateStore();
|
||||
// seed store2 with the acked state from before restart
|
||||
await store2.SaveAsync(new AlarmConditionState(
|
||||
"HighTemp",
|
||||
AlarmEnabledState.Enabled,
|
||||
AlarmActiveState.Active, // was active pre-restart
|
||||
AlarmAckedState.Acknowledged, // ack persisted
|
||||
AlarmConfirmedState.Unconfirmed,
|
||||
ShelvingState.Unshelved,
|
||||
DateTime.UtcNow,
|
||||
DateTime.UtcNow, null,
|
||||
DateTime.UtcNow, "alice", null,
|
||||
null, null, null,
|
||||
[new AlarmComment(DateTime.UtcNow, "alice", "Acknowledge", "")]),
|
||||
TestContext.Current.CancellationToken);
|
||||
|
||||
using var eng2 = new ScriptedAlarmEngine(up, store2, new ScriptLoggerFactory(logger), logger);
|
||||
await eng2.LoadAsync([Alarm("HighTemp", """return (int)ctx.GetTag("Temp").Value > 100;""")],
|
||||
TestContext.Current.CancellationToken);
|
||||
|
||||
var s = eng2.GetState("HighTemp")!;
|
||||
s.Active.ShouldBe(AlarmActiveState.Inactive, "Active recomputed from current tag value");
|
||||
s.Acked.ShouldBe(AlarmAckedState.Acknowledged, "Ack persisted across restart");
|
||||
s.LastAckUser.ShouldBe("alice");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Shelved_active_transitions_state_but_suppresses_emission()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("Temp", 50);
|
||||
using var eng = Build(up, out _);
|
||||
await eng.LoadAsync([Alarm("HighTemp", """return (int)ctx.GetTag("Temp").Value > 100;""")],
|
||||
TestContext.Current.CancellationToken);
|
||||
|
||||
await eng.OneShotShelveAsync("HighTemp", "alice", TestContext.Current.CancellationToken);
|
||||
|
||||
var events = new List<ScriptedAlarmEvent>();
|
||||
eng.OnEvent += (_, e) => events.Add(e);
|
||||
|
||||
up.Push("Temp", 150);
|
||||
await Task.Delay(200);
|
||||
|
||||
events.Any(e => e.Emission == EmissionKind.Activated).ShouldBeFalse(
|
||||
"OneShot shelve suppresses activation emission");
|
||||
eng.GetState("HighTemp")!.Active.ShouldBe(AlarmActiveState.Active,
|
||||
"state still advances so startup recovery is consistent");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Predicate_runtime_exception_does_not_transition_state()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("Temp", 150);
|
||||
using var eng = Build(up, out _);
|
||||
await eng.LoadAsync([
|
||||
Alarm("BadScript", """throw new InvalidOperationException("boom");"""),
|
||||
Alarm("GoodScript", """return (int)ctx.GetTag("Temp").Value > 100;"""),
|
||||
], TestContext.Current.CancellationToken);
|
||||
|
||||
// Bad script doesn't activate + doesn't disable other alarms.
|
||||
eng.GetState("BadScript")!.Active.ShouldBe(AlarmActiveState.Inactive);
|
||||
eng.GetState("GoodScript")!.Active.ShouldBe(AlarmActiveState.Active);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Disable_prevents_activation_until_re_enabled()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("Temp", 50);
|
||||
using var eng = Build(up, out _);
|
||||
await eng.LoadAsync([Alarm("HighTemp", """return (int)ctx.GetTag("Temp").Value > 100;""")],
|
||||
TestContext.Current.CancellationToken);
|
||||
|
||||
await eng.DisableAsync("HighTemp", "alice", TestContext.Current.CancellationToken);
|
||||
up.Push("Temp", 150);
|
||||
await Task.Delay(100);
|
||||
eng.GetState("HighTemp")!.Active.ShouldBe(AlarmActiveState.Inactive,
|
||||
"disabled alarm ignores predicate");
|
||||
|
||||
await eng.EnableAsync("HighTemp", "alice", TestContext.Current.CancellationToken);
|
||||
up.Push("Temp", 160);
|
||||
await WaitForAsync(() => eng.GetState("HighTemp")!.Active == AlarmActiveState.Active);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AddComment_appends_to_audit_without_state_change()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("Temp", 50);
|
||||
using var eng = Build(up, out var store);
|
||||
await eng.LoadAsync([Alarm("A", """return false;""")], TestContext.Current.CancellationToken);
|
||||
|
||||
await eng.AddCommentAsync("A", "alice", "peeking at this", TestContext.Current.CancellationToken);
|
||||
|
||||
var s = await store.LoadAsync("A", TestContext.Current.CancellationToken);
|
||||
s.ShouldNotBeNull();
|
||||
s!.Comments.Count.ShouldBe(1);
|
||||
s.Comments[0].User.ShouldBe("alice");
|
||||
s.Comments[0].Kind.ShouldBe("AddComment");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Predicate_scripts_cannot_SetVirtualTag()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("Temp", 100);
|
||||
using var eng = Build(up, out _);
|
||||
|
||||
// The script compiles fine but throws at runtime when SetVirtualTag is called.
|
||||
// The engine swallows the exception + leaves state unchanged.
|
||||
await eng.LoadAsync([
|
||||
new ScriptedAlarmDefinition(
|
||||
"Bad", "Plant/Line1", "Bad",
|
||||
AlarmKind.AlarmCondition, AlarmSeverity.High, "bad",
|
||||
"""
|
||||
ctx.SetVirtualTag("NotAllowed", 1);
|
||||
return true;
|
||||
"""),
|
||||
], TestContext.Current.CancellationToken);
|
||||
|
||||
// Bad alarm's predicate threw — state unchanged.
|
||||
eng.GetState("Bad")!.Active.ShouldBe(AlarmActiveState.Inactive);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Dispose_releases_upstream_subscriptions()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("Temp", 50);
|
||||
var eng = Build(up, out _);
|
||||
await eng.LoadAsync([Alarm("A", """return (int)ctx.GetTag("Temp").Value > 100;""")],
|
||||
TestContext.Current.CancellationToken);
|
||||
up.ActiveSubscriptionCount.ShouldBe(1);
|
||||
|
||||
eng.Dispose();
|
||||
up.ActiveSubscriptionCount.ShouldBe(0);
|
||||
}
|
||||
|
||||
private static async Task WaitForAsync(Func<bool> cond, int timeoutMs = 2000)
|
||||
{
|
||||
var deadline = DateTime.UtcNow.AddMilliseconds(timeoutMs);
|
||||
while (DateTime.UtcNow < deadline)
|
||||
{
|
||||
if (cond()) return;
|
||||
await Task.Delay(25);
|
||||
}
|
||||
throw new TimeoutException("Condition did not become true in time");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,142 @@
|
||||
using Serilog;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class ScriptedAlarmSourceTests
|
||||
{
|
||||
private static async Task<(ScriptedAlarmEngine e, ScriptedAlarmSource s, FakeUpstream u)> BuildAsync()
|
||||
{
|
||||
var up = new FakeUpstream();
|
||||
up.Set("Temp", 50);
|
||||
var logger = new LoggerConfiguration().CreateLogger();
|
||||
var engine = new ScriptedAlarmEngine(up, new InMemoryAlarmStateStore(),
|
||||
new ScriptLoggerFactory(logger), logger);
|
||||
await engine.LoadAsync([
|
||||
new ScriptedAlarmDefinition(
|
||||
"Plant/Line1::HighTemp",
|
||||
"Plant/Line1",
|
||||
"HighTemp",
|
||||
AlarmKind.LimitAlarm,
|
||||
AlarmSeverity.High,
|
||||
"Temp {Temp}C",
|
||||
"""return (int)ctx.GetTag("Temp").Value > 100;"""),
|
||||
new ScriptedAlarmDefinition(
|
||||
"Plant/Line2::OtherAlarm",
|
||||
"Plant/Line2",
|
||||
"OtherAlarm",
|
||||
AlarmKind.AlarmCondition,
|
||||
AlarmSeverity.Low,
|
||||
"other",
|
||||
"""return false;"""),
|
||||
], CancellationToken.None);
|
||||
|
||||
var source = new ScriptedAlarmSource(engine);
|
||||
return (engine, source, up);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Subscribe_with_empty_filter_receives_every_alarm_emission()
|
||||
{
|
||||
var (engine, source, up) = await BuildAsync();
|
||||
using var _e = engine;
|
||||
using var _s = source;
|
||||
|
||||
var events = new List<AlarmEventArgs>();
|
||||
source.OnAlarmEvent += (_, e) => events.Add(e);
|
||||
var handle = await source.SubscribeAlarmsAsync([], TestContext.Current.CancellationToken);
|
||||
|
||||
up.Push("Temp", 150);
|
||||
await Task.Delay(200);
|
||||
|
||||
events.Count.ShouldBe(1);
|
||||
events[0].ConditionId.ShouldBe("Plant/Line1::HighTemp");
|
||||
events[0].SourceNodeId.ShouldBe("Plant/Line1");
|
||||
events[0].Severity.ShouldBe(AlarmSeverity.High);
|
||||
events[0].AlarmType.ShouldBe("LimitAlarm");
|
||||
events[0].Message.ShouldBe("Temp 150C");
|
||||
|
||||
await source.UnsubscribeAlarmsAsync(handle, TestContext.Current.CancellationToken);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Subscribe_with_equipment_prefix_filters_by_that_prefix()
|
||||
{
|
||||
var (engine, source, up) = await BuildAsync();
|
||||
using var _e = engine;
|
||||
using var _s = source;
|
||||
|
||||
var events = new List<AlarmEventArgs>();
|
||||
source.OnAlarmEvent += (_, e) => events.Add(e);
|
||||
|
||||
// Subscribe only to Line1 alarms.
|
||||
var handle = await source.SubscribeAlarmsAsync(["Plant/Line1"], TestContext.Current.CancellationToken);
|
||||
|
||||
up.Push("Temp", 150);
|
||||
await Task.Delay(200);
|
||||
|
||||
events.Count.ShouldBe(1);
|
||||
events[0].SourceNodeId.ShouldBe("Plant/Line1");
|
||||
|
||||
await source.UnsubscribeAlarmsAsync(handle, TestContext.Current.CancellationToken);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Unsubscribe_stops_further_events()
|
||||
{
|
||||
var (engine, source, up) = await BuildAsync();
|
||||
using var _e = engine;
|
||||
using var _s = source;
|
||||
|
||||
var events = new List<AlarmEventArgs>();
|
||||
source.OnAlarmEvent += (_, e) => events.Add(e);
|
||||
var handle = await source.SubscribeAlarmsAsync([], TestContext.Current.CancellationToken);
|
||||
await source.UnsubscribeAlarmsAsync(handle, TestContext.Current.CancellationToken);
|
||||
|
||||
up.Push("Temp", 150);
|
||||
await Task.Delay(200);
|
||||
|
||||
events.Count.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_routes_to_engine_with_default_user()
|
||||
{
|
||||
var (engine, source, up) = await BuildAsync();
|
||||
using var _e = engine;
|
||||
using var _s = source;
|
||||
|
||||
up.Push("Temp", 150);
|
||||
await Task.Delay(200);
|
||||
engine.GetState("Plant/Line1::HighTemp")!.Acked.ShouldBe(AlarmAckedState.Unacknowledged);
|
||||
|
||||
await source.AcknowledgeAsync([new AlarmAcknowledgeRequest(
|
||||
"Plant/Line1", "Plant/Line1::HighTemp", "ack via opcua")],
|
||||
TestContext.Current.CancellationToken);
|
||||
|
||||
var state = engine.GetState("Plant/Line1::HighTemp")!;
|
||||
state.Acked.ShouldBe(AlarmAckedState.Acknowledged);
|
||||
state.LastAckUser.ShouldBe("opcua-client");
|
||||
state.LastAckComment.ShouldBe("ack via opcua");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Null_arguments_rejected()
|
||||
{
|
||||
var (engine, source, _) = await BuildAsync();
|
||||
using var _e = engine;
|
||||
using var _s = source;
|
||||
|
||||
await Should.ThrowAsync<ArgumentNullException>(async () =>
|
||||
await source.SubscribeAlarmsAsync(null!, TestContext.Current.CancellationToken));
|
||||
await Should.ThrowAsync<ArgumentNullException>(async () =>
|
||||
await source.UnsubscribeAlarmsAsync(null!, TestContext.Current.CancellationToken));
|
||||
await Should.ThrowAsync<ArgumentNullException>(async () =>
|
||||
await source.AcknowledgeAsync(null!, TestContext.Current.CancellationToken));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net10.0</TargetFramework>
|
||||
<Nullable>enable</Nullable>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<IsPackable>false</IsPackable>
|
||||
<IsTestProject>true</IsTestProject>
|
||||
<RootNamespace>ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.Tests</RootNamespace>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="xunit.v3" Version="1.1.0"/>
|
||||
<PackageReference Include="Shouldly" Version="4.3.0"/>
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0"/>
|
||||
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
|
||||
<PrivateAssets>all</PrivateAssets>
|
||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
||||
</PackageReference>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms\ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.csproj"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
|
||||
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
Reference in New Issue
Block a user