Sister to SiteAuditTelemetryActor: takes a combined CachedCallTelemetry
packet and fans it out to the two site-local stores.
* AuditEvent half writes through IAuditWriter (the M2 FallbackAuditWriter
+ SqliteAuditWriter chain — same site SQLite hot-path as sync calls).
* SiteCallOperational half maps Audit.Kind to the matching
IOperationTrackingStore method:
- CachedSubmit -> RecordEnqueueAsync (insert-if-not-exists)
- ApiCallCached / DbWriteCached -> RecordAttemptAsync (monotonic)
- CachedResolve -> RecordTerminalAsync (first-write-wins)
Best-effort contract (alog.md §7): independent try/catch per half so a
thrown writer cannot starve the tracking row (and vice-versa); both
failures are logged at warning level and swallowed — the calling script
never sees them.
Wire push deferred to M6 — the NoOp ISiteStreamAuditClient binding stays
in effect; the forwarder writes only to the local stores in M3. The
existing SiteAuditTelemetryActor drain loop will sweep the audit rows
once a real gRPC client lands.
Bundle E task E2.
246 lines
9.4 KiB
C#
246 lines
9.4 KiB
C#
using Microsoft.Extensions.Logging.Abstractions;
|
|
using NSubstitute;
|
|
using NSubstitute.ExceptionExtensions;
|
|
using ScadaLink.AuditLog.Site.Telemetry;
|
|
using ScadaLink.Commons.Entities.Audit;
|
|
using ScadaLink.Commons.Interfaces;
|
|
using ScadaLink.Commons.Interfaces.Services;
|
|
using ScadaLink.Commons.Messages.Integration;
|
|
using ScadaLink.Commons.Types;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
|
|
namespace ScadaLink.AuditLog.Tests.Site.Telemetry;
|
|
|
|
/// <summary>
|
|
/// Bundle E E2 tests for <see cref="CachedCallTelemetryForwarder"/>. The
|
|
/// forwarder is the site-side dual emitter: every cached-call lifecycle event
|
|
/// writes one <see cref="AuditEvent"/> to <see cref="IAuditWriter"/> and one
|
|
/// operational tracking-row mutation to <see cref="IOperationTrackingStore"/>.
|
|
/// Audit-emission contract: best-effort — a thrown writer or tracking store
|
|
/// must be logged and swallowed; the forwarder must never propagate the
|
|
/// exception to the calling script.
|
|
/// </summary>
|
|
public class CachedCallTelemetryForwarderTests
|
|
{
|
|
private readonly IAuditWriter _writer = Substitute.For<IAuditWriter>();
|
|
private readonly IOperationTrackingStore _tracking = Substitute.For<IOperationTrackingStore>();
|
|
private readonly TrackedOperationId _id = TrackedOperationId.New();
|
|
private readonly DateTime _now = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc);
|
|
|
|
private CachedCallTelemetryForwarder CreateSut() => new(
|
|
_writer, _tracking, NullLogger<CachedCallTelemetryForwarder>.Instance);
|
|
|
|
private CachedCallTelemetry SubmitPacket() =>
|
|
new(
|
|
Audit: new AuditEvent
|
|
{
|
|
EventId = Guid.NewGuid(),
|
|
OccurredAtUtc = _now,
|
|
Channel = AuditChannel.ApiOutbound,
|
|
Kind = AuditKind.CachedSubmit,
|
|
CorrelationId = _id.Value,
|
|
SourceSiteId = "site-1",
|
|
SourceInstanceId = "inst-1",
|
|
SourceScript = "ScriptActor:doStuff",
|
|
Target = "ERP.GetOrder",
|
|
Status = AuditStatus.Submitted,
|
|
ForwardState = AuditForwardState.Pending,
|
|
},
|
|
Operational: new SiteCallOperational(
|
|
TrackedOperationId: _id,
|
|
Channel: "ApiOutbound",
|
|
Target: "ERP.GetOrder",
|
|
SourceSite: "site-1",
|
|
Status: "Submitted",
|
|
RetryCount: 0,
|
|
LastError: null,
|
|
HttpStatus: null,
|
|
CreatedAtUtc: _now,
|
|
UpdatedAtUtc: _now,
|
|
TerminalAtUtc: null));
|
|
|
|
private CachedCallTelemetry AttemptedPacket(int retryCount = 1, string? lastError = "HTTP 500", int? httpStatus = 500) =>
|
|
new(
|
|
Audit: new AuditEvent
|
|
{
|
|
EventId = Guid.NewGuid(),
|
|
OccurredAtUtc = _now,
|
|
Channel = AuditChannel.ApiOutbound,
|
|
Kind = AuditKind.ApiCallCached,
|
|
CorrelationId = _id.Value,
|
|
SourceSiteId = "site-1",
|
|
Target = "ERP.GetOrder",
|
|
Status = AuditStatus.Attempted,
|
|
HttpStatus = httpStatus,
|
|
ErrorMessage = lastError,
|
|
ForwardState = AuditForwardState.Pending,
|
|
},
|
|
Operational: new SiteCallOperational(
|
|
TrackedOperationId: _id,
|
|
Channel: "ApiOutbound",
|
|
Target: "ERP.GetOrder",
|
|
SourceSite: "site-1",
|
|
Status: "Attempted",
|
|
RetryCount: retryCount,
|
|
LastError: lastError,
|
|
HttpStatus: httpStatus,
|
|
CreatedAtUtc: _now,
|
|
UpdatedAtUtc: _now,
|
|
TerminalAtUtc: null));
|
|
|
|
private CachedCallTelemetry ResolvePacket(string status = "Delivered") =>
|
|
new(
|
|
Audit: new AuditEvent
|
|
{
|
|
EventId = Guid.NewGuid(),
|
|
OccurredAtUtc = _now,
|
|
Channel = AuditChannel.ApiOutbound,
|
|
Kind = AuditKind.CachedResolve,
|
|
CorrelationId = _id.Value,
|
|
SourceSiteId = "site-1",
|
|
Target = "ERP.GetOrder",
|
|
Status = Enum.Parse<AuditStatus>(status),
|
|
ForwardState = AuditForwardState.Pending,
|
|
},
|
|
Operational: new SiteCallOperational(
|
|
TrackedOperationId: _id,
|
|
Channel: "ApiOutbound",
|
|
Target: "ERP.GetOrder",
|
|
SourceSite: "site-1",
|
|
Status: status,
|
|
RetryCount: 2,
|
|
LastError: null,
|
|
HttpStatus: null,
|
|
CreatedAtUtc: _now,
|
|
UpdatedAtUtc: _now,
|
|
TerminalAtUtc: _now));
|
|
|
|
[Fact]
|
|
public async Task ForwardAsync_Submit_WritesAuditEvent_AndRecordsEnqueue()
|
|
{
|
|
var sut = CreateSut();
|
|
var packet = SubmitPacket();
|
|
|
|
await sut.ForwardAsync(packet, CancellationToken.None);
|
|
|
|
// Audit row: one WriteAsync of the submit event.
|
|
await _writer.Received(1).WriteAsync(
|
|
Arg.Is<AuditEvent>(e =>
|
|
e.EventId == packet.Audit.EventId
|
|
&& e.Kind == AuditKind.CachedSubmit
|
|
&& e.Status == AuditStatus.Submitted),
|
|
Arg.Any<CancellationToken>());
|
|
|
|
// Tracking row: insert-if-not-exists with kind discriminator.
|
|
await _tracking.Received(1).RecordEnqueueAsync(
|
|
_id,
|
|
"ApiOutbound",
|
|
"ERP.GetOrder",
|
|
"inst-1",
|
|
"ScriptActor:doStuff",
|
|
Arg.Any<CancellationToken>());
|
|
await _tracking.DidNotReceiveWithAnyArgs().RecordAttemptAsync(
|
|
default, default!, default, default, default, default);
|
|
await _tracking.DidNotReceiveWithAnyArgs().RecordTerminalAsync(
|
|
default, default!, default, default, default);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ForwardAsync_Attempted_WritesAuditEvent_AndRecordsAttempt()
|
|
{
|
|
var sut = CreateSut();
|
|
var packet = AttemptedPacket(retryCount: 2, lastError: "HTTP 503", httpStatus: 503);
|
|
|
|
await sut.ForwardAsync(packet, CancellationToken.None);
|
|
|
|
await _writer.Received(1).WriteAsync(
|
|
Arg.Is<AuditEvent>(e =>
|
|
e.EventId == packet.Audit.EventId
|
|
&& e.Kind == AuditKind.ApiCallCached
|
|
&& e.Status == AuditStatus.Attempted),
|
|
Arg.Any<CancellationToken>());
|
|
|
|
await _tracking.Received(1).RecordAttemptAsync(
|
|
_id, "Attempted", 2, "HTTP 503", 503, Arg.Any<CancellationToken>());
|
|
await _tracking.DidNotReceiveWithAnyArgs().RecordEnqueueAsync(
|
|
default, default!, default, default, default, default);
|
|
await _tracking.DidNotReceiveWithAnyArgs().RecordTerminalAsync(
|
|
default, default!, default, default, default);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ForwardAsync_Resolve_WritesAuditEvent_AndRecordsTerminal()
|
|
{
|
|
var sut = CreateSut();
|
|
var packet = ResolvePacket("Delivered");
|
|
|
|
await sut.ForwardAsync(packet, CancellationToken.None);
|
|
|
|
await _writer.Received(1).WriteAsync(
|
|
Arg.Is<AuditEvent>(e =>
|
|
e.EventId == packet.Audit.EventId
|
|
&& e.Kind == AuditKind.CachedResolve
|
|
&& e.Status == AuditStatus.Delivered),
|
|
Arg.Any<CancellationToken>());
|
|
|
|
await _tracking.Received(1).RecordTerminalAsync(
|
|
_id, "Delivered", null, null, Arg.Any<CancellationToken>());
|
|
await _tracking.DidNotReceiveWithAnyArgs().RecordEnqueueAsync(
|
|
default, default!, default, default, default, default);
|
|
await _tracking.DidNotReceiveWithAnyArgs().RecordAttemptAsync(
|
|
default, default!, default, default, default, default);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ForwardAsync_WriterThrows_Logs_DoesNotPropagate()
|
|
{
|
|
_writer.WriteAsync(Arg.Any<AuditEvent>(), Arg.Any<CancellationToken>())
|
|
.Throws(new InvalidOperationException("primary down"));
|
|
|
|
var sut = CreateSut();
|
|
|
|
// Must not throw.
|
|
await sut.ForwardAsync(SubmitPacket(), CancellationToken.None);
|
|
|
|
// Tracking still attempted — emission of the two halves is independent
|
|
// so a writer outage cannot starve the operational row (and vice-versa).
|
|
await _tracking.Received(1).RecordEnqueueAsync(
|
|
Arg.Any<TrackedOperationId>(),
|
|
Arg.Any<string>(),
|
|
Arg.Any<string?>(),
|
|
Arg.Any<string?>(),
|
|
Arg.Any<string?>(),
|
|
Arg.Any<CancellationToken>());
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ForwardAsync_TrackingStoreThrows_Logs_DoesNotPropagate()
|
|
{
|
|
_tracking.RecordEnqueueAsync(
|
|
Arg.Any<TrackedOperationId>(),
|
|
Arg.Any<string>(),
|
|
Arg.Any<string?>(),
|
|
Arg.Any<string?>(),
|
|
Arg.Any<string?>(),
|
|
Arg.Any<CancellationToken>())
|
|
.Throws(new InvalidOperationException("sqlite locked"));
|
|
|
|
var sut = CreateSut();
|
|
|
|
await sut.ForwardAsync(SubmitPacket(), CancellationToken.None);
|
|
|
|
// Writer still attempted — emission halves are independent.
|
|
await _writer.Received(1).WriteAsync(
|
|
Arg.Any<AuditEvent>(), Arg.Any<CancellationToken>());
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ForwardAsync_NullPacket_Throws()
|
|
{
|
|
var sut = CreateSut();
|
|
|
|
await Assert.ThrowsAsync<ArgumentNullException>(
|
|
() => sut.ForwardAsync(null!, CancellationToken.None));
|
|
}
|
|
}
|