Files
lmxopcua/tests/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs
Joseph Doherty 25ad4b1929 Phase 7 Stream D — Historian alarm sink (SQLite store-and-forward + Galaxy.Host IPC contracts)
Phase 7 plan decisions #16, #17, #19, #21 implementation. Durable local SQLite queue
absorbs every qualifying alarm event; drain worker forwards batches to Galaxy.Host
(reusing the already-loaded 32-bit aahClientManaged DLLs) on an exponential-backoff
cadence; operator acks never block on the historian being reachable.

## New project Core.AlarmHistorian (net10)

- AlarmHistorianEvent — source-agnostic event shape (scripted alarms + Galaxy-native +
  AB CIP ALMD + any future IAlarmSource)
- IAlarmHistorianSink / NullAlarmHistorianSink — interface + disabled default
- IAlarmHistorianWriter — per-event outcome (Ack / RetryPlease / PermanentFail); Stream G
  wires the Galaxy.Host IPC client implementation
- SqliteStoreAndForwardSink — full implementation:
  - Queue table with AttemptCount / LastError / DeadLettered columns
  - DrainOnceAsync serialised via SemaphoreSlim
  - BackoffLadder 1s → 2s → 5s → 15s → 60s (cap)
  - DefaultCapacity 1,000,000 rows — overflow evicts oldest non-dead-lettered
  - DefaultDeadLetterRetention 30 days — sweeper purges on every drain tick
  - RetryDeadLettered operator action reattaches dead-letters to the regular queue
  - Writer-side exceptions treated as whole-batch RetryPlease (no data loss)

## New IPC contracts in Driver.Galaxy.Shared

- HistorianAlarmEventRequest — batched up to 100 events/request per plan Stream D.5
- HistorianAlarmEventResponse — per-event outcome (1:1 with request order)
- HistorianAlarmEventOutcomeDto enum (byte on the wire — Ack/RetryPlease/PermanentFail)
- HistorianAlarmEventDto — mirrors Core.AlarmHistorian.AlarmHistorianEvent
- HistorianConnectivityStatusNotification — Host pushes proactively when the SDK
  session drops so /alarms/historian flips red without waiting for the next drain
- MessageKind additions: 0x80 HistorianAlarmEventRequest / 0x81 HistorianAlarmEventResponse
  / 0x82 HistorianConnectivityStatus

## Tests — 14/14

SqliteStoreAndForwardSinkTests covers: enqueue→drain→Ack round-trip, empty-queue no-op,
RetryPlease bumps backoff + keeps row, Ack after Retry resets backoff, PermanentFail
dead-letters one row without blocking neighbors, writer exception treated as whole-batch
retry with error surfaced in status, capacity eviction drops oldest non-dead-lettered,
dead-letters purged past retention window, RetryDeadLettered requeues, ladder caps at
60s after 10 retries, Null sink reports Disabled status, null sink swallows enqueue,
ctor argument validation, disposed sink rejects enqueue.

## Totals
Full Phase 7 tests: 160 green (63 Scripting + 36 VirtualTags + 47 ScriptedAlarms +
14 AlarmHistorian). Stream G wires this into the real Galaxy.Host IPC pipe.
2026-04-20 19:11:17 -04:00

287 lines
11 KiB
C#

using Serilog;
using Serilog.Core;
using Serilog.Events;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests;
/// <summary>
/// Verifies the durable SQLite store-and-forward queue behind the historian sink:
/// round-trip Ack, backoff ladder on RetryPlease, dead-lettering on PermanentFail,
/// capacity eviction, and retention-based dead-letter purge.
/// </summary>
[Trait("Category", "Unit")]
public sealed class SqliteStoreAndForwardSinkTests : IDisposable
{
private readonly string _dbPath;
private readonly ILogger _log;
public SqliteStoreAndForwardSinkTests()
{
_dbPath = Path.Combine(Path.GetTempPath(), $"otopcua-historian-{Guid.NewGuid():N}.sqlite");
_log = new LoggerConfiguration().MinimumLevel.Verbose().CreateLogger();
}
public void Dispose()
{
try { if (File.Exists(_dbPath)) File.Delete(_dbPath); } catch { }
}
private sealed class FakeWriter : IAlarmHistorianWriter
{
public Queue<HistorianWriteOutcome> NextOutcomePerEvent { get; } = new();
public HistorianWriteOutcome DefaultOutcome { get; set; } = HistorianWriteOutcome.Ack;
public List<IReadOnlyList<AlarmHistorianEvent>> Batches { get; } = [];
public Exception? ThrowOnce { get; set; }
public Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken ct)
{
if (ThrowOnce is not null)
{
var e = ThrowOnce;
ThrowOnce = null;
throw e;
}
Batches.Add(batch);
var outcomes = new List<HistorianWriteOutcome>();
for (var i = 0; i < batch.Count; i++)
outcomes.Add(NextOutcomePerEvent.Count > 0 ? NextOutcomePerEvent.Dequeue() : DefaultOutcome);
return Task.FromResult<IReadOnlyList<HistorianWriteOutcome>>(outcomes);
}
}
private static AlarmHistorianEvent Event(string alarmId, DateTime? ts = null) => new(
AlarmId: alarmId,
EquipmentPath: "/Site/Line1/Cell",
AlarmName: "HighTemp",
AlarmTypeName: "LimitAlarm",
Severity: AlarmSeverity.High,
EventKind: "Activated",
Message: "temp exceeded",
User: "system",
Comment: null,
TimestampUtc: ts ?? DateTime.UtcNow);
[Fact]
public async Task EnqueueThenDrain_Ack_removes_row()
{
var writer = new FakeWriter();
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
sink.GetStatus().QueueDepth.ShouldBe(1);
await sink.DrainOnceAsync(CancellationToken.None);
writer.Batches.Count.ShouldBe(1);
writer.Batches[0].Count.ShouldBe(1);
writer.Batches[0][0].AlarmId.ShouldBe("A1");
var status = sink.GetStatus();
status.QueueDepth.ShouldBe(0);
status.DeadLetterDepth.ShouldBe(0);
status.LastSuccessUtc.ShouldNotBeNull();
}
[Fact]
public async Task Drain_with_empty_queue_is_noop()
{
var writer = new FakeWriter();
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.DrainOnceAsync(CancellationToken.None);
writer.Batches.ShouldBeEmpty();
sink.GetStatus().DrainState.ShouldBe(HistorianDrainState.Idle);
}
[Fact]
public async Task RetryPlease_bumps_backoff_and_keeps_row()
{
var writer = new FakeWriter();
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.RetryPlease);
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
var before = sink.CurrentBackoff;
await sink.DrainOnceAsync(CancellationToken.None);
sink.CurrentBackoff.ShouldBeGreaterThan(before);
sink.GetStatus().QueueDepth.ShouldBe(1, "row stays in queue for retry");
sink.GetStatus().DrainState.ShouldBe(HistorianDrainState.BackingOff);
}
[Fact]
public async Task Ack_after_Retry_resets_backoff()
{
var writer = new FakeWriter();
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.RetryPlease);
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
await sink.DrainOnceAsync(CancellationToken.None);
sink.CurrentBackoff.ShouldBeGreaterThan(TimeSpan.FromSeconds(1) - TimeSpan.FromMilliseconds(1));
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.Ack);
await sink.DrainOnceAsync(CancellationToken.None);
sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(1));
sink.GetStatus().QueueDepth.ShouldBe(0);
}
[Fact]
public async Task PermanentFail_dead_letters_one_row_only()
{
var writer = new FakeWriter();
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail);
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.Ack);
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.EnqueueAsync(Event("bad"), CancellationToken.None);
await sink.EnqueueAsync(Event("good"), CancellationToken.None);
await sink.DrainOnceAsync(CancellationToken.None);
var status = sink.GetStatus();
status.QueueDepth.ShouldBe(0, "good row acked");
status.DeadLetterDepth.ShouldBe(1, "bad row dead-lettered");
}
[Fact]
public async Task Writer_exception_treated_as_retry_for_whole_batch()
{
var writer = new FakeWriter { ThrowOnce = new InvalidOperationException("pipe broken") };
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
await sink.DrainOnceAsync(CancellationToken.None);
var status = sink.GetStatus();
status.QueueDepth.ShouldBe(1);
status.LastError.ShouldBe("pipe broken");
status.DrainState.ShouldBe(HistorianDrainState.BackingOff);
// Next drain after the writer recovers should Ack.
await sink.DrainOnceAsync(CancellationToken.None);
sink.GetStatus().QueueDepth.ShouldBe(0);
}
[Fact]
public async Task Capacity_eviction_drops_oldest_nondeadlettered_row()
{
var writer = new FakeWriter();
using var sink = new SqliteStoreAndForwardSink(
_dbPath, writer, _log, batchSize: 100, capacity: 3);
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
await sink.EnqueueAsync(Event("A2"), CancellationToken.None);
await sink.EnqueueAsync(Event("A3"), CancellationToken.None);
// A4 enqueue must evict the oldest (A1).
await sink.EnqueueAsync(Event("A4"), CancellationToken.None);
sink.GetStatus().QueueDepth.ShouldBe(3);
await sink.DrainOnceAsync(CancellationToken.None);
var drained = writer.Batches[0].Select(e => e.AlarmId).ToArray();
drained.ShouldNotContain("A1");
drained.ShouldContain("A2");
drained.ShouldContain("A3");
drained.ShouldContain("A4");
}
[Fact]
public async Task Deadlettered_rows_are_purged_past_retention()
{
var now = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
DateTime clock = now;
var writer = new FakeWriter();
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail);
using var sink = new SqliteStoreAndForwardSink(
_dbPath, writer, _log, deadLetterRetention: TimeSpan.FromDays(30),
clock: () => clock);
await sink.EnqueueAsync(Event("bad"), CancellationToken.None);
await sink.DrainOnceAsync(CancellationToken.None);
sink.GetStatus().DeadLetterDepth.ShouldBe(1);
// Advance past retention + tick drain (which runs PurgeAgedDeadLetters).
clock = now.AddDays(31);
await sink.DrainOnceAsync(CancellationToken.None);
sink.GetStatus().DeadLetterDepth.ShouldBe(0, "purged past retention");
}
[Fact]
public async Task RetryDeadLettered_requeues_for_retry()
{
var writer = new FakeWriter();
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail);
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.EnqueueAsync(Event("bad"), CancellationToken.None);
await sink.DrainOnceAsync(CancellationToken.None);
sink.GetStatus().DeadLetterDepth.ShouldBe(1);
var revived = sink.RetryDeadLettered();
revived.ShouldBe(1);
var status = sink.GetStatus();
status.QueueDepth.ShouldBe(1);
status.DeadLetterDepth.ShouldBe(0);
}
[Fact]
public async Task Backoff_ladder_caps_at_60s()
{
var writer = new FakeWriter { DefaultOutcome = HistorianWriteOutcome.RetryPlease };
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
// 10 retry rounds — ladder should cap at 60s.
for (var i = 0; i < 10; i++)
await sink.DrainOnceAsync(CancellationToken.None);
sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(60));
}
[Fact]
public void NullAlarmHistorianSink_reports_disabled_status()
{
var s = NullAlarmHistorianSink.Instance.GetStatus();
s.DrainState.ShouldBe(HistorianDrainState.Disabled);
s.QueueDepth.ShouldBe(0);
}
[Fact]
public async Task NullAlarmHistorianSink_swallows_enqueue()
{
// Should not throw or persist anything.
await NullAlarmHistorianSink.Instance.EnqueueAsync(Event("A1"), CancellationToken.None);
}
[Fact]
public void Ctor_rejects_bad_args()
{
var w = new FakeWriter();
Should.Throw<ArgumentException>(() => new SqliteStoreAndForwardSink("", w, _log));
Should.Throw<ArgumentNullException>(() => new SqliteStoreAndForwardSink(_dbPath, null!, _log));
Should.Throw<ArgumentNullException>(() => new SqliteStoreAndForwardSink(_dbPath, w, null!));
Should.Throw<ArgumentOutOfRangeException>(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, batchSize: 0));
Should.Throw<ArgumentOutOfRangeException>(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, capacity: 0));
}
[Fact]
public async Task Disposed_sink_rejects_enqueue()
{
var writer = new FakeWriter();
var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
sink.Dispose();
await Should.ThrowAsync<ObjectDisposedException>(
() => sink.EnqueueAsync(Event("A1"), CancellationToken.None));
}
}