diff --git a/src/ScadaLink.Commons/Interfaces/Services/ICachedCallLifecycleObserver.cs b/src/ScadaLink.Commons/Interfaces/Services/ICachedCallLifecycleObserver.cs new file mode 100644 index 0000000..8e34cb4 --- /dev/null +++ b/src/ScadaLink.Commons/Interfaces/Services/ICachedCallLifecycleObserver.cs @@ -0,0 +1,93 @@ +using ScadaLink.Commons.Types; + +namespace ScadaLink.Commons.Interfaces.Services; + +/// +/// Audit Log #23 (M3 Bundle E — Tasks E4/E5): site-side hook the +/// store-and-forward retry loop invokes after every cached-call attempt and +/// at terminal-state transitions, so the audit pipeline can emit +/// ApiCallCached/DbWriteCached per-attempt rows and the +/// CachedResolve terminal row under the original +/// . +/// +/// +/// +/// The interface deliberately uses +/// rather than so the +/// S&F project does not need to depend on the audit vocabulary — the +/// bridge living in ScadaLink.AuditLog maps the outcome to the right +/// audit kind + status when materialising the CachedCallTelemetry +/// packet. +/// +/// +/// Best-effort contract (alog.md §7): implementations MUST swallow +/// internal failures rather than propagating to the S&F service — a +/// thrown observer must not be misclassified as a transient delivery +/// failure and must not corrupt the retry-count bookkeeping. +/// +/// +public interface ICachedCallLifecycleObserver +{ + /// + /// Called by the store-and-forward retry loop after every cached-call + /// delivery attempt. Receives the message's TrackedOperationId-bearing id, + /// the per-category channel discriminator, retry-count + last-error + /// context, and whether the outcome reached a terminal state. + /// + Task OnAttemptCompletedAsync(CachedCallAttemptContext context, CancellationToken ct = default); +} + +/// +/// Per-attempt context handed to . +/// +/// +/// Tracking id parsed from the underlying StoreAndForwardMessage.Id. +/// +/// +/// Trust-boundary channel string — "ApiOutbound" for ExternalSystem +/// cached calls, "DbOutbound" for cached DB writes. +/// +/// Human-readable target (system name / DB connection). +/// Site id that submitted the cached call. +/// Per-attempt outcome. +/// Number of retries performed so far (S&F bookkeeping). +/// Most recent error message (null on success). +/// Most recent HTTP status (null when not applicable). +/// When the underlying S&F message was first enqueued. +/// When this attempt completed. +/// Duration of the attempt in milliseconds (null when not measured). +/// Originating instance, when known. +public sealed record CachedCallAttemptContext( + TrackedOperationId TrackedOperationId, + string Channel, + string Target, + string SourceSite, + CachedCallAttemptOutcome Outcome, + int RetryCount, + string? LastError, + int? HttpStatus, + DateTime CreatedAtUtc, + DateTime OccurredAtUtc, + int? DurationMs, + string? SourceInstanceId); + +/// +/// Coarse outcome of one cached-call delivery attempt, observed from inside +/// the store-and-forward retry loop. The audit bridge maps this to the +/// ApiCallCached/DbWriteCached Attempted row and, when terminal, +/// the corresponding CachedResolve row. +/// +public enum CachedCallAttemptOutcome +{ + /// Attempt delivered successfully — terminal Delivered state. + Delivered, + + /// Attempt failed transiently; another retry will follow. + TransientFailure, + + /// Attempt returned permanent failure — terminal Parked state (S&F semantics). + PermanentFailure, + + /// Retry budget exhausted — terminal Parked state. + ParkedMaxRetries, +} diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs index 4a38216..70f6c23 100644 --- a/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs @@ -1,4 +1,6 @@ using Microsoft.Extensions.Logging; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.StoreAndForward; @@ -33,6 +35,19 @@ public class StoreAndForwardService private readonly StoreAndForwardOptions _options; private readonly ReplicationService? _replication; private readonly ILogger _logger; + /// + /// Audit Log #23 (M3 Bundle E — Task E4): site-side observer notified + /// after every cached-call delivery attempt. Optional — when null no + /// telemetry is emitted; the legacy pre-M3 retry loop behaviour is + /// preserved exactly. + /// + private readonly ICachedCallLifecycleObserver? _cachedCallObserver; + /// + /// Audit Log #23 (M3 Bundle E — Task E4): site id stamped onto the + /// cached-call attempt context so the audit bridge can build the + /// half of the telemetry packet. + /// + private readonly string _siteId; private Timer? _retryTimer; private int _retryInProgress; @@ -63,12 +78,16 @@ public class StoreAndForwardService StoreAndForwardStorage storage, StoreAndForwardOptions options, ILogger logger, - ReplicationService? replication = null) + ReplicationService? replication = null, + ICachedCallLifecycleObserver? cachedCallObserver = null, + string siteId = "") { _storage = storage; _options = options; _logger = logger; _replication = replication; + _cachedCallObserver = cachedCallObserver; + _siteId = siteId; } /// @@ -280,15 +299,33 @@ public class StoreAndForwardService return; } + // Audit Log #23 (M3 Bundle E — Tasks E4/E5): measure per-attempt + // duration so the audit row carries a meaningful DurationMs. Captured + // around the handler invocation only — storage / replication overhead + // is excluded. + var attemptStartUtc = DateTime.UtcNow; + var attemptStopwatch = System.Diagnostics.Stopwatch.StartNew(); + try { var success = await handler(message); + attemptStopwatch.Stop(); if (success) { await _storage.RemoveMessageAsync(message.Id); _replication?.ReplicateRemove(message.Id); RaiseActivity("Delivered", message.Category, $"Delivered to {message.Target} after {message.RetryCount} retries"); + + // M3: terminal Delivered observer notification — the audit + // bridge maps this to Attempted + CachedResolve(Delivered). + await NotifyCachedCallObserverAsync( + message, + CachedCallAttemptOutcome.Delivered, + lastError: null, + httpStatus: null, + occurredAtUtc: attemptStartUtc, + durationMs: (int)attemptStopwatch.ElapsedMilliseconds); return; } @@ -311,9 +348,20 @@ public class StoreAndForwardService _replication?.ReplicatePark(message); RaiseActivity("Parked", message.Category, $"Permanent failure for {message.Target}: handler returned false"); + + // M3: terminal PermanentFailure observer notification — the + // audit bridge maps this to Attempted(Failed) + CachedResolve(Parked). + await NotifyCachedCallObserverAsync( + message, + CachedCallAttemptOutcome.PermanentFailure, + lastError: message.LastError, + httpStatus: null, + occurredAtUtc: attemptStartUtc, + durationMs: (int)attemptStopwatch.ElapsedMilliseconds); } catch (Exception ex) { + attemptStopwatch.Stop(); // Transient failure — increment retry, check max message.RetryCount++; message.LastAttemptAt = DateTimeOffset.UtcNow; @@ -339,6 +387,16 @@ public class StoreAndForwardService _logger.LogWarning( "Message {MessageId} parked after {MaxRetries} retries to {Target}", message.Id, message.MaxRetries, message.Target); + + // M3: terminal ParkedMaxRetries observer notification — the + // audit bridge maps this to Attempted(Failed) + CachedResolve(Parked). + await NotifyCachedCallObserverAsync( + message, + CachedCallAttemptOutcome.ParkedMaxRetries, + lastError: ex.Message, + httpStatus: null, + occurredAtUtc: attemptStartUtc, + durationMs: (int)attemptStopwatch.ElapsedMilliseconds); } else { @@ -355,10 +413,113 @@ public class StoreAndForwardService } RaiseActivity("Retried", message.Category, $"Retry {message.RetryCount}/{message.MaxRetries} for {message.Target}: {ex.Message}"); + + // M3: per-attempt TransientFailure observer notification — + // the audit bridge maps this to Attempted(Failed). + await NotifyCachedCallObserverAsync( + message, + CachedCallAttemptOutcome.TransientFailure, + lastError: ex.Message, + httpStatus: null, + occurredAtUtc: attemptStartUtc, + durationMs: (int)attemptStopwatch.ElapsedMilliseconds); } } } + /// + /// Audit Log #23 (M3 Bundle E — Tasks E4/E5): notify the registered + /// of the just-completed + /// attempt. Only fires for cached-call categories + /// ( and + /// ); the + /// category has its + /// own central-side audit pipeline (Notification Outbox / #21) and must + /// not surface on this hook. + /// + /// + /// Best-effort: an observer that throws is logged and swallowed so a + /// failing audit pipeline cannot corrupt S&F retry bookkeeping + /// (alog.md §7 contract). Messages whose ids are not valid GUIDs (pre-M3 + /// callers that didn't thread a TrackedOperationId in) are silently + /// skipped — the observer requires a parseable id by contract. + /// + private async Task NotifyCachedCallObserverAsync( + StoreAndForwardMessage message, + CachedCallAttemptOutcome outcome, + string? lastError, + int? httpStatus, + DateTime occurredAtUtc, + int? durationMs) + { + if (_cachedCallObserver == null) + { + return; + } + + // Only cached-call categories generate audit telemetry on this hook — + // notifications have their own outbox-side audit pipeline. + var channel = message.Category switch + { + StoreAndForwardCategory.ExternalSystem => "ApiOutbound", + StoreAndForwardCategory.CachedDbWrite => "DbOutbound", + _ => null, + }; + if (channel is null) + { + return; + } + + if (!TrackedOperationId.TryParse(message.Id, out var trackedId)) + { + // Pre-M3 message (random GUID-N id from S&F itself, no + // TrackedOperationId threaded in). Skip — no audit row to bind to. + return; + } + + CachedCallAttemptContext context; + try + { + context = new CachedCallAttemptContext( + TrackedOperationId: trackedId, + Channel: channel, + Target: message.Target, + SourceSite: _siteId, + Outcome: outcome, + RetryCount: message.RetryCount, + LastError: lastError, + HttpStatus: httpStatus, + CreatedAtUtc: message.CreatedAt.UtcDateTime, + OccurredAtUtc: DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc), + DurationMs: durationMs, + SourceInstanceId: message.OriginInstanceName); + } + catch (Exception buildEx) + { + // Defensive — record construction shouldn't throw, but the alog.md + // §7 contract requires this path be exception-safe regardless. + _logger.LogWarning(buildEx, + "Failed to build cached-call attempt context for {MessageId}; observer skipped", + message.Id); + return; + } + + try + { + await _cachedCallObserver.OnAttemptCompletedAsync(context, CancellationToken.None) + .ConfigureAwait(false); + } + catch (Exception ex) + { + // alog.md §7 best-effort: an audit observer outage must NEVER be + // misclassified as a transient delivery failure or corrupt the + // S&F retry bookkeeping. + _logger.LogWarning(ex, + "ICachedCallLifecycleObserver threw for {MessageId} (Outcome {Outcome}); ignored", + message.Id, outcome); + } + } + /// /// WP-12: Gets parked messages for central query (Pattern 8). /// diff --git a/tests/ScadaLink.StoreAndForward.Tests/CachedCallAttemptEmissionTests.cs b/tests/ScadaLink.StoreAndForward.Tests/CachedCallAttemptEmissionTests.cs new file mode 100644 index 0000000..05a8233 --- /dev/null +++ b/tests/ScadaLink.StoreAndForward.Tests/CachedCallAttemptEmissionTests.cs @@ -0,0 +1,298 @@ +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.StoreAndForward.Tests; + +/// +/// Audit Log #23 — M3 Bundle E Tasks E4 + E5: the store-and-forward retry +/// loop invokes after every +/// cached-call attempt. The observer is given a +/// derived from the underlying +/// ; the audit bridge then materialises +/// the right CachedCallTelemetry packet (Attempted on every retry, +/// CachedResolve on terminal transitions). Tests run with +/// DefaultRetryInterval=Zero so the timer-driven retry sweep is +/// short-circuited by directly invoking +/// . +/// +public class CachedCallAttemptEmissionTests : IAsyncLifetime, IDisposable +{ + private readonly SqliteConnection _keepAlive; + private readonly StoreAndForwardStorage _storage; + private readonly StoreAndForwardService _service; + private readonly StoreAndForwardOptions _options; + private readonly CapturingObserver _observer; + + public CachedCallAttemptEmissionTests() + { + var dbName = $"E4Tests_{Guid.NewGuid():N}"; + var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; + _keepAlive = new SqliteConnection(connStr); + _keepAlive.Open(); + + _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); + + _options = new StoreAndForwardOptions + { + DefaultRetryInterval = TimeSpan.Zero, + DefaultMaxRetries = 3, + RetryTimerInterval = TimeSpan.FromMinutes(10), + }; + + _observer = new CapturingObserver(); + + _service = new StoreAndForwardService( + _storage, + _options, + NullLogger.Instance, + replication: null, + cachedCallObserver: _observer, + siteId: "site-77"); + } + + public async Task InitializeAsync() => await _storage.InitializeAsync(); + public Task DisposeAsync() => Task.CompletedTask; + public void Dispose() => _keepAlive.Dispose(); + + /// + /// Captures every observer notification so tests can assert on the + /// emitted lifecycle sequence. + /// + private sealed class CapturingObserver : ICachedCallLifecycleObserver + { + public List Notifications { get; } = new(); + public Exception? ThrowOnNotify { get; set; } + + public Task OnAttemptCompletedAsync(CachedCallAttemptContext context, CancellationToken ct = default) + { + if (ThrowOnNotify != null) + { + return Task.FromException(ThrowOnNotify); + } + lock (Notifications) + { + Notifications.Add(context); + } + return Task.CompletedTask; + } + } + + private async Task EnqueueBufferedAsync( + StoreAndForwardCategory category, string target, int maxRetries = 3) + { + // The TrackedOperationId is the S&F message id (Bundle E3 contract). + var trackedId = TrackedOperationId.New(); + await _service.EnqueueAsync( + category, + target, + """{"payload":"x"}""", + originInstanceName: "Plant.Pump42", + maxRetries: maxRetries, + retryInterval: TimeSpan.Zero, + attemptImmediateDelivery: false, + messageId: trackedId.ToString()); + return trackedId; + } + + // ── Task E4: per-attempt observer notifications ── + + [Fact] + public async Task Attempt_FailWithHttp500_EmitsAttemptedTelemetry() + { + // ExternalSystem cached call buffered, retry sweep encounters a + // transient failure on the first attempt. + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => throw new HttpRequestException("HTTP 500 from ERP")); + var trackedId = await EnqueueBufferedAsync( + StoreAndForwardCategory.ExternalSystem, "ERP", maxRetries: 5); + + await _service.RetryPendingMessagesAsync(); + + var notification = Assert.Single(_observer.Notifications); + Assert.Equal(trackedId, notification.TrackedOperationId); + Assert.Equal("ApiOutbound", notification.Channel); + Assert.Equal("ERP", notification.Target); + Assert.Equal("site-77", notification.SourceSite); + Assert.Equal(CachedCallAttemptOutcome.TransientFailure, notification.Outcome); + Assert.Equal(1, notification.RetryCount); + Assert.Contains("HTTP 500", notification.LastError); + Assert.Equal("Plant.Pump42", notification.SourceInstanceId); + } + + [Fact] + public async Task Attempt_Success_EmitsDeliveredOutcome() + { + // ExternalSystem cached call buffered, retry sweep delivers the + // message successfully on its first attempt. + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => Task.FromResult(true)); + var trackedId = await EnqueueBufferedAsync( + StoreAndForwardCategory.ExternalSystem, "ERP"); + + await _service.RetryPendingMessagesAsync(); + + var notification = Assert.Single(_observer.Notifications); + Assert.Equal(trackedId, notification.TrackedOperationId); + Assert.Equal(CachedCallAttemptOutcome.Delivered, notification.Outcome); + Assert.Null(notification.LastError); + } + + [Fact] + public async Task Attempt_PermanentFailure_EmitsPermanentFailureOutcome() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => Task.FromResult(false)); + var trackedId = await EnqueueBufferedAsync( + StoreAndForwardCategory.ExternalSystem, "ERP"); + + await _service.RetryPendingMessagesAsync(); + + var notification = Assert.Single(_observer.Notifications); + Assert.Equal(trackedId, notification.TrackedOperationId); + Assert.Equal(CachedCallAttemptOutcome.PermanentFailure, notification.Outcome); + } + + [Fact] + public async Task Attempt_CachedDbWrite_EmitsDbOutboundChannel() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.CachedDbWrite, + _ => Task.FromResult(true)); + var trackedId = await EnqueueBufferedAsync( + StoreAndForwardCategory.CachedDbWrite, "myDb"); + + await _service.RetryPendingMessagesAsync(); + + var notification = Assert.Single(_observer.Notifications); + Assert.Equal(trackedId, notification.TrackedOperationId); + Assert.Equal("DbOutbound", notification.Channel); + Assert.Equal("myDb", notification.Target); + } + + [Fact] + public async Task Attempt_NotificationCategory_NoObserverNotification() + { + // Notifications are NOT cached calls — they're forwarded to central via + // a separate forwarder. The observer must not fire for Notification + // category messages. + _service.RegisterDeliveryHandler(StoreAndForwardCategory.Notification, + _ => Task.FromResult(true)); + await _service.EnqueueAsync( + StoreAndForwardCategory.Notification, + "alerts", + """{"subject":"x"}""", + originInstanceName: "Plant.Pump42", + attemptImmediateDelivery: false); + + await _service.RetryPendingMessagesAsync(); + + Assert.Empty(_observer.Notifications); + } + + [Fact] + public async Task Attempt_MessageIdNotAGuid_NoObserverNotification() + { + // Pre-M3 cached calls (no TrackedOperationId threaded in) use a random + // GUID-N message id from S&F itself. We should still emit (M3 expects + // post-rollout these are tracked) — BUT pre-rollout messages can have + // a non-parseable id, in which case the observer is silently skipped + // to keep S&F bookkeeping intact. + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => Task.FromResult(true)); + await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, + "ERP", + """{}""", + originInstanceName: "Plant.Pump42", + attemptImmediateDelivery: false, + messageId: "not-a-valid-guid-id"); + + await _service.RetryPendingMessagesAsync(); + + Assert.Empty(_observer.Notifications); + } + + // ── Task E5: terminal-state observer notifications ── + + [Fact] + public async Task Terminal_Delivered_EmitsResolveWithDeliveredStatus() + { + // A successful retry produces a single Delivered observer notification + // — the audit bridge maps this to both an Attempted-Delivered audit row + // and the terminal CachedResolve(Delivered) row. The S&F layer fires + // ONE notification per attempt and lets the bridge fan out as needed. + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => Task.FromResult(true)); + var trackedId = await EnqueueBufferedAsync( + StoreAndForwardCategory.ExternalSystem, "ERP"); + + await _service.RetryPendingMessagesAsync(); + + var notification = Assert.Single(_observer.Notifications); + Assert.Equal(trackedId, notification.TrackedOperationId); + Assert.Equal(CachedCallAttemptOutcome.Delivered, notification.Outcome); + } + + [Fact] + public async Task Terminal_Parked_OnMaxRetries_EmitsParkedMaxRetries() + { + // Configure handler to throw transient every time. + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => throw new HttpRequestException("Connection refused")); + var trackedId = await EnqueueBufferedAsync( + StoreAndForwardCategory.ExternalSystem, "ERP", maxRetries: 2); + + // Two sweeps -> RetryCount climbs to 2 -> parked on the second sweep. + await _service.RetryPendingMessagesAsync(); + await _service.RetryPendingMessagesAsync(); + + Assert.Equal(2, _observer.Notifications.Count); + Assert.Equal(CachedCallAttemptOutcome.TransientFailure, _observer.Notifications[0].Outcome); + Assert.Equal(CachedCallAttemptOutcome.ParkedMaxRetries, _observer.Notifications[1].Outcome); + Assert.Equal(trackedId, _observer.Notifications[1].TrackedOperationId); + } + + [Fact] + public async Task Lifecycle_RetryFail_RetrySucceed_EmitsExpectedSequence() + { + var calls = 0; + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, _ => + { + calls++; + if (calls == 1) throw new HttpRequestException("transient"); + return Task.FromResult(true); + }); + var trackedId = await EnqueueBufferedAsync( + StoreAndForwardCategory.ExternalSystem, "ERP", maxRetries: 5); + + await _service.RetryPendingMessagesAsync(); + await _service.RetryPendingMessagesAsync(); + + Assert.Equal(2, _observer.Notifications.Count); + Assert.Equal(CachedCallAttemptOutcome.TransientFailure, _observer.Notifications[0].Outcome); + Assert.Equal(1, _observer.Notifications[0].RetryCount); + Assert.Equal(CachedCallAttemptOutcome.Delivered, _observer.Notifications[1].Outcome); + Assert.Equal(trackedId, _observer.Notifications[1].TrackedOperationId); + } + + // ── Best-effort contract: observer throws must NOT corrupt retry bookkeeping ── + + [Fact] + public async Task Observer_Throws_DoesNotCorruptRetryCount() + { + _observer.ThrowOnNotify = new InvalidOperationException("simulated audit failure"); + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => Task.FromResult(true)); + var trackedId = await EnqueueBufferedAsync( + StoreAndForwardCategory.ExternalSystem, "ERP"); + + // Must not throw — observer is best-effort. + await _service.RetryPendingMessagesAsync(); + + // The message was delivered (handler returned true) so it should be gone. + var msg = await _storage.GetMessageByIdAsync(trackedId.ToString()); + Assert.Null(msg); + } +}