using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging.Abstractions; using ScadaLink.Commons.Interfaces.Services; using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.StoreAndForward.Tests; /// /// Audit Log #23 — M3 Bundle E Tasks E4 + E5: the store-and-forward retry /// loop invokes after every /// cached-call attempt. The observer is given a /// derived from the underlying /// ; the audit bridge then materialises /// the right CachedCallTelemetry packet (Attempted on every retry, /// CachedResolve on terminal transitions). Tests run with /// DefaultRetryInterval=Zero so the timer-driven retry sweep is /// short-circuited by directly invoking /// . /// public class CachedCallAttemptEmissionTests : IAsyncLifetime, IDisposable { private readonly SqliteConnection _keepAlive; private readonly StoreAndForwardStorage _storage; private readonly StoreAndForwardService _service; private readonly StoreAndForwardOptions _options; private readonly CapturingObserver _observer; public CachedCallAttemptEmissionTests() { var dbName = $"E4Tests_{Guid.NewGuid():N}"; var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; _keepAlive = new SqliteConnection(connStr); _keepAlive.Open(); _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); _options = new StoreAndForwardOptions { DefaultRetryInterval = TimeSpan.Zero, DefaultMaxRetries = 3, RetryTimerInterval = TimeSpan.FromMinutes(10), }; _observer = new CapturingObserver(); _service = new StoreAndForwardService( _storage, _options, NullLogger.Instance, replication: null, cachedCallObserver: _observer, siteId: "site-77"); } public async Task InitializeAsync() => await _storage.InitializeAsync(); public Task DisposeAsync() => Task.CompletedTask; public void Dispose() => _keepAlive.Dispose(); /// /// Captures every observer notification so tests can assert on the /// emitted lifecycle sequence. /// private sealed class CapturingObserver : ICachedCallLifecycleObserver { public List Notifications { get; } = new(); public Exception? ThrowOnNotify { get; set; } public Task OnAttemptCompletedAsync(CachedCallAttemptContext context, CancellationToken ct = default) { if (ThrowOnNotify != null) { return Task.FromException(ThrowOnNotify); } lock (Notifications) { Notifications.Add(context); } return Task.CompletedTask; } } private async Task EnqueueBufferedAsync( StoreAndForwardCategory category, string target, int maxRetries = 3) { // The TrackedOperationId is the S&F message id (Bundle E3 contract). var trackedId = TrackedOperationId.New(); await _service.EnqueueAsync( category, target, """{"payload":"x"}""", originInstanceName: "Plant.Pump42", maxRetries: maxRetries, retryInterval: TimeSpan.Zero, attemptImmediateDelivery: false, messageId: trackedId.ToString()); return trackedId; } // ── Task E4: per-attempt observer notifications ── [Fact] public async Task Attempt_FailWithHttp500_EmitsAttemptedTelemetry() { // ExternalSystem cached call buffered, retry sweep encounters a // transient failure on the first attempt. _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("HTTP 500 from ERP")); var trackedId = await EnqueueBufferedAsync( StoreAndForwardCategory.ExternalSystem, "ERP", maxRetries: 5); await _service.RetryPendingMessagesAsync(); var notification = Assert.Single(_observer.Notifications); Assert.Equal(trackedId, notification.TrackedOperationId); Assert.Equal("ApiOutbound", notification.Channel); Assert.Equal("ERP", notification.Target); Assert.Equal("site-77", notification.SourceSite); Assert.Equal(CachedCallAttemptOutcome.TransientFailure, notification.Outcome); Assert.Equal(1, notification.RetryCount); Assert.Contains("HTTP 500", notification.LastError); Assert.Equal("Plant.Pump42", notification.SourceInstanceId); } [Fact] public async Task Attempt_Success_EmitsDeliveredOutcome() { // ExternalSystem cached call buffered, retry sweep delivers the // message successfully on its first attempt. _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => Task.FromResult(true)); var trackedId = await EnqueueBufferedAsync( StoreAndForwardCategory.ExternalSystem, "ERP"); await _service.RetryPendingMessagesAsync(); var notification = Assert.Single(_observer.Notifications); Assert.Equal(trackedId, notification.TrackedOperationId); Assert.Equal(CachedCallAttemptOutcome.Delivered, notification.Outcome); Assert.Null(notification.LastError); } [Fact] public async Task Attempt_PermanentFailure_EmitsPermanentFailureOutcome() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => Task.FromResult(false)); var trackedId = await EnqueueBufferedAsync( StoreAndForwardCategory.ExternalSystem, "ERP"); await _service.RetryPendingMessagesAsync(); var notification = Assert.Single(_observer.Notifications); Assert.Equal(trackedId, notification.TrackedOperationId); Assert.Equal(CachedCallAttemptOutcome.PermanentFailure, notification.Outcome); } [Fact] public async Task Attempt_CachedDbWrite_EmitsDbOutboundChannel() { _service.RegisterDeliveryHandler(StoreAndForwardCategory.CachedDbWrite, _ => Task.FromResult(true)); var trackedId = await EnqueueBufferedAsync( StoreAndForwardCategory.CachedDbWrite, "myDb"); await _service.RetryPendingMessagesAsync(); var notification = Assert.Single(_observer.Notifications); Assert.Equal(trackedId, notification.TrackedOperationId); Assert.Equal("DbOutbound", notification.Channel); Assert.Equal("myDb", notification.Target); } [Fact] public async Task Attempt_NotificationCategory_NoObserverNotification() { // Notifications are NOT cached calls — they're forwarded to central via // a separate forwarder. The observer must not fire for Notification // category messages. _service.RegisterDeliveryHandler(StoreAndForwardCategory.Notification, _ => Task.FromResult(true)); await _service.EnqueueAsync( StoreAndForwardCategory.Notification, "alerts", """{"subject":"x"}""", originInstanceName: "Plant.Pump42", attemptImmediateDelivery: false); await _service.RetryPendingMessagesAsync(); Assert.Empty(_observer.Notifications); } [Fact] public async Task Attempt_MessageIdNotAGuid_NoObserverNotification() { // Pre-M3 cached calls (no TrackedOperationId threaded in) use a random // GUID-N message id from S&F itself. We should still emit (M3 expects // post-rollout these are tracked) — BUT pre-rollout messages can have // a non-parseable id, in which case the observer is silently skipped // to keep S&F bookkeeping intact. _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => Task.FromResult(true)); await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "ERP", """{}""", originInstanceName: "Plant.Pump42", attemptImmediateDelivery: false, messageId: "not-a-valid-guid-id"); await _service.RetryPendingMessagesAsync(); Assert.Empty(_observer.Notifications); } // ── Task E5: terminal-state observer notifications ── [Fact] public async Task Terminal_Delivered_EmitsResolveWithDeliveredStatus() { // A successful retry produces a single Delivered observer notification // — the audit bridge maps this to both an Attempted-Delivered audit row // and the terminal CachedResolve(Delivered) row. The S&F layer fires // ONE notification per attempt and lets the bridge fan out as needed. _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => Task.FromResult(true)); var trackedId = await EnqueueBufferedAsync( StoreAndForwardCategory.ExternalSystem, "ERP"); await _service.RetryPendingMessagesAsync(); var notification = Assert.Single(_observer.Notifications); Assert.Equal(trackedId, notification.TrackedOperationId); Assert.Equal(CachedCallAttemptOutcome.Delivered, notification.Outcome); } [Fact] public async Task Terminal_Parked_OnMaxRetries_EmitsParkedMaxRetries() { // Configure handler to throw transient every time. _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("Connection refused")); var trackedId = await EnqueueBufferedAsync( StoreAndForwardCategory.ExternalSystem, "ERP", maxRetries: 2); // Two sweeps -> RetryCount climbs to 2 -> parked on the second sweep. await _service.RetryPendingMessagesAsync(); await _service.RetryPendingMessagesAsync(); Assert.Equal(2, _observer.Notifications.Count); Assert.Equal(CachedCallAttemptOutcome.TransientFailure, _observer.Notifications[0].Outcome); Assert.Equal(CachedCallAttemptOutcome.ParkedMaxRetries, _observer.Notifications[1].Outcome); Assert.Equal(trackedId, _observer.Notifications[1].TrackedOperationId); } [Fact] public async Task Lifecycle_RetryFail_RetrySucceed_EmitsExpectedSequence() { var calls = 0; _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => { calls++; if (calls == 1) throw new HttpRequestException("transient"); return Task.FromResult(true); }); var trackedId = await EnqueueBufferedAsync( StoreAndForwardCategory.ExternalSystem, "ERP", maxRetries: 5); await _service.RetryPendingMessagesAsync(); await _service.RetryPendingMessagesAsync(); Assert.Equal(2, _observer.Notifications.Count); Assert.Equal(CachedCallAttemptOutcome.TransientFailure, _observer.Notifications[0].Outcome); Assert.Equal(1, _observer.Notifications[0].RetryCount); Assert.Equal(CachedCallAttemptOutcome.Delivered, _observer.Notifications[1].Outcome); Assert.Equal(trackedId, _observer.Notifications[1].TrackedOperationId); } // ── Audit Log #23 (ExecutionId Task 4): ExecutionId / SourceScript ── [Fact] public async Task Attempt_CarriesExecutionIdAndSourceScript_FromBufferedMessage() { // A buffered cached call carries the originating script execution's // ExecutionId + SourceScript. The retry sweep must surface both on the // CachedCallAttemptContext handed to the observer so the audit bridge // can stamp them on the retry-loop cached rows. var executionId = Guid.NewGuid(); _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("HTTP 503")); var trackedId = TrackedOperationId.New(); await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "ERP", """{"payload":"x"}""", originInstanceName: "Plant.Pump42", maxRetries: 5, retryInterval: TimeSpan.Zero, attemptImmediateDelivery: false, messageId: trackedId.ToString(), executionId: executionId, sourceScript: "Plant.Pump42/OnTick"); await _service.RetryPendingMessagesAsync(); var notification = Assert.Single(_observer.Notifications); Assert.Equal(executionId, notification.ExecutionId); Assert.Equal("Plant.Pump42/OnTick", notification.SourceScript); } [Fact] public async Task Attempt_NullExecutionIdAndSourceScript_SurfaceAsNull() { // Back-compat: a row buffered without ExecutionId / SourceScript (legacy // enqueue path) must surface them as null on the context, not throw. _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => Task.FromResult(true)); var trackedId = await EnqueueBufferedAsync( StoreAndForwardCategory.ExternalSystem, "ERP"); await _service.RetryPendingMessagesAsync(); var notification = Assert.Single(_observer.Notifications); Assert.Null(notification.ExecutionId); Assert.Null(notification.SourceScript); } [Fact] public async Task TerminalResolve_CarriesExecutionIdAndSourceScript() { // The terminal Delivered notification must also carry the threaded // provenance so the CachedResolve audit row is correlated. var executionId = Guid.NewGuid(); _service.RegisterDeliveryHandler(StoreAndForwardCategory.CachedDbWrite, _ => Task.FromResult(true)); var trackedId = TrackedOperationId.New(); await _service.EnqueueAsync( StoreAndForwardCategory.CachedDbWrite, "myDb", """{"payload":"x"}""", originInstanceName: "Plant.Tank", maxRetries: 3, retryInterval: TimeSpan.Zero, attemptImmediateDelivery: false, messageId: trackedId.ToString(), executionId: executionId, sourceScript: "Plant.Tank/OnAlarm"); await _service.RetryPendingMessagesAsync(); var notification = Assert.Single(_observer.Notifications); Assert.Equal(CachedCallAttemptOutcome.Delivered, notification.Outcome); Assert.Equal(executionId, notification.ExecutionId); Assert.Equal("Plant.Tank/OnAlarm", notification.SourceScript); } // ── Audit Log #23 (ParentExecutionId Task 6): ParentExecutionId ── [Fact] public async Task Attempt_CarriesParentExecutionId_FromBufferedMessage() { // A cached call enqueued from an inbound-API-routed script run carries // the spawning execution's ParentExecutionId. The retry sweep must // surface it on the CachedCallAttemptContext beside ExecutionId so the // audit bridge can stamp it on the retry-loop cached rows. var parentExecutionId = Guid.NewGuid(); _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => throw new HttpRequestException("HTTP 503")); var trackedId = TrackedOperationId.New(); await _service.EnqueueAsync( StoreAndForwardCategory.ExternalSystem, "ERP", """{"payload":"x"}""", originInstanceName: "Plant.Pump42", maxRetries: 5, retryInterval: TimeSpan.Zero, attemptImmediateDelivery: false, messageId: trackedId.ToString(), parentExecutionId: parentExecutionId); await _service.RetryPendingMessagesAsync(); var notification = Assert.Single(_observer.Notifications); Assert.Equal(parentExecutionId, notification.ParentExecutionId); } [Fact] public async Task Attempt_NullParentExecutionId_SurfacesAsNull() { // Non-routed run: the originating script was not spawned by an // inbound-API request, so no ParentExecutionId is threaded. It must // surface as null on the context, not throw. _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => Task.FromResult(true)); var trackedId = await EnqueueBufferedAsync( StoreAndForwardCategory.ExternalSystem, "ERP"); await _service.RetryPendingMessagesAsync(); var notification = Assert.Single(_observer.Notifications); Assert.Null(notification.ParentExecutionId); } [Fact] public async Task TerminalResolve_CarriesParentExecutionId() { // The terminal Delivered notification must also carry the threaded // ParentExecutionId so the CachedResolve audit row correlates back to // the spawning inbound-API execution. var parentExecutionId = Guid.NewGuid(); _service.RegisterDeliveryHandler(StoreAndForwardCategory.CachedDbWrite, _ => Task.FromResult(true)); var trackedId = TrackedOperationId.New(); await _service.EnqueueAsync( StoreAndForwardCategory.CachedDbWrite, "myDb", """{"payload":"x"}""", originInstanceName: "Plant.Tank", maxRetries: 3, retryInterval: TimeSpan.Zero, attemptImmediateDelivery: false, messageId: trackedId.ToString(), parentExecutionId: parentExecutionId); await _service.RetryPendingMessagesAsync(); var notification = Assert.Single(_observer.Notifications); Assert.Equal(CachedCallAttemptOutcome.Delivered, notification.Outcome); Assert.Equal(parentExecutionId, notification.ParentExecutionId); } // ── Best-effort contract: observer throws must NOT corrupt retry bookkeeping ── [Fact] public async Task Observer_Throws_DoesNotCorruptRetryCount() { _observer.ThrowOnNotify = new InvalidOperationException("simulated audit failure"); _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => Task.FromResult(true)); var trackedId = await EnqueueBufferedAsync( StoreAndForwardCategory.ExternalSystem, "ERP"); // Must not throw — observer is best-effort. await _service.RetryPendingMessagesAsync(); // The message was delivered (handler returned true) so it should be gone. var msg = await _storage.GetMessageByIdAsync(trackedId.ToString()); Assert.Null(msg); } }