feat(siteruntime): Database.CachedWrite emits combined telemetry + S&F audit bridge (#23 M3)
Wire the M3 cached-call audit pipeline end-to-end for the database
channel and close the loop between the S&F lifecycle observer and the
site-side dual emitter.
* DatabaseCachedWriteEmissionTests covers Database.CachedWrite (set up
in Bundle E3): mints a TrackedOperationId, emits one CachedSubmit
packet on DbOutbound, threads the id into IDatabaseGateway, and is
best-effort on a thrown forwarder. Mirrors ExternalSystem.CachedCall
coverage from E3.
* CachedCallLifecycleBridge (new) implements ICachedCallLifecycleObserver
and lives alongside CachedCallTelemetryForwarder. The bridge ingests
per-attempt notifications from the S&F retry loop and fans them out
to the forwarder:
- TransientFailure -> 1 Attempted row
- Delivered -> Attempted + CachedResolve(Delivered)
- PermanentFailure -> Attempted + CachedResolve(Parked)
- ParkedMaxRetries -> Attempted + CachedResolve(Parked)
Channel string -> AuditKind mapping (ApiOutbound->ApiCallCached,
DbOutbound->DbWriteCached). Best-effort top-level catch swallows any
unexpected throw so the S&F retry bookkeeping is never disturbed.
* Bridge tests (7) cover all four outcomes, channel mapping, provenance
propagation, and the no-throw-on-forwarder-failure contract.
Bundle F (Host registration) will instantiate the bridge and inject it
into StoreAndForwardService.cachedCallObserver, closing the wiring path
end-to-end.
Bundle E task E6.
This commit is contained in:
@@ -0,0 +1,170 @@
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Moq;
|
||||
using ScadaLink.Commons.Entities.Audit;
|
||||
using ScadaLink.Commons.Interfaces.Services;
|
||||
using ScadaLink.Commons.Messages.Integration;
|
||||
using ScadaLink.Commons.Types;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
using ScadaLink.SiteRuntime.Scripts;
|
||||
|
||||
namespace ScadaLink.SiteRuntime.Tests.Scripts;
|
||||
|
||||
/// <summary>
|
||||
/// Audit Log #23 — M3 Bundle E (Task E6): every script-initiated
|
||||
/// <c>Database.CachedWrite</c> emits exactly one <c>CachedSubmit</c>
|
||||
/// combined-telemetry packet at enqueue time on the <c>DbOutbound</c>
|
||||
/// channel, returns a fresh <see cref="TrackedOperationId"/>, and threads
|
||||
/// the id into the database gateway so the store-and-forward retry loop can
|
||||
/// emit per-attempt + terminal telemetry under the same id.
|
||||
/// </summary>
|
||||
public class DatabaseCachedWriteEmissionTests
|
||||
{
|
||||
private sealed class CapturingForwarder : ICachedCallTelemetryForwarder
|
||||
{
|
||||
public List<CachedCallTelemetry> Telemetry { get; } = new();
|
||||
public Exception? ThrowOnForward { get; set; }
|
||||
|
||||
public Task ForwardAsync(CachedCallTelemetry telemetry, CancellationToken ct = default)
|
||||
{
|
||||
if (ThrowOnForward != null)
|
||||
{
|
||||
return Task.FromException(ThrowOnForward);
|
||||
}
|
||||
Telemetry.Add(telemetry);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
private const string SiteId = "site-77";
|
||||
private const string InstanceName = "Plant.Pump42";
|
||||
private const string SourceScript = "ScriptActor:WriteAudit";
|
||||
|
||||
private static ScriptRuntimeContext.DatabaseHelper CreateHelper(
|
||||
IDatabaseGateway gateway,
|
||||
ICachedCallTelemetryForwarder? forwarder)
|
||||
{
|
||||
return new ScriptRuntimeContext.DatabaseHelper(
|
||||
gateway,
|
||||
InstanceName,
|
||||
NullLogger.Instance,
|
||||
siteId: SiteId,
|
||||
sourceScript: SourceScript,
|
||||
cachedForwarder: forwarder);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CachedWrite_EmitsSubmitTelemetry_OnEnqueue_KindCachedSubmit_ChannelDbOutbound()
|
||||
{
|
||||
var gateway = new Mock<IDatabaseGateway>();
|
||||
gateway
|
||||
.Setup(g => g.CachedWriteAsync(
|
||||
"myDb", "INSERT INTO t VALUES (1)",
|
||||
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
|
||||
InstanceName,
|
||||
It.IsAny<CancellationToken>(),
|
||||
It.IsAny<TrackedOperationId?>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
var forwarder = new CapturingForwarder();
|
||||
|
||||
var helper = CreateHelper(gateway.Object, forwarder);
|
||||
var trackedId = await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)");
|
||||
|
||||
Assert.NotEqual(default, trackedId);
|
||||
var packet = Assert.Single(forwarder.Telemetry);
|
||||
|
||||
Assert.Equal(AuditChannel.DbOutbound, packet.Audit.Channel);
|
||||
Assert.Equal(AuditKind.CachedSubmit, packet.Audit.Kind);
|
||||
Assert.Equal(AuditStatus.Submitted, packet.Audit.Status);
|
||||
Assert.Equal("myDb", packet.Audit.Target);
|
||||
Assert.Equal(trackedId.Value, packet.Audit.CorrelationId);
|
||||
|
||||
Assert.Equal(trackedId, packet.Operational.TrackedOperationId);
|
||||
Assert.Equal("DbOutbound", packet.Operational.Channel);
|
||||
Assert.Equal("myDb", packet.Operational.Target);
|
||||
Assert.Equal(SiteId, packet.Operational.SourceSite);
|
||||
Assert.Equal("Submitted", packet.Operational.Status);
|
||||
Assert.Equal(0, packet.Operational.RetryCount);
|
||||
Assert.Null(packet.Operational.TerminalAtUtc);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CachedWrite_ProvenancePopulated()
|
||||
{
|
||||
var gateway = new Mock<IDatabaseGateway>();
|
||||
gateway
|
||||
.Setup(g => g.CachedWriteAsync(
|
||||
It.IsAny<string>(), It.IsAny<string>(),
|
||||
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
|
||||
It.IsAny<string?>(),
|
||||
It.IsAny<CancellationToken>(),
|
||||
It.IsAny<TrackedOperationId?>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
var forwarder = new CapturingForwarder();
|
||||
|
||||
var helper = CreateHelper(gateway.Object, forwarder);
|
||||
await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)");
|
||||
|
||||
var packet = Assert.Single(forwarder.Telemetry);
|
||||
Assert.Equal(SiteId, packet.Audit.SourceSiteId);
|
||||
Assert.Equal(InstanceName, packet.Audit.SourceInstanceId);
|
||||
Assert.Equal(SourceScript, packet.Audit.SourceScript);
|
||||
Assert.Equal(SiteId, packet.Operational.SourceSite);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CachedWrite_ReturnsTrackedOperationId_ThreadsIdToGateway()
|
||||
{
|
||||
var gateway = new Mock<IDatabaseGateway>();
|
||||
gateway
|
||||
.Setup(g => g.CachedWriteAsync(
|
||||
It.IsAny<string>(), It.IsAny<string>(),
|
||||
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
|
||||
It.IsAny<string?>(),
|
||||
It.IsAny<CancellationToken>(),
|
||||
It.IsAny<TrackedOperationId?>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
var forwarder = new CapturingForwarder();
|
||||
|
||||
var helper = CreateHelper(gateway.Object, forwarder);
|
||||
var trackedId = await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)");
|
||||
|
||||
Assert.NotEqual(default, trackedId);
|
||||
gateway.Verify(g => g.CachedWriteAsync(
|
||||
"myDb", "INSERT INTO t VALUES (1)",
|
||||
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
|
||||
InstanceName,
|
||||
It.IsAny<CancellationToken>(),
|
||||
trackedId),
|
||||
Times.Once);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CachedWrite_ForwarderThrows_StillReturnsTrackedOperationId()
|
||||
{
|
||||
var gateway = new Mock<IDatabaseGateway>();
|
||||
gateway
|
||||
.Setup(g => g.CachedWriteAsync(
|
||||
It.IsAny<string>(), It.IsAny<string>(),
|
||||
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
|
||||
It.IsAny<string?>(),
|
||||
It.IsAny<CancellationToken>(),
|
||||
It.IsAny<TrackedOperationId?>()))
|
||||
.Returns(Task.CompletedTask);
|
||||
var forwarder = new CapturingForwarder
|
||||
{
|
||||
ThrowOnForward = new InvalidOperationException("simulated forwarder outage"),
|
||||
};
|
||||
|
||||
var helper = CreateHelper(gateway.Object, forwarder);
|
||||
var trackedId = await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)");
|
||||
|
||||
Assert.NotEqual(default, trackedId);
|
||||
gateway.Verify(g => g.CachedWriteAsync(
|
||||
"myDb", "INSERT INTO t VALUES (1)",
|
||||
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
|
||||
InstanceName,
|
||||
It.IsAny<CancellationToken>(),
|
||||
trackedId),
|
||||
Times.Once);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user