From 2145b29d4d992a3ffaf3287fddcb6f8ff19fce60 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 14:41:15 -0400 Subject: [PATCH] feat(auditlog): CachedCallTelemetryForwarder for site-side dual emission (#23 M3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sister to SiteAuditTelemetryActor: takes a combined CachedCallTelemetry packet and fans it out to the two site-local stores. * AuditEvent half writes through IAuditWriter (the M2 FallbackAuditWriter + SqliteAuditWriter chain — same site SQLite hot-path as sync calls). * SiteCallOperational half maps Audit.Kind to the matching IOperationTrackingStore method: - CachedSubmit -> RecordEnqueueAsync (insert-if-not-exists) - ApiCallCached / DbWriteCached -> RecordAttemptAsync (monotonic) - CachedResolve -> RecordTerminalAsync (first-write-wins) Best-effort contract (alog.md §7): independent try/catch per half so a thrown writer cannot starve the tracking row (and vice-versa); both failures are logged at warning level and swallowed — the calling script never sees them. Wire push deferred to M6 — the NoOp ISiteStreamAuditClient binding stays in effect; the forwarder writes only to the local stores in M3. The existing SiteAuditTelemetryActor drain loop will sweep the audit rows once a real gRPC client lands. Bundle E task E2. --- .../Telemetry/CachedCallTelemetryForwarder.cs | 161 ++++++++++++ .../CachedCallTelemetryForwarderTests.cs | 245 ++++++++++++++++++ 2 files changed, 406 insertions(+) create mode 100644 src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs create mode 100644 tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallTelemetryForwarderTests.cs diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs b/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs new file mode 100644 index 0000000..27192d2 --- /dev/null +++ b/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs @@ -0,0 +1,161 @@ +using Microsoft.Extensions.Logging; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Messages.Integration; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.AuditLog.Site.Telemetry; + +/// +/// Site-side dual emitter for cached-call lifecycle telemetry (Audit Log #23 / +/// M3). Sister to : where the M2 actor +/// drains audit-only events, this forwarder takes a combined +/// packet and fans it out to the two +/// site-local stores in a single call: +/// +/// The row is written via +/// (the site FallbackAuditWriter + +/// SqliteAuditWriter chain established in M2). +/// The operational half +/// updates the site-local OperationTracking SQLite store via +/// , with the per-lifecycle method +/// (Enqueue / Attempt / Terminal) selected from the +/// audit row's . +/// +/// +/// +/// +/// Best-effort contract (alog.md §7): a thrown writer OR a thrown +/// tracking store must never propagate to the calling script. Both emission +/// halves are wrapped in independent try/catch blocks so a SQLite outage on +/// one side cannot starve the other — the failure is logged and the call +/// returns normally. +/// +/// +/// Wire push deferred to M6. M3 keeps this forwarder synchronous +/// against the local stores: there is no site→central gRPC channel yet, so +/// the RPC +/// is registered on the interface (Bundle E1) but the production binding +/// remains NoOpSiteStreamAuditClient. Once M6 wires a real client the +/// drain pattern from SiteAuditTelemetryActor can be reused — the +/// AuditEvent rows already live in SQLite tagged +/// , so a single drain loop sweeps +/// both M2 and M3 emissions. +/// +/// +public sealed class CachedCallTelemetryForwarder +{ + private readonly IAuditWriter _auditWriter; + private readonly IOperationTrackingStore _trackingStore; + private readonly ILogger _logger; + + public CachedCallTelemetryForwarder( + IAuditWriter auditWriter, + IOperationTrackingStore trackingStore, + ILogger logger) + { + _auditWriter = auditWriter ?? throw new ArgumentNullException(nameof(auditWriter)); + _trackingStore = trackingStore ?? throw new ArgumentNullException(nameof(trackingStore)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + /// Fan out one combined-telemetry packet to the audit writer and the + /// tracking store. Returns once both halves have been attempted (success + /// OR logged failure). NEVER throws — exceptions are caught per-half and + /// logged at warning level so the calling script's outbound action is not + /// disturbed. + /// + public async Task ForwardAsync(CachedCallTelemetry telemetry, CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(telemetry); + + // Independent try/catch — a thrown audit writer must not prevent the + // tracking-store update from running (and vice-versa). Both halves + // are best-effort. + await TryEmitAuditAsync(telemetry, ct).ConfigureAwait(false); + await TryEmitTrackingAsync(telemetry, ct).ConfigureAwait(false); + } + + private async Task TryEmitAuditAsync(CachedCallTelemetry telemetry, CancellationToken ct) + { + try + { + await _auditWriter.WriteAsync(telemetry.Audit, ct).ConfigureAwait(false); + } + catch (Exception ex) + { + // alog.md §7 best-effort contract — log and swallow. The audit + // pipeline's own retry/recovery (RingBufferFallback in the + // FallbackAuditWriter) handles transient writer failures upstream; + // a throw bubbling up here means the writer's own swallow contract + // failed, which is itself best-effort-handled. + _logger.LogWarning(ex, + "CachedCallTelemetryForwarder: audit emission threw for EventId {EventId} (Kind {Kind}, Status {Status})", + telemetry.Audit.EventId, telemetry.Audit.Kind, telemetry.Audit.Status); + } + } + + private async Task TryEmitTrackingAsync(CachedCallTelemetry telemetry, CancellationToken ct) + { + try + { + switch (telemetry.Audit.Kind) + { + case AuditKind.CachedSubmit: + // Enqueue — insert-if-not-exists with the operational + // channel as the kind discriminator. RetryCount is fixed + // at 0 by the tracking store's INSERT contract. + await _trackingStore.RecordEnqueueAsync( + telemetry.Operational.TrackedOperationId, + telemetry.Operational.Channel, + telemetry.Operational.Target, + telemetry.Audit.SourceInstanceId, + telemetry.Audit.SourceScript, + ct).ConfigureAwait(false); + break; + + case AuditKind.ApiCallCached: + case AuditKind.DbWriteCached: + // Attempt — advance retry counter + last-error/HTTP-status. + // Terminal rows are guarded by the store's WHERE clause. + await _trackingStore.RecordAttemptAsync( + telemetry.Operational.TrackedOperationId, + telemetry.Operational.Status, + telemetry.Operational.RetryCount, + telemetry.Operational.LastError, + telemetry.Operational.HttpStatus, + ct).ConfigureAwait(false); + break; + + case AuditKind.CachedResolve: + // Terminal — first-write-wins on the resolve flip. + await _trackingStore.RecordTerminalAsync( + telemetry.Operational.TrackedOperationId, + telemetry.Operational.Status, + telemetry.Operational.LastError, + telemetry.Operational.HttpStatus, + ct).ConfigureAwait(false); + break; + + default: + // Defensive — only the four cached-lifecycle kinds are + // expected on this path. Anything else is logged so a + // mis-routed packet is visible but never crashes the + // forwarder. + _logger.LogWarning( + "CachedCallTelemetryForwarder: unexpected audit kind {Kind} on tracking emission for EventId {EventId}", + telemetry.Audit.Kind, telemetry.Audit.EventId); + break; + } + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "CachedCallTelemetryForwarder: tracking-store emission threw for TrackedOperationId {Id} (Status {Status})", + telemetry.Operational.TrackedOperationId, telemetry.Operational.Status); + } + } +} diff --git a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallTelemetryForwarderTests.cs b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallTelemetryForwarderTests.cs new file mode 100644 index 0000000..61cc353 --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallTelemetryForwarderTests.cs @@ -0,0 +1,245 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using NSubstitute.ExceptionExtensions; +using ScadaLink.AuditLog.Site.Telemetry; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Messages.Integration; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.AuditLog.Tests.Site.Telemetry; + +/// +/// Bundle E E2 tests for . The +/// forwarder is the site-side dual emitter: every cached-call lifecycle event +/// writes one to and one +/// operational tracking-row mutation to . +/// Audit-emission contract: best-effort — a thrown writer or tracking store +/// must be logged and swallowed; the forwarder must never propagate the +/// exception to the calling script. +/// +public class CachedCallTelemetryForwarderTests +{ + private readonly IAuditWriter _writer = Substitute.For(); + private readonly IOperationTrackingStore _tracking = Substitute.For(); + private readonly TrackedOperationId _id = TrackedOperationId.New(); + private readonly DateTime _now = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc); + + private CachedCallTelemetryForwarder CreateSut() => new( + _writer, _tracking, NullLogger.Instance); + + private CachedCallTelemetry SubmitPacket() => + new( + Audit: new AuditEvent + { + EventId = Guid.NewGuid(), + OccurredAtUtc = _now, + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.CachedSubmit, + CorrelationId = _id.Value, + SourceSiteId = "site-1", + SourceInstanceId = "inst-1", + SourceScript = "ScriptActor:doStuff", + Target = "ERP.GetOrder", + Status = AuditStatus.Submitted, + ForwardState = AuditForwardState.Pending, + }, + Operational: new SiteCallOperational( + TrackedOperationId: _id, + Channel: "ApiOutbound", + Target: "ERP.GetOrder", + SourceSite: "site-1", + Status: "Submitted", + RetryCount: 0, + LastError: null, + HttpStatus: null, + CreatedAtUtc: _now, + UpdatedAtUtc: _now, + TerminalAtUtc: null)); + + private CachedCallTelemetry AttemptedPacket(int retryCount = 1, string? lastError = "HTTP 500", int? httpStatus = 500) => + new( + Audit: new AuditEvent + { + EventId = Guid.NewGuid(), + OccurredAtUtc = _now, + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.ApiCallCached, + CorrelationId = _id.Value, + SourceSiteId = "site-1", + Target = "ERP.GetOrder", + Status = AuditStatus.Attempted, + HttpStatus = httpStatus, + ErrorMessage = lastError, + ForwardState = AuditForwardState.Pending, + }, + Operational: new SiteCallOperational( + TrackedOperationId: _id, + Channel: "ApiOutbound", + Target: "ERP.GetOrder", + SourceSite: "site-1", + Status: "Attempted", + RetryCount: retryCount, + LastError: lastError, + HttpStatus: httpStatus, + CreatedAtUtc: _now, + UpdatedAtUtc: _now, + TerminalAtUtc: null)); + + private CachedCallTelemetry ResolvePacket(string status = "Delivered") => + new( + Audit: new AuditEvent + { + EventId = Guid.NewGuid(), + OccurredAtUtc = _now, + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.CachedResolve, + CorrelationId = _id.Value, + SourceSiteId = "site-1", + Target = "ERP.GetOrder", + Status = Enum.Parse(status), + ForwardState = AuditForwardState.Pending, + }, + Operational: new SiteCallOperational( + TrackedOperationId: _id, + Channel: "ApiOutbound", + Target: "ERP.GetOrder", + SourceSite: "site-1", + Status: status, + RetryCount: 2, + LastError: null, + HttpStatus: null, + CreatedAtUtc: _now, + UpdatedAtUtc: _now, + TerminalAtUtc: _now)); + + [Fact] + public async Task ForwardAsync_Submit_WritesAuditEvent_AndRecordsEnqueue() + { + var sut = CreateSut(); + var packet = SubmitPacket(); + + await sut.ForwardAsync(packet, CancellationToken.None); + + // Audit row: one WriteAsync of the submit event. + await _writer.Received(1).WriteAsync( + Arg.Is(e => + e.EventId == packet.Audit.EventId + && e.Kind == AuditKind.CachedSubmit + && e.Status == AuditStatus.Submitted), + Arg.Any()); + + // Tracking row: insert-if-not-exists with kind discriminator. + await _tracking.Received(1).RecordEnqueueAsync( + _id, + "ApiOutbound", + "ERP.GetOrder", + "inst-1", + "ScriptActor:doStuff", + Arg.Any()); + await _tracking.DidNotReceiveWithAnyArgs().RecordAttemptAsync( + default, default!, default, default, default, default); + await _tracking.DidNotReceiveWithAnyArgs().RecordTerminalAsync( + default, default!, default, default, default); + } + + [Fact] + public async Task ForwardAsync_Attempted_WritesAuditEvent_AndRecordsAttempt() + { + var sut = CreateSut(); + var packet = AttemptedPacket(retryCount: 2, lastError: "HTTP 503", httpStatus: 503); + + await sut.ForwardAsync(packet, CancellationToken.None); + + await _writer.Received(1).WriteAsync( + Arg.Is(e => + e.EventId == packet.Audit.EventId + && e.Kind == AuditKind.ApiCallCached + && e.Status == AuditStatus.Attempted), + Arg.Any()); + + await _tracking.Received(1).RecordAttemptAsync( + _id, "Attempted", 2, "HTTP 503", 503, Arg.Any()); + await _tracking.DidNotReceiveWithAnyArgs().RecordEnqueueAsync( + default, default!, default, default, default, default); + await _tracking.DidNotReceiveWithAnyArgs().RecordTerminalAsync( + default, default!, default, default, default); + } + + [Fact] + public async Task ForwardAsync_Resolve_WritesAuditEvent_AndRecordsTerminal() + { + var sut = CreateSut(); + var packet = ResolvePacket("Delivered"); + + await sut.ForwardAsync(packet, CancellationToken.None); + + await _writer.Received(1).WriteAsync( + Arg.Is(e => + e.EventId == packet.Audit.EventId + && e.Kind == AuditKind.CachedResolve + && e.Status == AuditStatus.Delivered), + Arg.Any()); + + await _tracking.Received(1).RecordTerminalAsync( + _id, "Delivered", null, null, Arg.Any()); + await _tracking.DidNotReceiveWithAnyArgs().RecordEnqueueAsync( + default, default!, default, default, default, default); + await _tracking.DidNotReceiveWithAnyArgs().RecordAttemptAsync( + default, default!, default, default, default, default); + } + + [Fact] + public async Task ForwardAsync_WriterThrows_Logs_DoesNotPropagate() + { + _writer.WriteAsync(Arg.Any(), Arg.Any()) + .Throws(new InvalidOperationException("primary down")); + + var sut = CreateSut(); + + // Must not throw. + await sut.ForwardAsync(SubmitPacket(), CancellationToken.None); + + // Tracking still attempted — emission of the two halves is independent + // so a writer outage cannot starve the operational row (and vice-versa). + await _tracking.Received(1).RecordEnqueueAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any()); + } + + [Fact] + public async Task ForwardAsync_TrackingStoreThrows_Logs_DoesNotPropagate() + { + _tracking.RecordEnqueueAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any()) + .Throws(new InvalidOperationException("sqlite locked")); + + var sut = CreateSut(); + + await sut.ForwardAsync(SubmitPacket(), CancellationToken.None); + + // Writer still attempted — emission halves are independent. + await _writer.Received(1).WriteAsync( + Arg.Any(), Arg.Any()); + } + + [Fact] + public async Task ForwardAsync_NullPacket_Throws() + { + var sut = CreateSut(); + + await Assert.ThrowsAsync( + () => sut.ForwardAsync(null!, CancellationToken.None)); + } +}