Site: site emitters of SiteCallOperational (ExternalSystemClient, the script-API cached call path in ScriptRuntimeContext, CachedCallLifecycleBridge) inject INodeIdentityProvider and stamp SourceNode = NodeName at construction. OperationTrackingStore call site in CachedCallTelemetryForwarder now stamps SourceNode too. Central: SiteCallAuditRepository.UpsertAsync INSERT includes SourceNode in the column list; conditional monotonic UPDATE uses COALESCE(@SourceNode, SourceNode) so later packets cannot blank a previously- stamped value. After this commit every SiteCalls row carries node-a/node-b in SourceNode (subject to monotonic preservation).
308 lines
12 KiB
C#
308 lines
12 KiB
C#
using Microsoft.Extensions.Logging.Abstractions;
|
|
using NSubstitute;
|
|
using NSubstitute.ExceptionExtensions;
|
|
using ScadaLink.AuditLog.Site.Telemetry;
|
|
using ScadaLink.Commons.Entities.Audit;
|
|
using ScadaLink.Commons.Interfaces;
|
|
using ScadaLink.Commons.Interfaces.Services;
|
|
using ScadaLink.Commons.Messages.Integration;
|
|
using ScadaLink.Commons.Types;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
|
|
namespace ScadaLink.AuditLog.Tests.Site.Telemetry;
|
|
|
|
/// <summary>
|
|
/// Bundle E E2 tests for <see cref="CachedCallTelemetryForwarder"/>. The
|
|
/// forwarder is the site-side dual emitter: every cached-call lifecycle event
|
|
/// writes one <see cref="AuditEvent"/> to <see cref="IAuditWriter"/> and one
|
|
/// operational tracking-row mutation to <see cref="IOperationTrackingStore"/>.
|
|
/// Audit-emission contract: best-effort — a thrown writer or tracking store
|
|
/// must be logged and swallowed; the forwarder must never propagate the
|
|
/// exception to the calling script.
|
|
/// </summary>
|
|
public class CachedCallTelemetryForwarderTests
|
|
{
|
|
private readonly IAuditWriter _writer = Substitute.For<IAuditWriter>();
|
|
private readonly IOperationTrackingStore _tracking = Substitute.For<IOperationTrackingStore>();
|
|
private readonly TrackedOperationId _id = TrackedOperationId.New();
|
|
private readonly DateTime _now = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc);
|
|
|
|
private CachedCallTelemetryForwarder CreateSut() => new(
|
|
_writer, _tracking, NullLogger<CachedCallTelemetryForwarder>.Instance);
|
|
|
|
private CachedCallTelemetry SubmitPacket() =>
|
|
new(
|
|
Audit: new AuditEvent
|
|
{
|
|
EventId = Guid.NewGuid(),
|
|
OccurredAtUtc = _now,
|
|
Channel = AuditChannel.ApiOutbound,
|
|
Kind = AuditKind.CachedSubmit,
|
|
CorrelationId = _id.Value,
|
|
SourceSiteId = "site-1",
|
|
SourceInstanceId = "inst-1",
|
|
SourceScript = "ScriptActor:doStuff",
|
|
Target = "ERP.GetOrder",
|
|
Status = AuditStatus.Submitted,
|
|
ForwardState = AuditForwardState.Pending,
|
|
},
|
|
Operational: new SiteCallOperational(
|
|
TrackedOperationId: _id,
|
|
Channel: "ApiOutbound",
|
|
Target: "ERP.GetOrder",
|
|
SourceSite: "site-1",
|
|
SourceNode: null,
|
|
Status: "Submitted",
|
|
RetryCount: 0,
|
|
LastError: null,
|
|
HttpStatus: null,
|
|
CreatedAtUtc: _now,
|
|
UpdatedAtUtc: _now,
|
|
TerminalAtUtc: null));
|
|
|
|
private CachedCallTelemetry AttemptedPacket(int retryCount = 1, string? lastError = "HTTP 500", int? httpStatus = 500) =>
|
|
new(
|
|
Audit: new AuditEvent
|
|
{
|
|
EventId = Guid.NewGuid(),
|
|
OccurredAtUtc = _now,
|
|
Channel = AuditChannel.ApiOutbound,
|
|
Kind = AuditKind.ApiCallCached,
|
|
CorrelationId = _id.Value,
|
|
SourceSiteId = "site-1",
|
|
Target = "ERP.GetOrder",
|
|
Status = AuditStatus.Attempted,
|
|
HttpStatus = httpStatus,
|
|
ErrorMessage = lastError,
|
|
ForwardState = AuditForwardState.Pending,
|
|
},
|
|
Operational: new SiteCallOperational(
|
|
TrackedOperationId: _id,
|
|
Channel: "ApiOutbound",
|
|
Target: "ERP.GetOrder",
|
|
SourceSite: "site-1",
|
|
SourceNode: null,
|
|
Status: "Attempted",
|
|
RetryCount: retryCount,
|
|
LastError: lastError,
|
|
HttpStatus: httpStatus,
|
|
CreatedAtUtc: _now,
|
|
UpdatedAtUtc: _now,
|
|
TerminalAtUtc: null));
|
|
|
|
private CachedCallTelemetry ResolvePacket(string status = "Delivered") =>
|
|
new(
|
|
Audit: new AuditEvent
|
|
{
|
|
EventId = Guid.NewGuid(),
|
|
OccurredAtUtc = _now,
|
|
Channel = AuditChannel.ApiOutbound,
|
|
Kind = AuditKind.CachedResolve,
|
|
CorrelationId = _id.Value,
|
|
SourceSiteId = "site-1",
|
|
Target = "ERP.GetOrder",
|
|
Status = Enum.Parse<AuditStatus>(status),
|
|
ForwardState = AuditForwardState.Pending,
|
|
},
|
|
Operational: new SiteCallOperational(
|
|
TrackedOperationId: _id,
|
|
Channel: "ApiOutbound",
|
|
Target: "ERP.GetOrder",
|
|
SourceSite: "site-1",
|
|
SourceNode: null,
|
|
Status: status,
|
|
RetryCount: 2,
|
|
LastError: null,
|
|
HttpStatus: null,
|
|
CreatedAtUtc: _now,
|
|
UpdatedAtUtc: _now,
|
|
TerminalAtUtc: _now));
|
|
|
|
[Fact]
|
|
public async Task ForwardAsync_Submit_WritesAuditEvent_AndRecordsEnqueue()
|
|
{
|
|
var sut = CreateSut();
|
|
var packet = SubmitPacket();
|
|
|
|
await sut.ForwardAsync(packet, CancellationToken.None);
|
|
|
|
// Audit row: one WriteAsync of the submit event.
|
|
await _writer.Received(1).WriteAsync(
|
|
Arg.Is<AuditEvent>(e =>
|
|
e.EventId == packet.Audit.EventId
|
|
&& e.Kind == AuditKind.CachedSubmit
|
|
&& e.Status == AuditStatus.Submitted),
|
|
Arg.Any<CancellationToken>());
|
|
|
|
// Tracking row: insert-if-not-exists with kind discriminator.
|
|
// Default CreateSut() does NOT supply an INodeIdentityProvider, so the
|
|
// forwarder passes null sourceNode to RecordEnqueueAsync (legacy / test
|
|
// host behaviour). The Task 14 stamping path is covered by the
|
|
// ForwardAsync_Submit_StampsSourceNode_FromNodeIdentityProvider test
|
|
// below.
|
|
await _tracking.Received(1).RecordEnqueueAsync(
|
|
_id,
|
|
"ApiOutbound",
|
|
"ERP.GetOrder",
|
|
"inst-1",
|
|
"ScriptActor:doStuff",
|
|
null,
|
|
Arg.Any<CancellationToken>());
|
|
await _tracking.DidNotReceiveWithAnyArgs().RecordAttemptAsync(
|
|
default, default!, default, default, default, default);
|
|
await _tracking.DidNotReceiveWithAnyArgs().RecordTerminalAsync(
|
|
default, default!, default, default, default);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ForwardAsync_Attempted_WritesAuditEvent_AndRecordsAttempt()
|
|
{
|
|
var sut = CreateSut();
|
|
var packet = AttemptedPacket(retryCount: 2, lastError: "HTTP 503", httpStatus: 503);
|
|
|
|
await sut.ForwardAsync(packet, CancellationToken.None);
|
|
|
|
await _writer.Received(1).WriteAsync(
|
|
Arg.Is<AuditEvent>(e =>
|
|
e.EventId == packet.Audit.EventId
|
|
&& e.Kind == AuditKind.ApiCallCached
|
|
&& e.Status == AuditStatus.Attempted),
|
|
Arg.Any<CancellationToken>());
|
|
|
|
await _tracking.Received(1).RecordAttemptAsync(
|
|
_id, "Attempted", 2, "HTTP 503", 503, Arg.Any<CancellationToken>());
|
|
await _tracking.DidNotReceiveWithAnyArgs().RecordEnqueueAsync(
|
|
default, default!, default, default, default, default, default);
|
|
await _tracking.DidNotReceiveWithAnyArgs().RecordTerminalAsync(
|
|
default, default!, default, default, default);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ForwardAsync_Resolve_WritesAuditEvent_AndRecordsTerminal()
|
|
{
|
|
var sut = CreateSut();
|
|
var packet = ResolvePacket("Delivered");
|
|
|
|
await sut.ForwardAsync(packet, CancellationToken.None);
|
|
|
|
await _writer.Received(1).WriteAsync(
|
|
Arg.Is<AuditEvent>(e =>
|
|
e.EventId == packet.Audit.EventId
|
|
&& e.Kind == AuditKind.CachedResolve
|
|
&& e.Status == AuditStatus.Delivered),
|
|
Arg.Any<CancellationToken>());
|
|
|
|
await _tracking.Received(1).RecordTerminalAsync(
|
|
_id, "Delivered", null, null, Arg.Any<CancellationToken>());
|
|
await _tracking.DidNotReceiveWithAnyArgs().RecordEnqueueAsync(
|
|
default, default!, default, default, default, default, default);
|
|
await _tracking.DidNotReceiveWithAnyArgs().RecordAttemptAsync(
|
|
default, default!, default, default, default, default);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ForwardAsync_WriterThrows_Logs_DoesNotPropagate()
|
|
{
|
|
_writer.WriteAsync(Arg.Any<AuditEvent>(), Arg.Any<CancellationToken>())
|
|
.Throws(new InvalidOperationException("primary down"));
|
|
|
|
var sut = CreateSut();
|
|
|
|
// Must not throw.
|
|
await sut.ForwardAsync(SubmitPacket(), CancellationToken.None);
|
|
|
|
// Tracking still attempted — emission of the two halves is independent
|
|
// so a writer outage cannot starve the operational row (and vice-versa).
|
|
await _tracking.Received(1).RecordEnqueueAsync(
|
|
Arg.Any<TrackedOperationId>(),
|
|
Arg.Any<string>(),
|
|
Arg.Any<string?>(),
|
|
Arg.Any<string?>(),
|
|
Arg.Any<string?>(),
|
|
Arg.Any<string?>(),
|
|
Arg.Any<CancellationToken>());
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ForwardAsync_TrackingStoreThrows_Logs_DoesNotPropagate()
|
|
{
|
|
_tracking.RecordEnqueueAsync(
|
|
Arg.Any<TrackedOperationId>(),
|
|
Arg.Any<string>(),
|
|
Arg.Any<string?>(),
|
|
Arg.Any<string?>(),
|
|
Arg.Any<string?>(),
|
|
Arg.Any<string?>(),
|
|
Arg.Any<CancellationToken>())
|
|
.Throws(new InvalidOperationException("sqlite locked"));
|
|
|
|
var sut = CreateSut();
|
|
|
|
await sut.ForwardAsync(SubmitPacket(), CancellationToken.None);
|
|
|
|
// Writer still attempted — emission halves are independent.
|
|
await _writer.Received(1).WriteAsync(
|
|
Arg.Any<AuditEvent>(), Arg.Any<CancellationToken>());
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ForwardAsync_NullPacket_Throws()
|
|
{
|
|
var sut = CreateSut();
|
|
|
|
await Assert.ThrowsAsync<ArgumentNullException>(
|
|
() => sut.ForwardAsync(null!, CancellationToken.None));
|
|
}
|
|
|
|
// ── SourceNode-stamping (Task 14) ──
|
|
|
|
[Fact]
|
|
public async Task ForwardAsync_Submit_StampsSourceNode_FromNodeIdentityProvider()
|
|
{
|
|
// SourceNode-stamping (Task 14): when an INodeIdentityProvider is
|
|
// wired the forwarder must stamp its NodeName onto the
|
|
// RecordEnqueueAsync sourceNode parameter so the tracking row
|
|
// captures the originating node (node-a/node-b).
|
|
var nodeIdentity = Substitute.For<INodeIdentityProvider>();
|
|
nodeIdentity.NodeName.Returns("node-a");
|
|
|
|
var sut = new CachedCallTelemetryForwarder(
|
|
_writer, _tracking, NullLogger<CachedCallTelemetryForwarder>.Instance, nodeIdentity);
|
|
|
|
await sut.ForwardAsync(SubmitPacket(), CancellationToken.None);
|
|
|
|
await _tracking.Received(1).RecordEnqueueAsync(
|
|
_id,
|
|
"ApiOutbound",
|
|
"ERP.GetOrder",
|
|
"inst-1",
|
|
"ScriptActor:doStuff",
|
|
"node-a",
|
|
Arg.Any<CancellationToken>());
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ForwardAsync_Submit_NodeIdentityNullNodeName_PassesNullSourceNode()
|
|
{
|
|
// The provider exists but reports a null NodeName (unconfigured).
|
|
// The forwarder passes that null through to RecordEnqueueAsync rather
|
|
// than falling back to a placeholder string.
|
|
var nodeIdentity = Substitute.For<INodeIdentityProvider>();
|
|
nodeIdentity.NodeName.Returns((string?)null);
|
|
|
|
var sut = new CachedCallTelemetryForwarder(
|
|
_writer, _tracking, NullLogger<CachedCallTelemetryForwarder>.Instance, nodeIdentity);
|
|
|
|
await sut.ForwardAsync(SubmitPacket(), CancellationToken.None);
|
|
|
|
await _tracking.Received(1).RecordEnqueueAsync(
|
|
_id,
|
|
"ApiOutbound",
|
|
"ERP.GetOrder",
|
|
"inst-1",
|
|
"ScriptActor:doStuff",
|
|
null,
|
|
Arg.Any<CancellationToken>());
|
|
}
|
|
}
|