feat(auditlog): CachedCallTelemetryForwarder for site-side dual emission (#23 M3)
Sister to SiteAuditTelemetryActor: takes a combined CachedCallTelemetry
packet and fans it out to the two site-local stores.
* AuditEvent half writes through IAuditWriter (the M2 FallbackAuditWriter
+ SqliteAuditWriter chain — same site SQLite hot-path as sync calls).
* SiteCallOperational half maps Audit.Kind to the matching
IOperationTrackingStore method:
- CachedSubmit -> RecordEnqueueAsync (insert-if-not-exists)
- ApiCallCached / DbWriteCached -> RecordAttemptAsync (monotonic)
- CachedResolve -> RecordTerminalAsync (first-write-wins)
Best-effort contract (alog.md §7): independent try/catch per half so a
thrown writer cannot starve the tracking row (and vice-versa); both
failures are logged at warning level and swallowed — the calling script
never sees them.
Wire push deferred to M6 — the NoOp ISiteStreamAuditClient binding stays
in effect; the forwarder writes only to the local stores in M3. The
existing SiteAuditTelemetryActor drain loop will sweep the audit rows
once a real gRPC client lands.
Bundle E task E2.
This commit is contained in:
@@ -0,0 +1,161 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ScadaLink.Commons.Entities.Audit;
|
||||
using ScadaLink.Commons.Interfaces;
|
||||
using ScadaLink.Commons.Interfaces.Services;
|
||||
using ScadaLink.Commons.Messages.Integration;
|
||||
using ScadaLink.Commons.Types;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
|
||||
namespace ScadaLink.AuditLog.Site.Telemetry;
|
||||
|
||||
/// <summary>
|
||||
/// Site-side dual emitter for cached-call lifecycle telemetry (Audit Log #23 /
|
||||
/// M3). Sister to <see cref="SiteAuditTelemetryActor"/>: where the M2 actor
|
||||
/// drains audit-only events, this forwarder takes a combined
|
||||
/// <see cref="CachedCallTelemetry"/> packet and fans it out to the two
|
||||
/// site-local stores in a single call:
|
||||
/// <list type="bullet">
|
||||
/// <item><description>The <see cref="AuditEvent"/> row is written via
|
||||
/// <see cref="IAuditWriter"/> (the site <c>FallbackAuditWriter</c> +
|
||||
/// <c>SqliteAuditWriter</c> chain established in M2).</description></item>
|
||||
/// <item><description>The operational <see cref="SiteCallOperational"/> half
|
||||
/// updates the site-local <c>OperationTracking</c> SQLite store via
|
||||
/// <see cref="IOperationTrackingStore"/>, with the per-lifecycle method
|
||||
/// (<c>Enqueue</c> / <c>Attempt</c> / <c>Terminal</c>) selected from the
|
||||
/// audit row's <see cref="AuditKind"/>.</description></item>
|
||||
/// </list>
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// <b>Best-effort contract (alog.md §7):</b> a thrown writer OR a thrown
|
||||
/// tracking store must never propagate to the calling script. Both emission
|
||||
/// halves are wrapped in independent try/catch blocks so a SQLite outage on
|
||||
/// one side cannot starve the other — the failure is logged and the call
|
||||
/// returns normally.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>Wire push deferred to M6.</b> M3 keeps this forwarder synchronous
|
||||
/// against the local stores: there is no site→central gRPC channel yet, so
|
||||
/// the <see cref="ISiteStreamAuditClient.IngestCachedTelemetryAsync"/> RPC
|
||||
/// is registered on the interface (Bundle E1) but the production binding
|
||||
/// remains <c>NoOpSiteStreamAuditClient</c>. Once M6 wires a real client the
|
||||
/// drain pattern from <c>SiteAuditTelemetryActor</c> can be reused — the
|
||||
/// <c>AuditEvent</c> rows already live in SQLite tagged
|
||||
/// <see cref="AuditForwardState.Pending"/>, so a single drain loop sweeps
|
||||
/// both M2 and M3 emissions.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class CachedCallTelemetryForwarder
|
||||
{
|
||||
private readonly IAuditWriter _auditWriter;
|
||||
private readonly IOperationTrackingStore _trackingStore;
|
||||
private readonly ILogger<CachedCallTelemetryForwarder> _logger;
|
||||
|
||||
public CachedCallTelemetryForwarder(
|
||||
IAuditWriter auditWriter,
|
||||
IOperationTrackingStore trackingStore,
|
||||
ILogger<CachedCallTelemetryForwarder> logger)
|
||||
{
|
||||
_auditWriter = auditWriter ?? throw new ArgumentNullException(nameof(auditWriter));
|
||||
_trackingStore = trackingStore ?? throw new ArgumentNullException(nameof(trackingStore));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Fan out one combined-telemetry packet to the audit writer and the
|
||||
/// tracking store. Returns once both halves have been attempted (success
|
||||
/// OR logged failure). NEVER throws — exceptions are caught per-half and
|
||||
/// logged at warning level so the calling script's outbound action is not
|
||||
/// disturbed.
|
||||
/// </summary>
|
||||
public async Task ForwardAsync(CachedCallTelemetry telemetry, CancellationToken ct = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(telemetry);
|
||||
|
||||
// Independent try/catch — a thrown audit writer must not prevent the
|
||||
// tracking-store update from running (and vice-versa). Both halves
|
||||
// are best-effort.
|
||||
await TryEmitAuditAsync(telemetry, ct).ConfigureAwait(false);
|
||||
await TryEmitTrackingAsync(telemetry, ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task TryEmitAuditAsync(CachedCallTelemetry telemetry, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _auditWriter.WriteAsync(telemetry.Audit, ct).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// alog.md §7 best-effort contract — log and swallow. The audit
|
||||
// pipeline's own retry/recovery (RingBufferFallback in the
|
||||
// FallbackAuditWriter) handles transient writer failures upstream;
|
||||
// a throw bubbling up here means the writer's own swallow contract
|
||||
// failed, which is itself best-effort-handled.
|
||||
_logger.LogWarning(ex,
|
||||
"CachedCallTelemetryForwarder: audit emission threw for EventId {EventId} (Kind {Kind}, Status {Status})",
|
||||
telemetry.Audit.EventId, telemetry.Audit.Kind, telemetry.Audit.Status);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task TryEmitTrackingAsync(CachedCallTelemetry telemetry, CancellationToken ct)
|
||||
{
|
||||
try
|
||||
{
|
||||
switch (telemetry.Audit.Kind)
|
||||
{
|
||||
case AuditKind.CachedSubmit:
|
||||
// Enqueue — insert-if-not-exists with the operational
|
||||
// channel as the kind discriminator. RetryCount is fixed
|
||||
// at 0 by the tracking store's INSERT contract.
|
||||
await _trackingStore.RecordEnqueueAsync(
|
||||
telemetry.Operational.TrackedOperationId,
|
||||
telemetry.Operational.Channel,
|
||||
telemetry.Operational.Target,
|
||||
telemetry.Audit.SourceInstanceId,
|
||||
telemetry.Audit.SourceScript,
|
||||
ct).ConfigureAwait(false);
|
||||
break;
|
||||
|
||||
case AuditKind.ApiCallCached:
|
||||
case AuditKind.DbWriteCached:
|
||||
// Attempt — advance retry counter + last-error/HTTP-status.
|
||||
// Terminal rows are guarded by the store's WHERE clause.
|
||||
await _trackingStore.RecordAttemptAsync(
|
||||
telemetry.Operational.TrackedOperationId,
|
||||
telemetry.Operational.Status,
|
||||
telemetry.Operational.RetryCount,
|
||||
telemetry.Operational.LastError,
|
||||
telemetry.Operational.HttpStatus,
|
||||
ct).ConfigureAwait(false);
|
||||
break;
|
||||
|
||||
case AuditKind.CachedResolve:
|
||||
// Terminal — first-write-wins on the resolve flip.
|
||||
await _trackingStore.RecordTerminalAsync(
|
||||
telemetry.Operational.TrackedOperationId,
|
||||
telemetry.Operational.Status,
|
||||
telemetry.Operational.LastError,
|
||||
telemetry.Operational.HttpStatus,
|
||||
ct).ConfigureAwait(false);
|
||||
break;
|
||||
|
||||
default:
|
||||
// Defensive — only the four cached-lifecycle kinds are
|
||||
// expected on this path. Anything else is logged so a
|
||||
// mis-routed packet is visible but never crashes the
|
||||
// forwarder.
|
||||
_logger.LogWarning(
|
||||
"CachedCallTelemetryForwarder: unexpected audit kind {Kind} on tracking emission for EventId {EventId}",
|
||||
telemetry.Audit.Kind, telemetry.Audit.EventId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"CachedCallTelemetryForwarder: tracking-store emission threw for TrackedOperationId {Id} (Status {Status})",
|
||||
telemetry.Operational.TrackedOperationId, telemetry.Operational.Status);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,245 @@
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NSubstitute;
|
||||
using NSubstitute.ExceptionExtensions;
|
||||
using ScadaLink.AuditLog.Site.Telemetry;
|
||||
using ScadaLink.Commons.Entities.Audit;
|
||||
using ScadaLink.Commons.Interfaces;
|
||||
using ScadaLink.Commons.Interfaces.Services;
|
||||
using ScadaLink.Commons.Messages.Integration;
|
||||
using ScadaLink.Commons.Types;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
|
||||
namespace ScadaLink.AuditLog.Tests.Site.Telemetry;
|
||||
|
||||
/// <summary>
|
||||
/// Bundle E E2 tests for <see cref="CachedCallTelemetryForwarder"/>. The
|
||||
/// forwarder is the site-side dual emitter: every cached-call lifecycle event
|
||||
/// writes one <see cref="AuditEvent"/> to <see cref="IAuditWriter"/> and one
|
||||
/// operational tracking-row mutation to <see cref="IOperationTrackingStore"/>.
|
||||
/// Audit-emission contract: best-effort — a thrown writer or tracking store
|
||||
/// must be logged and swallowed; the forwarder must never propagate the
|
||||
/// exception to the calling script.
|
||||
/// </summary>
|
||||
public class CachedCallTelemetryForwarderTests
|
||||
{
|
||||
private readonly IAuditWriter _writer = Substitute.For<IAuditWriter>();
|
||||
private readonly IOperationTrackingStore _tracking = Substitute.For<IOperationTrackingStore>();
|
||||
private readonly TrackedOperationId _id = TrackedOperationId.New();
|
||||
private readonly DateTime _now = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc);
|
||||
|
||||
private CachedCallTelemetryForwarder CreateSut() => new(
|
||||
_writer, _tracking, NullLogger<CachedCallTelemetryForwarder>.Instance);
|
||||
|
||||
private CachedCallTelemetry SubmitPacket() =>
|
||||
new(
|
||||
Audit: new AuditEvent
|
||||
{
|
||||
EventId = Guid.NewGuid(),
|
||||
OccurredAtUtc = _now,
|
||||
Channel = AuditChannel.ApiOutbound,
|
||||
Kind = AuditKind.CachedSubmit,
|
||||
CorrelationId = _id.Value,
|
||||
SourceSiteId = "site-1",
|
||||
SourceInstanceId = "inst-1",
|
||||
SourceScript = "ScriptActor:doStuff",
|
||||
Target = "ERP.GetOrder",
|
||||
Status = AuditStatus.Submitted,
|
||||
ForwardState = AuditForwardState.Pending,
|
||||
},
|
||||
Operational: new SiteCallOperational(
|
||||
TrackedOperationId: _id,
|
||||
Channel: "ApiOutbound",
|
||||
Target: "ERP.GetOrder",
|
||||
SourceSite: "site-1",
|
||||
Status: "Submitted",
|
||||
RetryCount: 0,
|
||||
LastError: null,
|
||||
HttpStatus: null,
|
||||
CreatedAtUtc: _now,
|
||||
UpdatedAtUtc: _now,
|
||||
TerminalAtUtc: null));
|
||||
|
||||
private CachedCallTelemetry AttemptedPacket(int retryCount = 1, string? lastError = "HTTP 500", int? httpStatus = 500) =>
|
||||
new(
|
||||
Audit: new AuditEvent
|
||||
{
|
||||
EventId = Guid.NewGuid(),
|
||||
OccurredAtUtc = _now,
|
||||
Channel = AuditChannel.ApiOutbound,
|
||||
Kind = AuditKind.ApiCallCached,
|
||||
CorrelationId = _id.Value,
|
||||
SourceSiteId = "site-1",
|
||||
Target = "ERP.GetOrder",
|
||||
Status = AuditStatus.Attempted,
|
||||
HttpStatus = httpStatus,
|
||||
ErrorMessage = lastError,
|
||||
ForwardState = AuditForwardState.Pending,
|
||||
},
|
||||
Operational: new SiteCallOperational(
|
||||
TrackedOperationId: _id,
|
||||
Channel: "ApiOutbound",
|
||||
Target: "ERP.GetOrder",
|
||||
SourceSite: "site-1",
|
||||
Status: "Attempted",
|
||||
RetryCount: retryCount,
|
||||
LastError: lastError,
|
||||
HttpStatus: httpStatus,
|
||||
CreatedAtUtc: _now,
|
||||
UpdatedAtUtc: _now,
|
||||
TerminalAtUtc: null));
|
||||
|
||||
private CachedCallTelemetry ResolvePacket(string status = "Delivered") =>
|
||||
new(
|
||||
Audit: new AuditEvent
|
||||
{
|
||||
EventId = Guid.NewGuid(),
|
||||
OccurredAtUtc = _now,
|
||||
Channel = AuditChannel.ApiOutbound,
|
||||
Kind = AuditKind.CachedResolve,
|
||||
CorrelationId = _id.Value,
|
||||
SourceSiteId = "site-1",
|
||||
Target = "ERP.GetOrder",
|
||||
Status = Enum.Parse<AuditStatus>(status),
|
||||
ForwardState = AuditForwardState.Pending,
|
||||
},
|
||||
Operational: new SiteCallOperational(
|
||||
TrackedOperationId: _id,
|
||||
Channel: "ApiOutbound",
|
||||
Target: "ERP.GetOrder",
|
||||
SourceSite: "site-1",
|
||||
Status: status,
|
||||
RetryCount: 2,
|
||||
LastError: null,
|
||||
HttpStatus: null,
|
||||
CreatedAtUtc: _now,
|
||||
UpdatedAtUtc: _now,
|
||||
TerminalAtUtc: _now));
|
||||
|
||||
[Fact]
|
||||
public async Task ForwardAsync_Submit_WritesAuditEvent_AndRecordsEnqueue()
|
||||
{
|
||||
var sut = CreateSut();
|
||||
var packet = SubmitPacket();
|
||||
|
||||
await sut.ForwardAsync(packet, CancellationToken.None);
|
||||
|
||||
// Audit row: one WriteAsync of the submit event.
|
||||
await _writer.Received(1).WriteAsync(
|
||||
Arg.Is<AuditEvent>(e =>
|
||||
e.EventId == packet.Audit.EventId
|
||||
&& e.Kind == AuditKind.CachedSubmit
|
||||
&& e.Status == AuditStatus.Submitted),
|
||||
Arg.Any<CancellationToken>());
|
||||
|
||||
// Tracking row: insert-if-not-exists with kind discriminator.
|
||||
await _tracking.Received(1).RecordEnqueueAsync(
|
||||
_id,
|
||||
"ApiOutbound",
|
||||
"ERP.GetOrder",
|
||||
"inst-1",
|
||||
"ScriptActor:doStuff",
|
||||
Arg.Any<CancellationToken>());
|
||||
await _tracking.DidNotReceiveWithAnyArgs().RecordAttemptAsync(
|
||||
default, default!, default, default, default, default);
|
||||
await _tracking.DidNotReceiveWithAnyArgs().RecordTerminalAsync(
|
||||
default, default!, default, default, default);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForwardAsync_Attempted_WritesAuditEvent_AndRecordsAttempt()
|
||||
{
|
||||
var sut = CreateSut();
|
||||
var packet = AttemptedPacket(retryCount: 2, lastError: "HTTP 503", httpStatus: 503);
|
||||
|
||||
await sut.ForwardAsync(packet, CancellationToken.None);
|
||||
|
||||
await _writer.Received(1).WriteAsync(
|
||||
Arg.Is<AuditEvent>(e =>
|
||||
e.EventId == packet.Audit.EventId
|
||||
&& e.Kind == AuditKind.ApiCallCached
|
||||
&& e.Status == AuditStatus.Attempted),
|
||||
Arg.Any<CancellationToken>());
|
||||
|
||||
await _tracking.Received(1).RecordAttemptAsync(
|
||||
_id, "Attempted", 2, "HTTP 503", 503, Arg.Any<CancellationToken>());
|
||||
await _tracking.DidNotReceiveWithAnyArgs().RecordEnqueueAsync(
|
||||
default, default!, default, default, default, default);
|
||||
await _tracking.DidNotReceiveWithAnyArgs().RecordTerminalAsync(
|
||||
default, default!, default, default, default);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForwardAsync_Resolve_WritesAuditEvent_AndRecordsTerminal()
|
||||
{
|
||||
var sut = CreateSut();
|
||||
var packet = ResolvePacket("Delivered");
|
||||
|
||||
await sut.ForwardAsync(packet, CancellationToken.None);
|
||||
|
||||
await _writer.Received(1).WriteAsync(
|
||||
Arg.Is<AuditEvent>(e =>
|
||||
e.EventId == packet.Audit.EventId
|
||||
&& e.Kind == AuditKind.CachedResolve
|
||||
&& e.Status == AuditStatus.Delivered),
|
||||
Arg.Any<CancellationToken>());
|
||||
|
||||
await _tracking.Received(1).RecordTerminalAsync(
|
||||
_id, "Delivered", null, null, Arg.Any<CancellationToken>());
|
||||
await _tracking.DidNotReceiveWithAnyArgs().RecordEnqueueAsync(
|
||||
default, default!, default, default, default, default);
|
||||
await _tracking.DidNotReceiveWithAnyArgs().RecordAttemptAsync(
|
||||
default, default!, default, default, default, default);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForwardAsync_WriterThrows_Logs_DoesNotPropagate()
|
||||
{
|
||||
_writer.WriteAsync(Arg.Any<AuditEvent>(), Arg.Any<CancellationToken>())
|
||||
.Throws(new InvalidOperationException("primary down"));
|
||||
|
||||
var sut = CreateSut();
|
||||
|
||||
// Must not throw.
|
||||
await sut.ForwardAsync(SubmitPacket(), CancellationToken.None);
|
||||
|
||||
// Tracking still attempted — emission of the two halves is independent
|
||||
// so a writer outage cannot starve the operational row (and vice-versa).
|
||||
await _tracking.Received(1).RecordEnqueueAsync(
|
||||
Arg.Any<TrackedOperationId>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string?>(),
|
||||
Arg.Any<string?>(),
|
||||
Arg.Any<string?>(),
|
||||
Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForwardAsync_TrackingStoreThrows_Logs_DoesNotPropagate()
|
||||
{
|
||||
_tracking.RecordEnqueueAsync(
|
||||
Arg.Any<TrackedOperationId>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string?>(),
|
||||
Arg.Any<string?>(),
|
||||
Arg.Any<string?>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Throws(new InvalidOperationException("sqlite locked"));
|
||||
|
||||
var sut = CreateSut();
|
||||
|
||||
await sut.ForwardAsync(SubmitPacket(), CancellationToken.None);
|
||||
|
||||
// Writer still attempted — emission halves are independent.
|
||||
await _writer.Received(1).WriteAsync(
|
||||
Arg.Any<AuditEvent>(), Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ForwardAsync_NullPacket_Throws()
|
||||
{
|
||||
var sut = CreateSut();
|
||||
|
||||
await Assert.ThrowsAsync<ArgumentNullException>(
|
||||
() => sut.ForwardAsync(null!, CancellationToken.None));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user