using Microsoft.Extensions.Logging.Abstractions; using Moq; using ZB.MOM.WW.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration; using ZB.MOM.WW.ScadaBridge.Commons.Types; using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Scripts; namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests.Scripts; /// /// Audit Log #23 — M3 Bundle E (Task E6): every script-initiated /// Database.CachedWrite emits exactly one CachedSubmit /// combined-telemetry packet at enqueue time on the DbOutbound /// channel, returns a fresh , and threads /// the id into the database gateway so the store-and-forward retry loop can /// emit per-attempt + terminal telemetry under the same id. /// public class DatabaseCachedWriteEmissionTests { private sealed class CapturingForwarder : ICachedCallTelemetryForwarder { public List Telemetry { get; } = new(); public Exception? ThrowOnForward { get; set; } public Task ForwardAsync(CachedCallTelemetry telemetry, CancellationToken ct = default) { if (ThrowOnForward != null) { return Task.FromException(ThrowOnForward); } Telemetry.Add(telemetry); return Task.CompletedTask; } } private const string SiteId = "site-77"; private const string InstanceName = "Plant.Pump42"; private const string SourceScript = "ScriptActor:WriteAudit"; /// /// Audit Log #23: a fixed per-execution id so the cached-row tests can /// assert against a known value. /// private static readonly Guid TestExecutionId = Guid.NewGuid(); private static ScriptRuntimeContext.DatabaseHelper CreateHelper( IDatabaseGateway gateway, ICachedCallTelemetryForwarder? forwarder, Guid? parentExecutionId = null) { return new ScriptRuntimeContext.DatabaseHelper( gateway, InstanceName, NullLogger.Instance, // Audit Log #23: the per-execution id stamped into ExecutionId on // every script-side row. Cached rows keep CorrelationId = // TrackedOperationId (the per-operation lifecycle id). TestExecutionId, siteId: SiteId, sourceScript: SourceScript, cachedForwarder: forwarder, parentExecutionId: parentExecutionId); } [Fact] public async Task CachedWrite_EmitsSubmitTelemetry_OnEnqueue_KindCachedSubmit_ChannelDbOutbound() { var gateway = new Mock(); gateway .Setup(g => g.CachedWriteAsync( "myDb", "INSERT INTO t VALUES (1)", It.IsAny?>(), InstanceName, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The // buffered result (WasBuffered=true) models the transient-failure // path these enqueue-telemetry tests exercise — only the CachedSubmit // packet is emitted; the S&F retry loop (not the helper) owns the // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder); var trackedId = await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); Assert.NotEqual(default, trackedId); var packet = Assert.Single(forwarder.Telemetry); Assert.Equal(AuditChannel.DbOutbound, packet.Audit.AsRow().Channel); Assert.Equal(AuditKind.CachedSubmit, packet.Audit.AsRow().Kind); Assert.Equal(AuditStatus.Submitted, packet.Audit.AsRow().Status); Assert.Equal("myDb", packet.Audit.AsRow().Target); // CorrelationId is the per-operation lifecycle id (TrackedOperationId); // ExecutionId is the per-execution id from the runtime context. Assert.Equal(trackedId.Value, packet.Audit.AsRow().CorrelationId); Assert.Equal(TestExecutionId, packet.Audit.AsRow().ExecutionId); // Audit Log #23 (ParentExecutionId): null for a non-routed run. Assert.Null(packet.Audit.AsRow().ParentExecutionId); Assert.Equal(trackedId, packet.Operational.TrackedOperationId); Assert.Equal("DbOutbound", packet.Operational.Channel); Assert.Equal("myDb", packet.Operational.Target); Assert.Equal(SiteId, packet.Operational.SourceSite); Assert.Equal("Submitted", packet.Operational.Status); Assert.Equal(0, packet.Operational.RetryCount); Assert.Null(packet.Operational.TerminalAtUtc); } [Fact] public async Task CachedWrite_ProvenancePopulated() { var gateway = new Mock(); gateway .Setup(g => g.CachedWriteAsync( It.IsAny(), It.IsAny(), It.IsAny?>(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The // buffered result (WasBuffered=true) models the transient-failure // path these enqueue-telemetry tests exercise — only the CachedSubmit // packet is emitted; the S&F retry loop (not the helper) owns the // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder); await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); var packet = Assert.Single(forwarder.Telemetry); Assert.Equal(SiteId, packet.Audit.AsRow().SourceSiteId); Assert.Equal(InstanceName, packet.Audit.AsRow().SourceInstanceId); Assert.Equal(SourceScript, packet.Audit.AsRow().SourceScript); Assert.Equal(SiteId, packet.Operational.SourceSite); } [Fact] public async Task CachedWrite_RoutedRun_StampsParentExecutionId_OnSubmitTelemetry() { // Audit Log #23 (ParentExecutionId, Task 5): an inbound-API-routed run // carries the spawning execution's id; the CachedSubmit telemetry row // must stamp it in ParentExecutionId. var parentExecutionId = Guid.NewGuid(); var gateway = new Mock(); gateway .Setup(g => g.CachedWriteAsync( It.IsAny(), It.IsAny(), It.IsAny?>(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The // buffered result (WasBuffered=true) models the transient-failure // path these enqueue-telemetry tests exercise — only the CachedSubmit // packet is emitted; the S&F retry loop (not the helper) owns the // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder, parentExecutionId); await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); var packet = Assert.Single(forwarder.Telemetry); Assert.Equal(parentExecutionId, packet.Audit.AsRow().ParentExecutionId); } [Fact] public async Task CachedWrite_ReturnsTrackedOperationId_ThreadsIdToGateway() { var gateway = new Mock(); gateway .Setup(g => g.CachedWriteAsync( It.IsAny(), It.IsAny(), It.IsAny?>(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The // buffered result (WasBuffered=true) models the transient-failure // path these enqueue-telemetry tests exercise — only the CachedSubmit // packet is emitted; the S&F retry loop (not the helper) owns the // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder); var trackedId = await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); Assert.NotEqual(default, trackedId); gateway.Verify(g => g.CachedWriteAsync( "myDb", "INSERT INTO t VALUES (1)", It.IsAny?>(), InstanceName, It.IsAny(), trackedId, It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); } /// /// Audit Log #23 (ExecutionId Task 4): the helper → gateway hop of the /// threading chain. The cached-write helper must forward the runtime /// context's ExecutionId and SourceScript verbatim into /// — so the buffered retry /// loop later stamps the right provenance onto its audit rows. This /// asserts the exact id/script (not It.IsAny), so a regression that /// dropped the threading would fail here. /// [Fact] public async Task CachedWrite_ThreadsExecutionIdAndSourceScript_IntoGateway() { var gateway = new Mock(); gateway .Setup(g => g.CachedWriteAsync( "myDb", "INSERT INTO t VALUES (1)", It.IsAny?>(), InstanceName, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The // buffered result (WasBuffered=true) models the transient-failure // path these enqueue-telemetry tests exercise — only the CachedSubmit // packet is emitted; the S&F retry loop (not the helper) owns the // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder); await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); // The known TestExecutionId and SourceScript must reach the gateway // unchanged — these are what the S&F retry loop persists and replays. gateway.Verify(g => g.CachedWriteAsync( "myDb", "INSERT INTO t VALUES (1)", It.IsAny?>(), InstanceName, It.IsAny(), It.IsAny(), It.Is(id => id == TestExecutionId), It.Is(s => s == SourceScript), It.IsAny()), Times.Once); } /// /// Audit Log #23 (ParentExecutionId Task 6): the helper → gateway hop for /// ParentExecutionId. A cached write enqueued from an inbound-API- /// routed script run must forward the runtime context's /// ParentExecutionId verbatim into /// so the buffered retry /// loop later stamps it onto its audit rows. /// [Fact] public async Task CachedWrite_ThreadsParentExecutionId_IntoGateway() { var parentExecutionId = Guid.NewGuid(); var gateway = new Mock(); gateway .Setup(g => g.CachedWriteAsync( "myDb", "INSERT INTO t VALUES (1)", It.IsAny?>(), InstanceName, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The // buffered result (WasBuffered=true) models the transient-failure // path these enqueue-telemetry tests exercise — only the CachedSubmit // packet is emitted; the S&F retry loop (not the helper) owns the // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder, parentExecutionId); await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); gateway.Verify(g => g.CachedWriteAsync( "myDb", "INSERT INTO t VALUES (1)", It.IsAny?>(), InstanceName, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.Is(id => id == parentExecutionId)), Times.Once); } /// /// Audit Log #23 (ParentExecutionId Task 6): a non-routed run threads a /// null ParentExecutionId into the gateway — the additive default. /// [Fact] public async Task CachedWrite_NonRoutedRun_ThreadsNullParentExecutionId_IntoGateway() { var gateway = new Mock(); gateway .Setup(g => g.CachedWriteAsync( "myDb", "INSERT INTO t VALUES (1)", It.IsAny?>(), InstanceName, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The // buffered result (WasBuffered=true) models the transient-failure // path these enqueue-telemetry tests exercise — only the CachedSubmit // packet is emitted; the S&F retry loop (not the helper) owns the // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder); await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); gateway.Verify(g => g.CachedWriteAsync( "myDb", "INSERT INTO t VALUES (1)", It.IsAny?>(), InstanceName, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.Is(id => id == null)), Times.Once); } [Fact] public async Task CachedWrite_ForwarderThrows_StillReturnsTrackedOperationId() { var gateway = new Mock(); gateway .Setup(g => g.CachedWriteAsync( It.IsAny(), It.IsAny(), It.IsAny?>(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The // buffered result (WasBuffered=true) models the transient-failure // path these enqueue-telemetry tests exercise — only the CachedSubmit // packet is emitted; the S&F retry loop (not the helper) owns the // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder { ThrowOnForward = new InvalidOperationException("simulated forwarder outage"), }; var helper = CreateHelper(gateway.Object, forwarder); var trackedId = await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); Assert.NotEqual(default, trackedId); gateway.Verify(g => g.CachedWriteAsync( "myDb", "INSERT INTO t VALUES (1)", It.IsAny?>(), InstanceName, It.IsAny(), trackedId, It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); } // ── SourceNode-stamping (Task 14) ── [Fact] public async Task CachedWrite_StampsSourceNode_OnSubmitTelemetryRow() { // Symmetric to ExternalSystemCachedCallEmissionTests's // CachedCall_StampsSourceNode_OnEverySiteCallOperationalRow — locks // the DbOutbound emitter against a future refactor that drops // _sourceNode from the Database.CachedWrite CachedSubmit row. var gateway = new Mock(); gateway .Setup(g => g.CachedWriteAsync( "myDb", "INSERT INTO t VALUES (1)", It.IsAny?>(), InstanceName, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The // buffered result (WasBuffered=true) models the transient-failure // path these enqueue-telemetry tests exercise — only the CachedSubmit // packet is emitted; the S&F retry loop (not the helper) owns the // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = new ScriptRuntimeContext.DatabaseHelper( gateway.Object, InstanceName, NullLogger.Instance, TestExecutionId, auditWriter: null, siteId: SiteId, sourceScript: SourceScript, cachedForwarder: forwarder, parentExecutionId: null, sourceNode: "node-a"); await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); var packet = Assert.Single(forwarder.Telemetry); Assert.Equal("node-a", packet.Operational.SourceNode); } [Fact] public async Task CachedWrite_NoSourceNodeWired_LeavesSourceNodeNull() { // Default CreateHelper does NOT pass sourceNode — the legacy / test // host path. The operational row carries null SourceNode, leaving // central's SiteCalls.SourceNode NULL. var gateway = new Mock(); gateway .Setup(g => g.CachedWriteAsync( "myDb", "INSERT INTO t VALUES (1)", It.IsAny?>(), InstanceName, It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) // M2.3 (#7): CachedWriteAsync now returns an ExternalCallResult. The // buffered result (WasBuffered=true) models the transient-failure // path these enqueue-telemetry tests exercise — only the CachedSubmit // packet is emitted; the S&F retry loop (not the helper) owns the // terminal rows, so Assert.Single(forwarder.Telemetry) still holds. .ReturnsAsync(new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder); await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); var packet = Assert.Single(forwarder.Telemetry); Assert.Null(packet.Operational.SourceNode); } // ── M2.3 (#7): immediate-completion lifecycle (WasBuffered=false) ── private static Mock GatewayReturning(ExternalCallResult result) { var gateway = new Mock(); gateway .Setup(g => g.CachedWriteAsync( It.IsAny(), It.IsAny(), It.IsAny?>(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .ReturnsAsync(result); return gateway; } [Fact] public async Task CachedWrite_ImmediateSuccess_EmitsSubmitAttemptedThenDeliveredResolve() { // An immediate success (WasBuffered=false) bypasses the S&F retry loop, // so the helper itself must emit the Attempted + terminal CachedResolve // rows — mirroring ExternalSystem.CachedCall's immediate-success path. var gateway = GatewayReturning( new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: false)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder); var trackedId = await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); Assert.Equal(3, forwarder.Telemetry.Count); var submit = forwarder.Telemetry[0].Audit.AsRow(); Assert.Equal(AuditKind.CachedSubmit, submit.Kind); Assert.Equal(AuditStatus.Submitted, submit.Status); var attempted = forwarder.Telemetry[1].Audit.AsRow(); Assert.Equal(AuditChannel.DbOutbound, attempted.Channel); Assert.Equal(AuditKind.DbWriteCached, attempted.Kind); Assert.Equal(AuditStatus.Attempted, attempted.Status); var resolve = forwarder.Telemetry[2]; Assert.Equal(AuditChannel.DbOutbound, resolve.Audit.AsRow().Channel); Assert.Equal(AuditKind.CachedResolve, resolve.Audit.AsRow().Kind); Assert.Equal(AuditStatus.Delivered, resolve.Audit.AsRow().Status); Assert.Equal(trackedId.Value, resolve.Audit.AsRow().CorrelationId); Assert.Equal("Delivered", resolve.Operational.Status); Assert.NotNull(resolve.Operational.TerminalAtUtc); } [Fact] public async Task CachedWrite_ImmediatePermanentFailure_EmitsSubmitAttemptedThenFailedResolve() { // A synchronous permanent SQL failure (Success=false, WasBuffered=false) // also bypasses S&F; the terminal CachedResolve must be Failed and the // error message must propagate onto the row. const string error = "Permanent database error: Permanent SQL error 2627 on myDb: ..."; var gateway = GatewayReturning( new ExternalCallResult(Success: false, ResponseJson: null, ErrorMessage: error, WasBuffered: false)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder); var trackedId = await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); Assert.Equal(3, forwarder.Telemetry.Count); var resolve = forwarder.Telemetry[2]; Assert.Equal(AuditKind.CachedResolve, resolve.Audit.AsRow().Kind); Assert.Equal(AuditStatus.Failed, resolve.Audit.AsRow().Status); Assert.Equal(error, resolve.Audit.AsRow().ErrorMessage); Assert.Equal("Failed", resolve.Operational.Status); Assert.Equal(error, resolve.Operational.LastError); Assert.NotNull(resolve.Operational.TerminalAtUtc); Assert.NotEqual(default, trackedId); } [Fact] public async Task CachedWrite_BufferedTransient_EmitsOnlySubmit_NoTerminalRows() { // The WasBuffered=true path must NOT emit Attempted / CachedResolve — the // S&F retry loop owns those. Only the CachedSubmit row is emitted by the // helper. var gateway = GatewayReturning( new ExternalCallResult(Success: true, ResponseJson: null, ErrorMessage: null, WasBuffered: true)); var forwarder = new CapturingForwarder(); var helper = CreateHelper(gateway.Object, forwarder); await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); var packet = Assert.Single(forwarder.Telemetry); Assert.Equal(AuditKind.CachedSubmit, packet.Audit.AsRow().Kind); } }