From 06ed0aceadac6ddc56027159b1b699afa3bb9b9d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 23 May 2026 17:41:22 -0400 Subject: [PATCH] feat(sitecall-audit): carry + persist SourceNode end-to-end via cached telemetry Site: site emitters of SiteCallOperational (ExternalSystemClient, the script-API cached call path in ScriptRuntimeContext, CachedCallLifecycleBridge) inject INodeIdentityProvider and stamp SourceNode = NodeName at construction. OperationTrackingStore call site in CachedCallTelemetryForwarder now stamps SourceNode too. Central: SiteCallAuditRepository.UpsertAsync INSERT includes SourceNode in the column list; conditional monotonic UPDATE uses COALESCE(@SourceNode, SourceNode) so later packets cannot blank a previously- stamped value. After this commit every SiteCalls row carries node-a/node-b in SourceNode (subject to monotonic preservation). --- .../ServiceCollectionExtensions.cs | 21 ++- .../Telemetry/CachedCallLifecycleBridge.cs | 23 ++- .../Telemetry/CachedCallTelemetryForwarder.cs | 21 ++- .../Repositories/SiteCallAuditRepository.cs | 27 ++- .../Scripts/ScriptRuntimeContext.cs | 71 ++++++-- .../CachedCallLifecycleBridgeTests.cs | 64 +++++++ .../CachedCallTelemetryForwarderTests.cs | 57 ++++++- .../SiteCallAuditRepositoryTests.cs | 159 +++++++++++++++++- .../SiteCallAuditActorTests.cs | 65 ++++++- .../ExternalSystemCachedCallEmissionTests.cs | 65 +++++++ 10 files changed, 539 insertions(+), 34 deletions(-) diff --git a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs index 7a8a775..a62e02f 100644 --- a/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs +++ b/src/ScadaLink.AuditLog/ServiceCollectionExtensions.cs @@ -146,7 +146,14 @@ public static class ServiceCollectionExtensions new CachedCallTelemetryForwarder( sp.GetRequiredService(), sp.GetService(), - sp.GetRequiredService>())); + sp.GetRequiredService>(), + // SourceNode-stamping (Task 14): the local node identity is + // threaded through so RecordEnqueueAsync can stamp the + // tracking row's SourceNode column. GetService — central + // composition roots may not register the provider, in which + // case the forwarder degrades to a null SourceNode rather + // than failing the DI resolution. + sp.GetService())); // M3 Bundle F: bridge the store-and-forward retry-loop observer hook // to the cached-call forwarder so per-attempt + terminal telemetry @@ -154,7 +161,17 @@ public static class ServiceCollectionExtensions // as the script-thread CachedSubmit row. Registered as a singleton // and also bound to ICachedCallLifecycleObserver so AddStoreAndForward // can resolve it through DI (Bundle F StoreAndForward wiring change). - services.AddSingleton(); + // SourceNode-stamping (Task 14): factory-resolved so the + // INodeIdentityProvider singleton can be threaded through — the + // bridge stamps SiteCallOperational.SourceNode from + // INodeIdentityProvider.NodeName on every cached-call lifecycle row. + // GetService (not GetRequiredService) — central composition roots may + // not register the provider, in which case the bridge degrades to a + // null SourceNode rather than failing the DI resolution. + services.AddSingleton(sp => new CachedCallLifecycleBridge( + sp.GetRequiredService(), + sp.GetRequiredService>(), + sp.GetService())); services.AddSingleton( sp => sp.GetRequiredService()); diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallLifecycleBridge.cs b/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallLifecycleBridge.cs index 8ca8ba4..299f7ce 100644 --- a/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallLifecycleBridge.cs +++ b/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallLifecycleBridge.cs @@ -39,12 +39,23 @@ public sealed class CachedCallLifecycleBridge : ICachedCallLifecycleObserver private readonly ICachedCallTelemetryForwarder _forwarder; private readonly ILogger _logger; + /// + /// SourceNode-stamping (Task 14): the local node identity provider used to + /// stamp SiteCallOperational.SourceNode on every cached-call + /// lifecycle row this bridge emits. Optional — when null (legacy hosts / + /// tests that don't register the provider) SourceNode stays null and + /// central persists the SiteCalls row with SourceNode NULL. + /// + private readonly INodeIdentityProvider? _nodeIdentity; + public CachedCallLifecycleBridge( ICachedCallTelemetryForwarder forwarder, - ILogger logger) + ILogger logger, + INodeIdentityProvider? nodeIdentity = null) { _forwarder = forwarder ?? throw new ArgumentNullException(nameof(forwarder)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _nodeIdentity = nodeIdentity; } /// @@ -114,7 +125,7 @@ public sealed class CachedCallLifecycleBridge : ICachedCallLifecycleObserver await _forwarder.ForwardAsync(packet, ct).ConfigureAwait(false); } - private static CachedCallTelemetry BuildPacket( + private CachedCallTelemetry BuildPacket( CachedCallAttemptContext context, AuditKind kind, AuditStatus status, @@ -162,9 +173,11 @@ public sealed class CachedCallLifecycleBridge : ICachedCallLifecycleObserver Channel: context.Channel, Target: context.Target, SourceSite: context.SourceSite, - // SourceNode: stamped by Task 14 once the bridge gets an - // INodeIdentityProvider; null until then. - SourceNode: null, + // SourceNode-stamping (Task 14): the local cluster node name + // (node-a/node-b on a site). Stamped from the injected + // INodeIdentityProvider; null when no provider was wired so + // central persists SiteCalls.SourceNode as NULL. + SourceNode: _nodeIdentity?.NodeName, Status: operationalStatus, RetryCount: context.RetryCount, LastError: lastError, diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs b/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs index dd3958e..897f047 100644 --- a/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs +++ b/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs @@ -53,6 +53,14 @@ public sealed class CachedCallTelemetryForwarder : ICachedCallTelemetryForwarder private readonly IOperationTrackingStore? _trackingStore; private readonly ILogger _logger; + /// + /// SourceNode-stamping (Task 14): local node identity provider used to + /// stamp the tracking-store row's SourceNode column on + /// RecordEnqueueAsync. Optional — when null (legacy / test hosts) + /// the column stays NULL on the tracking row. + /// + private readonly INodeIdentityProvider? _nodeIdentity; + /// /// Construct the forwarder. is optional — /// when null only the audit half of the packet is emitted, which matches @@ -65,11 +73,13 @@ public sealed class CachedCallTelemetryForwarder : ICachedCallTelemetryForwarder public CachedCallTelemetryForwarder( IAuditWriter auditWriter, IOperationTrackingStore? trackingStore, - ILogger logger) + ILogger logger, + INodeIdentityProvider? nodeIdentity = null) { _auditWriter = auditWriter ?? throw new ArgumentNullException(nameof(auditWriter)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _trackingStore = trackingStore; + _nodeIdentity = nodeIdentity; } /// @@ -128,16 +138,17 @@ public sealed class CachedCallTelemetryForwarder : ICachedCallTelemetryForwarder // Enqueue — insert-if-not-exists with the operational // channel as the kind discriminator. RetryCount is fixed // at 0 by the tracking store's INSERT contract. - // sourceNode plumbed through but left null here; stamping - // is wired in a later task (Task 14) once the - // INodeIdentityProvider is threaded into the forwarder. + // SourceNode-stamping (Task 14): stamp the local node + // name (node-a/node-b) from the injected + // INodeIdentityProvider; null when no provider was wired + // so the tracking row's SourceNode column stays NULL. await _trackingStore.RecordEnqueueAsync( telemetry.Operational.TrackedOperationId, telemetry.Operational.Channel, telemetry.Operational.Target, telemetry.Audit.SourceInstanceId, telemetry.Audit.SourceScript, - sourceNode: null, + sourceNode: _nodeIdentity?.NodeName, ct).ConfigureAwait(false); break; diff --git a/src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs b/src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs index e14d46a..a06282f 100644 --- a/src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs +++ b/src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs @@ -69,15 +69,20 @@ public class SiteCallAuditRepository : ISiteCallAuditRepository // Step 1: insert-if-not-exists. Like AuditLogRepository.InsertIfNotExistsAsync // this is check-then-act so a duplicate-key violation may surface under // concurrent inserts on the same id — caught + logged at Debug. + // + // SourceNode-stamping (Task 14): the column is included in the INSERT + // column list / VALUES so a fresh row carries the originating node + // name (node-a/node-b for site rows). A null SourceNode (legacy hosts + // / unstamped reconciled rows) writes NULL straight through. try { await _context.Database.ExecuteSqlInterpolatedAsync( $@"IF NOT EXISTS (SELECT 1 FROM dbo.SiteCalls WHERE TrackedOperationId = {idText}) INSERT INTO dbo.SiteCalls - (TrackedOperationId, Channel, Target, SourceSite, Status, RetryCount, + (TrackedOperationId, Channel, Target, SourceSite, SourceNode, Status, RetryCount, LastError, HttpStatus, CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc, IngestedAtUtc) VALUES - ({idText}, {siteCall.Channel}, {siteCall.Target}, {siteCall.SourceSite}, {siteCall.Status}, {siteCall.RetryCount}, + ({idText}, {siteCall.Channel}, {siteCall.Target}, {siteCall.SourceSite}, {siteCall.SourceNode}, {siteCall.Status}, {siteCall.RetryCount}, {siteCall.LastError}, {siteCall.HttpStatus}, {siteCall.CreatedAtUtc}, {siteCall.UpdatedAtUtc}, {siteCall.TerminalAtUtc}, {siteCall.IngestedAtUtc});", ct); } @@ -96,6 +101,21 @@ VALUES // string to the same rank table the caller uses; we only mutate if the // incoming rank is strictly greater. Same-rank (including // terminal-over-terminal) is a no-op — first-write-wins at each rank. + // + // SourceNode-stamping (Task 14): SourceNode is updated via + // COALESCE(@SourceNode, SourceNode). The operator returns @SourceNode + // when it is non-null, otherwise the stored value — so the column + // behaves protectively: a later packet that carries a null + // SourceNode (e.g. a reconciliation pull from an unstamped node) + // NEVER blanks out a value the first stamping packet set. A later + // packet that DOES carry a non-null SourceNode replaces the previous + // value — combined with the monotonic-rank guard this is + // "last-non-null-wins on rank advance", which lets a missing + // SourceNode be filled in later if Submit happened to be unstamped + // and an Attempt/Resolve carries the node identity. Within one + // lifecycle every packet should carry the same SourceNode value (one + // execution, one node) so the "overwrite" path is in practice + // idempotent. await _context.Database.ExecuteSqlInterpolatedAsync( $@"UPDATE dbo.SiteCalls SET Status = {siteCall.Status}, @@ -104,7 +124,8 @@ SET Status = {siteCall.Status}, HttpStatus = {siteCall.HttpStatus}, UpdatedAtUtc = {siteCall.UpdatedAtUtc}, TerminalAtUtc = {siteCall.TerminalAtUtc}, - IngestedAtUtc = {siteCall.IngestedAtUtc} + IngestedAtUtc = {siteCall.IngestedAtUtc}, + SourceNode = COALESCE({siteCall.SourceNode}, SourceNode) WHERE TrackedOperationId = {idText} AND {incomingRank} > (CASE Status WHEN 'Submitted' THEN 0 diff --git a/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs b/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs index 554acb1..55e781f 100644 --- a/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs +++ b/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs @@ -310,7 +310,11 @@ public class ScriptRuntimeContext _cachedForwarder, // Audit Log #23 (ParentExecutionId): the spawning execution's id, // threaded alongside _executionId. Null for non-routed runs. - _parentExecutionId); + _parentExecutionId, + // SourceNode-stamping (Task 14): the local node name (node-a/node-b), + // threaded so the cached-call telemetry construction sites can stamp + // it onto SiteCallOperational.SourceNode. + _sourceNode); /// /// WP-13: Provides access to database operations. @@ -334,7 +338,11 @@ public class ScriptRuntimeContext _cachedForwarder, // Audit Log #23 (ParentExecutionId): the spawning execution's id, // threaded alongside _executionId. Null for non-routed runs. - _parentExecutionId); + _parentExecutionId, + // SourceNode-stamping (Task 14): the local node name (node-a/node-b), + // threaded so Database.CachedWrite's CachedSubmit telemetry can + // stamp it onto SiteCallOperational.SourceNode. + _sourceNode); /// /// Provides access to the Notification Outbox API. @@ -453,6 +461,16 @@ public class ScriptRuntimeContext private readonly string? _sourceScript; private readonly ICachedCallTelemetryForwarder? _cachedForwarder; + /// + /// SourceNode-stamping (Task 14): the local cluster node name on + /// which this script is executing (node-a/node-b). + /// Stamped onto SiteCallOperational.SourceNode on the three + /// cached-call telemetry construction sites (CachedSubmit + the two + /// immediate-completion rows) so central can persist it on the + /// SiteCalls row. + /// + private readonly string? _sourceNode; + // Internal constructor for tests living in ScadaLink.SiteRuntime.Tests // (via InternalsVisibleTo). Production sites resolve the helper through // ScriptRuntimeContext.ExternalSystem. @@ -474,7 +492,8 @@ public class ScriptRuntimeContext string siteId = "", string? sourceScript = null, ICachedCallTelemetryForwarder? cachedForwarder = null, - Guid? parentExecutionId = null) + Guid? parentExecutionId = null, + string? sourceNode = null) { _client = client; _instanceName = instanceName; @@ -485,6 +504,7 @@ public class ScriptRuntimeContext _sourceScript = sourceScript; _cachedForwarder = cachedForwarder; _parentExecutionId = parentExecutionId; + _sourceNode = sourceNode; } public async Task Call( @@ -670,9 +690,11 @@ public class ScriptRuntimeContext Channel: "ApiOutbound", Target: target, SourceSite: _siteId, - // SourceNode: stamped by Task 14 once the script context - // gets an INodeIdentityProvider; null until then. - SourceNode: null, + // SourceNode-stamping (Task 14): the local node name + // (node-a/node-b) — threaded through INodeIdentityProvider + // at the ScriptExecutionActor; null when no provider was + // wired so central persists SiteCalls.SourceNode as NULL. + SourceNode: _sourceNode, Status: "Submitted", RetryCount: 0, LastError: null, @@ -791,9 +813,11 @@ public class ScriptRuntimeContext Channel: "ApiOutbound", Target: target, SourceSite: _siteId, - // SourceNode: stamped by Task 14 once the script context - // gets an INodeIdentityProvider; null until then. - SourceNode: null, + // SourceNode-stamping (Task 14): the local node name + // (node-a/node-b) — threaded through INodeIdentityProvider + // at the ScriptExecutionActor; null when no provider was + // wired so central persists SiteCalls.SourceNode as NULL. + SourceNode: _sourceNode, Status: "Attempted", // RetryCount stays 0 — the operation never reached the // S&F retry sweep, so no retries were performed. @@ -861,9 +885,11 @@ public class ScriptRuntimeContext Channel: "ApiOutbound", Target: target, SourceSite: _siteId, - // SourceNode: stamped by Task 14 once the script context - // gets an INodeIdentityProvider; null until then. - SourceNode: null, + // SourceNode-stamping (Task 14): the local node name + // (node-a/node-b) — threaded through INodeIdentityProvider + // at the ScriptExecutionActor; null when no provider was + // wired so central persists SiteCalls.SourceNode as NULL. + SourceNode: _sourceNode, Status: operationalTerminalStatus, RetryCount: 0, LastError: result.Success ? null : result.ErrorMessage, @@ -1120,6 +1146,15 @@ public class ScriptRuntimeContext /// private readonly IAuditWriter? _auditWriter; + /// + /// SourceNode-stamping (Task 14): the local cluster node name on + /// which this script is executing (node-a/node-b). + /// Stamped onto SiteCallOperational.SourceNode at the + /// Database.CachedWrite CachedSubmit telemetry construction + /// site so central can persist it on the SiteCalls row. + /// + private readonly string? _sourceNode; + // Parameter ordering: executionId sits immediately after the // ILogger — see the note on ExternalSystemHelper's ctor for why the // post-logger slot is the one consistent position across all four @@ -1133,7 +1168,8 @@ public class ScriptRuntimeContext string siteId = "", string? sourceScript = null, ICachedCallTelemetryForwarder? cachedForwarder = null, - Guid? parentExecutionId = null) + Guid? parentExecutionId = null, + string? sourceNode = null) { _gateway = gateway; _instanceName = instanceName; @@ -1144,6 +1180,7 @@ public class ScriptRuntimeContext _sourceScript = sourceScript; _cachedForwarder = cachedForwarder; _parentExecutionId = parentExecutionId; + _sourceNode = sourceNode; } public async Task Connection( @@ -1274,9 +1311,11 @@ public class ScriptRuntimeContext Channel: "DbOutbound", Target: target, SourceSite: _siteId, - // SourceNode: stamped by Task 14 once the script context - // gets an INodeIdentityProvider; null until then. - SourceNode: null, + // SourceNode-stamping (Task 14): the local node name + // (node-a/node-b) — threaded through INodeIdentityProvider + // at the ScriptExecutionActor; null when no provider was + // wired so central persists SiteCalls.SourceNode as NULL. + SourceNode: _sourceNode, Status: "Submitted", RetryCount: 0, LastError: null, diff --git a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallLifecycleBridgeTests.cs b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallLifecycleBridgeTests.cs index 4185ab1..bfde0eb 100644 --- a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallLifecycleBridgeTests.cs +++ b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallLifecycleBridgeTests.cs @@ -327,4 +327,68 @@ public class CachedCallLifecycleBridgeTests Assert.NotNull(captured); Assert.Null(captured!.Audit.ParentExecutionId); } + + // ── SourceNode-stamping (Task 14) ── + + [Fact] + public async Task RetryLoopRow_StampsSourceNode_FromNodeIdentityProvider() + { + // SourceNode-stamping (Task 14): when an INodeIdentityProvider is + // wired the bridge stamps the local node name (node-a/node-b) onto + // the SiteCallOperational.SourceNode column of every emitted packet. + var nodeIdentity = Substitute.For(); + nodeIdentity.NodeName.Returns("node-a"); + + var captured = new List(); + _forwarder.ForwardAsync(Arg.Do(t => captured.Add(t)), Arg.Any()) + .Returns(Task.CompletedTask); + + var sut = new CachedCallLifecycleBridge( + _forwarder, NullLogger.Instance, nodeIdentity); + + await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.Delivered)); + + Assert.Equal(2, captured.Count); + Assert.All(captured, p => Assert.Equal("node-a", p.Operational.SourceNode)); + } + + [Fact] + public async Task RetryLoopRow_NoNodeIdentityProvider_LeavesSourceNodeNull() + { + // When no INodeIdentityProvider is wired (legacy hosts / tests) the + // bridge degrades to a null SourceNode rather than throwing. The + // emitted packet's SourceNode is null so the central row persists NULL. + var captured = new List(); + _forwarder.ForwardAsync(Arg.Do(t => captured.Add(t)), Arg.Any()) + .Returns(Task.CompletedTask); + + // Default CreateSut() does NOT pass a node-identity provider. + var sut = CreateSut(); + await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.TransientFailure)); + + var packet = Assert.Single(captured); + Assert.Null(packet.Operational.SourceNode); + } + + [Fact] + public async Task RetryLoopRow_NodeIdentityWithNullNodeName_LeavesSourceNodeNull() + { + // The provider exists but reports a null NodeName (unconfigured). The + // bridge must pass that null through to SourceNode rather than + // falling back to a placeholder. + var nodeIdentity = Substitute.For(); + nodeIdentity.NodeName.Returns((string?)null); + + var captured = new List(); + _forwarder.ForwardAsync(Arg.Do(t => captured.Add(t)), Arg.Any()) + .Returns(Task.CompletedTask); + + var sut = new CachedCallLifecycleBridge( + _forwarder, NullLogger.Instance, nodeIdentity); + + await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.TransientFailure)); + + var packet = Assert.Single(captured); + Assert.Null(packet.Operational.SourceNode); + } } diff --git a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallTelemetryForwarderTests.cs b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallTelemetryForwarderTests.cs index 7be51a6..e5de6d5 100644 --- a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallTelemetryForwarderTests.cs +++ b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallTelemetryForwarderTests.cs @@ -135,7 +135,11 @@ public class CachedCallTelemetryForwarderTests Arg.Any()); // Tracking row: insert-if-not-exists with kind discriminator. - // sourceNode is null until Task 14 wires the INodeIdentityProvider through. + // Default CreateSut() does NOT supply an INodeIdentityProvider, so the + // forwarder passes null sourceNode to RecordEnqueueAsync (legacy / test + // host behaviour). The Task 14 stamping path is covered by the + // ForwardAsync_Submit_StampsSourceNode_FromNodeIdentityProvider test + // below. await _tracking.Received(1).RecordEnqueueAsync( _id, "ApiOutbound", @@ -249,4 +253,55 @@ public class CachedCallTelemetryForwarderTests await Assert.ThrowsAsync( () => sut.ForwardAsync(null!, CancellationToken.None)); } + + // ── SourceNode-stamping (Task 14) ── + + [Fact] + public async Task ForwardAsync_Submit_StampsSourceNode_FromNodeIdentityProvider() + { + // SourceNode-stamping (Task 14): when an INodeIdentityProvider is + // wired the forwarder must stamp its NodeName onto the + // RecordEnqueueAsync sourceNode parameter so the tracking row + // captures the originating node (node-a/node-b). + var nodeIdentity = Substitute.For(); + nodeIdentity.NodeName.Returns("node-a"); + + var sut = new CachedCallTelemetryForwarder( + _writer, _tracking, NullLogger.Instance, nodeIdentity); + + await sut.ForwardAsync(SubmitPacket(), CancellationToken.None); + + await _tracking.Received(1).RecordEnqueueAsync( + _id, + "ApiOutbound", + "ERP.GetOrder", + "inst-1", + "ScriptActor:doStuff", + "node-a", + Arg.Any()); + } + + [Fact] + public async Task ForwardAsync_Submit_NodeIdentityNullNodeName_PassesNullSourceNode() + { + // The provider exists but reports a null NodeName (unconfigured). + // The forwarder passes that null through to RecordEnqueueAsync rather + // than falling back to a placeholder string. + var nodeIdentity = Substitute.For(); + nodeIdentity.NodeName.Returns((string?)null); + + var sut = new CachedCallTelemetryForwarder( + _writer, _tracking, NullLogger.Instance, nodeIdentity); + + await sut.ForwardAsync(SubmitPacket(), CancellationToken.None); + + await _tracking.Received(1).RecordEnqueueAsync( + _id, + "ApiOutbound", + "ERP.GetOrder", + "inst-1", + "ScriptActor:doStuff", + null, + Arg.Any()); + } } diff --git a/tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/SiteCallAuditRepositoryTests.cs b/tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/SiteCallAuditRepositoryTests.cs index 67d93f2..4e77af7 100644 --- a/tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/SiteCallAuditRepositoryTests.cs +++ b/tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/SiteCallAuditRepositoryTests.cs @@ -520,7 +520,8 @@ public class SiteCallAuditRepositoryTests : IClassFixture DateTime? createdAtUtc = null, DateTime? updatedAtUtc = null, bool terminal = false, - DateTime? terminalAtUtc = null) + DateTime? terminalAtUtc = null, + string? sourceNode = null) { var created = createdAtUtc ?? DateTime.UtcNow; var updated = updatedAtUtc ?? created; @@ -534,6 +535,7 @@ public class SiteCallAuditRepositoryTests : IClassFixture Channel = "ApiOutbound", Target = "ERP.GetOrder", SourceSite = sourceSite ?? NewSiteId(), + SourceNode = sourceNode, Status = status, RetryCount = retryCount, LastError = lastError, @@ -544,4 +546,159 @@ public class SiteCallAuditRepositoryTests : IClassFixture IngestedAtUtc = DateTime.UtcNow, }; } + + // --- SourceNode-stamping (Task 14) -------------------------------------- + + [SkippableFact] + public async Task UpsertAsync_PersistsSourceNode_OnFreshInsert() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + // SourceNode-stamping (Task 14): a fresh INSERT must persist the + // SourceNode column verbatim — the central row carries the originating + // site node name end-to-end. + var id = TrackedOperationId.New(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + + await repo.UpsertAsync(NewRow(id, status: "Submitted", sourceNode: "node-a")); + + var loaded = await repo.GetAsync(id); + Assert.NotNull(loaded); + Assert.Equal("node-a", loaded!.SourceNode); + } + + [SkippableFact] + public async Task UpsertAsync_PreservesSourceNode_WhenLaterPacketCarriesNull() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + // SourceNode-stamping (Task 14): the UPDATE uses + // COALESCE(@SourceNode, SourceNode) so a subsequent packet that does + // NOT carry a SourceNode (legacy / reconciliation pull from an + // unstamped node) MUST NOT blank out the value the first packet set. + // Combined with the monotonic-rank guard the Status advances but the + // SourceNode survives. + // + // Each step uses a fresh DbContext — raw-SQL UPDATEs bypass the + // change tracker, so reusing a single context whose entity is already + // tracked masks the post-UPDATE state on a follow-up FindAsync. + var id = TrackedOperationId.New(); + await using (var context = CreateContext()) + { + var repo = new SiteCallAuditRepository(context); + // First packet: stamped Submit from node-a. + await repo.UpsertAsync(NewRow(id, status: "Submitted", sourceNode: "node-a")); + } + await using (var context = CreateContext()) + { + var repo = new SiteCallAuditRepository(context); + // Later packet: rank-advancing Attempted with null SourceNode. + await repo.UpsertAsync(NewRow(id, status: "Attempted", retryCount: 1, sourceNode: null)); + } + + await using (var readContext = CreateContext()) + { + var readRepo = new SiteCallAuditRepository(readContext); + var loaded = await readRepo.GetAsync(id); + Assert.NotNull(loaded); + // SourceNode preserved despite the null on the later packet. + Assert.Equal("node-a", loaded!.SourceNode); + // Status advanced — proves the UPDATE branch actually ran. + Assert.Equal("Attempted", loaded.Status); + Assert.Equal(1, loaded.RetryCount); + } + } + + [SkippableFact] + public async Task UpsertAsync_NonNullIncomingSourceNode_OverwritesPreviousValueOnRankAdvance() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + // SourceNode-stamping (Task 14): per the COALESCE(@SourceNode, + // SourceNode) semantics the column protects against a *null* + // incoming value blanking a previously-stamped one, but a non-null + // incoming value DOES replace the existing value on a rank-advancing + // packet. This is the "last-non-null-wins on advance" behaviour the + // SQL operator literally implements — see the comment in + // SiteCallAuditRepository.UpsertAsync. + // + // In practice both stamps within a single lifecycle SHOULD carry the + // same value (same node, same execution); a divergence would imply a + // mid-lifecycle node change (e.g. failover handing off to node-b) and + // letting the latest stamp through is arguably the right call. This + // test pins the actual behaviour so we notice if the SQL gets + // inverted (to a true first-write-wins COALESCE(SourceNode, + // @SourceNode)) inadvertently. + var id = TrackedOperationId.New(); + await using (var context = CreateContext()) + { + var repo = new SiteCallAuditRepository(context); + await repo.UpsertAsync(NewRow(id, status: "Submitted", sourceNode: "node-a")); + } + await using (var context = CreateContext()) + { + var repo = new SiteCallAuditRepository(context); + await repo.UpsertAsync(NewRow(id, status: "Attempted", retryCount: 1, sourceNode: "node-b")); + } + + await using (var readContext = CreateContext()) + { + var readRepo = new SiteCallAuditRepository(readContext); + var loaded = await readRepo.GetAsync(id); + Assert.NotNull(loaded); + // Incoming non-null wins — node-b replaces node-a on rank advance. + Assert.Equal("node-b", loaded!.SourceNode); + // Other monotonic fields advanced too — proves the UPDATE ran. + Assert.Equal("Attempted", loaded.Status); + } + } + + [SkippableFact] + public async Task UpsertAsync_FillsSourceNode_WhenInsertWasNullAndLaterPacketCarriesValue() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + // SourceNode-stamping (Task 14): when the column was left NULL by an + // earlier unstamped packet, a later rank-advancing packet with a + // non-null SourceNode fills it — the COALESCE(@SourceNode, SourceNode) + // SQL operator returns @SourceNode when @SourceNode is non-null, so + // the incoming value wins over the existing NULL. This is the + // recovery path for an initially-unstamped lifecycle whose later + // packets carry the node identity. + // + // The intermediate verification and final read use FRESH contexts — + // FindAsync hits the change tracker first, so a cached entity from + // an earlier read in the same context can mask a raw-SQL UPDATE. + var id = TrackedOperationId.New(); + await using (var context = CreateContext()) + { + var repo = new SiteCallAuditRepository(context); + await repo.UpsertAsync(NewRow(id, status: "Submitted", sourceNode: null)); + } + + // Verify the INSERT left SourceNode NULL via a fresh context. + await using (var verifyContext = CreateContext()) + { + var verifyRepo = new SiteCallAuditRepository(verifyContext); + var afterInsert = await verifyRepo.GetAsync(id); + Assert.NotNull(afterInsert); + Assert.Null(afterInsert!.SourceNode); + } + + await using (var context = CreateContext()) + { + var repo = new SiteCallAuditRepository(context); + await repo.UpsertAsync(NewRow(id, status: "Attempted", retryCount: 1, sourceNode: "node-a")); + } + + await using (var readContext = CreateContext()) + { + var readRepo = new SiteCallAuditRepository(readContext); + var loaded = await readRepo.GetAsync(id); + Assert.NotNull(loaded); + Assert.Equal("node-a", loaded!.SourceNode); + Assert.Equal("Attempted", loaded.Status); + } + } } diff --git a/tests/ScadaLink.SiteCallAudit.Tests/SiteCallAuditActorTests.cs b/tests/ScadaLink.SiteCallAudit.Tests/SiteCallAuditActorTests.cs index 3c39257..42093c5 100644 --- a/tests/ScadaLink.SiteCallAudit.Tests/SiteCallAuditActorTests.cs +++ b/tests/ScadaLink.SiteCallAudit.Tests/SiteCallAuditActorTests.cs @@ -49,7 +49,8 @@ public class SiteCallAuditActorTests : TestKit, IClassFixture(TimeSpan.FromSeconds(10)); + Assert.True(reply.Accepted); + + await using var readContext = CreateContext(); + var stored = await readContext.Set() + .Where(s => s.TrackedOperationId == id) + .ToListAsync(); + Assert.Single(stored); + Assert.Equal("node-a", stored[0].SourceNode); + } + + [SkippableFact] + public async Task UpsertSiteCallCommand_NullSourceNode_PersistsNull() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + // Mirror of the above for unstamped packets — a command with null + // SourceNode persists NULL on the row rather than falling back to a + // placeholder. The first audit packet from a legacy host (or a node + // without INodeIdentityProvider wired) must NOT inject a fabricated + // value central-side. + var siteId = NewSiteId(); + var id = TrackedOperationId.New(); + var row = NewRow(id, siteId, status: "Submitted", sourceNode: null); + + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + var actor = CreateActor(repo); + + actor.Tell(new UpsertSiteCallCommand(row), TestActor); + var reply = ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.True(reply.Accepted); + + await using var readContext = CreateContext(); + var stored = await readContext.Set() + .Where(s => s.TrackedOperationId == id) + .ToListAsync(); + Assert.Single(stored); + Assert.Null(stored[0].SourceNode); + } + /// /// Test double whose always /// throws — used to verify the query handler's failure projection produces a diff --git a/tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs index 7ed9725..ac90f4f 100644 --- a/tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs +++ b/tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs @@ -570,4 +570,69 @@ public class ExternalSystemCachedCallEmissionTests var only = Assert.Single(forwarder.Telemetry); Assert.Equal(AuditKind.CachedSubmit, only.Audit.Kind); } + + // ── SourceNode-stamping (Task 14) ── + + [Fact] + public async Task CachedCall_StampsSourceNode_OnEverySiteCallOperationalRow() + { + // SourceNode-stamping (Task 14): when the helper is constructed with + // a non-null sourceNode, every SiteCallOperational it produces + // (CachedSubmit on enqueue + the immediate-completion Attempted/ + // CachedResolve pair when WasBuffered=false) carries that node name. + var client = new Mock(); + client + .Setup(c => c.CachedCallAsync( + "ERP", "GetOrder", + It.IsAny?>(), + InstanceName, + It.IsAny(), + It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny())) + // Immediate completion — helper produces all three rows itself. + .ReturnsAsync(new ExternalCallResult(true, "{\"ok\":true}", null, WasBuffered: false)); + var forwarder = new CapturingForwarder(); + + var helper = new ScriptRuntimeContext.ExternalSystemHelper( + client.Object, + InstanceName, + NullLogger.Instance, + TestExecutionId, + auditWriter: null, + siteId: SiteId, + sourceScript: SourceScript, + cachedForwarder: forwarder, + parentExecutionId: null, + sourceNode: "node-a"); + + await helper.CachedCall("ERP", "GetOrder"); + + Assert.Equal(3, forwarder.Telemetry.Count); + Assert.All(forwarder.Telemetry, t => Assert.Equal("node-a", t.Operational.SourceNode)); + } + + [Fact] + public async Task CachedCall_NoSourceNodeWired_LeavesSourceNodeNull() + { + // Default CreateHelper does NOT pass sourceNode — the legacy / test + // host path. Every operational row carries null SourceNode, leaving + // central's SiteCalls.SourceNode NULL. + var client = new Mock(); + client + .Setup(c => c.CachedCallAsync( + "ERP", "GetOrder", + It.IsAny?>(), + InstanceName, + It.IsAny(), + It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true)); + var forwarder = new CapturingForwarder(); + + var helper = CreateHelper(client.Object, forwarder); + await helper.CachedCall("ERP", "GetOrder"); + + var only = Assert.Single(forwarder.Telemetry); + Assert.Null(only.Operational.SourceNode); + } }