using Serilog; using Serilog.Core; using Serilog.Events; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests; /// /// Verifies the durable SQLite store-and-forward queue behind the historian sink: /// round-trip Ack, backoff ladder on RetryPlease, dead-lettering on PermanentFail, /// capacity eviction, and retention-based dead-letter purge. /// [Trait("Category", "Unit")] public sealed class SqliteStoreAndForwardSinkTests : IDisposable { private readonly string _dbPath; private readonly ILogger _log; public SqliteStoreAndForwardSinkTests() { _dbPath = Path.Combine(Path.GetTempPath(), $"otopcua-historian-{Guid.NewGuid():N}.sqlite"); _log = new LoggerConfiguration().MinimumLevel.Verbose().CreateLogger(); } public void Dispose() { try { if (File.Exists(_dbPath)) File.Delete(_dbPath); } catch { } } private sealed class FakeWriter : IAlarmHistorianWriter { public Queue NextOutcomePerEvent { get; } = new(); public HistorianWriteOutcome DefaultOutcome { get; set; } = HistorianWriteOutcome.Ack; public List> Batches { get; } = []; public Exception? ThrowOnce { get; set; } public Task> WriteBatchAsync( IReadOnlyList batch, CancellationToken ct) { if (ThrowOnce is not null) { var e = ThrowOnce; ThrowOnce = null; throw e; } Batches.Add(batch); var outcomes = new List(); for (var i = 0; i < batch.Count; i++) outcomes.Add(NextOutcomePerEvent.Count > 0 ? NextOutcomePerEvent.Dequeue() : DefaultOutcome); return Task.FromResult>(outcomes); } } private static AlarmHistorianEvent Event(string alarmId, DateTime? ts = null) => new( AlarmId: alarmId, EquipmentPath: "/Site/Line1/Cell", AlarmName: "HighTemp", AlarmTypeName: "LimitAlarm", Severity: AlarmSeverity.High, EventKind: "Activated", Message: "temp exceeded", User: "system", Comment: null, TimestampUtc: ts ?? DateTime.UtcNow); [Fact] public async Task EnqueueThenDrain_Ack_removes_row() { var writer = new FakeWriter(); using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); await sink.EnqueueAsync(Event("A1"), CancellationToken.None); sink.GetStatus().QueueDepth.ShouldBe(1); await sink.DrainOnceAsync(CancellationToken.None); writer.Batches.Count.ShouldBe(1); writer.Batches[0].Count.ShouldBe(1); writer.Batches[0][0].AlarmId.ShouldBe("A1"); var status = sink.GetStatus(); status.QueueDepth.ShouldBe(0); status.DeadLetterDepth.ShouldBe(0); status.LastSuccessUtc.ShouldNotBeNull(); } [Fact] public async Task Drain_with_empty_queue_is_noop() { var writer = new FakeWriter(); using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); await sink.DrainOnceAsync(CancellationToken.None); writer.Batches.ShouldBeEmpty(); sink.GetStatus().DrainState.ShouldBe(HistorianDrainState.Idle); } [Fact] public async Task RetryPlease_bumps_backoff_and_keeps_row() { var writer = new FakeWriter(); writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.RetryPlease); using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); await sink.EnqueueAsync(Event("A1"), CancellationToken.None); var before = sink.CurrentBackoff; await sink.DrainOnceAsync(CancellationToken.None); sink.CurrentBackoff.ShouldBeGreaterThan(before); sink.GetStatus().QueueDepth.ShouldBe(1, "row stays in queue for retry"); sink.GetStatus().DrainState.ShouldBe(HistorianDrainState.BackingOff); } [Fact] public async Task Ack_after_Retry_resets_backoff() { var writer = new FakeWriter(); writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.RetryPlease); using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); await sink.EnqueueAsync(Event("A1"), CancellationToken.None); await sink.DrainOnceAsync(CancellationToken.None); sink.CurrentBackoff.ShouldBeGreaterThan(TimeSpan.FromSeconds(1) - TimeSpan.FromMilliseconds(1)); writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.Ack); await sink.DrainOnceAsync(CancellationToken.None); sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(1)); sink.GetStatus().QueueDepth.ShouldBe(0); } [Fact] public async Task PermanentFail_dead_letters_one_row_only() { var writer = new FakeWriter(); writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail); writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.Ack); using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); await sink.EnqueueAsync(Event("bad"), CancellationToken.None); await sink.EnqueueAsync(Event("good"), CancellationToken.None); await sink.DrainOnceAsync(CancellationToken.None); var status = sink.GetStatus(); status.QueueDepth.ShouldBe(0, "good row acked"); status.DeadLetterDepth.ShouldBe(1, "bad row dead-lettered"); } [Fact] public async Task Writer_exception_treated_as_retry_for_whole_batch() { var writer = new FakeWriter { ThrowOnce = new InvalidOperationException("pipe broken") }; using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); await sink.EnqueueAsync(Event("A1"), CancellationToken.None); await sink.DrainOnceAsync(CancellationToken.None); var status = sink.GetStatus(); status.QueueDepth.ShouldBe(1); status.LastError.ShouldBe("pipe broken"); status.DrainState.ShouldBe(HistorianDrainState.BackingOff); // Next drain after the writer recovers should Ack. await sink.DrainOnceAsync(CancellationToken.None); sink.GetStatus().QueueDepth.ShouldBe(0); } [Fact] public async Task Capacity_eviction_drops_oldest_nondeadlettered_row() { var writer = new FakeWriter(); using var sink = new SqliteStoreAndForwardSink( _dbPath, writer, _log, batchSize: 100, capacity: 3); await sink.EnqueueAsync(Event("A1"), CancellationToken.None); await sink.EnqueueAsync(Event("A2"), CancellationToken.None); await sink.EnqueueAsync(Event("A3"), CancellationToken.None); // A4 enqueue must evict the oldest (A1). await sink.EnqueueAsync(Event("A4"), CancellationToken.None); sink.GetStatus().QueueDepth.ShouldBe(3); await sink.DrainOnceAsync(CancellationToken.None); var drained = writer.Batches[0].Select(e => e.AlarmId).ToArray(); drained.ShouldNotContain("A1"); drained.ShouldContain("A2"); drained.ShouldContain("A3"); drained.ShouldContain("A4"); } [Fact] public async Task Deadlettered_rows_are_purged_past_retention() { var now = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc); DateTime clock = now; var writer = new FakeWriter(); writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail); using var sink = new SqliteStoreAndForwardSink( _dbPath, writer, _log, deadLetterRetention: TimeSpan.FromDays(30), clock: () => clock); await sink.EnqueueAsync(Event("bad"), CancellationToken.None); await sink.DrainOnceAsync(CancellationToken.None); sink.GetStatus().DeadLetterDepth.ShouldBe(1); // Advance past retention + tick drain (which runs PurgeAgedDeadLetters). clock = now.AddDays(31); await sink.DrainOnceAsync(CancellationToken.None); sink.GetStatus().DeadLetterDepth.ShouldBe(0, "purged past retention"); } [Fact] public async Task RetryDeadLettered_requeues_for_retry() { var writer = new FakeWriter(); writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail); using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); await sink.EnqueueAsync(Event("bad"), CancellationToken.None); await sink.DrainOnceAsync(CancellationToken.None); sink.GetStatus().DeadLetterDepth.ShouldBe(1); var revived = sink.RetryDeadLettered(); revived.ShouldBe(1); var status = sink.GetStatus(); status.QueueDepth.ShouldBe(1); status.DeadLetterDepth.ShouldBe(0); } [Fact] public async Task Backoff_ladder_caps_at_60s() { var writer = new FakeWriter { DefaultOutcome = HistorianWriteOutcome.RetryPlease }; using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); await sink.EnqueueAsync(Event("A1"), CancellationToken.None); // 10 retry rounds — ladder should cap at 60s. for (var i = 0; i < 10; i++) await sink.DrainOnceAsync(CancellationToken.None); sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(60)); } [Fact] public void NullAlarmHistorianSink_reports_disabled_status() { var s = NullAlarmHistorianSink.Instance.GetStatus(); s.DrainState.ShouldBe(HistorianDrainState.Disabled); s.QueueDepth.ShouldBe(0); } [Fact] public async Task NullAlarmHistorianSink_swallows_enqueue() { // Should not throw or persist anything. await NullAlarmHistorianSink.Instance.EnqueueAsync(Event("A1"), CancellationToken.None); } [Fact] public void Ctor_rejects_bad_args() { var w = new FakeWriter(); Should.Throw(() => new SqliteStoreAndForwardSink("", w, _log)); Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, null!, _log)); Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, null!)); Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, batchSize: 0)); Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, capacity: 0)); } [Fact] public async Task Disposed_sink_rejects_enqueue() { var writer = new FakeWriter(); var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); sink.Dispose(); await Should.ThrowAsync( () => sink.EnqueueAsync(Event("A1"), CancellationToken.None)); } }