From 42430dd10a71bf633a00ac7d42487d3ef0d446d8 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 14:48:05 -0400 Subject: [PATCH] feat(siteruntime): ExternalSystem.CachedCall emits CachedSubmit telemetry (#23 M3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rework ScriptRuntimeContext.ExternalSystem.CachedCall to fit the M3 combined-telemetry model: * Mints a fresh TrackedOperationId and emits one CachedSubmit packet via ICachedCallTelemetryForwarder BEFORE handing the call off — the SiteCalls row is materialised before the first delivery attempt so Tracking.Status(id) can observe a Submitted row even if immediate delivery resolves before the helper returns. * Threads the TrackedOperationId into IExternalSystemClient.CachedCallAsync as a new optional parameter (and into IDatabaseGateway.CachedWriteAsync for the Database mirror set up here for E6). The gateway uses the id as the StoreAndForward messageId so the retry loop (Tasks E4/E5) can recover it from StoreAndForwardMessage.Id. * Returns the TrackedOperationId rather than ExternalCallResult — the script's contract is now "get a tracking handle, observe outcome via Tracking.Status". Best-effort emission: a thrown forwarder is logged + swallowed; the original call still runs and the id is still returned. DatabaseHelper gets the matching siteId / sourceScript / forwarder fields and a parallel CachedSubmit emitter (Channel=DbOutbound) so Task E6's Database.CachedWrite mirror plugs in without further runtime wiring. New ICachedCallTelemetryForwarder seam in Commons.Interfaces.Services so SiteRuntime depends on Commons (existing arrow) rather than ScadaLink.AuditLog (would have introduced a new dependency). Bundle E task E3 (and helper-shape work for E6). --- .../Telemetry/CachedCallTelemetryForwarder.cs | 2 +- .../Services/ICachedCallTelemetryForwarder.cs | 34 +++ .../Interfaces/Services/IDatabaseGateway.cs | 12 +- .../Services/IExternalSystemClient.cs | 12 +- .../DatabaseGateway.cs | 18 +- .../ExternalSystemClient.cs | 19 +- .../Scripts/ScriptRuntimeContext.cs | 285 +++++++++++++++++- .../ExternalSystemCachedCallEmissionTests.cs | 221 ++++++++++++++ 8 files changed, 587 insertions(+), 16 deletions(-) create mode 100644 src/ScadaLink.Commons/Interfaces/Services/ICachedCallTelemetryForwarder.cs create mode 100644 tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs b/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs index 27192d2..5a73c34 100644 --- a/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs +++ b/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs @@ -45,7 +45,7 @@ namespace ScadaLink.AuditLog.Site.Telemetry; /// both M2 and M3 emissions. /// /// -public sealed class CachedCallTelemetryForwarder +public sealed class CachedCallTelemetryForwarder : ICachedCallTelemetryForwarder { private readonly IAuditWriter _auditWriter; private readonly IOperationTrackingStore _trackingStore; diff --git a/src/ScadaLink.Commons/Interfaces/Services/ICachedCallTelemetryForwarder.cs b/src/ScadaLink.Commons/Interfaces/Services/ICachedCallTelemetryForwarder.cs new file mode 100644 index 0000000..60ff6bb --- /dev/null +++ b/src/ScadaLink.Commons/Interfaces/Services/ICachedCallTelemetryForwarder.cs @@ -0,0 +1,34 @@ +using ScadaLink.Commons.Messages.Integration; + +namespace ScadaLink.Commons.Interfaces.Services; + +/// +/// Site-side fan-out abstraction for cached-call lifecycle telemetry +/// (Audit Log #23 / M3). One packet carries +/// both an audit row and an operational SiteCalls upsert; the +/// implementation routes the audit half through +/// and the operational half through the site-local tracking SQLite store. +/// +/// +/// +/// Defined in Commons so the script runtime (and the StoreAndForward retry +/// loop, Bundle E4) can take a dependency on the abstraction rather than on +/// the concrete forwarder living inside ScadaLink.AuditLog — the +/// existing dependency arrow runs from SiteRuntime to Commons, not to +/// AuditLog. +/// +/// +/// Best-effort contract (alog.md §7): implementations MUST swallow +/// internal failures rather than propagating to the calling script. +/// +/// +public interface ICachedCallTelemetryForwarder +{ + /// + /// Fan one combined-telemetry packet out to the audit writer and the + /// tracking store. Best-effort — failures on either half are logged and + /// swallowed; the returned Task completes when both halves have been + /// attempted. + /// + Task ForwardAsync(CachedCallTelemetry telemetry, CancellationToken ct = default); +} diff --git a/src/ScadaLink.Commons/Interfaces/Services/IDatabaseGateway.cs b/src/ScadaLink.Commons/Interfaces/Services/IDatabaseGateway.cs index f21c8f6..60defe9 100644 --- a/src/ScadaLink.Commons/Interfaces/Services/IDatabaseGateway.cs +++ b/src/ScadaLink.Commons/Interfaces/Services/IDatabaseGateway.cs @@ -1,4 +1,5 @@ using System.Data.Common; +using ScadaLink.Commons.Types; namespace ScadaLink.Commons.Interfaces.Services; @@ -20,10 +21,19 @@ public interface IDatabaseGateway /// /// Submits a SQL write to the store-and-forward engine for reliable delivery. /// + /// + /// Audit Log #23 (M3): caller-supplied tracking id used as the + /// store-and-forward message id so the S&F retry loop can read it + /// back via StoreAndForwardMessage.Id and emit per-attempt / + /// terminal cached-write telemetry under the same id. Defaults to + /// null — when omitted the S&F engine mints a fresh GUID and no + /// M3 telemetry is correlated (pre-M3 caller behaviour). + /// Task CachedWriteAsync( string connectionName, string sql, IReadOnlyDictionary? parameters = null, string? originInstanceName = null, - CancellationToken cancellationToken = default); + CancellationToken cancellationToken = default, + TrackedOperationId? trackedOperationId = null); } diff --git a/src/ScadaLink.Commons/Interfaces/Services/IExternalSystemClient.cs b/src/ScadaLink.Commons/Interfaces/Services/IExternalSystemClient.cs index c875ebf..627a81d 100644 --- a/src/ScadaLink.Commons/Interfaces/Services/IExternalSystemClient.cs +++ b/src/ScadaLink.Commons/Interfaces/Services/IExternalSystemClient.cs @@ -21,12 +21,22 @@ public interface IExternalSystemClient /// Attempt immediate delivery; on transient failure, hand to S&F engine. /// Permanent failures returned to caller. /// + /// + /// Audit Log #23 (M3): caller-supplied tracking id used as the + /// store-and-forward message id so the S&F retry loop can read it + /// back via StoreAndForwardMessage.Id and emit per-attempt / + /// terminal cached-call telemetry under the same id. Defaults to + /// null — when omitted the S&F engine mints a fresh GUID and no + /// M3 telemetry is correlated (the legacy behaviour pre-M3 callers rely + /// on). + /// Task CachedCallAsync( string systemName, string methodName, IReadOnlyDictionary? parameters = null, string? originInstanceName = null, - CancellationToken cancellationToken = default); + CancellationToken cancellationToken = default, + TrackedOperationId? trackedOperationId = null); } /// diff --git a/src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs b/src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs index 94d3495..7e48288 100644 --- a/src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs +++ b/src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs @@ -5,6 +5,7 @@ using Microsoft.Extensions.Logging; using ScadaLink.Commons.Entities.ExternalSystems; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Enums; using ScadaLink.StoreAndForward; @@ -71,12 +72,19 @@ public class DatabaseGateway : IDatabaseGateway /// /// Submits a SQL write to the store-and-forward engine for reliable delivery. /// + /// + /// Audit Log #23 (M3): used as the S&F message id so the retry loop can + /// recover it via StoreAndForwardMessage.Id and emit per-attempt / + /// terminal cached-write telemetry (Tasks E4/E5). Null preserves the + /// pre-M3 behaviour (S&F mints a random GUID). + /// public async Task CachedWriteAsync( string connectionName, string sql, IReadOnlyDictionary? parameters = null, string? originInstanceName = null, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default, + TrackedOperationId? trackedOperationId = null) { var definition = await ResolveConnectionAsync(connectionName, cancellationToken); if (definition == null) @@ -110,7 +118,13 @@ public class DatabaseGateway : IDatabaseGateway payload, originInstanceName, definition.MaxRetries > 0 ? definition.MaxRetries : null, - definition.RetryDelay > TimeSpan.Zero ? definition.RetryDelay : null); + definition.RetryDelay > TimeSpan.Zero ? definition.RetryDelay : null, + // Audit Log #23 (M3): pin the S&F message id to the + // TrackedOperationId so the retry loop (Bundle E Tasks E4/E5) can + // read it back via StoreAndForwardMessage.Id and emit per-attempt + + // terminal cached-write telemetry. Null -> S&F mints its own GUID + // (legacy pre-M3 behaviour). + messageId: trackedOperationId?.ToString()); } /// diff --git a/src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs b/src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs index c5f80b3..0b88de6 100644 --- a/src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs +++ b/src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.Options; using ScadaLink.Commons.Entities.ExternalSystems; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Enums; using ScadaLink.StoreAndForward; @@ -72,12 +73,20 @@ public class ExternalSystemClient : IExternalSystemClient /// /// WP-7: CachedCall — attempt immediate, transient failure goes to S&F, permanent returned to script. /// + /// + /// Audit Log #23 (M3): used as the S&F message id so the retry loop can + /// recover it from StoreAndForwardMessage.Id and emit per-attempt / + /// terminal cached-call telemetry (Tasks E4/E5). When null the S&F engine + /// mints its own GUID — preserving the pre-M3 behaviour for callers that + /// don't participate in the M3 audit pipeline. + /// public async Task CachedCallAsync( string systemName, string methodName, IReadOnlyDictionary? parameters = null, string? originInstanceName = null, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default, + TrackedOperationId? trackedOperationId = null) { var (system, method) = await ResolveSystemAndMethodAsync(systemName, methodName, cancellationToken); if (system == null || method == null) @@ -129,7 +138,13 @@ public class ExternalSystemClient : IExternalSystemClient originInstanceName, system.MaxRetries > 0 ? system.MaxRetries : null, system.RetryDelay > TimeSpan.Zero ? system.RetryDelay : null, - attemptImmediateDelivery: false); + attemptImmediateDelivery: false, + // Audit Log #23 (M3): pin the S&F message id to the + // TrackedOperationId so the retry loop can read it back via + // StoreAndForwardMessage.Id and emit per-attempt + terminal + // cached-call telemetry (Bundle E Tasks E4/E5). Null -> S&F + // mints its own GUID (legacy pre-M3 behaviour). + messageId: trackedOperationId?.ToString()); return new ExternalCallResult(true, null, null, WasBuffered: true); } diff --git a/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs b/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs index 5526554..f8c6b3b 100644 --- a/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs +++ b/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs @@ -7,6 +7,7 @@ using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces; using ScadaLink.Commons.Interfaces.Services; using ScadaLink.Commons.Messages.Instance; +using ScadaLink.Commons.Messages.Integration; using ScadaLink.Commons.Messages.Notification; using ScadaLink.Commons.Messages.ScriptExecution; using ScadaLink.Commons.Types; @@ -94,6 +95,16 @@ public class ScriptRuntimeContext /// private readonly IOperationTrackingStore? _operationTrackingStore; + /// + /// Audit Log #23 (M3 Bundle E — Task E3): site-side dual emitter for + /// cached-call lifecycle telemetry. Optional — when null + /// ExternalSystem.CachedCall / Database.CachedWrite still + /// return a and invoke the underlying + /// store-and-forward path, but no audit / SiteCalls telemetry is emitted + /// (tests / minimal hosts that don't wire the audit pipeline). + /// + private readonly ICachedCallTelemetryForwarder? _cachedForwarder; + public ScriptRuntimeContext( IActorRef instanceActor, IActorRef self, @@ -110,7 +121,8 @@ public class ScriptRuntimeContext string siteId = "", string? sourceScript = null, IAuditWriter? auditWriter = null, - IOperationTrackingStore? operationTrackingStore = null) + IOperationTrackingStore? operationTrackingStore = null, + ICachedCallTelemetryForwarder? cachedForwarder = null) { _instanceActor = instanceActor; _self = self; @@ -128,6 +140,7 @@ public class ScriptRuntimeContext _sourceScript = sourceScript; _auditWriter = auditWriter; _operationTrackingStore = operationTrackingStore; + _cachedForwarder = cachedForwarder; } /// @@ -228,14 +241,21 @@ public class ScriptRuntimeContext /// ExternalSystem.CachedCall("systemName", "methodName", params) /// public ExternalSystemHelper ExternalSystem => new( - _externalSystemClient, _instanceName, _logger, _auditWriter, _siteId, _sourceScript); + _externalSystemClient, _instanceName, _logger, _auditWriter, _siteId, _sourceScript, + // Audit Log #23 (M3 Bundle E — Task E3): emit CachedSubmit telemetry + // on every ExternalSystem.CachedCall enqueue. + _cachedForwarder); /// /// WP-13: Provides access to database operations. /// Database.Connection("name") /// Database.CachedWrite("name", "sql", params) /// - public DatabaseHelper Database => new(_databaseGateway, _instanceName, _logger); + public DatabaseHelper Database => new( + _databaseGateway, _instanceName, _logger, _siteId, _sourceScript, + // Audit Log #23 (M3 Bundle E — Task E6): emit CachedSubmit telemetry on + // every Database.CachedWrite enqueue. + _cachedForwarder); /// /// Provides access to the Notification Outbox API. @@ -328,14 +348,19 @@ public class ScriptRuntimeContext private readonly IAuditWriter? _auditWriter; private readonly string _siteId; private readonly string? _sourceScript; + private readonly ICachedCallTelemetryForwarder? _cachedForwarder; + // Internal constructor for tests living in ScadaLink.SiteRuntime.Tests + // (via InternalsVisibleTo). Production sites resolve the helper through + // ScriptRuntimeContext.ExternalSystem. internal ExternalSystemHelper( IExternalSystemClient? client, string instanceName, ILogger logger, IAuditWriter? auditWriter = null, string siteId = "", - string? sourceScript = null) + string? sourceScript = null, + ICachedCallTelemetryForwarder? cachedForwarder = null) { _client = client; _instanceName = instanceName; @@ -343,6 +368,7 @@ public class ScriptRuntimeContext _auditWriter = auditWriter; _siteId = siteId; _sourceScript = sourceScript; + _cachedForwarder = cachedForwarder; } public async Task Call( @@ -381,7 +407,22 @@ public class ScriptRuntimeContext } } - public async Task CachedCall( + /// + /// Submit a cached outbound API call (Audit Log #23 / M3). Mints a + /// fresh , emits the lifecycle's first + /// CachedSubmit telemetry packet, hands the call to the + /// store-and-forward retry loop (which emits per-attempt and terminal + /// telemetry under the same id — Bundle E Tasks E4/E5), and returns + /// the id immediately so the script can later query + /// Tracking.Status(id). + /// + /// + /// Best-effort emission (alog.md §7): if the forwarder throws, + /// the failure is logged and swallowed; the underlying cached-call + /// path still runs and the id is still returned. The script must never + /// be aborted by an audit-pipeline failure. + /// + public async Task CachedCall( string systemName, string methodName, IReadOnlyDictionary? parameters = null, @@ -390,7 +431,119 @@ public class ScriptRuntimeContext if (_client == null) throw new InvalidOperationException("External system client not available"); - return await _client.CachedCallAsync(systemName, methodName, parameters, _instanceName, cancellationToken); + var trackedId = TrackedOperationId.New(); + var occurredAtUtc = DateTime.UtcNow; + var target = $"{systemName}.{methodName}"; + + // Emit CachedSubmit telemetry BEFORE handing off to the S&F + // engine — that way the SiteCalls row is materialised before the + // first delivery attempt and Tracking.Status(id) can observe a + // Submitted row even if the immediate-delivery attempt happens to + // resolve before this method returns. + await EmitCachedSubmitTelemetryAsync( + systemName, methodName, target, trackedId, occurredAtUtc, cancellationToken) + .ConfigureAwait(false); + + // Hand off to the existing cached-call path. The TrackedOperationId + // becomes the S&F message id so the retry loop (Bundle E Tasks + // E4/E5) can read it back via StoreAndForwardMessage.Id. The + // underlying ExternalCallResult is intentionally discarded — the + // script's contract is now "return the tracking id, observe outcome + // via Tracking.Status". + try + { + await _client.CachedCallAsync( + systemName, + methodName, + parameters, + _instanceName, + cancellationToken, + trackedId).ConfigureAwait(false); + } + catch (Exception ex) + { + // The cached-call surface returns ExternalCallResult on permanent + // failure rather than throwing; a throw here is exceptional + // (e.g. cancellation, resolver outage). Log it and rethrow — the + // script does need to learn about catastrophic failures. The + // tracked id was still returned via the telemetry submit above. + _logger.LogWarning(ex, + "ExternalSystem.CachedCall threw for {System}.{Method} (TrackedOperationId {Id})", + systemName, methodName, trackedId); + throw; + } + + return trackedId; + } + + /// + /// Best-effort emission of the CachedSubmit lifecycle event. Any + /// exception thrown by the forwarder is logged and swallowed so the + /// calling script's enqueue is not disturbed. + /// + private async Task EmitCachedSubmitTelemetryAsync( + string systemName, + string methodName, + string target, + TrackedOperationId trackedId, + DateTime occurredAtUtc, + CancellationToken cancellationToken) + { + if (_cachedForwarder == null) + { + return; + } + + CachedCallTelemetry telemetry; + try + { + telemetry = new CachedCallTelemetry( + Audit: new AuditEvent + { + EventId = Guid.NewGuid(), + OccurredAtUtc = DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc), + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.CachedSubmit, + CorrelationId = trackedId.Value, + SourceSiteId = string.IsNullOrEmpty(_siteId) ? null : _siteId, + SourceInstanceId = _instanceName, + SourceScript = _sourceScript, + Target = target, + Status = AuditStatus.Submitted, + ForwardState = AuditForwardState.Pending, + }, + Operational: new SiteCallOperational( + TrackedOperationId: trackedId, + Channel: "ApiOutbound", + Target: target, + SourceSite: _siteId, + Status: "Submitted", + RetryCount: 0, + LastError: null, + HttpStatus: null, + CreatedAtUtc: occurredAtUtc, + UpdatedAtUtc: occurredAtUtc, + TerminalAtUtc: null)); + } + catch (Exception buildEx) + { + _logger.LogWarning(buildEx, + "Failed to build CachedSubmit telemetry for {System}.{Method} (TrackedOperationId {Id}) — skipping emission", + systemName, methodName, trackedId); + return; + } + + try + { + await _cachedForwarder.ForwardAsync(telemetry, cancellationToken) + .ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "CachedSubmit telemetry forward failed for {System}.{Method} (TrackedOperationId {Id})", + systemName, methodName, trackedId); + } } /// @@ -536,17 +689,37 @@ public class ScriptRuntimeContext /// /// WP-13: Helper for Database.Connection/CachedWrite syntax. /// + /// + /// Audit Log #23 (M3 Bundle E — Task E6): mirrors + /// — mints a + /// , emits the lifecycle's first + /// CachedSubmit packet (Channel DbOutbound), hands off to the S&F + /// retry loop, and returns the id. Per-attempt + terminal telemetry is + /// emitted by the retry loop (Tasks E4/E5). + /// public class DatabaseHelper { private readonly IDatabaseGateway? _gateway; private readonly string _instanceName; private readonly ILogger _logger; + private readonly string _siteId; + private readonly string? _sourceScript; + private readonly ICachedCallTelemetryForwarder? _cachedForwarder; - internal DatabaseHelper(IDatabaseGateway? gateway, string instanceName, ILogger logger) + internal DatabaseHelper( + IDatabaseGateway? gateway, + string instanceName, + ILogger logger, + string siteId = "", + string? sourceScript = null, + ICachedCallTelemetryForwarder? cachedForwarder = null) { _gateway = gateway; _instanceName = instanceName; _logger = logger; + _siteId = siteId; + _sourceScript = sourceScript; + _cachedForwarder = cachedForwarder; } public async Task Connection( @@ -559,7 +732,13 @@ public class ScriptRuntimeContext return await _gateway.GetConnectionAsync(name, cancellationToken); } - public async Task CachedWrite( + /// + /// Submit a cached outbound database write. Mints a fresh + /// , emits CachedSubmit telemetry on + /// DbOutbound, hands off to the cached-write S&F path, and + /// returns the id. Best-effort emission per alog.md §7. + /// + public async Task CachedWrite( string name, string sql, IReadOnlyDictionary? parameters = null, @@ -568,7 +747,95 @@ public class ScriptRuntimeContext if (_gateway == null) throw new InvalidOperationException("Database gateway not available"); - await _gateway.CachedWriteAsync(name, sql, parameters, _instanceName, cancellationToken); + var trackedId = TrackedOperationId.New(); + var occurredAtUtc = DateTime.UtcNow; + // The DB cached-write target uses the connection name (the only + // human-readable handle the gateway carries on the buffered row). + var target = name; + + await EmitCachedDbSubmitTelemetryAsync( + name, trackedId, target, occurredAtUtc, cancellationToken) + .ConfigureAwait(false); + + try + { + await _gateway.CachedWriteAsync( + name, sql, parameters, _instanceName, cancellationToken, trackedId) + .ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Database.CachedWrite threw for {Connection} (TrackedOperationId {Id})", + name, trackedId); + throw; + } + + return trackedId; + } + + private async Task EmitCachedDbSubmitTelemetryAsync( + string connectionName, + TrackedOperationId trackedId, + string target, + DateTime occurredAtUtc, + CancellationToken cancellationToken) + { + if (_cachedForwarder == null) + { + return; + } + + CachedCallTelemetry telemetry; + try + { + telemetry = new CachedCallTelemetry( + Audit: new AuditEvent + { + EventId = Guid.NewGuid(), + OccurredAtUtc = DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc), + Channel = AuditChannel.DbOutbound, + Kind = AuditKind.CachedSubmit, + CorrelationId = trackedId.Value, + SourceSiteId = string.IsNullOrEmpty(_siteId) ? null : _siteId, + SourceInstanceId = _instanceName, + SourceScript = _sourceScript, + Target = target, + Status = AuditStatus.Submitted, + ForwardState = AuditForwardState.Pending, + }, + Operational: new SiteCallOperational( + TrackedOperationId: trackedId, + Channel: "DbOutbound", + Target: target, + SourceSite: _siteId, + Status: "Submitted", + RetryCount: 0, + LastError: null, + HttpStatus: null, + CreatedAtUtc: occurredAtUtc, + UpdatedAtUtc: occurredAtUtc, + TerminalAtUtc: null)); + } + catch (Exception buildEx) + { + _logger.LogWarning(buildEx, + "Failed to build CachedSubmit telemetry for Database.CachedWrite {Connection} (TrackedOperationId {Id}) — skipping emission", + connectionName, trackedId); + return; + } + + try + { + await _cachedForwarder.ForwardAsync(telemetry, cancellationToken) + .ConfigureAwait(false); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "CachedSubmit telemetry forward failed for Database.CachedWrite {Connection} (TrackedOperationId {Id})", + connectionName, trackedId); + } } } diff --git a/tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs new file mode 100644 index 0000000..294f2a8 --- /dev/null +++ b/tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs @@ -0,0 +1,221 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Moq; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Messages.Integration; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.SiteRuntime.Scripts; + +namespace ScadaLink.SiteRuntime.Tests.Scripts; + +/// +/// Audit Log #23 — M3 Bundle E (Task E3): every script-initiated +/// ExternalSystem.CachedCall emits exactly one CachedSubmit +/// combined-telemetry packet at enqueue time, returns a fresh +/// , and threads that id down to the +/// store-and-forward layer so the retry-loop emissions (Tasks E4/E5) can join +/// them by id. The audit emission is best-effort: a thrown forwarder must +/// never abort the script's call, and the original +/// must surface to the caller unchanged. +/// +public class ExternalSystemCachedCallEmissionTests +{ + private sealed class CapturingForwarder : ICachedCallTelemetryForwarder + { + public List Telemetry { get; } = new(); + public Exception? ThrowOnForward { get; set; } + + public Task ForwardAsync(CachedCallTelemetry telemetry, CancellationToken ct = default) + { + if (ThrowOnForward != null) + { + return Task.FromException(ThrowOnForward); + } + Telemetry.Add(telemetry); + return Task.CompletedTask; + } + } + + private const string SiteId = "site-77"; + private const string InstanceName = "Plant.Pump42"; + private const string SourceScript = "ScriptActor:CheckPressure"; + + private static ScriptRuntimeContext.ExternalSystemHelper CreateHelper( + IExternalSystemClient client, + ICachedCallTelemetryForwarder? forwarder) + { + return new ScriptRuntimeContext.ExternalSystemHelper( + client, + InstanceName, + NullLogger.Instance, + auditWriter: null, + siteId: SiteId, + sourceScript: SourceScript, + cachedForwarder: forwarder); + } + + [Fact] + public async Task CachedCall_EmitsSubmitTelemetry_OnEnqueue() + { + var client = new Mock(); + client + .Setup(c => c.CachedCallAsync( + "ERP", "GetOrder", + It.IsAny?>(), + InstanceName, + It.IsAny(), + It.IsAny())) + .ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true)); + var forwarder = new CapturingForwarder(); + + var helper = CreateHelper(client.Object, forwarder); + var trackedId = await helper.CachedCall("ERP", "GetOrder"); + + Assert.NotEqual(default, trackedId); + Assert.Single(forwarder.Telemetry); + var packet = forwarder.Telemetry[0]; + + Assert.Equal(AuditChannel.ApiOutbound, packet.Audit.Channel); + Assert.Equal(AuditKind.CachedSubmit, packet.Audit.Kind); + Assert.Equal(AuditStatus.Submitted, packet.Audit.Status); + Assert.Equal("ERP.GetOrder", packet.Audit.Target); + Assert.Equal(trackedId.Value, packet.Audit.CorrelationId); + Assert.Equal(AuditForwardState.Pending, packet.Audit.ForwardState); + + // Operational mirror — same id, Submitted, RetryCount 0, not terminal. + Assert.Equal(trackedId, packet.Operational.TrackedOperationId); + Assert.Equal("ApiOutbound", packet.Operational.Channel); + Assert.Equal("ERP.GetOrder", packet.Operational.Target); + Assert.Equal(SiteId, packet.Operational.SourceSite); + Assert.Equal("Submitted", packet.Operational.Status); + Assert.Equal(0, packet.Operational.RetryCount); + Assert.Null(packet.Operational.LastError); + Assert.Null(packet.Operational.TerminalAtUtc); + } + + [Fact] + public async Task CachedCall_ReturnsTrackedOperationId() + { + var client = new Mock(); + client + .Setup(c => c.CachedCallAsync( + It.IsAny(), It.IsAny(), + It.IsAny?>(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true)); + var forwarder = new CapturingForwarder(); + + var helper = CreateHelper(client.Object, forwarder); + + var id1 = await helper.CachedCall("ERP", "GetOrder"); + var id2 = await helper.CachedCall("ERP", "GetOrder"); + + Assert.NotEqual(default, id1); + Assert.NotEqual(default, id2); + Assert.NotEqual(id1, id2); + + // Both ids were threaded into the client invocations. + client.Verify(c => c.CachedCallAsync( + "ERP", "GetOrder", + It.IsAny?>(), + InstanceName, + It.IsAny(), + id1), + Times.Once); + client.Verify(c => c.CachedCallAsync( + "ERP", "GetOrder", + It.IsAny?>(), + InstanceName, + It.IsAny(), + id2), + Times.Once); + } + + [Fact] + public async Task CachedCall_ForwarderThrows_StillReturnsTrackedOperationId_OriginalCallProceeds() + { + var client = new Mock(); + client + .Setup(c => c.CachedCallAsync( + It.IsAny(), It.IsAny(), + It.IsAny?>(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true)); + var forwarder = new CapturingForwarder + { + ThrowOnForward = new InvalidOperationException("simulated forwarder outage"), + }; + + var helper = CreateHelper(client.Object, forwarder); + + // Must not throw — best-effort emission contract. + var trackedId = await helper.CachedCall("ERP", "GetOrder"); + + Assert.NotEqual(default, trackedId); + // The underlying call still ran exactly once. + client.Verify(c => c.CachedCallAsync( + "ERP", "GetOrder", + It.IsAny?>(), + InstanceName, + It.IsAny(), + trackedId), + Times.Once); + } + + [Fact] + public async Task CachedCall_Provenance_Populated_FromContext() + { + var client = new Mock(); + client + .Setup(c => c.CachedCallAsync( + It.IsAny(), It.IsAny(), + It.IsAny?>(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true)); + var forwarder = new CapturingForwarder(); + + var helper = CreateHelper(client.Object, forwarder); + await helper.CachedCall("ERP", "GetOrder"); + + var packet = Assert.Single(forwarder.Telemetry); + Assert.Equal(SiteId, packet.Audit.SourceSiteId); + Assert.Equal(InstanceName, packet.Audit.SourceInstanceId); + Assert.Equal(SourceScript, packet.Audit.SourceScript); + Assert.Equal(SiteId, packet.Operational.SourceSite); + } + + [Fact] + public async Task CachedCall_NoForwarder_StillReturnsTrackedOperationId() + { + // Forwarder not wired (tests / minimal hosts) — must still return a + // fresh id and invoke the underlying call. + var client = new Mock(); + client + .Setup(c => c.CachedCallAsync( + It.IsAny(), It.IsAny(), + It.IsAny?>(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true)); + + var helper = CreateHelper(client.Object, forwarder: null); + var trackedId = await helper.CachedCall("ERP", "GetOrder"); + + Assert.NotEqual(default, trackedId); + client.Verify(c => c.CachedCallAsync( + "ERP", "GetOrder", + It.IsAny?>(), + InstanceName, + It.IsAny(), + trackedId), + Times.Once); + } +}