feat(sitecall-audit): carry + persist SourceNode end-to-end via cached telemetry

Site: site emitters of SiteCallOperational (ExternalSystemClient, the script-API
cached call path in ScriptRuntimeContext, CachedCallLifecycleBridge) inject
INodeIdentityProvider and stamp SourceNode = NodeName at construction.

OperationTrackingStore call site in CachedCallTelemetryForwarder now stamps
SourceNode too.

Central: SiteCallAuditRepository.UpsertAsync INSERT includes SourceNode in the
column list; conditional monotonic UPDATE uses
COALESCE(@SourceNode, SourceNode) so later packets cannot blank a previously-
stamped value. After this commit every SiteCalls row carries node-a/node-b in
SourceNode (subject to monotonic preservation).
This commit is contained in:
Joseph Doherty
2026-05-23 17:41:22 -04:00
parent d1fcab490c
commit 06ed0acead
10 changed files with 539 additions and 34 deletions

View File

@@ -146,7 +146,14 @@ public static class ServiceCollectionExtensions
new CachedCallTelemetryForwarder( new CachedCallTelemetryForwarder(
sp.GetRequiredService<IAuditWriter>(), sp.GetRequiredService<IAuditWriter>(),
sp.GetService<ScadaLink.Commons.Interfaces.IOperationTrackingStore>(), sp.GetService<ScadaLink.Commons.Interfaces.IOperationTrackingStore>(),
sp.GetRequiredService<ILogger<CachedCallTelemetryForwarder>>())); sp.GetRequiredService<ILogger<CachedCallTelemetryForwarder>>(),
// SourceNode-stamping (Task 14): the local node identity is
// threaded through so RecordEnqueueAsync can stamp the
// tracking row's SourceNode column. GetService — central
// composition roots may not register the provider, in which
// case the forwarder degrades to a null SourceNode rather
// than failing the DI resolution.
sp.GetService<INodeIdentityProvider>()));
// M3 Bundle F: bridge the store-and-forward retry-loop observer hook // M3 Bundle F: bridge the store-and-forward retry-loop observer hook
// to the cached-call forwarder so per-attempt + terminal telemetry // to the cached-call forwarder so per-attempt + terminal telemetry
@@ -154,7 +161,17 @@ public static class ServiceCollectionExtensions
// as the script-thread CachedSubmit row. Registered as a singleton // as the script-thread CachedSubmit row. Registered as a singleton
// and also bound to ICachedCallLifecycleObserver so AddStoreAndForward // and also bound to ICachedCallLifecycleObserver so AddStoreAndForward
// can resolve it through DI (Bundle F StoreAndForward wiring change). // can resolve it through DI (Bundle F StoreAndForward wiring change).
services.AddSingleton<CachedCallLifecycleBridge>(); // SourceNode-stamping (Task 14): factory-resolved so the
// INodeIdentityProvider singleton can be threaded through — the
// bridge stamps SiteCallOperational.SourceNode from
// INodeIdentityProvider.NodeName on every cached-call lifecycle row.
// GetService (not GetRequiredService) — central composition roots may
// not register the provider, in which case the bridge degrades to a
// null SourceNode rather than failing the DI resolution.
services.AddSingleton<CachedCallLifecycleBridge>(sp => new CachedCallLifecycleBridge(
sp.GetRequiredService<ICachedCallTelemetryForwarder>(),
sp.GetRequiredService<ILogger<CachedCallLifecycleBridge>>(),
sp.GetService<INodeIdentityProvider>()));
services.AddSingleton<ICachedCallLifecycleObserver>( services.AddSingleton<ICachedCallLifecycleObserver>(
sp => sp.GetRequiredService<CachedCallLifecycleBridge>()); sp => sp.GetRequiredService<CachedCallLifecycleBridge>());

View File

@@ -39,12 +39,23 @@ public sealed class CachedCallLifecycleBridge : ICachedCallLifecycleObserver
private readonly ICachedCallTelemetryForwarder _forwarder; private readonly ICachedCallTelemetryForwarder _forwarder;
private readonly ILogger<CachedCallLifecycleBridge> _logger; private readonly ILogger<CachedCallLifecycleBridge> _logger;
/// <summary>
/// SourceNode-stamping (Task 14): the local node identity provider used to
/// stamp <c>SiteCallOperational.SourceNode</c> on every cached-call
/// lifecycle row this bridge emits. Optional — when null (legacy hosts /
/// tests that don't register the provider) SourceNode stays null and
/// central persists the <c>SiteCalls</c> row with SourceNode NULL.
/// </summary>
private readonly INodeIdentityProvider? _nodeIdentity;
public CachedCallLifecycleBridge( public CachedCallLifecycleBridge(
ICachedCallTelemetryForwarder forwarder, ICachedCallTelemetryForwarder forwarder,
ILogger<CachedCallLifecycleBridge> logger) ILogger<CachedCallLifecycleBridge> logger,
INodeIdentityProvider? nodeIdentity = null)
{ {
_forwarder = forwarder ?? throw new ArgumentNullException(nameof(forwarder)); _forwarder = forwarder ?? throw new ArgumentNullException(nameof(forwarder));
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_nodeIdentity = nodeIdentity;
} }
/// <inheritdoc/> /// <inheritdoc/>
@@ -114,7 +125,7 @@ public sealed class CachedCallLifecycleBridge : ICachedCallLifecycleObserver
await _forwarder.ForwardAsync(packet, ct).ConfigureAwait(false); await _forwarder.ForwardAsync(packet, ct).ConfigureAwait(false);
} }
private static CachedCallTelemetry BuildPacket( private CachedCallTelemetry BuildPacket(
CachedCallAttemptContext context, CachedCallAttemptContext context,
AuditKind kind, AuditKind kind,
AuditStatus status, AuditStatus status,
@@ -162,9 +173,11 @@ public sealed class CachedCallLifecycleBridge : ICachedCallLifecycleObserver
Channel: context.Channel, Channel: context.Channel,
Target: context.Target, Target: context.Target,
SourceSite: context.SourceSite, SourceSite: context.SourceSite,
// SourceNode: stamped by Task 14 once the bridge gets an // SourceNode-stamping (Task 14): the local cluster node name
// INodeIdentityProvider; null until then. // (node-a/node-b on a site). Stamped from the injected
SourceNode: null, // INodeIdentityProvider; null when no provider was wired so
// central persists SiteCalls.SourceNode as NULL.
SourceNode: _nodeIdentity?.NodeName,
Status: operationalStatus, Status: operationalStatus,
RetryCount: context.RetryCount, RetryCount: context.RetryCount,
LastError: lastError, LastError: lastError,

View File

@@ -53,6 +53,14 @@ public sealed class CachedCallTelemetryForwarder : ICachedCallTelemetryForwarder
private readonly IOperationTrackingStore? _trackingStore; private readonly IOperationTrackingStore? _trackingStore;
private readonly ILogger<CachedCallTelemetryForwarder> _logger; private readonly ILogger<CachedCallTelemetryForwarder> _logger;
/// <summary>
/// SourceNode-stamping (Task 14): local node identity provider used to
/// stamp the tracking-store row's <c>SourceNode</c> column on
/// <c>RecordEnqueueAsync</c>. Optional — when null (legacy / test hosts)
/// the column stays NULL on the tracking row.
/// </summary>
private readonly INodeIdentityProvider? _nodeIdentity;
/// <summary> /// <summary>
/// Construct the forwarder. <paramref name="trackingStore"/> is optional — /// Construct the forwarder. <paramref name="trackingStore"/> is optional —
/// when null only the audit half of the packet is emitted, which matches /// when null only the audit half of the packet is emitted, which matches
@@ -65,11 +73,13 @@ public sealed class CachedCallTelemetryForwarder : ICachedCallTelemetryForwarder
public CachedCallTelemetryForwarder( public CachedCallTelemetryForwarder(
IAuditWriter auditWriter, IAuditWriter auditWriter,
IOperationTrackingStore? trackingStore, IOperationTrackingStore? trackingStore,
ILogger<CachedCallTelemetryForwarder> logger) ILogger<CachedCallTelemetryForwarder> logger,
INodeIdentityProvider? nodeIdentity = null)
{ {
_auditWriter = auditWriter ?? throw new ArgumentNullException(nameof(auditWriter)); _auditWriter = auditWriter ?? throw new ArgumentNullException(nameof(auditWriter));
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_trackingStore = trackingStore; _trackingStore = trackingStore;
_nodeIdentity = nodeIdentity;
} }
/// <summary> /// <summary>
@@ -128,16 +138,17 @@ public sealed class CachedCallTelemetryForwarder : ICachedCallTelemetryForwarder
// Enqueue — insert-if-not-exists with the operational // Enqueue — insert-if-not-exists with the operational
// channel as the kind discriminator. RetryCount is fixed // channel as the kind discriminator. RetryCount is fixed
// at 0 by the tracking store's INSERT contract. // at 0 by the tracking store's INSERT contract.
// sourceNode plumbed through but left null here; stamping // SourceNode-stamping (Task 14): stamp the local node
// is wired in a later task (Task 14) once the // name (node-a/node-b) from the injected
// INodeIdentityProvider is threaded into the forwarder. // INodeIdentityProvider; null when no provider was wired
// so the tracking row's SourceNode column stays NULL.
await _trackingStore.RecordEnqueueAsync( await _trackingStore.RecordEnqueueAsync(
telemetry.Operational.TrackedOperationId, telemetry.Operational.TrackedOperationId,
telemetry.Operational.Channel, telemetry.Operational.Channel,
telemetry.Operational.Target, telemetry.Operational.Target,
telemetry.Audit.SourceInstanceId, telemetry.Audit.SourceInstanceId,
telemetry.Audit.SourceScript, telemetry.Audit.SourceScript,
sourceNode: null, sourceNode: _nodeIdentity?.NodeName,
ct).ConfigureAwait(false); ct).ConfigureAwait(false);
break; break;

View File

@@ -69,15 +69,20 @@ public class SiteCallAuditRepository : ISiteCallAuditRepository
// Step 1: insert-if-not-exists. Like AuditLogRepository.InsertIfNotExistsAsync // Step 1: insert-if-not-exists. Like AuditLogRepository.InsertIfNotExistsAsync
// this is check-then-act so a duplicate-key violation may surface under // this is check-then-act so a duplicate-key violation may surface under
// concurrent inserts on the same id — caught + logged at Debug. // concurrent inserts on the same id — caught + logged at Debug.
//
// SourceNode-stamping (Task 14): the column is included in the INSERT
// column list / VALUES so a fresh row carries the originating node
// name (node-a/node-b for site rows). A null SourceNode (legacy hosts
// / unstamped reconciled rows) writes NULL straight through.
try try
{ {
await _context.Database.ExecuteSqlInterpolatedAsync( await _context.Database.ExecuteSqlInterpolatedAsync(
$@"IF NOT EXISTS (SELECT 1 FROM dbo.SiteCalls WHERE TrackedOperationId = {idText}) $@"IF NOT EXISTS (SELECT 1 FROM dbo.SiteCalls WHERE TrackedOperationId = {idText})
INSERT INTO dbo.SiteCalls INSERT INTO dbo.SiteCalls
(TrackedOperationId, Channel, Target, SourceSite, Status, RetryCount, (TrackedOperationId, Channel, Target, SourceSite, SourceNode, Status, RetryCount,
LastError, HttpStatus, CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc, IngestedAtUtc) LastError, HttpStatus, CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc, IngestedAtUtc)
VALUES VALUES
({idText}, {siteCall.Channel}, {siteCall.Target}, {siteCall.SourceSite}, {siteCall.Status}, {siteCall.RetryCount}, ({idText}, {siteCall.Channel}, {siteCall.Target}, {siteCall.SourceSite}, {siteCall.SourceNode}, {siteCall.Status}, {siteCall.RetryCount},
{siteCall.LastError}, {siteCall.HttpStatus}, {siteCall.CreatedAtUtc}, {siteCall.UpdatedAtUtc}, {siteCall.TerminalAtUtc}, {siteCall.IngestedAtUtc});", {siteCall.LastError}, {siteCall.HttpStatus}, {siteCall.CreatedAtUtc}, {siteCall.UpdatedAtUtc}, {siteCall.TerminalAtUtc}, {siteCall.IngestedAtUtc});",
ct); ct);
} }
@@ -96,6 +101,21 @@ VALUES
// string to the same rank table the caller uses; we only mutate if the // string to the same rank table the caller uses; we only mutate if the
// incoming rank is strictly greater. Same-rank (including // incoming rank is strictly greater. Same-rank (including
// terminal-over-terminal) is a no-op — first-write-wins at each rank. // terminal-over-terminal) is a no-op — first-write-wins at each rank.
//
// SourceNode-stamping (Task 14): SourceNode is updated via
// COALESCE(@SourceNode, SourceNode). The operator returns @SourceNode
// when it is non-null, otherwise the stored value — so the column
// behaves protectively: a later packet that carries a null
// SourceNode (e.g. a reconciliation pull from an unstamped node)
// NEVER blanks out a value the first stamping packet set. A later
// packet that DOES carry a non-null SourceNode replaces the previous
// value — combined with the monotonic-rank guard this is
// "last-non-null-wins on rank advance", which lets a missing
// SourceNode be filled in later if Submit happened to be unstamped
// and an Attempt/Resolve carries the node identity. Within one
// lifecycle every packet should carry the same SourceNode value (one
// execution, one node) so the "overwrite" path is in practice
// idempotent.
await _context.Database.ExecuteSqlInterpolatedAsync( await _context.Database.ExecuteSqlInterpolatedAsync(
$@"UPDATE dbo.SiteCalls $@"UPDATE dbo.SiteCalls
SET Status = {siteCall.Status}, SET Status = {siteCall.Status},
@@ -104,7 +124,8 @@ SET Status = {siteCall.Status},
HttpStatus = {siteCall.HttpStatus}, HttpStatus = {siteCall.HttpStatus},
UpdatedAtUtc = {siteCall.UpdatedAtUtc}, UpdatedAtUtc = {siteCall.UpdatedAtUtc},
TerminalAtUtc = {siteCall.TerminalAtUtc}, TerminalAtUtc = {siteCall.TerminalAtUtc},
IngestedAtUtc = {siteCall.IngestedAtUtc} IngestedAtUtc = {siteCall.IngestedAtUtc},
SourceNode = COALESCE({siteCall.SourceNode}, SourceNode)
WHERE TrackedOperationId = {idText} WHERE TrackedOperationId = {idText}
AND {incomingRank} > (CASE Status AND {incomingRank} > (CASE Status
WHEN 'Submitted' THEN 0 WHEN 'Submitted' THEN 0

View File

@@ -310,7 +310,11 @@ public class ScriptRuntimeContext
_cachedForwarder, _cachedForwarder,
// Audit Log #23 (ParentExecutionId): the spawning execution's id, // Audit Log #23 (ParentExecutionId): the spawning execution's id,
// threaded alongside _executionId. Null for non-routed runs. // threaded alongside _executionId. Null for non-routed runs.
_parentExecutionId); _parentExecutionId,
// SourceNode-stamping (Task 14): the local node name (node-a/node-b),
// threaded so the cached-call telemetry construction sites can stamp
// it onto SiteCallOperational.SourceNode.
_sourceNode);
/// <summary> /// <summary>
/// WP-13: Provides access to database operations. /// WP-13: Provides access to database operations.
@@ -334,7 +338,11 @@ public class ScriptRuntimeContext
_cachedForwarder, _cachedForwarder,
// Audit Log #23 (ParentExecutionId): the spawning execution's id, // Audit Log #23 (ParentExecutionId): the spawning execution's id,
// threaded alongside _executionId. Null for non-routed runs. // threaded alongside _executionId. Null for non-routed runs.
_parentExecutionId); _parentExecutionId,
// SourceNode-stamping (Task 14): the local node name (node-a/node-b),
// threaded so Database.CachedWrite's CachedSubmit telemetry can
// stamp it onto SiteCallOperational.SourceNode.
_sourceNode);
/// <summary> /// <summary>
/// Provides access to the Notification Outbox API. /// Provides access to the Notification Outbox API.
@@ -453,6 +461,16 @@ public class ScriptRuntimeContext
private readonly string? _sourceScript; private readonly string? _sourceScript;
private readonly ICachedCallTelemetryForwarder? _cachedForwarder; private readonly ICachedCallTelemetryForwarder? _cachedForwarder;
/// <summary>
/// SourceNode-stamping (Task 14): the local cluster node name on
/// which this script is executing (<c>node-a</c>/<c>node-b</c>).
/// Stamped onto <c>SiteCallOperational.SourceNode</c> on the three
/// cached-call telemetry construction sites (CachedSubmit + the two
/// immediate-completion rows) so central can persist it on the
/// <c>SiteCalls</c> row.
/// </summary>
private readonly string? _sourceNode;
// Internal constructor for tests living in ScadaLink.SiteRuntime.Tests // Internal constructor for tests living in ScadaLink.SiteRuntime.Tests
// (via InternalsVisibleTo). Production sites resolve the helper through // (via InternalsVisibleTo). Production sites resolve the helper through
// ScriptRuntimeContext.ExternalSystem. // ScriptRuntimeContext.ExternalSystem.
@@ -474,7 +492,8 @@ public class ScriptRuntimeContext
string siteId = "", string siteId = "",
string? sourceScript = null, string? sourceScript = null,
ICachedCallTelemetryForwarder? cachedForwarder = null, ICachedCallTelemetryForwarder? cachedForwarder = null,
Guid? parentExecutionId = null) Guid? parentExecutionId = null,
string? sourceNode = null)
{ {
_client = client; _client = client;
_instanceName = instanceName; _instanceName = instanceName;
@@ -485,6 +504,7 @@ public class ScriptRuntimeContext
_sourceScript = sourceScript; _sourceScript = sourceScript;
_cachedForwarder = cachedForwarder; _cachedForwarder = cachedForwarder;
_parentExecutionId = parentExecutionId; _parentExecutionId = parentExecutionId;
_sourceNode = sourceNode;
} }
public async Task<ExternalCallResult> Call( public async Task<ExternalCallResult> Call(
@@ -670,9 +690,11 @@ public class ScriptRuntimeContext
Channel: "ApiOutbound", Channel: "ApiOutbound",
Target: target, Target: target,
SourceSite: _siteId, SourceSite: _siteId,
// SourceNode: stamped by Task 14 once the script context // SourceNode-stamping (Task 14): the local node name
// gets an INodeIdentityProvider; null until then. // (node-a/node-b) — threaded through INodeIdentityProvider
SourceNode: null, // at the ScriptExecutionActor; null when no provider was
// wired so central persists SiteCalls.SourceNode as NULL.
SourceNode: _sourceNode,
Status: "Submitted", Status: "Submitted",
RetryCount: 0, RetryCount: 0,
LastError: null, LastError: null,
@@ -791,9 +813,11 @@ public class ScriptRuntimeContext
Channel: "ApiOutbound", Channel: "ApiOutbound",
Target: target, Target: target,
SourceSite: _siteId, SourceSite: _siteId,
// SourceNode: stamped by Task 14 once the script context // SourceNode-stamping (Task 14): the local node name
// gets an INodeIdentityProvider; null until then. // (node-a/node-b) — threaded through INodeIdentityProvider
SourceNode: null, // at the ScriptExecutionActor; null when no provider was
// wired so central persists SiteCalls.SourceNode as NULL.
SourceNode: _sourceNode,
Status: "Attempted", Status: "Attempted",
// RetryCount stays 0 — the operation never reached the // RetryCount stays 0 — the operation never reached the
// S&F retry sweep, so no retries were performed. // S&F retry sweep, so no retries were performed.
@@ -861,9 +885,11 @@ public class ScriptRuntimeContext
Channel: "ApiOutbound", Channel: "ApiOutbound",
Target: target, Target: target,
SourceSite: _siteId, SourceSite: _siteId,
// SourceNode: stamped by Task 14 once the script context // SourceNode-stamping (Task 14): the local node name
// gets an INodeIdentityProvider; null until then. // (node-a/node-b) — threaded through INodeIdentityProvider
SourceNode: null, // at the ScriptExecutionActor; null when no provider was
// wired so central persists SiteCalls.SourceNode as NULL.
SourceNode: _sourceNode,
Status: operationalTerminalStatus, Status: operationalTerminalStatus,
RetryCount: 0, RetryCount: 0,
LastError: result.Success ? null : result.ErrorMessage, LastError: result.Success ? null : result.ErrorMessage,
@@ -1120,6 +1146,15 @@ public class ScriptRuntimeContext
/// </summary> /// </summary>
private readonly IAuditWriter? _auditWriter; private readonly IAuditWriter? _auditWriter;
/// <summary>
/// SourceNode-stamping (Task 14): the local cluster node name on
/// which this script is executing (<c>node-a</c>/<c>node-b</c>).
/// Stamped onto <c>SiteCallOperational.SourceNode</c> at the
/// <c>Database.CachedWrite</c> CachedSubmit telemetry construction
/// site so central can persist it on the <c>SiteCalls</c> row.
/// </summary>
private readonly string? _sourceNode;
// Parameter ordering: executionId sits immediately after the // Parameter ordering: executionId sits immediately after the
// ILogger — see the note on ExternalSystemHelper's ctor for why the // ILogger — see the note on ExternalSystemHelper's ctor for why the
// post-logger slot is the one consistent position across all four // post-logger slot is the one consistent position across all four
@@ -1133,7 +1168,8 @@ public class ScriptRuntimeContext
string siteId = "", string siteId = "",
string? sourceScript = null, string? sourceScript = null,
ICachedCallTelemetryForwarder? cachedForwarder = null, ICachedCallTelemetryForwarder? cachedForwarder = null,
Guid? parentExecutionId = null) Guid? parentExecutionId = null,
string? sourceNode = null)
{ {
_gateway = gateway; _gateway = gateway;
_instanceName = instanceName; _instanceName = instanceName;
@@ -1144,6 +1180,7 @@ public class ScriptRuntimeContext
_sourceScript = sourceScript; _sourceScript = sourceScript;
_cachedForwarder = cachedForwarder; _cachedForwarder = cachedForwarder;
_parentExecutionId = parentExecutionId; _parentExecutionId = parentExecutionId;
_sourceNode = sourceNode;
} }
public async Task<System.Data.Common.DbConnection> Connection( public async Task<System.Data.Common.DbConnection> Connection(
@@ -1274,9 +1311,11 @@ public class ScriptRuntimeContext
Channel: "DbOutbound", Channel: "DbOutbound",
Target: target, Target: target,
SourceSite: _siteId, SourceSite: _siteId,
// SourceNode: stamped by Task 14 once the script context // SourceNode-stamping (Task 14): the local node name
// gets an INodeIdentityProvider; null until then. // (node-a/node-b) — threaded through INodeIdentityProvider
SourceNode: null, // at the ScriptExecutionActor; null when no provider was
// wired so central persists SiteCalls.SourceNode as NULL.
SourceNode: _sourceNode,
Status: "Submitted", Status: "Submitted",
RetryCount: 0, RetryCount: 0,
LastError: null, LastError: null,

View File

@@ -327,4 +327,68 @@ public class CachedCallLifecycleBridgeTests
Assert.NotNull(captured); Assert.NotNull(captured);
Assert.Null(captured!.Audit.ParentExecutionId); Assert.Null(captured!.Audit.ParentExecutionId);
} }
// ── SourceNode-stamping (Task 14) ──
[Fact]
public async Task RetryLoopRow_StampsSourceNode_FromNodeIdentityProvider()
{
// SourceNode-stamping (Task 14): when an INodeIdentityProvider is
// wired the bridge stamps the local node name (node-a/node-b) onto
// the SiteCallOperational.SourceNode column of every emitted packet.
var nodeIdentity = Substitute.For<INodeIdentityProvider>();
nodeIdentity.NodeName.Returns("node-a");
var captured = new List<CachedCallTelemetry>();
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
var sut = new CachedCallLifecycleBridge(
_forwarder, NullLogger<CachedCallLifecycleBridge>.Instance, nodeIdentity);
await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.Delivered));
Assert.Equal(2, captured.Count);
Assert.All(captured, p => Assert.Equal("node-a", p.Operational.SourceNode));
}
[Fact]
public async Task RetryLoopRow_NoNodeIdentityProvider_LeavesSourceNodeNull()
{
// When no INodeIdentityProvider is wired (legacy hosts / tests) the
// bridge degrades to a null SourceNode rather than throwing. The
// emitted packet's SourceNode is null so the central row persists NULL.
var captured = new List<CachedCallTelemetry>();
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
// Default CreateSut() does NOT pass a node-identity provider.
var sut = CreateSut();
await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.TransientFailure));
var packet = Assert.Single(captured);
Assert.Null(packet.Operational.SourceNode);
}
[Fact]
public async Task RetryLoopRow_NodeIdentityWithNullNodeName_LeavesSourceNodeNull()
{
// The provider exists but reports a null NodeName (unconfigured). The
// bridge must pass that null through to SourceNode rather than
// falling back to a placeholder.
var nodeIdentity = Substitute.For<INodeIdentityProvider>();
nodeIdentity.NodeName.Returns((string?)null);
var captured = new List<CachedCallTelemetry>();
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
.Returns(Task.CompletedTask);
var sut = new CachedCallLifecycleBridge(
_forwarder, NullLogger<CachedCallLifecycleBridge>.Instance, nodeIdentity);
await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.TransientFailure));
var packet = Assert.Single(captured);
Assert.Null(packet.Operational.SourceNode);
}
} }

View File

@@ -135,7 +135,11 @@ public class CachedCallTelemetryForwarderTests
Arg.Any<CancellationToken>()); Arg.Any<CancellationToken>());
// Tracking row: insert-if-not-exists with kind discriminator. // Tracking row: insert-if-not-exists with kind discriminator.
// sourceNode is null until Task 14 wires the INodeIdentityProvider through. // Default CreateSut() does NOT supply an INodeIdentityProvider, so the
// forwarder passes null sourceNode to RecordEnqueueAsync (legacy / test
// host behaviour). The Task 14 stamping path is covered by the
// ForwardAsync_Submit_StampsSourceNode_FromNodeIdentityProvider test
// below.
await _tracking.Received(1).RecordEnqueueAsync( await _tracking.Received(1).RecordEnqueueAsync(
_id, _id,
"ApiOutbound", "ApiOutbound",
@@ -249,4 +253,55 @@ public class CachedCallTelemetryForwarderTests
await Assert.ThrowsAsync<ArgumentNullException>( await Assert.ThrowsAsync<ArgumentNullException>(
() => sut.ForwardAsync(null!, CancellationToken.None)); () => sut.ForwardAsync(null!, CancellationToken.None));
} }
// ── SourceNode-stamping (Task 14) ──
[Fact]
public async Task ForwardAsync_Submit_StampsSourceNode_FromNodeIdentityProvider()
{
// SourceNode-stamping (Task 14): when an INodeIdentityProvider is
// wired the forwarder must stamp its NodeName onto the
// RecordEnqueueAsync sourceNode parameter so the tracking row
// captures the originating node (node-a/node-b).
var nodeIdentity = Substitute.For<INodeIdentityProvider>();
nodeIdentity.NodeName.Returns("node-a");
var sut = new CachedCallTelemetryForwarder(
_writer, _tracking, NullLogger<CachedCallTelemetryForwarder>.Instance, nodeIdentity);
await sut.ForwardAsync(SubmitPacket(), CancellationToken.None);
await _tracking.Received(1).RecordEnqueueAsync(
_id,
"ApiOutbound",
"ERP.GetOrder",
"inst-1",
"ScriptActor:doStuff",
"node-a",
Arg.Any<CancellationToken>());
}
[Fact]
public async Task ForwardAsync_Submit_NodeIdentityNullNodeName_PassesNullSourceNode()
{
// The provider exists but reports a null NodeName (unconfigured).
// The forwarder passes that null through to RecordEnqueueAsync rather
// than falling back to a placeholder string.
var nodeIdentity = Substitute.For<INodeIdentityProvider>();
nodeIdentity.NodeName.Returns((string?)null);
var sut = new CachedCallTelemetryForwarder(
_writer, _tracking, NullLogger<CachedCallTelemetryForwarder>.Instance, nodeIdentity);
await sut.ForwardAsync(SubmitPacket(), CancellationToken.None);
await _tracking.Received(1).RecordEnqueueAsync(
_id,
"ApiOutbound",
"ERP.GetOrder",
"inst-1",
"ScriptActor:doStuff",
null,
Arg.Any<CancellationToken>());
}
} }

View File

@@ -520,7 +520,8 @@ public class SiteCallAuditRepositoryTests : IClassFixture<MsSqlMigrationFixture>
DateTime? createdAtUtc = null, DateTime? createdAtUtc = null,
DateTime? updatedAtUtc = null, DateTime? updatedAtUtc = null,
bool terminal = false, bool terminal = false,
DateTime? terminalAtUtc = null) DateTime? terminalAtUtc = null,
string? sourceNode = null)
{ {
var created = createdAtUtc ?? DateTime.UtcNow; var created = createdAtUtc ?? DateTime.UtcNow;
var updated = updatedAtUtc ?? created; var updated = updatedAtUtc ?? created;
@@ -534,6 +535,7 @@ public class SiteCallAuditRepositoryTests : IClassFixture<MsSqlMigrationFixture>
Channel = "ApiOutbound", Channel = "ApiOutbound",
Target = "ERP.GetOrder", Target = "ERP.GetOrder",
SourceSite = sourceSite ?? NewSiteId(), SourceSite = sourceSite ?? NewSiteId(),
SourceNode = sourceNode,
Status = status, Status = status,
RetryCount = retryCount, RetryCount = retryCount,
LastError = lastError, LastError = lastError,
@@ -544,4 +546,159 @@ public class SiteCallAuditRepositoryTests : IClassFixture<MsSqlMigrationFixture>
IngestedAtUtc = DateTime.UtcNow, IngestedAtUtc = DateTime.UtcNow,
}; };
} }
// --- SourceNode-stamping (Task 14) --------------------------------------
[SkippableFact]
public async Task UpsertAsync_PersistsSourceNode_OnFreshInsert()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
// SourceNode-stamping (Task 14): a fresh INSERT must persist the
// SourceNode column verbatim — the central row carries the originating
// site node name end-to-end.
var id = TrackedOperationId.New();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
await repo.UpsertAsync(NewRow(id, status: "Submitted", sourceNode: "node-a"));
var loaded = await repo.GetAsync(id);
Assert.NotNull(loaded);
Assert.Equal("node-a", loaded!.SourceNode);
}
[SkippableFact]
public async Task UpsertAsync_PreservesSourceNode_WhenLaterPacketCarriesNull()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
// SourceNode-stamping (Task 14): the UPDATE uses
// COALESCE(@SourceNode, SourceNode) so a subsequent packet that does
// NOT carry a SourceNode (legacy / reconciliation pull from an
// unstamped node) MUST NOT blank out the value the first packet set.
// Combined with the monotonic-rank guard the Status advances but the
// SourceNode survives.
//
// Each step uses a fresh DbContext — raw-SQL UPDATEs bypass the
// change tracker, so reusing a single context whose entity is already
// tracked masks the post-UPDATE state on a follow-up FindAsync.
var id = TrackedOperationId.New();
await using (var context = CreateContext())
{
var repo = new SiteCallAuditRepository(context);
// First packet: stamped Submit from node-a.
await repo.UpsertAsync(NewRow(id, status: "Submitted", sourceNode: "node-a"));
}
await using (var context = CreateContext())
{
var repo = new SiteCallAuditRepository(context);
// Later packet: rank-advancing Attempted with null SourceNode.
await repo.UpsertAsync(NewRow(id, status: "Attempted", retryCount: 1, sourceNode: null));
}
await using (var readContext = CreateContext())
{
var readRepo = new SiteCallAuditRepository(readContext);
var loaded = await readRepo.GetAsync(id);
Assert.NotNull(loaded);
// SourceNode preserved despite the null on the later packet.
Assert.Equal("node-a", loaded!.SourceNode);
// Status advanced — proves the UPDATE branch actually ran.
Assert.Equal("Attempted", loaded.Status);
Assert.Equal(1, loaded.RetryCount);
}
}
[SkippableFact]
public async Task UpsertAsync_NonNullIncomingSourceNode_OverwritesPreviousValueOnRankAdvance()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
// SourceNode-stamping (Task 14): per the COALESCE(@SourceNode,
// SourceNode) semantics the column protects against a *null*
// incoming value blanking a previously-stamped one, but a non-null
// incoming value DOES replace the existing value on a rank-advancing
// packet. This is the "last-non-null-wins on advance" behaviour the
// SQL operator literally implements — see the comment in
// SiteCallAuditRepository.UpsertAsync.
//
// In practice both stamps within a single lifecycle SHOULD carry the
// same value (same node, same execution); a divergence would imply a
// mid-lifecycle node change (e.g. failover handing off to node-b) and
// letting the latest stamp through is arguably the right call. This
// test pins the actual behaviour so we notice if the SQL gets
// inverted (to a true first-write-wins COALESCE(SourceNode,
// @SourceNode)) inadvertently.
var id = TrackedOperationId.New();
await using (var context = CreateContext())
{
var repo = new SiteCallAuditRepository(context);
await repo.UpsertAsync(NewRow(id, status: "Submitted", sourceNode: "node-a"));
}
await using (var context = CreateContext())
{
var repo = new SiteCallAuditRepository(context);
await repo.UpsertAsync(NewRow(id, status: "Attempted", retryCount: 1, sourceNode: "node-b"));
}
await using (var readContext = CreateContext())
{
var readRepo = new SiteCallAuditRepository(readContext);
var loaded = await readRepo.GetAsync(id);
Assert.NotNull(loaded);
// Incoming non-null wins — node-b replaces node-a on rank advance.
Assert.Equal("node-b", loaded!.SourceNode);
// Other monotonic fields advanced too — proves the UPDATE ran.
Assert.Equal("Attempted", loaded.Status);
}
}
[SkippableFact]
public async Task UpsertAsync_FillsSourceNode_WhenInsertWasNullAndLaterPacketCarriesValue()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
// SourceNode-stamping (Task 14): when the column was left NULL by an
// earlier unstamped packet, a later rank-advancing packet with a
// non-null SourceNode fills it — the COALESCE(@SourceNode, SourceNode)
// SQL operator returns @SourceNode when @SourceNode is non-null, so
// the incoming value wins over the existing NULL. This is the
// recovery path for an initially-unstamped lifecycle whose later
// packets carry the node identity.
//
// The intermediate verification and final read use FRESH contexts —
// FindAsync hits the change tracker first, so a cached entity from
// an earlier read in the same context can mask a raw-SQL UPDATE.
var id = TrackedOperationId.New();
await using (var context = CreateContext())
{
var repo = new SiteCallAuditRepository(context);
await repo.UpsertAsync(NewRow(id, status: "Submitted", sourceNode: null));
}
// Verify the INSERT left SourceNode NULL via a fresh context.
await using (var verifyContext = CreateContext())
{
var verifyRepo = new SiteCallAuditRepository(verifyContext);
var afterInsert = await verifyRepo.GetAsync(id);
Assert.NotNull(afterInsert);
Assert.Null(afterInsert!.SourceNode);
}
await using (var context = CreateContext())
{
var repo = new SiteCallAuditRepository(context);
await repo.UpsertAsync(NewRow(id, status: "Attempted", retryCount: 1, sourceNode: "node-a"));
}
await using (var readContext = CreateContext())
{
var readRepo = new SiteCallAuditRepository(readContext);
var loaded = await readRepo.GetAsync(id);
Assert.NotNull(loaded);
Assert.Equal("node-a", loaded!.SourceNode);
Assert.Equal("Attempted", loaded.Status);
}
}
} }

View File

@@ -49,7 +49,8 @@ public class SiteCallAuditActorTests : TestKit, IClassFixture<MsSqlMigrationFixt
string? lastError = null, string? lastError = null,
DateTime? createdAtUtc = null, DateTime? createdAtUtc = null,
DateTime? updatedAtUtc = null, DateTime? updatedAtUtc = null,
bool terminal = false) bool terminal = false,
string? sourceNode = null)
{ {
var created = createdAtUtc ?? DateTime.UtcNow; var created = createdAtUtc ?? DateTime.UtcNow;
var updated = updatedAtUtc ?? created; var updated = updatedAtUtc ?? created;
@@ -59,6 +60,7 @@ public class SiteCallAuditActorTests : TestKit, IClassFixture<MsSqlMigrationFixt
Channel = "ApiOutbound", Channel = "ApiOutbound",
Target = "ERP.GetOrder", Target = "ERP.GetOrder",
SourceSite = sourceSite, SourceSite = sourceSite,
SourceNode = sourceNode,
Status = status, Status = status,
RetryCount = retryCount, RetryCount = retryCount,
LastError = lastError, LastError = lastError,
@@ -492,6 +494,67 @@ public class SiteCallAuditActorTests : TestKit, IClassFixture<MsSqlMigrationFixt
Assert.Equal("corr-fault", response.CorrelationId); Assert.Equal("corr-fault", response.CorrelationId);
} }
// ── SourceNode-stamping (Task 14): end-to-end actor → repo persistence ──
[SkippableFact]
public async Task UpsertSiteCallCommand_PersistsSourceNode_EndToEnd()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
// SourceNode-stamping (Task 14): an UpsertSiteCallCommand carrying
// SourceNode "node-a" must land in the SiteCalls row's SourceNode
// column unchanged — verifies the actor's mapping path does not
// strip the column AND the repository INSERT writes it.
var siteId = NewSiteId();
var id = TrackedOperationId.New();
var row = NewRow(id, siteId, status: "Submitted", sourceNode: "node-a");
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo);
actor.Tell(new UpsertSiteCallCommand(row), TestActor);
var reply = ExpectMsg<UpsertSiteCallReply>(TimeSpan.FromSeconds(10));
Assert.True(reply.Accepted);
await using var readContext = CreateContext();
var stored = await readContext.Set<SiteCall>()
.Where(s => s.TrackedOperationId == id)
.ToListAsync();
Assert.Single(stored);
Assert.Equal("node-a", stored[0].SourceNode);
}
[SkippableFact]
public async Task UpsertSiteCallCommand_NullSourceNode_PersistsNull()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
// Mirror of the above for unstamped packets — a command with null
// SourceNode persists NULL on the row rather than falling back to a
// placeholder. The first audit packet from a legacy host (or a node
// without INodeIdentityProvider wired) must NOT inject a fabricated
// value central-side.
var siteId = NewSiteId();
var id = TrackedOperationId.New();
var row = NewRow(id, siteId, status: "Submitted", sourceNode: null);
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo);
actor.Tell(new UpsertSiteCallCommand(row), TestActor);
var reply = ExpectMsg<UpsertSiteCallReply>(TimeSpan.FromSeconds(10));
Assert.True(reply.Accepted);
await using var readContext = CreateContext();
var stored = await readContext.Set<SiteCall>()
.Where(s => s.TrackedOperationId == id)
.ToListAsync();
Assert.Single(stored);
Assert.Null(stored[0].SourceNode);
}
/// <summary> /// <summary>
/// Test double whose <see cref="ISiteCallAuditRepository.QueryAsync"/> always /// Test double whose <see cref="ISiteCallAuditRepository.QueryAsync"/> always
/// throws — used to verify the query handler's failure projection produces a /// throws — used to verify the query handler's failure projection produces a

View File

@@ -570,4 +570,69 @@ public class ExternalSystemCachedCallEmissionTests
var only = Assert.Single(forwarder.Telemetry); var only = Assert.Single(forwarder.Telemetry);
Assert.Equal(AuditKind.CachedSubmit, only.Audit.Kind); Assert.Equal(AuditKind.CachedSubmit, only.Audit.Kind);
} }
// ── SourceNode-stamping (Task 14) ──
[Fact]
public async Task CachedCall_StampsSourceNode_OnEverySiteCallOperationalRow()
{
// SourceNode-stamping (Task 14): when the helper is constructed with
// a non-null sourceNode, every SiteCallOperational it produces
// (CachedSubmit on enqueue + the immediate-completion Attempted/
// CachedResolve pair when WasBuffered=false) carries that node name.
var client = new Mock<IExternalSystemClient>();
client
.Setup(c => c.CachedCallAsync(
"ERP", "GetOrder",
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
// Immediate completion — helper produces all three rows itself.
.ReturnsAsync(new ExternalCallResult(true, "{\"ok\":true}", null, WasBuffered: false));
var forwarder = new CapturingForwarder();
var helper = new ScriptRuntimeContext.ExternalSystemHelper(
client.Object,
InstanceName,
NullLogger.Instance,
TestExecutionId,
auditWriter: null,
siteId: SiteId,
sourceScript: SourceScript,
cachedForwarder: forwarder,
parentExecutionId: null,
sourceNode: "node-a");
await helper.CachedCall("ERP", "GetOrder");
Assert.Equal(3, forwarder.Telemetry.Count);
Assert.All(forwarder.Telemetry, t => Assert.Equal("node-a", t.Operational.SourceNode));
}
[Fact]
public async Task CachedCall_NoSourceNodeWired_LeavesSourceNodeNull()
{
// Default CreateHelper does NOT pass sourceNode — the legacy / test
// host path. Every operational row carries null SourceNode, leaving
// central's SiteCalls.SourceNode NULL.
var client = new Mock<IExternalSystemClient>();
client
.Setup(c => c.CachedCallAsync(
"ERP", "GetOrder",
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
InstanceName,
It.IsAny<CancellationToken>(),
It.IsAny<TrackedOperationId?>(),
It.IsAny<Guid?>(), It.IsAny<string?>(), It.IsAny<Guid?>()))
.ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true));
var forwarder = new CapturingForwarder();
var helper = CreateHelper(client.Object, forwarder);
await helper.CachedCall("ERP", "GetOrder");
var only = Assert.Single(forwarder.Telemetry);
Assert.Null(only.Operational.SourceNode);
}
} }