Compare commits

...

2 Commits

Author SHA1 Message Date
Joseph Doherty
25ad4b1929 Phase 7 Stream D — Historian alarm sink (SQLite store-and-forward + Galaxy.Host IPC contracts)
Phase 7 plan decisions #16, #17, #19, #21 implementation. Durable local SQLite queue
absorbs every qualifying alarm event; drain worker forwards batches to Galaxy.Host
(reusing the already-loaded 32-bit aahClientManaged DLLs) on an exponential-backoff
cadence; operator acks never block on the historian being reachable.

## New project Core.AlarmHistorian (net10)

- AlarmHistorianEvent — source-agnostic event shape (scripted alarms + Galaxy-native +
  AB CIP ALMD + any future IAlarmSource)
- IAlarmHistorianSink / NullAlarmHistorianSink — interface + disabled default
- IAlarmHistorianWriter — per-event outcome (Ack / RetryPlease / PermanentFail); Stream G
  wires the Galaxy.Host IPC client implementation
- SqliteStoreAndForwardSink — full implementation:
  - Queue table with AttemptCount / LastError / DeadLettered columns
  - DrainOnceAsync serialised via SemaphoreSlim
  - BackoffLadder 1s → 2s → 5s → 15s → 60s (cap)
  - DefaultCapacity 1,000,000 rows — overflow evicts oldest non-dead-lettered
  - DefaultDeadLetterRetention 30 days — sweeper purges on every drain tick
  - RetryDeadLettered operator action reattaches dead-letters to the regular queue
  - Writer-side exceptions treated as whole-batch RetryPlease (no data loss)

## New IPC contracts in Driver.Galaxy.Shared

- HistorianAlarmEventRequest — batched up to 100 events/request per plan Stream D.5
- HistorianAlarmEventResponse — per-event outcome (1:1 with request order)
- HistorianAlarmEventOutcomeDto enum (byte on the wire — Ack/RetryPlease/PermanentFail)
- HistorianAlarmEventDto — mirrors Core.AlarmHistorian.AlarmHistorianEvent
- HistorianConnectivityStatusNotification — Host pushes proactively when the SDK
  session drops so /alarms/historian flips red without waiting for the next drain
- MessageKind additions: 0x80 HistorianAlarmEventRequest / 0x81 HistorianAlarmEventResponse
  / 0x82 HistorianConnectivityStatus

## Tests — 14/14

SqliteStoreAndForwardSinkTests covers: enqueue→drain→Ack round-trip, empty-queue no-op,
RetryPlease bumps backoff + keeps row, Ack after Retry resets backoff, PermanentFail
dead-letters one row without blocking neighbors, writer exception treated as whole-batch
retry with error surfaced in status, capacity eviction drops oldest non-dead-lettered,
dead-letters purged past retention window, RetryDeadLettered requeues, ladder caps at
60s after 10 retries, Null sink reports Disabled status, null sink swallows enqueue,
ctor argument validation, disposed sink rejects enqueue.

## Totals
Full Phase 7 tests: 160 green (63 Scripting + 36 VirtualTags + 47 ScriptedAlarms +
14 AlarmHistorian). Stream G wires this into the real Galaxy.Host IPC pipe.
2026-04-20 19:11:17 -04:00
51d0b27bfd Merge pull request 'Phase 7 Stream C — Core.ScriptedAlarms (Part 9 state machine + predicate engine + IAlarmSource)' (#181) from phase-7-stream-c-scripted-alarms into v2 2026-04-20 18:52:11 -04:00
9 changed files with 966 additions and 0 deletions

View File

@@ -6,6 +6,7 @@
<Project Path="src/ZB.MOM.WW.OtOpcUa.Core.Scripting/ZB.MOM.WW.OtOpcUa.Core.Scripting.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Core.VirtualTags/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Server/ZB.MOM.WW.OtOpcUa.Server.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Admin/ZB.MOM.WW.OtOpcUa.Admin.csproj"/>
<Project Path="src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.csproj"/>
@@ -32,6 +33,7 @@
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests/ZB.MOM.WW.OtOpcUa.Core.Scripting.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests/ZB.MOM.WW.OtOpcUa.Core.VirtualTags.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.Tests/ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Server.Tests/ZB.MOM.WW.OtOpcUa.Server.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Admin.Tests/ZB.MOM.WW.OtOpcUa.Admin.Tests.csproj"/>
<Project Path="tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Tests.csproj"/>

View File

@@ -0,0 +1,36 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
/// <summary>
/// The event shape the historian sink consumes — source-agnostic across scripted
/// alarms + Galaxy-native + AB CIP ALMD + any future IAlarmSource per Phase 7 plan
/// decision #15 (sink scope = all alarm sources, not just scripted). A per-alarm
/// <c>HistorizeToAveva</c> toggle on the producer side gates which events flow.
/// </summary>
/// <param name="AlarmId">Stable condition identity.</param>
/// <param name="EquipmentPath">UNS path of the Equipment node the alarm hangs under. Doubles as the "SourceNode" in Historian's alarm schema.</param>
/// <param name="AlarmName">Human-readable alarm name.</param>
/// <param name="AlarmTypeName">Concrete Part 9 subtype — "LimitAlarm" / "DiscreteAlarm" / "OffNormalAlarm" / "AlarmCondition". Used as the Historian "AlarmType" column.</param>
/// <param name="Severity">Mapped to Historian's numeric priority on the sink side.</param>
/// <param name="EventKind">
/// Which state transition this event represents — "Activated" / "Cleared" /
/// "Acknowledged" / "Confirmed" / "Shelved" / "Unshelved" / "Disabled" / "Enabled" /
/// "CommentAdded". Free-form string because different alarm sources use different
/// vocabularies; the Galaxy.Host handler maps to the historian's enum on the wire.
/// </param>
/// <param name="Message">Fully-rendered message text — template tokens already resolved upstream.</param>
/// <param name="User">Operator who triggered the transition. "system" for engine-driven events (shelving expiry, predicate change).</param>
/// <param name="Comment">Operator-supplied free-form text, if any.</param>
/// <param name="TimestampUtc">When the transition occurred.</param>
public sealed record AlarmHistorianEvent(
string AlarmId,
string EquipmentPath,
string AlarmName,
string AlarmTypeName,
AlarmSeverity Severity,
string EventKind,
string Message,
string User,
string? Comment,
DateTime TimestampUtc);

View File

@@ -0,0 +1,82 @@
namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
/// <summary>
/// The historian sink contract — where qualifying alarm events land. Phase 7 plan
/// decision #17: ingestion routes through Galaxy.Host's pipe so we reuse the
/// already-loaded <c>aahClientManaged</c> DLLs without loading 32-bit native code
/// in the main .NET 10 server. Tests use an in-memory fake; production uses
/// <see cref="SqliteStoreAndForwardSink"/>.
/// </summary>
/// <remarks>
/// <para>
/// <see cref="EnqueueAsync"/> is fire-and-forget from the engine's perspective —
/// the sink MUST NOT block the emitting thread. Production implementations
/// (<see cref="SqliteStoreAndForwardSink"/>) persist to a local SQLite queue
/// first, then drain asynchronously to the actual historian. Per Phase 7 plan
/// decision #16, failed downstream writes replay with exponential backoff;
/// operator actions are never blocked waiting on the historian.
/// </para>
/// <para>
/// <see cref="GetStatus"/> exposes queue depth + drain rate + last error
/// for the Admin UI <c>/alarms/historian</c> diagnostics page (Stream F).
/// </para>
/// </remarks>
public interface IAlarmHistorianSink
{
/// <summary>Durably enqueue the event. Returns as soon as the queue row is committed.</summary>
Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken);
/// <summary>Snapshot of current queue depth + drain health.</summary>
HistorianSinkStatus GetStatus();
}
/// <summary>No-op default for tests or deployments that don't historize alarms.</summary>
public sealed class NullAlarmHistorianSink : IAlarmHistorianSink
{
public static readonly NullAlarmHistorianSink Instance = new();
public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken) => Task.CompletedTask;
public HistorianSinkStatus GetStatus() => new(
QueueDepth: 0,
DeadLetterDepth: 0,
LastDrainUtc: null,
LastSuccessUtc: null,
LastError: null,
DrainState: HistorianDrainState.Disabled);
}
/// <summary>Diagnostic snapshot surfaced to the Admin UI + /healthz endpoints.</summary>
public sealed record HistorianSinkStatus(
long QueueDepth,
long DeadLetterDepth,
DateTime? LastDrainUtc,
DateTime? LastSuccessUtc,
string? LastError,
HistorianDrainState DrainState);
/// <summary>Where the drain worker is in its state machine.</summary>
public enum HistorianDrainState
{
Disabled,
Idle,
Draining,
BackingOff,
}
/// <summary>Signaled by the Galaxy.Host-side handler when it fails a batch — drain worker uses this to decide retry cadence.</summary>
public enum HistorianWriteOutcome
{
/// <summary>Successfully persisted to the historian. Remove from queue.</summary>
Ack,
/// <summary>Transient failure (historian disconnected, timeout, busy). Leave queued; retry after backoff.</summary>
RetryPlease,
/// <summary>Permanent failure (malformed event, unrecoverable SDK error). Move to dead-letter table.</summary>
PermanentFail,
}
/// <summary>What the drain worker delegates writes to — Stream G wires this to the Galaxy.Host IPC client.</summary>
public interface IAlarmHistorianWriter
{
/// <summary>Push a batch of events to the historian. Returns one outcome per event, same order.</summary>
Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,397 @@
using System.Text.Json;
using Microsoft.Data.Sqlite;
using Serilog;
namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
/// <summary>
/// Phase 7 plan decisions #16#17 implementation: durable SQLite queue on the node
/// absorbs every qualifying alarm event, a drain worker batches rows to Galaxy.Host
/// via <see cref="IAlarmHistorianWriter"/> on an exponential-backoff cadence, and
/// operator acks never block on the historian being reachable.
/// </summary>
/// <remarks>
/// <para>
/// Queue schema:
/// <code>
/// CREATE TABLE Queue (
/// RowId INTEGER PRIMARY KEY AUTOINCREMENT,
/// AlarmId TEXT NOT NULL,
/// EnqueuedUtc TEXT NOT NULL,
/// PayloadJson TEXT NOT NULL,
/// AttemptCount INTEGER NOT NULL DEFAULT 0,
/// LastAttemptUtc TEXT NULL,
/// LastError TEXT NULL,
/// DeadLettered INTEGER NOT NULL DEFAULT 0
/// );
/// </code>
/// Dead-lettered rows stay in place for the configured retention window (default
/// 30 days per Phase 7 plan decision #21) so operators can inspect + manually
/// retry before the sweeper purges them. Regular queue capacity is bounded —
/// overflow evicts the oldest non-dead-lettered rows with a WARN log.
/// </para>
/// <para>
/// Drain runs on a shared <see cref="System.Threading.Timer"/>. Exponential
/// backoff on <see cref="HistorianWriteOutcome.RetryPlease"/>: 1s → 2s → 5s →
/// 15s → 60s cap. <see cref="HistorianWriteOutcome.PermanentFail"/> rows flip
/// the <c>DeadLettered</c> flag on the individual row; neighbors in the batch
/// still retry on their own cadence.
/// </para>
/// </remarks>
public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
{
/// <summary>Default queue capacity — oldest non-dead-lettered rows evicted past this.</summary>
public const long DefaultCapacity = 1_000_000;
public static readonly TimeSpan DefaultDeadLetterRetention = TimeSpan.FromDays(30);
private static readonly TimeSpan[] BackoffLadder =
[
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(15),
TimeSpan.FromSeconds(60),
];
private readonly string _connectionString;
private readonly IAlarmHistorianWriter _writer;
private readonly ILogger _logger;
private readonly int _batchSize;
private readonly long _capacity;
private readonly TimeSpan _deadLetterRetention;
private readonly Func<DateTime> _clock;
private readonly SemaphoreSlim _drainGate = new(1, 1);
private Timer? _drainTimer;
private int _backoffIndex;
private DateTime? _lastDrainUtc;
private DateTime? _lastSuccessUtc;
private string? _lastError;
private HistorianDrainState _drainState = HistorianDrainState.Idle;
private bool _disposed;
public SqliteStoreAndForwardSink(
string databasePath,
IAlarmHistorianWriter writer,
ILogger logger,
int batchSize = 100,
long capacity = DefaultCapacity,
TimeSpan? deadLetterRetention = null,
Func<DateTime>? clock = null)
{
if (string.IsNullOrWhiteSpace(databasePath))
throw new ArgumentException("Database path required.", nameof(databasePath));
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_batchSize = batchSize > 0 ? batchSize : throw new ArgumentOutOfRangeException(nameof(batchSize));
_capacity = capacity > 0 ? capacity : throw new ArgumentOutOfRangeException(nameof(capacity));
_deadLetterRetention = deadLetterRetention ?? DefaultDeadLetterRetention;
_clock = clock ?? (() => DateTime.UtcNow);
_connectionString = $"Data Source={databasePath}";
InitializeSchema();
}
/// <summary>
/// Start the background drain worker. Not started automatically so tests can
/// drive <see cref="DrainOnceAsync"/> deterministically.
/// </summary>
public void StartDrainLoop(TimeSpan tickInterval)
{
if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
_drainTimer?.Dispose();
_drainTimer = new Timer(_ => _ = DrainOnceAsync(CancellationToken.None),
null, tickInterval, tickInterval);
}
public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
{
if (evt is null) throw new ArgumentNullException(nameof(evt));
if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
using var conn = new SqliteConnection(_connectionString);
conn.Open();
EnforceCapacity(conn);
using var cmd = conn.CreateCommand();
cmd.CommandText = """
INSERT INTO Queue (AlarmId, EnqueuedUtc, PayloadJson, AttemptCount)
VALUES ($alarmId, $enqueued, $payload, 0);
""";
cmd.Parameters.AddWithValue("$alarmId", evt.AlarmId);
cmd.Parameters.AddWithValue("$enqueued", _clock().ToString("O"));
cmd.Parameters.AddWithValue("$payload", JsonSerializer.Serialize(evt));
cmd.ExecuteNonQuery();
return Task.CompletedTask;
}
/// <summary>
/// Read up to <see cref="_batchSize"/> queued rows, forward through the writer,
/// remove Ack'd rows, dead-letter PermanentFail rows, and extend the backoff
/// on RetryPlease. Safe to call from multiple threads; the semaphore enforces
/// serial execution.
/// </summary>
public async Task DrainOnceAsync(CancellationToken ct)
{
if (_disposed) return;
if (!await _drainGate.WaitAsync(0, ct).ConfigureAwait(false)) return;
try
{
_drainState = HistorianDrainState.Draining;
_lastDrainUtc = _clock();
PurgeAgedDeadLetters();
var (rowIds, events) = ReadBatch();
if (rowIds.Count == 0)
{
_drainState = HistorianDrainState.Idle;
return;
}
IReadOnlyList<HistorianWriteOutcome> outcomes;
try
{
outcomes = await _writer.WriteBatchAsync(events, ct).ConfigureAwait(false);
_lastError = null;
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
// Writer-side exception — treat entire batch as RetryPlease.
_lastError = ex.Message;
_logger.Warning(ex, "Historian writer threw on batch of {Count}; deferring retry", events.Count);
BumpBackoff();
_drainState = HistorianDrainState.BackingOff;
return;
}
if (outcomes.Count != events.Count)
throw new InvalidOperationException(
$"Writer returned {outcomes.Count} outcomes for {events.Count} events — expected 1:1");
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var tx = conn.BeginTransaction();
for (var i = 0; i < outcomes.Count; i++)
{
var outcome = outcomes[i];
var rowId = rowIds[i];
switch (outcome)
{
case HistorianWriteOutcome.Ack:
DeleteRow(conn, tx, rowId);
break;
case HistorianWriteOutcome.PermanentFail:
DeadLetterRow(conn, tx, rowId, $"permanent fail at {_clock():O}");
break;
case HistorianWriteOutcome.RetryPlease:
BumpAttempt(conn, tx, rowId, "retry-please");
break;
}
}
tx.Commit();
var acks = outcomes.Count(o => o == HistorianWriteOutcome.Ack);
if (acks > 0) _lastSuccessUtc = _clock();
if (outcomes.Any(o => o == HistorianWriteOutcome.RetryPlease))
{
BumpBackoff();
_drainState = HistorianDrainState.BackingOff;
}
else
{
ResetBackoff();
_drainState = HistorianDrainState.Idle;
}
}
finally
{
_drainGate.Release();
}
}
public HistorianSinkStatus GetStatus()
{
using var conn = new SqliteConnection(_connectionString);
conn.Open();
long queued;
long deadlettered;
using (var cmd = conn.CreateCommand())
{
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
queued = (long)(cmd.ExecuteScalar() ?? 0L);
}
using (var cmd = conn.CreateCommand())
{
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 1";
deadlettered = (long)(cmd.ExecuteScalar() ?? 0L);
}
return new HistorianSinkStatus(
QueueDepth: queued,
DeadLetterDepth: deadlettered,
LastDrainUtc: _lastDrainUtc,
LastSuccessUtc: _lastSuccessUtc,
LastError: _lastError,
DrainState: _drainState);
}
/// <summary>Operator action from Admin UI — retry every dead-lettered row. Non-cascading: they rejoin the regular queue + get a fresh backoff.</summary>
public int RetryDeadLettered()
{
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var cmd = conn.CreateCommand();
cmd.CommandText = "UPDATE Queue SET DeadLettered = 0, AttemptCount = 0, LastError = NULL WHERE DeadLettered = 1";
return cmd.ExecuteNonQuery();
}
private (List<long> rowIds, List<AlarmHistorianEvent> events) ReadBatch()
{
var rowIds = new List<long>();
var events = new List<AlarmHistorianEvent>();
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var cmd = conn.CreateCommand();
cmd.CommandText = """
SELECT RowId, PayloadJson FROM Queue
WHERE DeadLettered = 0
ORDER BY RowId ASC
LIMIT $limit
""";
cmd.Parameters.AddWithValue("$limit", _batchSize);
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
rowIds.Add(reader.GetInt64(0));
var payload = reader.GetString(1);
var evt = JsonSerializer.Deserialize<AlarmHistorianEvent>(payload);
if (evt is not null) events.Add(evt);
}
return (rowIds, events);
}
private static void DeleteRow(SqliteConnection conn, SqliteTransaction tx, long rowId)
{
using var cmd = conn.CreateCommand();
cmd.Transaction = tx;
cmd.CommandText = "DELETE FROM Queue WHERE RowId = $id";
cmd.Parameters.AddWithValue("$id", rowId);
cmd.ExecuteNonQuery();
}
private void DeadLetterRow(SqliteConnection conn, SqliteTransaction tx, long rowId, string reason)
{
using var cmd = conn.CreateCommand();
cmd.Transaction = tx;
cmd.CommandText = """
UPDATE Queue SET DeadLettered = 1, LastAttemptUtc = $now, LastError = $err, AttemptCount = AttemptCount + 1
WHERE RowId = $id
""";
cmd.Parameters.AddWithValue("$now", _clock().ToString("O"));
cmd.Parameters.AddWithValue("$err", reason);
cmd.Parameters.AddWithValue("$id", rowId);
cmd.ExecuteNonQuery();
}
private void BumpAttempt(SqliteConnection conn, SqliteTransaction tx, long rowId, string reason)
{
using var cmd = conn.CreateCommand();
cmd.Transaction = tx;
cmd.CommandText = """
UPDATE Queue SET LastAttemptUtc = $now, LastError = $err, AttemptCount = AttemptCount + 1
WHERE RowId = $id
""";
cmd.Parameters.AddWithValue("$now", _clock().ToString("O"));
cmd.Parameters.AddWithValue("$err", reason);
cmd.Parameters.AddWithValue("$id", rowId);
cmd.ExecuteNonQuery();
}
private void EnforceCapacity(SqliteConnection conn)
{
// Count non-dead-lettered rows only — dead-lettered rows retain for
// post-mortem per the configured retention window.
long count;
using (var cmd = conn.CreateCommand())
{
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
count = (long)(cmd.ExecuteScalar() ?? 0L);
}
if (count < _capacity) return;
var toEvict = count - _capacity + 1;
using (var cmd = conn.CreateCommand())
{
cmd.CommandText = """
DELETE FROM Queue
WHERE RowId IN (
SELECT RowId FROM Queue
WHERE DeadLettered = 0
ORDER BY RowId ASC
LIMIT $n
)
""";
cmd.Parameters.AddWithValue("$n", toEvict);
cmd.ExecuteNonQuery();
}
_logger.Warning(
"Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room",
_capacity, toEvict);
}
private void PurgeAgedDeadLetters()
{
var cutoff = (_clock() - _deadLetterRetention).ToString("O");
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var cmd = conn.CreateCommand();
cmd.CommandText = """
DELETE FROM Queue
WHERE DeadLettered = 1 AND LastAttemptUtc IS NOT NULL AND LastAttemptUtc < $cutoff
""";
cmd.Parameters.AddWithValue("$cutoff", cutoff);
var purged = cmd.ExecuteNonQuery();
if (purged > 0)
_logger.Information("Purged {Count} dead-lettered row(s) past retention window", purged);
}
private void InitializeSchema()
{
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var cmd = conn.CreateCommand();
cmd.CommandText = """
CREATE TABLE IF NOT EXISTS Queue (
RowId INTEGER PRIMARY KEY AUTOINCREMENT,
AlarmId TEXT NOT NULL,
EnqueuedUtc TEXT NOT NULL,
PayloadJson TEXT NOT NULL,
AttemptCount INTEGER NOT NULL DEFAULT 0,
LastAttemptUtc TEXT NULL,
LastError TEXT NULL,
DeadLettered INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS IX_Queue_Drain ON Queue (DeadLettered, RowId);
""";
cmd.ExecuteNonQuery();
}
private void BumpBackoff() => _backoffIndex = Math.Min(_backoffIndex + 1, BackoffLadder.Length - 1);
private void ResetBackoff() => _backoffIndex = 0;
public TimeSpan CurrentBackoff => BackoffLadder[_backoffIndex];
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_drainTimer?.Dispose();
_drainGate.Dispose();
}
}

View File

@@ -0,0 +1,32 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<LangVersion>latest</LangVersion>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<NoWarn>$(NoWarn);CS1591</NoWarn>
<RootNamespace>ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Data.Sqlite" Version="9.0.0"/>
<PackageReference Include="Serilog" Version="4.2.0"/>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Core.Abstractions\ZB.MOM.WW.OtOpcUa.Core.Abstractions.csproj"/>
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests"/>
</ItemGroup>
<ItemGroup>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
</ItemGroup>
</Project>

View File

@@ -60,6 +60,14 @@ public enum MessageKind : byte
HostConnectivityStatus = 0x70,
RuntimeStatusChange = 0x71,
// Phase 7 Stream D — historian alarm sink. Main server → Galaxy.Host batched
// writes into the Aveva Historian alarm schema via the already-loaded
// aahClientManaged DLLs. HistorianConnectivityStatus fires proactively from the
// Host when the SDK session transitions so diagnostics flip promptly.
HistorianAlarmEventRequest = 0x80,
HistorianAlarmEventResponse = 0x81,
HistorianConnectivityStatus = 0x82,
RecycleHostRequest = 0xF0,
RecycleStatusResponse = 0xF1,

View File

@@ -0,0 +1,92 @@
using System;
using MessagePack;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
/// <summary>
/// Phase 7 Stream D — IPC contracts for routing Part 9 alarm transitions from the
/// main .NET 10 server into Galaxy.Host's already-loaded <c>aahClientManaged</c>
/// DLLs. Reuses the Tier-C isolation + licensing pathway rather than loading 32-bit
/// native historian code into the main server.
/// </summary>
/// <remarks>
/// <para>
/// Batched on the wire to amortize IPC overhead — the main server's SqliteStoreAndForwardSink
/// ships up to 100 events per request per Phase 7 plan Stream D.5.
/// </para>
/// <para>
/// Per-event outcomes (Ack / RetryPlease / PermanentFail) let the drain worker
/// dead-letter malformed events without blocking neighbors in the batch.
/// <see cref="HistorianConnectivityStatusNotification"/> fires proactively from
/// the Host when the SDK session drops so the /hosts + /alarms/historian Admin
/// diagnostics pages flip to red promptly instead of waiting for the next
/// drain cycle.
/// </para>
/// </remarks>
[MessagePackObject]
public sealed class HistorianAlarmEventRequest
{
[Key(0)] public HistorianAlarmEventDto[] Events { get; set; } = Array.Empty<HistorianAlarmEventDto>();
}
[MessagePackObject]
public sealed class HistorianAlarmEventResponse
{
/// <summary>Per-event outcome, same order as the request.</summary>
[Key(0)] public HistorianAlarmEventOutcomeDto[] Outcomes { get; set; } = Array.Empty<HistorianAlarmEventOutcomeDto>();
}
/// <summary>Outcome enum — bytes on the wire so it stays compact.</summary>
public enum HistorianAlarmEventOutcomeDto : byte
{
/// <summary>Successfully persisted to the historian — remove from queue.</summary>
Ack = 0,
/// <summary>Transient failure (historian disconnected, timeout, busy) — retry after backoff.</summary>
RetryPlease = 1,
/// <summary>Permanent failure (malformed, unrecoverable SDK error) — move to dead-letter.</summary>
PermanentFail = 2,
}
/// <summary>One alarm-transition payload. Fields mirror <c>Core.AlarmHistorian.AlarmHistorianEvent</c>.</summary>
[MessagePackObject]
public sealed class HistorianAlarmEventDto
{
[Key(0)] public string AlarmId { get; set; } = string.Empty;
[Key(1)] public string EquipmentPath { get; set; } = string.Empty;
[Key(2)] public string AlarmName { get; set; } = string.Empty;
/// <summary>Concrete Part 9 subtype name — "LimitAlarm" / "OffNormalAlarm" / "AlarmCondition" / "DiscreteAlarm".</summary>
[Key(3)] public string AlarmTypeName { get; set; } = string.Empty;
/// <summary>Numeric severity the Host maps to the historian's priority scale.</summary>
[Key(4)] public int Severity { get; set; }
/// <summary>Which transition this event represents — "Activated" / "Cleared" / "Acknowledged" / etc.</summary>
[Key(5)] public string EventKind { get; set; } = string.Empty;
/// <summary>Pre-rendered message — template tokens resolved upstream.</summary>
[Key(6)] public string Message { get; set; } = string.Empty;
/// <summary>Operator who triggered the transition. "system" for engine-driven events.</summary>
[Key(7)] public string User { get; set; } = "system";
/// <summary>Operator-supplied free-form comment, if any.</summary>
[Key(8)] public string? Comment { get; set; }
/// <summary>Source timestamp (UTC Unix milliseconds).</summary>
[Key(9)] public long TimestampUtcUnixMs { get; set; }
}
/// <summary>
/// Proactive notification — Galaxy.Host pushes this when the historian SDK session
/// transitions (connected / disconnected / degraded). The main server reflects this
/// into the historian sink status so Admin UI surfaces the problem without the
/// operator having to scrutinize drain cadence.
/// </summary>
[MessagePackObject]
public sealed class HistorianConnectivityStatusNotification
{
[Key(0)] public string Status { get; set; } = "unknown"; // connected | disconnected | degraded
[Key(1)] public string? Detail { get; set; }
[Key(2)] public long ObservedAtUtcUnixMs { get; set; }
}

View File

@@ -0,0 +1,286 @@
using Serilog;
using Serilog.Core;
using Serilog.Events;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests;
/// <summary>
/// Verifies the durable SQLite store-and-forward queue behind the historian sink:
/// round-trip Ack, backoff ladder on RetryPlease, dead-lettering on PermanentFail,
/// capacity eviction, and retention-based dead-letter purge.
/// </summary>
[Trait("Category", "Unit")]
public sealed class SqliteStoreAndForwardSinkTests : IDisposable
{
private readonly string _dbPath;
private readonly ILogger _log;
public SqliteStoreAndForwardSinkTests()
{
_dbPath = Path.Combine(Path.GetTempPath(), $"otopcua-historian-{Guid.NewGuid():N}.sqlite");
_log = new LoggerConfiguration().MinimumLevel.Verbose().CreateLogger();
}
public void Dispose()
{
try { if (File.Exists(_dbPath)) File.Delete(_dbPath); } catch { }
}
private sealed class FakeWriter : IAlarmHistorianWriter
{
public Queue<HistorianWriteOutcome> NextOutcomePerEvent { get; } = new();
public HistorianWriteOutcome DefaultOutcome { get; set; } = HistorianWriteOutcome.Ack;
public List<IReadOnlyList<AlarmHistorianEvent>> Batches { get; } = [];
public Exception? ThrowOnce { get; set; }
public Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken ct)
{
if (ThrowOnce is not null)
{
var e = ThrowOnce;
ThrowOnce = null;
throw e;
}
Batches.Add(batch);
var outcomes = new List<HistorianWriteOutcome>();
for (var i = 0; i < batch.Count; i++)
outcomes.Add(NextOutcomePerEvent.Count > 0 ? NextOutcomePerEvent.Dequeue() : DefaultOutcome);
return Task.FromResult<IReadOnlyList<HistorianWriteOutcome>>(outcomes);
}
}
private static AlarmHistorianEvent Event(string alarmId, DateTime? ts = null) => new(
AlarmId: alarmId,
EquipmentPath: "/Site/Line1/Cell",
AlarmName: "HighTemp",
AlarmTypeName: "LimitAlarm",
Severity: AlarmSeverity.High,
EventKind: "Activated",
Message: "temp exceeded",
User: "system",
Comment: null,
TimestampUtc: ts ?? DateTime.UtcNow);
[Fact]
public async Task EnqueueThenDrain_Ack_removes_row()
{
var writer = new FakeWriter();
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
sink.GetStatus().QueueDepth.ShouldBe(1);
await sink.DrainOnceAsync(CancellationToken.None);
writer.Batches.Count.ShouldBe(1);
writer.Batches[0].Count.ShouldBe(1);
writer.Batches[0][0].AlarmId.ShouldBe("A1");
var status = sink.GetStatus();
status.QueueDepth.ShouldBe(0);
status.DeadLetterDepth.ShouldBe(0);
status.LastSuccessUtc.ShouldNotBeNull();
}
[Fact]
public async Task Drain_with_empty_queue_is_noop()
{
var writer = new FakeWriter();
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.DrainOnceAsync(CancellationToken.None);
writer.Batches.ShouldBeEmpty();
sink.GetStatus().DrainState.ShouldBe(HistorianDrainState.Idle);
}
[Fact]
public async Task RetryPlease_bumps_backoff_and_keeps_row()
{
var writer = new FakeWriter();
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.RetryPlease);
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
var before = sink.CurrentBackoff;
await sink.DrainOnceAsync(CancellationToken.None);
sink.CurrentBackoff.ShouldBeGreaterThan(before);
sink.GetStatus().QueueDepth.ShouldBe(1, "row stays in queue for retry");
sink.GetStatus().DrainState.ShouldBe(HistorianDrainState.BackingOff);
}
[Fact]
public async Task Ack_after_Retry_resets_backoff()
{
var writer = new FakeWriter();
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.RetryPlease);
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
await sink.DrainOnceAsync(CancellationToken.None);
sink.CurrentBackoff.ShouldBeGreaterThan(TimeSpan.FromSeconds(1) - TimeSpan.FromMilliseconds(1));
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.Ack);
await sink.DrainOnceAsync(CancellationToken.None);
sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(1));
sink.GetStatus().QueueDepth.ShouldBe(0);
}
[Fact]
public async Task PermanentFail_dead_letters_one_row_only()
{
var writer = new FakeWriter();
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail);
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.Ack);
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.EnqueueAsync(Event("bad"), CancellationToken.None);
await sink.EnqueueAsync(Event("good"), CancellationToken.None);
await sink.DrainOnceAsync(CancellationToken.None);
var status = sink.GetStatus();
status.QueueDepth.ShouldBe(0, "good row acked");
status.DeadLetterDepth.ShouldBe(1, "bad row dead-lettered");
}
[Fact]
public async Task Writer_exception_treated_as_retry_for_whole_batch()
{
var writer = new FakeWriter { ThrowOnce = new InvalidOperationException("pipe broken") };
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
await sink.DrainOnceAsync(CancellationToken.None);
var status = sink.GetStatus();
status.QueueDepth.ShouldBe(1);
status.LastError.ShouldBe("pipe broken");
status.DrainState.ShouldBe(HistorianDrainState.BackingOff);
// Next drain after the writer recovers should Ack.
await sink.DrainOnceAsync(CancellationToken.None);
sink.GetStatus().QueueDepth.ShouldBe(0);
}
[Fact]
public async Task Capacity_eviction_drops_oldest_nondeadlettered_row()
{
var writer = new FakeWriter();
using var sink = new SqliteStoreAndForwardSink(
_dbPath, writer, _log, batchSize: 100, capacity: 3);
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
await sink.EnqueueAsync(Event("A2"), CancellationToken.None);
await sink.EnqueueAsync(Event("A3"), CancellationToken.None);
// A4 enqueue must evict the oldest (A1).
await sink.EnqueueAsync(Event("A4"), CancellationToken.None);
sink.GetStatus().QueueDepth.ShouldBe(3);
await sink.DrainOnceAsync(CancellationToken.None);
var drained = writer.Batches[0].Select(e => e.AlarmId).ToArray();
drained.ShouldNotContain("A1");
drained.ShouldContain("A2");
drained.ShouldContain("A3");
drained.ShouldContain("A4");
}
[Fact]
public async Task Deadlettered_rows_are_purged_past_retention()
{
var now = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
DateTime clock = now;
var writer = new FakeWriter();
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail);
using var sink = new SqliteStoreAndForwardSink(
_dbPath, writer, _log, deadLetterRetention: TimeSpan.FromDays(30),
clock: () => clock);
await sink.EnqueueAsync(Event("bad"), CancellationToken.None);
await sink.DrainOnceAsync(CancellationToken.None);
sink.GetStatus().DeadLetterDepth.ShouldBe(1);
// Advance past retention + tick drain (which runs PurgeAgedDeadLetters).
clock = now.AddDays(31);
await sink.DrainOnceAsync(CancellationToken.None);
sink.GetStatus().DeadLetterDepth.ShouldBe(0, "purged past retention");
}
[Fact]
public async Task RetryDeadLettered_requeues_for_retry()
{
var writer = new FakeWriter();
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail);
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.EnqueueAsync(Event("bad"), CancellationToken.None);
await sink.DrainOnceAsync(CancellationToken.None);
sink.GetStatus().DeadLetterDepth.ShouldBe(1);
var revived = sink.RetryDeadLettered();
revived.ShouldBe(1);
var status = sink.GetStatus();
status.QueueDepth.ShouldBe(1);
status.DeadLetterDepth.ShouldBe(0);
}
[Fact]
public async Task Backoff_ladder_caps_at_60s()
{
var writer = new FakeWriter { DefaultOutcome = HistorianWriteOutcome.RetryPlease };
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
// 10 retry rounds — ladder should cap at 60s.
for (var i = 0; i < 10; i++)
await sink.DrainOnceAsync(CancellationToken.None);
sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(60));
}
[Fact]
public void NullAlarmHistorianSink_reports_disabled_status()
{
var s = NullAlarmHistorianSink.Instance.GetStatus();
s.DrainState.ShouldBe(HistorianDrainState.Disabled);
s.QueueDepth.ShouldBe(0);
}
[Fact]
public async Task NullAlarmHistorianSink_swallows_enqueue()
{
// Should not throw or persist anything.
await NullAlarmHistorianSink.Instance.EnqueueAsync(Event("A1"), CancellationToken.None);
}
[Fact]
public void Ctor_rejects_bad_args()
{
var w = new FakeWriter();
Should.Throw<ArgumentException>(() => new SqliteStoreAndForwardSink("", w, _log));
Should.Throw<ArgumentNullException>(() => new SqliteStoreAndForwardSink(_dbPath, null!, _log));
Should.Throw<ArgumentNullException>(() => new SqliteStoreAndForwardSink(_dbPath, w, null!));
Should.Throw<ArgumentOutOfRangeException>(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, batchSize: 0));
Should.Throw<ArgumentOutOfRangeException>(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, capacity: 0));
}
[Fact]
public async Task Disposed_sink_rejects_enqueue()
{
var writer = new FakeWriter();
var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
sink.Dispose();
await Should.ThrowAsync<ObjectDisposedException>(
() => sink.EnqueueAsync(Event("A1"), CancellationToken.None));
}
}

View File

@@ -0,0 +1,31 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
<RootNamespace>ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="xunit.v3" Version="1.1.0"/>
<PackageReference Include="Shouldly" Version="4.3.0"/>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0"/>
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian\ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.csproj"/>
</ItemGroup>
<ItemGroup>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-37gx-xxp4-5rgx"/>
<NuGetAuditSuppress Include="https://github.com/advisories/GHSA-w3x6-4m5h-cxqf"/>
</ItemGroup>
</Project>