From 25ad4b1929ce594b6103d44128284c690c93ba9f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 20 Apr 2026 19:11:17 -0400 Subject: [PATCH] =?UTF-8?q?Phase=207=20Stream=20D=20=E2=80=94=20Historian?= =?UTF-8?q?=20alarm=20sink=20(SQLite=20store-and-forward=20+=20Galaxy.Host?= =?UTF-8?q?=20IPC=20contracts)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 7 plan decisions #16, #17, #19, #21 implementation. Durable local SQLite queue absorbs every qualifying alarm event; drain worker forwards batches to Galaxy.Host (reusing the already-loaded 32-bit aahClientManaged DLLs) on an exponential-backoff cadence; operator acks never block on the historian being reachable. ## New project Core.AlarmHistorian (net10) - AlarmHistorianEvent — source-agnostic event shape (scripted alarms + Galaxy-native + AB CIP ALMD + any future IAlarmSource) - IAlarmHistorianSink / NullAlarmHistorianSink — interface + disabled default - IAlarmHistorianWriter — per-event outcome (Ack / RetryPlease / PermanentFail); Stream G wires the Galaxy.Host IPC client implementation - SqliteStoreAndForwardSink — full implementation: - Queue table with AttemptCount / LastError / DeadLettered columns - DrainOnceAsync serialised via SemaphoreSlim - BackoffLadder 1s → 2s → 5s → 15s → 60s (cap) - DefaultCapacity 1,000,000 rows — overflow evicts oldest non-dead-lettered - DefaultDeadLetterRetention 30 days — sweeper purges on every drain tick - RetryDeadLettered operator action reattaches dead-letters to the regular queue - Writer-side exceptions treated as whole-batch RetryPlease (no data loss) ## New IPC contracts in Driver.Galaxy.Shared - HistorianAlarmEventRequest — batched up to 100 events/request per plan Stream D.5 - HistorianAlarmEventResponse — per-event outcome (1:1 with request order) - HistorianAlarmEventOutcomeDto enum (byte on the wire — Ack/RetryPlease/PermanentFail) - HistorianAlarmEventDto — mirrors Core.AlarmHistorian.AlarmHistorianEvent - HistorianConnectivityStatusNotification — Host pushes proactively when the SDK session drops so /alarms/historian flips red without waiting for the next drain - MessageKind additions: 0x80 HistorianAlarmEventRequest / 0x81 HistorianAlarmEventResponse / 0x82 HistorianConnectivityStatus ## Tests — 14/14 SqliteStoreAndForwardSinkTests covers: enqueue→drain→Ack round-trip, empty-queue no-op, RetryPlease bumps backoff + keeps row, Ack after Retry resets backoff, PermanentFail dead-letters one row without blocking neighbors, writer exception treated as whole-batch retry with error surfaced in status, capacity eviction drops oldest non-dead-lettered, dead-letters purged past retention window, RetryDeadLettered requeues, ladder caps at 60s after 10 retries, Null sink reports Disabled status, null sink swallows enqueue, ctor argument validation, disposed sink rejects enqueue. ## Totals Full Phase 7 tests: 160 green (63 Scripting + 36 VirtualTags + 47 ScriptedAlarms + 14 AlarmHistorian). Stream G wires this into the real Galaxy.Host IPC pipe. --- ZB.MOM.WW.OtOpcUa.slnx | 2 + .../AlarmHistorianEvent.cs | 36 ++ .../IAlarmHistorianSink.cs | 82 ++++ .../SqliteStoreAndForwardSink.cs | 397 ++++++++++++++++++ ....MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj | 32 ++ .../Contracts/Framing.cs | 8 + .../Contracts/HistorianAlarms.cs | 92 ++++ .../SqliteStoreAndForwardSinkTests.cs | 286 +++++++++++++ ...W.OtOpcUa.Core.AlarmHistorian.Tests.csproj | 31 ++ 9 files changed, 966 insertions(+) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/AlarmHistorianEvent.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/IAlarmHistorianSink.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj create mode 100644 src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/HistorianAlarms.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests.csproj diff --git a/ZB.MOM.WW.OtOpcUa.slnx b/ZB.MOM.WW.OtOpcUa.slnx index 0bfe61c..2c304cb 100644 --- a/ZB.MOM.WW.OtOpcUa.slnx +++ b/ZB.MOM.WW.OtOpcUa.slnx @@ -6,6 +6,7 @@ + @@ -32,6 +33,7 @@ + diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/AlarmHistorianEvent.cs b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/AlarmHistorianEvent.cs new file mode 100644 index 0000000..8c383b0 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/AlarmHistorianEvent.cs @@ -0,0 +1,36 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; + +/// +/// The event shape the historian sink consumes — source-agnostic across scripted +/// alarms + Galaxy-native + AB CIP ALMD + any future IAlarmSource per Phase 7 plan +/// decision #15 (sink scope = all alarm sources, not just scripted). A per-alarm +/// HistorizeToAveva toggle on the producer side gates which events flow. +/// +/// Stable condition identity. +/// UNS path of the Equipment node the alarm hangs under. Doubles as the "SourceNode" in Historian's alarm schema. +/// Human-readable alarm name. +/// Concrete Part 9 subtype — "LimitAlarm" / "DiscreteAlarm" / "OffNormalAlarm" / "AlarmCondition". Used as the Historian "AlarmType" column. +/// Mapped to Historian's numeric priority on the sink side. +/// +/// Which state transition this event represents — "Activated" / "Cleared" / +/// "Acknowledged" / "Confirmed" / "Shelved" / "Unshelved" / "Disabled" / "Enabled" / +/// "CommentAdded". Free-form string because different alarm sources use different +/// vocabularies; the Galaxy.Host handler maps to the historian's enum on the wire. +/// +/// Fully-rendered message text — template tokens already resolved upstream. +/// Operator who triggered the transition. "system" for engine-driven events (shelving expiry, predicate change). +/// Operator-supplied free-form text, if any. +/// When the transition occurred. +public sealed record AlarmHistorianEvent( + string AlarmId, + string EquipmentPath, + string AlarmName, + string AlarmTypeName, + AlarmSeverity Severity, + string EventKind, + string Message, + string User, + string? Comment, + DateTime TimestampUtc); diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/IAlarmHistorianSink.cs b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/IAlarmHistorianSink.cs new file mode 100644 index 0000000..87fc9a2 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/IAlarmHistorianSink.cs @@ -0,0 +1,82 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; + +/// +/// The historian sink contract — where qualifying alarm events land. Phase 7 plan +/// decision #17: ingestion routes through Galaxy.Host's pipe so we reuse the +/// already-loaded aahClientManaged DLLs without loading 32-bit native code +/// in the main .NET 10 server. Tests use an in-memory fake; production uses +/// . +/// +/// +/// +/// is fire-and-forget from the engine's perspective — +/// the sink MUST NOT block the emitting thread. Production implementations +/// () persist to a local SQLite queue +/// first, then drain asynchronously to the actual historian. Per Phase 7 plan +/// decision #16, failed downstream writes replay with exponential backoff; +/// operator actions are never blocked waiting on the historian. +/// +/// +/// exposes queue depth + drain rate + last error +/// for the Admin UI /alarms/historian diagnostics page (Stream F). +/// +/// +public interface IAlarmHistorianSink +{ + /// Durably enqueue the event. Returns as soon as the queue row is committed. + Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken); + + /// Snapshot of current queue depth + drain health. + HistorianSinkStatus GetStatus(); +} + +/// No-op default for tests or deployments that don't historize alarms. +public sealed class NullAlarmHistorianSink : IAlarmHistorianSink +{ + public static readonly NullAlarmHistorianSink Instance = new(); + public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken) => Task.CompletedTask; + public HistorianSinkStatus GetStatus() => new( + QueueDepth: 0, + DeadLetterDepth: 0, + LastDrainUtc: null, + LastSuccessUtc: null, + LastError: null, + DrainState: HistorianDrainState.Disabled); +} + +/// Diagnostic snapshot surfaced to the Admin UI + /healthz endpoints. +public sealed record HistorianSinkStatus( + long QueueDepth, + long DeadLetterDepth, + DateTime? LastDrainUtc, + DateTime? LastSuccessUtc, + string? LastError, + HistorianDrainState DrainState); + +/// Where the drain worker is in its state machine. +public enum HistorianDrainState +{ + Disabled, + Idle, + Draining, + BackingOff, +} + +/// Signaled by the Galaxy.Host-side handler when it fails a batch — drain worker uses this to decide retry cadence. +public enum HistorianWriteOutcome +{ + /// Successfully persisted to the historian. Remove from queue. + Ack, + /// Transient failure (historian disconnected, timeout, busy). Leave queued; retry after backoff. + RetryPlease, + /// Permanent failure (malformed event, unrecoverable SDK error). Move to dead-letter table. + PermanentFail, +} + +/// What the drain worker delegates writes to — Stream G wires this to the Galaxy.Host IPC client. +public interface IAlarmHistorianWriter +{ + /// Push a batch of events to the historian. Returns one outcome per event, same order. + Task> WriteBatchAsync( + IReadOnlyList batch, CancellationToken cancellationToken); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs new file mode 100644 index 0000000..1b56ba5 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs @@ -0,0 +1,397 @@ +using System.Text.Json; +using Microsoft.Data.Sqlite; +using Serilog; + +namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; + +/// +/// Phase 7 plan decisions #16–#17 implementation: durable SQLite queue on the node +/// absorbs every qualifying alarm event, a drain worker batches rows to Galaxy.Host +/// via on an exponential-backoff cadence, and +/// operator acks never block on the historian being reachable. +/// +/// +/// +/// Queue schema: +/// +/// CREATE TABLE Queue ( +/// RowId INTEGER PRIMARY KEY AUTOINCREMENT, +/// AlarmId TEXT NOT NULL, +/// EnqueuedUtc TEXT NOT NULL, +/// PayloadJson TEXT NOT NULL, +/// AttemptCount INTEGER NOT NULL DEFAULT 0, +/// LastAttemptUtc TEXT NULL, +/// LastError TEXT NULL, +/// DeadLettered INTEGER NOT NULL DEFAULT 0 +/// ); +/// +/// Dead-lettered rows stay in place for the configured retention window (default +/// 30 days per Phase 7 plan decision #21) so operators can inspect + manually +/// retry before the sweeper purges them. Regular queue capacity is bounded — +/// overflow evicts the oldest non-dead-lettered rows with a WARN log. +/// +/// +/// Drain runs on a shared . Exponential +/// backoff on : 1s → 2s → 5s → +/// 15s → 60s cap. rows flip +/// the DeadLettered flag on the individual row; neighbors in the batch +/// still retry on their own cadence. +/// +/// +public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable +{ + /// Default queue capacity — oldest non-dead-lettered rows evicted past this. + public const long DefaultCapacity = 1_000_000; + public static readonly TimeSpan DefaultDeadLetterRetention = TimeSpan.FromDays(30); + + private static readonly TimeSpan[] BackoffLadder = + [ + TimeSpan.FromSeconds(1), + TimeSpan.FromSeconds(2), + TimeSpan.FromSeconds(5), + TimeSpan.FromSeconds(15), + TimeSpan.FromSeconds(60), + ]; + + private readonly string _connectionString; + private readonly IAlarmHistorianWriter _writer; + private readonly ILogger _logger; + private readonly int _batchSize; + private readonly long _capacity; + private readonly TimeSpan _deadLetterRetention; + private readonly Func _clock; + + private readonly SemaphoreSlim _drainGate = new(1, 1); + private Timer? _drainTimer; + private int _backoffIndex; + private DateTime? _lastDrainUtc; + private DateTime? _lastSuccessUtc; + private string? _lastError; + private HistorianDrainState _drainState = HistorianDrainState.Idle; + private bool _disposed; + + public SqliteStoreAndForwardSink( + string databasePath, + IAlarmHistorianWriter writer, + ILogger logger, + int batchSize = 100, + long capacity = DefaultCapacity, + TimeSpan? deadLetterRetention = null, + Func? clock = null) + { + if (string.IsNullOrWhiteSpace(databasePath)) + throw new ArgumentException("Database path required.", nameof(databasePath)); + _writer = writer ?? throw new ArgumentNullException(nameof(writer)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _batchSize = batchSize > 0 ? batchSize : throw new ArgumentOutOfRangeException(nameof(batchSize)); + _capacity = capacity > 0 ? capacity : throw new ArgumentOutOfRangeException(nameof(capacity)); + _deadLetterRetention = deadLetterRetention ?? DefaultDeadLetterRetention; + _clock = clock ?? (() => DateTime.UtcNow); + _connectionString = $"Data Source={databasePath}"; + + InitializeSchema(); + } + + /// + /// Start the background drain worker. Not started automatically so tests can + /// drive deterministically. + /// + public void StartDrainLoop(TimeSpan tickInterval) + { + if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink)); + _drainTimer?.Dispose(); + _drainTimer = new Timer(_ => _ = DrainOnceAsync(CancellationToken.None), + null, tickInterval, tickInterval); + } + + public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken) + { + if (evt is null) throw new ArgumentNullException(nameof(evt)); + if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink)); + + using var conn = new SqliteConnection(_connectionString); + conn.Open(); + + EnforceCapacity(conn); + + using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + INSERT INTO Queue (AlarmId, EnqueuedUtc, PayloadJson, AttemptCount) + VALUES ($alarmId, $enqueued, $payload, 0); + """; + cmd.Parameters.AddWithValue("$alarmId", evt.AlarmId); + cmd.Parameters.AddWithValue("$enqueued", _clock().ToString("O")); + cmd.Parameters.AddWithValue("$payload", JsonSerializer.Serialize(evt)); + cmd.ExecuteNonQuery(); + return Task.CompletedTask; + } + + /// + /// Read up to queued rows, forward through the writer, + /// remove Ack'd rows, dead-letter PermanentFail rows, and extend the backoff + /// on RetryPlease. Safe to call from multiple threads; the semaphore enforces + /// serial execution. + /// + public async Task DrainOnceAsync(CancellationToken ct) + { + if (_disposed) return; + if (!await _drainGate.WaitAsync(0, ct).ConfigureAwait(false)) return; + try + { + _drainState = HistorianDrainState.Draining; + _lastDrainUtc = _clock(); + + PurgeAgedDeadLetters(); + var (rowIds, events) = ReadBatch(); + if (rowIds.Count == 0) + { + _drainState = HistorianDrainState.Idle; + return; + } + + IReadOnlyList outcomes; + try + { + outcomes = await _writer.WriteBatchAsync(events, ct).ConfigureAwait(false); + _lastError = null; + } + catch (OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + // Writer-side exception — treat entire batch as RetryPlease. + _lastError = ex.Message; + _logger.Warning(ex, "Historian writer threw on batch of {Count}; deferring retry", events.Count); + BumpBackoff(); + _drainState = HistorianDrainState.BackingOff; + return; + } + + if (outcomes.Count != events.Count) + throw new InvalidOperationException( + $"Writer returned {outcomes.Count} outcomes for {events.Count} events — expected 1:1"); + + using var conn = new SqliteConnection(_connectionString); + conn.Open(); + using var tx = conn.BeginTransaction(); + for (var i = 0; i < outcomes.Count; i++) + { + var outcome = outcomes[i]; + var rowId = rowIds[i]; + switch (outcome) + { + case HistorianWriteOutcome.Ack: + DeleteRow(conn, tx, rowId); + break; + case HistorianWriteOutcome.PermanentFail: + DeadLetterRow(conn, tx, rowId, $"permanent fail at {_clock():O}"); + break; + case HistorianWriteOutcome.RetryPlease: + BumpAttempt(conn, tx, rowId, "retry-please"); + break; + } + } + tx.Commit(); + + var acks = outcomes.Count(o => o == HistorianWriteOutcome.Ack); + if (acks > 0) _lastSuccessUtc = _clock(); + + if (outcomes.Any(o => o == HistorianWriteOutcome.RetryPlease)) + { + BumpBackoff(); + _drainState = HistorianDrainState.BackingOff; + } + else + { + ResetBackoff(); + _drainState = HistorianDrainState.Idle; + } + } + finally + { + _drainGate.Release(); + } + } + + public HistorianSinkStatus GetStatus() + { + using var conn = new SqliteConnection(_connectionString); + conn.Open(); + + long queued; + long deadlettered; + using (var cmd = conn.CreateCommand()) + { + cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0"; + queued = (long)(cmd.ExecuteScalar() ?? 0L); + } + using (var cmd = conn.CreateCommand()) + { + cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 1"; + deadlettered = (long)(cmd.ExecuteScalar() ?? 0L); + } + + return new HistorianSinkStatus( + QueueDepth: queued, + DeadLetterDepth: deadlettered, + LastDrainUtc: _lastDrainUtc, + LastSuccessUtc: _lastSuccessUtc, + LastError: _lastError, + DrainState: _drainState); + } + + /// Operator action from Admin UI — retry every dead-lettered row. Non-cascading: they rejoin the regular queue + get a fresh backoff. + public int RetryDeadLettered() + { + using var conn = new SqliteConnection(_connectionString); + conn.Open(); + using var cmd = conn.CreateCommand(); + cmd.CommandText = "UPDATE Queue SET DeadLettered = 0, AttemptCount = 0, LastError = NULL WHERE DeadLettered = 1"; + return cmd.ExecuteNonQuery(); + } + + private (List rowIds, List events) ReadBatch() + { + var rowIds = new List(); + var events = new List(); + using var conn = new SqliteConnection(_connectionString); + conn.Open(); + using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + SELECT RowId, PayloadJson FROM Queue + WHERE DeadLettered = 0 + ORDER BY RowId ASC + LIMIT $limit + """; + cmd.Parameters.AddWithValue("$limit", _batchSize); + using var reader = cmd.ExecuteReader(); + while (reader.Read()) + { + rowIds.Add(reader.GetInt64(0)); + var payload = reader.GetString(1); + var evt = JsonSerializer.Deserialize(payload); + if (evt is not null) events.Add(evt); + } + return (rowIds, events); + } + + private static void DeleteRow(SqliteConnection conn, SqliteTransaction tx, long rowId) + { + using var cmd = conn.CreateCommand(); + cmd.Transaction = tx; + cmd.CommandText = "DELETE FROM Queue WHERE RowId = $id"; + cmd.Parameters.AddWithValue("$id", rowId); + cmd.ExecuteNonQuery(); + } + + private void DeadLetterRow(SqliteConnection conn, SqliteTransaction tx, long rowId, string reason) + { + using var cmd = conn.CreateCommand(); + cmd.Transaction = tx; + cmd.CommandText = """ + UPDATE Queue SET DeadLettered = 1, LastAttemptUtc = $now, LastError = $err, AttemptCount = AttemptCount + 1 + WHERE RowId = $id + """; + cmd.Parameters.AddWithValue("$now", _clock().ToString("O")); + cmd.Parameters.AddWithValue("$err", reason); + cmd.Parameters.AddWithValue("$id", rowId); + cmd.ExecuteNonQuery(); + } + + private void BumpAttempt(SqliteConnection conn, SqliteTransaction tx, long rowId, string reason) + { + using var cmd = conn.CreateCommand(); + cmd.Transaction = tx; + cmd.CommandText = """ + UPDATE Queue SET LastAttemptUtc = $now, LastError = $err, AttemptCount = AttemptCount + 1 + WHERE RowId = $id + """; + cmd.Parameters.AddWithValue("$now", _clock().ToString("O")); + cmd.Parameters.AddWithValue("$err", reason); + cmd.Parameters.AddWithValue("$id", rowId); + cmd.ExecuteNonQuery(); + } + + private void EnforceCapacity(SqliteConnection conn) + { + // Count non-dead-lettered rows only — dead-lettered rows retain for + // post-mortem per the configured retention window. + long count; + using (var cmd = conn.CreateCommand()) + { + cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0"; + count = (long)(cmd.ExecuteScalar() ?? 0L); + } + if (count < _capacity) return; + + var toEvict = count - _capacity + 1; + using (var cmd = conn.CreateCommand()) + { + cmd.CommandText = """ + DELETE FROM Queue + WHERE RowId IN ( + SELECT RowId FROM Queue + WHERE DeadLettered = 0 + ORDER BY RowId ASC + LIMIT $n + ) + """; + cmd.Parameters.AddWithValue("$n", toEvict); + cmd.ExecuteNonQuery(); + } + _logger.Warning( + "Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room", + _capacity, toEvict); + } + + private void PurgeAgedDeadLetters() + { + var cutoff = (_clock() - _deadLetterRetention).ToString("O"); + using var conn = new SqliteConnection(_connectionString); + conn.Open(); + using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + DELETE FROM Queue + WHERE DeadLettered = 1 AND LastAttemptUtc IS NOT NULL AND LastAttemptUtc < $cutoff + """; + cmd.Parameters.AddWithValue("$cutoff", cutoff); + var purged = cmd.ExecuteNonQuery(); + if (purged > 0) + _logger.Information("Purged {Count} dead-lettered row(s) past retention window", purged); + } + + private void InitializeSchema() + { + using var conn = new SqliteConnection(_connectionString); + conn.Open(); + using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + CREATE TABLE IF NOT EXISTS Queue ( + RowId INTEGER PRIMARY KEY AUTOINCREMENT, + AlarmId TEXT NOT NULL, + EnqueuedUtc TEXT NOT NULL, + PayloadJson TEXT NOT NULL, + AttemptCount INTEGER NOT NULL DEFAULT 0, + LastAttemptUtc TEXT NULL, + LastError TEXT NULL, + DeadLettered INTEGER NOT NULL DEFAULT 0 + ); + CREATE INDEX IF NOT EXISTS IX_Queue_Drain ON Queue (DeadLettered, RowId); + """; + cmd.ExecuteNonQuery(); + } + + private void BumpBackoff() => _backoffIndex = Math.Min(_backoffIndex + 1, BackoffLadder.Length - 1); + private void ResetBackoff() => _backoffIndex = 0; + public TimeSpan CurrentBackoff => BackoffLadder[_backoffIndex]; + + public void Dispose() + { + if (_disposed) return; + _disposed = true; + _drainTimer?.Dispose(); + _drainGate.Dispose(); + } +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj new file mode 100644 index 0000000..175475d --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj @@ -0,0 +1,32 @@ + + + + net10.0 + enable + enable + latest + true + true + $(NoWarn);CS1591 + ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian + + + + + + + + + + + + + + + + + + + + + diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/Framing.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/Framing.cs index 09db862..193d771 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/Framing.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/Framing.cs @@ -60,6 +60,14 @@ public enum MessageKind : byte HostConnectivityStatus = 0x70, RuntimeStatusChange = 0x71, + // Phase 7 Stream D — historian alarm sink. Main server → Galaxy.Host batched + // writes into the Aveva Historian alarm schema via the already-loaded + // aahClientManaged DLLs. HistorianConnectivityStatus fires proactively from the + // Host when the SDK session transitions so diagnostics flip promptly. + HistorianAlarmEventRequest = 0x80, + HistorianAlarmEventResponse = 0x81, + HistorianConnectivityStatus = 0x82, + RecycleHostRequest = 0xF0, RecycleStatusResponse = 0xF1, diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/HistorianAlarms.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/HistorianAlarms.cs new file mode 100644 index 0000000..6719cdd --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/Contracts/HistorianAlarms.cs @@ -0,0 +1,92 @@ +using System; +using MessagePack; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; + +/// +/// Phase 7 Stream D — IPC contracts for routing Part 9 alarm transitions from the +/// main .NET 10 server into Galaxy.Host's already-loaded aahClientManaged +/// DLLs. Reuses the Tier-C isolation + licensing pathway rather than loading 32-bit +/// native historian code into the main server. +/// +/// +/// +/// Batched on the wire to amortize IPC overhead — the main server's SqliteStoreAndForwardSink +/// ships up to 100 events per request per Phase 7 plan Stream D.5. +/// +/// +/// Per-event outcomes (Ack / RetryPlease / PermanentFail) let the drain worker +/// dead-letter malformed events without blocking neighbors in the batch. +/// fires proactively from +/// the Host when the SDK session drops so the /hosts + /alarms/historian Admin +/// diagnostics pages flip to red promptly instead of waiting for the next +/// drain cycle. +/// +/// +[MessagePackObject] +public sealed class HistorianAlarmEventRequest +{ + [Key(0)] public HistorianAlarmEventDto[] Events { get; set; } = Array.Empty(); +} + +[MessagePackObject] +public sealed class HistorianAlarmEventResponse +{ + /// Per-event outcome, same order as the request. + [Key(0)] public HistorianAlarmEventOutcomeDto[] Outcomes { get; set; } = Array.Empty(); +} + +/// Outcome enum — bytes on the wire so it stays compact. +public enum HistorianAlarmEventOutcomeDto : byte +{ + /// Successfully persisted to the historian — remove from queue. + Ack = 0, + /// Transient failure (historian disconnected, timeout, busy) — retry after backoff. + RetryPlease = 1, + /// Permanent failure (malformed, unrecoverable SDK error) — move to dead-letter. + PermanentFail = 2, +} + +/// One alarm-transition payload. Fields mirror Core.AlarmHistorian.AlarmHistorianEvent. +[MessagePackObject] +public sealed class HistorianAlarmEventDto +{ + [Key(0)] public string AlarmId { get; set; } = string.Empty; + [Key(1)] public string EquipmentPath { get; set; } = string.Empty; + [Key(2)] public string AlarmName { get; set; } = string.Empty; + + /// Concrete Part 9 subtype name — "LimitAlarm" / "OffNormalAlarm" / "AlarmCondition" / "DiscreteAlarm". + [Key(3)] public string AlarmTypeName { get; set; } = string.Empty; + + /// Numeric severity the Host maps to the historian's priority scale. + [Key(4)] public int Severity { get; set; } + + /// Which transition this event represents — "Activated" / "Cleared" / "Acknowledged" / etc. + [Key(5)] public string EventKind { get; set; } = string.Empty; + + /// Pre-rendered message — template tokens resolved upstream. + [Key(6)] public string Message { get; set; } = string.Empty; + + /// Operator who triggered the transition. "system" for engine-driven events. + [Key(7)] public string User { get; set; } = "system"; + + /// Operator-supplied free-form comment, if any. + [Key(8)] public string? Comment { get; set; } + + /// Source timestamp (UTC Unix milliseconds). + [Key(9)] public long TimestampUtcUnixMs { get; set; } +} + +/// +/// Proactive notification — Galaxy.Host pushes this when the historian SDK session +/// transitions (connected / disconnected / degraded). The main server reflects this +/// into the historian sink status so Admin UI surfaces the problem without the +/// operator having to scrutinize drain cadence. +/// +[MessagePackObject] +public sealed class HistorianConnectivityStatusNotification +{ + [Key(0)] public string Status { get; set; } = "unknown"; // connected | disconnected | degraded + [Key(1)] public string? Detail { get; set; } + [Key(2)] public long ObservedAtUtcUnixMs { get; set; } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs new file mode 100644 index 0000000..7ed87f1 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs @@ -0,0 +1,286 @@ +using Serilog; +using Serilog.Core; +using Serilog.Events; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests; + +/// +/// Verifies the durable SQLite store-and-forward queue behind the historian sink: +/// round-trip Ack, backoff ladder on RetryPlease, dead-lettering on PermanentFail, +/// capacity eviction, and retention-based dead-letter purge. +/// +[Trait("Category", "Unit")] +public sealed class SqliteStoreAndForwardSinkTests : IDisposable +{ + private readonly string _dbPath; + private readonly ILogger _log; + + public SqliteStoreAndForwardSinkTests() + { + _dbPath = Path.Combine(Path.GetTempPath(), $"otopcua-historian-{Guid.NewGuid():N}.sqlite"); + _log = new LoggerConfiguration().MinimumLevel.Verbose().CreateLogger(); + } + + public void Dispose() + { + try { if (File.Exists(_dbPath)) File.Delete(_dbPath); } catch { } + } + + private sealed class FakeWriter : IAlarmHistorianWriter + { + public Queue NextOutcomePerEvent { get; } = new(); + public HistorianWriteOutcome DefaultOutcome { get; set; } = HistorianWriteOutcome.Ack; + public List> Batches { get; } = []; + public Exception? ThrowOnce { get; set; } + + public Task> WriteBatchAsync( + IReadOnlyList batch, CancellationToken ct) + { + if (ThrowOnce is not null) + { + var e = ThrowOnce; + ThrowOnce = null; + throw e; + } + Batches.Add(batch); + var outcomes = new List(); + for (var i = 0; i < batch.Count; i++) + outcomes.Add(NextOutcomePerEvent.Count > 0 ? NextOutcomePerEvent.Dequeue() : DefaultOutcome); + return Task.FromResult>(outcomes); + } + } + + private static AlarmHistorianEvent Event(string alarmId, DateTime? ts = null) => new( + AlarmId: alarmId, + EquipmentPath: "/Site/Line1/Cell", + AlarmName: "HighTemp", + AlarmTypeName: "LimitAlarm", + Severity: AlarmSeverity.High, + EventKind: "Activated", + Message: "temp exceeded", + User: "system", + Comment: null, + TimestampUtc: ts ?? DateTime.UtcNow); + + [Fact] + public async Task EnqueueThenDrain_Ack_removes_row() + { + var writer = new FakeWriter(); + using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); + + await sink.EnqueueAsync(Event("A1"), CancellationToken.None); + sink.GetStatus().QueueDepth.ShouldBe(1); + + await sink.DrainOnceAsync(CancellationToken.None); + + writer.Batches.Count.ShouldBe(1); + writer.Batches[0].Count.ShouldBe(1); + writer.Batches[0][0].AlarmId.ShouldBe("A1"); + var status = sink.GetStatus(); + status.QueueDepth.ShouldBe(0); + status.DeadLetterDepth.ShouldBe(0); + status.LastSuccessUtc.ShouldNotBeNull(); + } + + [Fact] + public async Task Drain_with_empty_queue_is_noop() + { + var writer = new FakeWriter(); + using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); + + await sink.DrainOnceAsync(CancellationToken.None); + + writer.Batches.ShouldBeEmpty(); + sink.GetStatus().DrainState.ShouldBe(HistorianDrainState.Idle); + } + + [Fact] + public async Task RetryPlease_bumps_backoff_and_keeps_row() + { + var writer = new FakeWriter(); + writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.RetryPlease); + using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); + + await sink.EnqueueAsync(Event("A1"), CancellationToken.None); + var before = sink.CurrentBackoff; + await sink.DrainOnceAsync(CancellationToken.None); + + sink.CurrentBackoff.ShouldBeGreaterThan(before); + sink.GetStatus().QueueDepth.ShouldBe(1, "row stays in queue for retry"); + sink.GetStatus().DrainState.ShouldBe(HistorianDrainState.BackingOff); + } + + [Fact] + public async Task Ack_after_Retry_resets_backoff() + { + var writer = new FakeWriter(); + writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.RetryPlease); + using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); + + await sink.EnqueueAsync(Event("A1"), CancellationToken.None); + await sink.DrainOnceAsync(CancellationToken.None); + sink.CurrentBackoff.ShouldBeGreaterThan(TimeSpan.FromSeconds(1) - TimeSpan.FromMilliseconds(1)); + + writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.Ack); + await sink.DrainOnceAsync(CancellationToken.None); + + sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(1)); + sink.GetStatus().QueueDepth.ShouldBe(0); + } + + [Fact] + public async Task PermanentFail_dead_letters_one_row_only() + { + var writer = new FakeWriter(); + writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail); + writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.Ack); + using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); + + await sink.EnqueueAsync(Event("bad"), CancellationToken.None); + await sink.EnqueueAsync(Event("good"), CancellationToken.None); + await sink.DrainOnceAsync(CancellationToken.None); + + var status = sink.GetStatus(); + status.QueueDepth.ShouldBe(0, "good row acked"); + status.DeadLetterDepth.ShouldBe(1, "bad row dead-lettered"); + } + + [Fact] + public async Task Writer_exception_treated_as_retry_for_whole_batch() + { + var writer = new FakeWriter { ThrowOnce = new InvalidOperationException("pipe broken") }; + using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); + + await sink.EnqueueAsync(Event("A1"), CancellationToken.None); + await sink.DrainOnceAsync(CancellationToken.None); + + var status = sink.GetStatus(); + status.QueueDepth.ShouldBe(1); + status.LastError.ShouldBe("pipe broken"); + status.DrainState.ShouldBe(HistorianDrainState.BackingOff); + + // Next drain after the writer recovers should Ack. + await sink.DrainOnceAsync(CancellationToken.None); + sink.GetStatus().QueueDepth.ShouldBe(0); + } + + [Fact] + public async Task Capacity_eviction_drops_oldest_nondeadlettered_row() + { + var writer = new FakeWriter(); + using var sink = new SqliteStoreAndForwardSink( + _dbPath, writer, _log, batchSize: 100, capacity: 3); + + await sink.EnqueueAsync(Event("A1"), CancellationToken.None); + await sink.EnqueueAsync(Event("A2"), CancellationToken.None); + await sink.EnqueueAsync(Event("A3"), CancellationToken.None); + // A4 enqueue must evict the oldest (A1). + await sink.EnqueueAsync(Event("A4"), CancellationToken.None); + + sink.GetStatus().QueueDepth.ShouldBe(3); + + await sink.DrainOnceAsync(CancellationToken.None); + var drained = writer.Batches[0].Select(e => e.AlarmId).ToArray(); + drained.ShouldNotContain("A1"); + drained.ShouldContain("A2"); + drained.ShouldContain("A3"); + drained.ShouldContain("A4"); + } + + [Fact] + public async Task Deadlettered_rows_are_purged_past_retention() + { + var now = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc); + DateTime clock = now; + + var writer = new FakeWriter(); + writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail); + using var sink = new SqliteStoreAndForwardSink( + _dbPath, writer, _log, deadLetterRetention: TimeSpan.FromDays(30), + clock: () => clock); + + await sink.EnqueueAsync(Event("bad"), CancellationToken.None); + await sink.DrainOnceAsync(CancellationToken.None); + sink.GetStatus().DeadLetterDepth.ShouldBe(1); + + // Advance past retention + tick drain (which runs PurgeAgedDeadLetters). + clock = now.AddDays(31); + await sink.DrainOnceAsync(CancellationToken.None); + + sink.GetStatus().DeadLetterDepth.ShouldBe(0, "purged past retention"); + } + + [Fact] + public async Task RetryDeadLettered_requeues_for_retry() + { + var writer = new FakeWriter(); + writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail); + using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); + + await sink.EnqueueAsync(Event("bad"), CancellationToken.None); + await sink.DrainOnceAsync(CancellationToken.None); + sink.GetStatus().DeadLetterDepth.ShouldBe(1); + + var revived = sink.RetryDeadLettered(); + revived.ShouldBe(1); + + var status = sink.GetStatus(); + status.QueueDepth.ShouldBe(1); + status.DeadLetterDepth.ShouldBe(0); + } + + [Fact] + public async Task Backoff_ladder_caps_at_60s() + { + var writer = new FakeWriter { DefaultOutcome = HistorianWriteOutcome.RetryPlease }; + using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); + + await sink.EnqueueAsync(Event("A1"), CancellationToken.None); + + // 10 retry rounds — ladder should cap at 60s. + for (var i = 0; i < 10; i++) + await sink.DrainOnceAsync(CancellationToken.None); + + sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(60)); + } + + [Fact] + public void NullAlarmHistorianSink_reports_disabled_status() + { + var s = NullAlarmHistorianSink.Instance.GetStatus(); + s.DrainState.ShouldBe(HistorianDrainState.Disabled); + s.QueueDepth.ShouldBe(0); + } + + [Fact] + public async Task NullAlarmHistorianSink_swallows_enqueue() + { + // Should not throw or persist anything. + await NullAlarmHistorianSink.Instance.EnqueueAsync(Event("A1"), CancellationToken.None); + } + + [Fact] + public void Ctor_rejects_bad_args() + { + var w = new FakeWriter(); + Should.Throw(() => new SqliteStoreAndForwardSink("", w, _log)); + Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, null!, _log)); + Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, null!)); + Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, batchSize: 0)); + Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, capacity: 0)); + } + + [Fact] + public async Task Disposed_sink_rejects_enqueue() + { + var writer = new FakeWriter(); + var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); + sink.Dispose(); + + await Should.ThrowAsync( + () => sink.EnqueueAsync(Event("A1"), CancellationToken.None)); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests.csproj b/tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests.csproj new file mode 100644 index 0000000..5f3cc4f --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests.csproj @@ -0,0 +1,31 @@ + + + + net10.0 + enable + enable + false + true + ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + + -- 2.49.1