Files
scadalink-design/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallTelemetryForwarderTests.cs
Joseph Doherty 06ed0acead feat(sitecall-audit): carry + persist SourceNode end-to-end via cached telemetry
Site: site emitters of SiteCallOperational (ExternalSystemClient, the script-API
cached call path in ScriptRuntimeContext, CachedCallLifecycleBridge) inject
INodeIdentityProvider and stamp SourceNode = NodeName at construction.

OperationTrackingStore call site in CachedCallTelemetryForwarder now stamps
SourceNode too.

Central: SiteCallAuditRepository.UpsertAsync INSERT includes SourceNode in the
column list; conditional monotonic UPDATE uses
COALESCE(@SourceNode, SourceNode) so later packets cannot blank a previously-
stamped value. After this commit every SiteCalls row carries node-a/node-b in
SourceNode (subject to monotonic preservation).
2026-05-23 17:41:22 -04:00

308 lines
12 KiB
C#

using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using NSubstitute.ExceptionExtensions;
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Messages.Integration;
using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.AuditLog.Tests.Site.Telemetry;
/// <summary>
/// Bundle E E2 tests for <see cref="CachedCallTelemetryForwarder"/>. The
/// forwarder is the site-side dual emitter: every cached-call lifecycle event
/// writes one <see cref="AuditEvent"/> to <see cref="IAuditWriter"/> and one
/// operational tracking-row mutation to <see cref="IOperationTrackingStore"/>.
/// Audit-emission contract: best-effort — a thrown writer or tracking store
/// must be logged and swallowed; the forwarder must never propagate the
/// exception to the calling script.
/// </summary>
public class CachedCallTelemetryForwarderTests
{
private readonly IAuditWriter _writer = Substitute.For<IAuditWriter>();
private readonly IOperationTrackingStore _tracking = Substitute.For<IOperationTrackingStore>();
private readonly TrackedOperationId _id = TrackedOperationId.New();
private readonly DateTime _now = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc);
private CachedCallTelemetryForwarder CreateSut() => new(
_writer, _tracking, NullLogger<CachedCallTelemetryForwarder>.Instance);
private CachedCallTelemetry SubmitPacket() =>
new(
Audit: new AuditEvent
{
EventId = Guid.NewGuid(),
OccurredAtUtc = _now,
Channel = AuditChannel.ApiOutbound,
Kind = AuditKind.CachedSubmit,
CorrelationId = _id.Value,
SourceSiteId = "site-1",
SourceInstanceId = "inst-1",
SourceScript = "ScriptActor:doStuff",
Target = "ERP.GetOrder",
Status = AuditStatus.Submitted,
ForwardState = AuditForwardState.Pending,
},
Operational: new SiteCallOperational(
TrackedOperationId: _id,
Channel: "ApiOutbound",
Target: "ERP.GetOrder",
SourceSite: "site-1",
SourceNode: null,
Status: "Submitted",
RetryCount: 0,
LastError: null,
HttpStatus: null,
CreatedAtUtc: _now,
UpdatedAtUtc: _now,
TerminalAtUtc: null));
private CachedCallTelemetry AttemptedPacket(int retryCount = 1, string? lastError = "HTTP 500", int? httpStatus = 500) =>
new(
Audit: new AuditEvent
{
EventId = Guid.NewGuid(),
OccurredAtUtc = _now,
Channel = AuditChannel.ApiOutbound,
Kind = AuditKind.ApiCallCached,
CorrelationId = _id.Value,
SourceSiteId = "site-1",
Target = "ERP.GetOrder",
Status = AuditStatus.Attempted,
HttpStatus = httpStatus,
ErrorMessage = lastError,
ForwardState = AuditForwardState.Pending,
},
Operational: new SiteCallOperational(
TrackedOperationId: _id,
Channel: "ApiOutbound",
Target: "ERP.GetOrder",
SourceSite: "site-1",
SourceNode: null,
Status: "Attempted",
RetryCount: retryCount,
LastError: lastError,
HttpStatus: httpStatus,
CreatedAtUtc: _now,
UpdatedAtUtc: _now,
TerminalAtUtc: null));
private CachedCallTelemetry ResolvePacket(string status = "Delivered") =>
new(
Audit: new AuditEvent
{
EventId = Guid.NewGuid(),
OccurredAtUtc = _now,
Channel = AuditChannel.ApiOutbound,
Kind = AuditKind.CachedResolve,
CorrelationId = _id.Value,
SourceSiteId = "site-1",
Target = "ERP.GetOrder",
Status = Enum.Parse<AuditStatus>(status),
ForwardState = AuditForwardState.Pending,
},
Operational: new SiteCallOperational(
TrackedOperationId: _id,
Channel: "ApiOutbound",
Target: "ERP.GetOrder",
SourceSite: "site-1",
SourceNode: null,
Status: status,
RetryCount: 2,
LastError: null,
HttpStatus: null,
CreatedAtUtc: _now,
UpdatedAtUtc: _now,
TerminalAtUtc: _now));
[Fact]
public async Task ForwardAsync_Submit_WritesAuditEvent_AndRecordsEnqueue()
{
var sut = CreateSut();
var packet = SubmitPacket();
await sut.ForwardAsync(packet, CancellationToken.None);
// Audit row: one WriteAsync of the submit event.
await _writer.Received(1).WriteAsync(
Arg.Is<AuditEvent>(e =>
e.EventId == packet.Audit.EventId
&& e.Kind == AuditKind.CachedSubmit
&& e.Status == AuditStatus.Submitted),
Arg.Any<CancellationToken>());
// Tracking row: insert-if-not-exists with kind discriminator.
// Default CreateSut() does NOT supply an INodeIdentityProvider, so the
// forwarder passes null sourceNode to RecordEnqueueAsync (legacy / test
// host behaviour). The Task 14 stamping path is covered by the
// ForwardAsync_Submit_StampsSourceNode_FromNodeIdentityProvider test
// below.
await _tracking.Received(1).RecordEnqueueAsync(
_id,
"ApiOutbound",
"ERP.GetOrder",
"inst-1",
"ScriptActor:doStuff",
null,
Arg.Any<CancellationToken>());
await _tracking.DidNotReceiveWithAnyArgs().RecordAttemptAsync(
default, default!, default, default, default, default);
await _tracking.DidNotReceiveWithAnyArgs().RecordTerminalAsync(
default, default!, default, default, default);
}
[Fact]
public async Task ForwardAsync_Attempted_WritesAuditEvent_AndRecordsAttempt()
{
var sut = CreateSut();
var packet = AttemptedPacket(retryCount: 2, lastError: "HTTP 503", httpStatus: 503);
await sut.ForwardAsync(packet, CancellationToken.None);
await _writer.Received(1).WriteAsync(
Arg.Is<AuditEvent>(e =>
e.EventId == packet.Audit.EventId
&& e.Kind == AuditKind.ApiCallCached
&& e.Status == AuditStatus.Attempted),
Arg.Any<CancellationToken>());
await _tracking.Received(1).RecordAttemptAsync(
_id, "Attempted", 2, "HTTP 503", 503, Arg.Any<CancellationToken>());
await _tracking.DidNotReceiveWithAnyArgs().RecordEnqueueAsync(
default, default!, default, default, default, default, default);
await _tracking.DidNotReceiveWithAnyArgs().RecordTerminalAsync(
default, default!, default, default, default);
}
[Fact]
public async Task ForwardAsync_Resolve_WritesAuditEvent_AndRecordsTerminal()
{
var sut = CreateSut();
var packet = ResolvePacket("Delivered");
await sut.ForwardAsync(packet, CancellationToken.None);
await _writer.Received(1).WriteAsync(
Arg.Is<AuditEvent>(e =>
e.EventId == packet.Audit.EventId
&& e.Kind == AuditKind.CachedResolve
&& e.Status == AuditStatus.Delivered),
Arg.Any<CancellationToken>());
await _tracking.Received(1).RecordTerminalAsync(
_id, "Delivered", null, null, Arg.Any<CancellationToken>());
await _tracking.DidNotReceiveWithAnyArgs().RecordEnqueueAsync(
default, default!, default, default, default, default, default);
await _tracking.DidNotReceiveWithAnyArgs().RecordAttemptAsync(
default, default!, default, default, default, default);
}
[Fact]
public async Task ForwardAsync_WriterThrows_Logs_DoesNotPropagate()
{
_writer.WriteAsync(Arg.Any<AuditEvent>(), Arg.Any<CancellationToken>())
.Throws(new InvalidOperationException("primary down"));
var sut = CreateSut();
// Must not throw.
await sut.ForwardAsync(SubmitPacket(), CancellationToken.None);
// Tracking still attempted — emission of the two halves is independent
// so a writer outage cannot starve the operational row (and vice-versa).
await _tracking.Received(1).RecordEnqueueAsync(
Arg.Any<TrackedOperationId>(),
Arg.Any<string>(),
Arg.Any<string?>(),
Arg.Any<string?>(),
Arg.Any<string?>(),
Arg.Any<string?>(),
Arg.Any<CancellationToken>());
}
[Fact]
public async Task ForwardAsync_TrackingStoreThrows_Logs_DoesNotPropagate()
{
_tracking.RecordEnqueueAsync(
Arg.Any<TrackedOperationId>(),
Arg.Any<string>(),
Arg.Any<string?>(),
Arg.Any<string?>(),
Arg.Any<string?>(),
Arg.Any<string?>(),
Arg.Any<CancellationToken>())
.Throws(new InvalidOperationException("sqlite locked"));
var sut = CreateSut();
await sut.ForwardAsync(SubmitPacket(), CancellationToken.None);
// Writer still attempted — emission halves are independent.
await _writer.Received(1).WriteAsync(
Arg.Any<AuditEvent>(), Arg.Any<CancellationToken>());
}
[Fact]
public async Task ForwardAsync_NullPacket_Throws()
{
var sut = CreateSut();
await Assert.ThrowsAsync<ArgumentNullException>(
() => sut.ForwardAsync(null!, CancellationToken.None));
}
// ── SourceNode-stamping (Task 14) ──
[Fact]
public async Task ForwardAsync_Submit_StampsSourceNode_FromNodeIdentityProvider()
{
// SourceNode-stamping (Task 14): when an INodeIdentityProvider is
// wired the forwarder must stamp its NodeName onto the
// RecordEnqueueAsync sourceNode parameter so the tracking row
// captures the originating node (node-a/node-b).
var nodeIdentity = Substitute.For<INodeIdentityProvider>();
nodeIdentity.NodeName.Returns("node-a");
var sut = new CachedCallTelemetryForwarder(
_writer, _tracking, NullLogger<CachedCallTelemetryForwarder>.Instance, nodeIdentity);
await sut.ForwardAsync(SubmitPacket(), CancellationToken.None);
await _tracking.Received(1).RecordEnqueueAsync(
_id,
"ApiOutbound",
"ERP.GetOrder",
"inst-1",
"ScriptActor:doStuff",
"node-a",
Arg.Any<CancellationToken>());
}
[Fact]
public async Task ForwardAsync_Submit_NodeIdentityNullNodeName_PassesNullSourceNode()
{
// The provider exists but reports a null NodeName (unconfigured).
// The forwarder passes that null through to RecordEnqueueAsync rather
// than falling back to a placeholder string.
var nodeIdentity = Substitute.For<INodeIdentityProvider>();
nodeIdentity.NodeName.Returns((string?)null);
var sut = new CachedCallTelemetryForwarder(
_writer, _tracking, NullLogger<CachedCallTelemetryForwarder>.Instance, nodeIdentity);
await sut.ForwardAsync(SubmitPacket(), CancellationToken.None);
await _tracking.Received(1).RecordEnqueueAsync(
_id,
"ApiOutbound",
"ERP.GetOrder",
"inst-1",
"ScriptActor:doStuff",
null,
Arg.Any<CancellationToken>());
}
}