Site: site emitters of SiteCallOperational (ExternalSystemClient, the script-API cached call path in ScriptRuntimeContext, CachedCallLifecycleBridge) inject INodeIdentityProvider and stamp SourceNode = NodeName at construction. OperationTrackingStore call site in CachedCallTelemetryForwarder now stamps SourceNode too. Central: SiteCallAuditRepository.UpsertAsync INSERT includes SourceNode in the column list; conditional monotonic UPDATE uses COALESCE(@SourceNode, SourceNode) so later packets cannot blank a previously- stamped value. After this commit every SiteCalls row carries node-a/node-b in SourceNode (subject to monotonic preservation).
395 lines
17 KiB
C#
395 lines
17 KiB
C#
using Microsoft.Extensions.Logging.Abstractions;
|
|
using NSubstitute;
|
|
using ScadaLink.AuditLog.Site.Telemetry;
|
|
using ScadaLink.Commons.Interfaces.Services;
|
|
using ScadaLink.Commons.Messages.Integration;
|
|
using ScadaLink.Commons.Types;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
|
|
namespace ScadaLink.AuditLog.Tests.Site.Telemetry;
|
|
|
|
/// <summary>
|
|
/// Bundle E Tasks E4/E5 bridge tests. The bridge ingests
|
|
/// <see cref="CachedCallAttemptContext"/> notifications from the S&F
|
|
/// retry loop and routes them through <see cref="ICachedCallTelemetryForwarder"/>
|
|
/// as one or two <see cref="CachedCallTelemetry"/> packets:
|
|
/// <list type="bullet">
|
|
/// <item><description>Per-attempt: one <c>ApiCallCached</c>/<c>DbWriteCached</c> Attempted row.</description></item>
|
|
/// <item><description>Terminal (Delivered/PermanentFailure/ParkedMaxRetries): adds a CachedResolve row carrying the terminal Status.</description></item>
|
|
/// </list>
|
|
/// </summary>
|
|
public class CachedCallLifecycleBridgeTests
|
|
{
|
|
private readonly ICachedCallTelemetryForwarder _forwarder = Substitute.For<ICachedCallTelemetryForwarder>();
|
|
private readonly TrackedOperationId _id = TrackedOperationId.New();
|
|
|
|
private CachedCallLifecycleBridge CreateSut() => new(
|
|
_forwarder, NullLogger<CachedCallLifecycleBridge>.Instance);
|
|
|
|
private CachedCallAttemptContext Ctx(
|
|
CachedCallAttemptOutcome outcome,
|
|
string channel = "ApiOutbound",
|
|
int retryCount = 1,
|
|
string? lastError = null,
|
|
int? httpStatus = null,
|
|
Guid? executionId = null,
|
|
string? sourceScript = null,
|
|
Guid? parentExecutionId = null) =>
|
|
new(
|
|
TrackedOperationId: _id,
|
|
Channel: channel,
|
|
Target: "ERP.GetOrder",
|
|
SourceSite: "site-77",
|
|
Outcome: outcome,
|
|
RetryCount: retryCount,
|
|
LastError: lastError,
|
|
HttpStatus: httpStatus,
|
|
CreatedAtUtc: new DateTime(2026, 5, 20, 9, 0, 0, DateTimeKind.Utc),
|
|
OccurredAtUtc: new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc),
|
|
DurationMs: 42,
|
|
SourceInstanceId: "Plant.Pump42",
|
|
ExecutionId: executionId,
|
|
SourceScript: sourceScript,
|
|
ParentExecutionId: parentExecutionId);
|
|
|
|
[Fact]
|
|
public async Task TransientFailure_EmitsOneAttemptedRow_NoResolve()
|
|
{
|
|
var captured = new List<CachedCallTelemetry>();
|
|
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var sut = CreateSut();
|
|
await sut.OnAttemptCompletedAsync(Ctx(
|
|
CachedCallAttemptOutcome.TransientFailure,
|
|
retryCount: 2,
|
|
lastError: "HTTP 503",
|
|
httpStatus: 503));
|
|
|
|
var packet = Assert.Single(captured);
|
|
Assert.Equal(AuditKind.ApiCallCached, packet.Audit.Kind);
|
|
Assert.Equal(AuditStatus.Attempted, packet.Audit.Status);
|
|
Assert.Equal(503, packet.Audit.HttpStatus);
|
|
Assert.Equal("HTTP 503", packet.Audit.ErrorMessage);
|
|
Assert.Equal(_id.Value, packet.Audit.CorrelationId);
|
|
Assert.Equal("Attempted", packet.Operational.Status);
|
|
Assert.Equal(2, packet.Operational.RetryCount);
|
|
Assert.Null(packet.Operational.TerminalAtUtc);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Delivered_EmitsAttemptedRow_AndCachedResolveDelivered()
|
|
{
|
|
var captured = new List<CachedCallTelemetry>();
|
|
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var sut = CreateSut();
|
|
await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.Delivered));
|
|
|
|
Assert.Equal(2, captured.Count);
|
|
|
|
var attempted = captured[0];
|
|
Assert.Equal(AuditKind.ApiCallCached, attempted.Audit.Kind);
|
|
Assert.Equal(AuditStatus.Attempted, attempted.Audit.Status);
|
|
Assert.Equal("Attempted", attempted.Operational.Status);
|
|
Assert.Null(attempted.Operational.TerminalAtUtc);
|
|
|
|
var resolve = captured[1];
|
|
Assert.Equal(AuditKind.CachedResolve, resolve.Audit.Kind);
|
|
Assert.Equal(AuditStatus.Delivered, resolve.Audit.Status);
|
|
Assert.Equal("Delivered", resolve.Operational.Status);
|
|
Assert.NotNull(resolve.Operational.TerminalAtUtc);
|
|
Assert.Equal(_id.Value, resolve.Audit.CorrelationId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task PermanentFailure_EmitsAttempted_AndCachedResolveParked()
|
|
{
|
|
var captured = new List<CachedCallTelemetry>();
|
|
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var sut = CreateSut();
|
|
await sut.OnAttemptCompletedAsync(Ctx(
|
|
CachedCallAttemptOutcome.PermanentFailure,
|
|
lastError: "Permanent failure (handler returned false)"));
|
|
|
|
Assert.Equal(2, captured.Count);
|
|
Assert.Equal(AuditKind.ApiCallCached, captured[0].Audit.Kind);
|
|
Assert.Equal(AuditKind.CachedResolve, captured[1].Audit.Kind);
|
|
Assert.Equal(AuditStatus.Parked, captured[1].Audit.Status);
|
|
Assert.Equal("Parked", captured[1].Operational.Status);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ParkedMaxRetries_EmitsAttempted_AndCachedResolveParked()
|
|
{
|
|
var captured = new List<CachedCallTelemetry>();
|
|
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var sut = CreateSut();
|
|
await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.ParkedMaxRetries));
|
|
|
|
Assert.Equal(2, captured.Count);
|
|
Assert.Equal(AuditKind.CachedResolve, captured[1].Audit.Kind);
|
|
Assert.Equal(AuditStatus.Parked, captured[1].Audit.Status);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DbChannel_MapsToDbWriteCachedKind_AndDbOutboundChannel()
|
|
{
|
|
var captured = new List<CachedCallTelemetry>();
|
|
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var sut = CreateSut();
|
|
await sut.OnAttemptCompletedAsync(Ctx(
|
|
CachedCallAttemptOutcome.Delivered, channel: "DbOutbound"));
|
|
|
|
Assert.Equal(2, captured.Count);
|
|
Assert.Equal(AuditKind.DbWriteCached, captured[0].Audit.Kind);
|
|
Assert.Equal(AuditChannel.DbOutbound, captured[0].Audit.Channel);
|
|
Assert.Equal("DbOutbound", captured[0].Operational.Channel);
|
|
Assert.Equal(AuditKind.CachedResolve, captured[1].Audit.Kind);
|
|
Assert.Equal(AuditChannel.DbOutbound, captured[1].Audit.Channel);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task BridgeDoesNotThrow_WhenForwarderThrows()
|
|
{
|
|
_forwarder
|
|
.ForwardAsync(Arg.Any<CachedCallTelemetry>(), Arg.Any<CancellationToken>())
|
|
.Returns(Task.FromException(new InvalidOperationException("forwarder down")));
|
|
|
|
var sut = CreateSut();
|
|
|
|
// Must not throw — best-effort emission.
|
|
await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.Delivered));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task BridgePopulatesProvenance_FromAttemptContext()
|
|
{
|
|
CachedCallTelemetry? captured = null;
|
|
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured = t), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var sut = CreateSut();
|
|
await sut.OnAttemptCompletedAsync(Ctx(
|
|
CachedCallAttemptOutcome.TransientFailure,
|
|
retryCount: 3,
|
|
lastError: "transient",
|
|
httpStatus: 500));
|
|
|
|
Assert.NotNull(captured);
|
|
Assert.Equal("site-77", captured!.Audit.SourceSiteId);
|
|
Assert.Equal("Plant.Pump42", captured.Audit.SourceInstanceId);
|
|
Assert.Equal("ERP.GetOrder", captured.Audit.Target);
|
|
Assert.Equal(42, captured.Audit.DurationMs);
|
|
Assert.Equal(_id.Value, captured.Audit.CorrelationId);
|
|
}
|
|
|
|
// ── Audit Log #23 (ExecutionId Task 4): ExecutionId / SourceScript ──
|
|
|
|
[Fact]
|
|
public async Task RetryLoopAttemptedRow_CarriesExecutionIdAndSourceScript_FromContext()
|
|
{
|
|
// Task 4: the ExecutionId + SourceScript threaded through the S&F
|
|
// buffer arrive on the CachedCallAttemptContext; the bridge must stamp
|
|
// both onto the per-attempt ApiCallCached row (previously SourceScript
|
|
// was hard-coded null with a "not threaded through S&F" comment).
|
|
var executionId = Guid.NewGuid();
|
|
var captured = new List<CachedCallTelemetry>();
|
|
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var sut = CreateSut();
|
|
await sut.OnAttemptCompletedAsync(Ctx(
|
|
CachedCallAttemptOutcome.TransientFailure,
|
|
executionId: executionId,
|
|
sourceScript: "Plant.Pump42/OnTick"));
|
|
|
|
var packet = Assert.Single(captured);
|
|
Assert.Equal(AuditKind.ApiCallCached, packet.Audit.Kind);
|
|
Assert.Equal(executionId, packet.Audit.ExecutionId);
|
|
Assert.Equal("Plant.Pump42/OnTick", packet.Audit.SourceScript);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RetryLoopCachedResolveRow_CarriesExecutionIdAndSourceScript_FromContext()
|
|
{
|
|
// The terminal CachedResolve row must also carry the threaded
|
|
// provenance so the whole retry-loop lifecycle is correlated.
|
|
var executionId = Guid.NewGuid();
|
|
var captured = new List<CachedCallTelemetry>();
|
|
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var sut = CreateSut();
|
|
await sut.OnAttemptCompletedAsync(Ctx(
|
|
CachedCallAttemptOutcome.Delivered,
|
|
channel: "DbOutbound",
|
|
executionId: executionId,
|
|
sourceScript: "Plant.Tank/OnAlarm"));
|
|
|
|
Assert.Equal(2, captured.Count);
|
|
var resolve = Assert.Single(captured, p => p.Audit.Kind == AuditKind.CachedResolve);
|
|
Assert.Equal(executionId, resolve.Audit.ExecutionId);
|
|
Assert.Equal("Plant.Tank/OnAlarm", resolve.Audit.SourceScript);
|
|
|
|
var attempted = Assert.Single(captured, p => p.Audit.Kind == AuditKind.DbWriteCached);
|
|
Assert.Equal(executionId, attempted.Audit.ExecutionId);
|
|
Assert.Equal("Plant.Tank/OnAlarm", attempted.Audit.SourceScript);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RetryLoopRow_NullExecutionIdAndSourceScript_RemainNull()
|
|
{
|
|
// Back-compat: a pre-Task-4 buffered row has no ExecutionId /
|
|
// SourceScript; the bridge must leave the audit row's fields null
|
|
// rather than throwing.
|
|
CachedCallTelemetry? captured = null;
|
|
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured = t), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var sut = CreateSut();
|
|
await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.TransientFailure));
|
|
|
|
Assert.NotNull(captured);
|
|
Assert.Null(captured!.Audit.ExecutionId);
|
|
Assert.Null(captured.Audit.SourceScript);
|
|
}
|
|
|
|
// ── Audit Log #23 (ParentExecutionId Task 6): ParentExecutionId ──
|
|
|
|
[Fact]
|
|
public async Task RetryLoopAttemptedRow_CarriesParentExecutionId_FromContext()
|
|
{
|
|
// Task 6: the ParentExecutionId threaded through the S&F buffer (the
|
|
// inbound-API run that spawned the originating script) arrives on the
|
|
// CachedCallAttemptContext; the bridge must stamp it onto the
|
|
// per-attempt ApiCallCached row beside ExecutionId.
|
|
var parentExecutionId = Guid.NewGuid();
|
|
var captured = new List<CachedCallTelemetry>();
|
|
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var sut = CreateSut();
|
|
await sut.OnAttemptCompletedAsync(Ctx(
|
|
CachedCallAttemptOutcome.TransientFailure,
|
|
parentExecutionId: parentExecutionId));
|
|
|
|
var packet = Assert.Single(captured);
|
|
Assert.Equal(AuditKind.ApiCallCached, packet.Audit.Kind);
|
|
Assert.Equal(parentExecutionId, packet.Audit.ParentExecutionId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RetryLoopCachedResolveRow_CarriesParentExecutionId_FromContext()
|
|
{
|
|
// The terminal CachedResolve row must also carry the threaded
|
|
// ParentExecutionId so the whole retry-loop lifecycle correlates back
|
|
// to the spawning inbound-API execution.
|
|
var parentExecutionId = Guid.NewGuid();
|
|
var captured = new List<CachedCallTelemetry>();
|
|
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var sut = CreateSut();
|
|
await sut.OnAttemptCompletedAsync(Ctx(
|
|
CachedCallAttemptOutcome.Delivered,
|
|
channel: "DbOutbound",
|
|
parentExecutionId: parentExecutionId));
|
|
|
|
Assert.Equal(2, captured.Count);
|
|
var resolve = Assert.Single(captured, p => p.Audit.Kind == AuditKind.CachedResolve);
|
|
Assert.Equal(parentExecutionId, resolve.Audit.ParentExecutionId);
|
|
|
|
var attempted = Assert.Single(captured, p => p.Audit.Kind == AuditKind.DbWriteCached);
|
|
Assert.Equal(parentExecutionId, attempted.Audit.ParentExecutionId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RetryLoopRow_NullParentExecutionId_RemainsNull()
|
|
{
|
|
// Back-compat / non-routed run: the originating script was not spawned
|
|
// by an inbound-API request, so ParentExecutionId is null; the bridge
|
|
// must leave the audit row's ParentExecutionId null rather than throwing.
|
|
CachedCallTelemetry? captured = null;
|
|
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured = t), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var sut = CreateSut();
|
|
await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.TransientFailure));
|
|
|
|
Assert.NotNull(captured);
|
|
Assert.Null(captured!.Audit.ParentExecutionId);
|
|
}
|
|
|
|
// ── SourceNode-stamping (Task 14) ──
|
|
|
|
[Fact]
|
|
public async Task RetryLoopRow_StampsSourceNode_FromNodeIdentityProvider()
|
|
{
|
|
// SourceNode-stamping (Task 14): when an INodeIdentityProvider is
|
|
// wired the bridge stamps the local node name (node-a/node-b) onto
|
|
// the SiteCallOperational.SourceNode column of every emitted packet.
|
|
var nodeIdentity = Substitute.For<INodeIdentityProvider>();
|
|
nodeIdentity.NodeName.Returns("node-a");
|
|
|
|
var captured = new List<CachedCallTelemetry>();
|
|
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var sut = new CachedCallLifecycleBridge(
|
|
_forwarder, NullLogger<CachedCallLifecycleBridge>.Instance, nodeIdentity);
|
|
|
|
await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.Delivered));
|
|
|
|
Assert.Equal(2, captured.Count);
|
|
Assert.All(captured, p => Assert.Equal("node-a", p.Operational.SourceNode));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RetryLoopRow_NoNodeIdentityProvider_LeavesSourceNodeNull()
|
|
{
|
|
// When no INodeIdentityProvider is wired (legacy hosts / tests) the
|
|
// bridge degrades to a null SourceNode rather than throwing. The
|
|
// emitted packet's SourceNode is null so the central row persists NULL.
|
|
var captured = new List<CachedCallTelemetry>();
|
|
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
// Default CreateSut() does NOT pass a node-identity provider.
|
|
var sut = CreateSut();
|
|
await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.TransientFailure));
|
|
|
|
var packet = Assert.Single(captured);
|
|
Assert.Null(packet.Operational.SourceNode);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task RetryLoopRow_NodeIdentityWithNullNodeName_LeavesSourceNodeNull()
|
|
{
|
|
// The provider exists but reports a null NodeName (unconfigured). The
|
|
// bridge must pass that null through to SourceNode rather than
|
|
// falling back to a placeholder.
|
|
var nodeIdentity = Substitute.For<INodeIdentityProvider>();
|
|
nodeIdentity.NodeName.Returns((string?)null);
|
|
|
|
var captured = new List<CachedCallTelemetry>();
|
|
_forwarder.ForwardAsync(Arg.Do<CachedCallTelemetry>(t => captured.Add(t)), Arg.Any<CancellationToken>())
|
|
.Returns(Task.CompletedTask);
|
|
|
|
var sut = new CachedCallLifecycleBridge(
|
|
_forwarder, NullLogger<CachedCallLifecycleBridge>.Instance, nodeIdentity);
|
|
|
|
await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.TransientFailure));
|
|
|
|
var packet = Assert.Single(captured);
|
|
Assert.Null(packet.Operational.SourceNode);
|
|
}
|
|
}
|