test(auditlog): cached call combined telemetry end-to-end (#23 M3)

This commit is contained in:
Joseph Doherty
2026-05-20 15:25:10 -04:00
parent a3b0fb7f08
commit f4a7be4929
4 changed files with 577 additions and 0 deletions

View File

@@ -0,0 +1,125 @@
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.AuditLog.Telemetry;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Messages.Integration;
using ScadaLink.Commons.Types;
using ScadaLink.Communication.Grpc;
using Google.Protobuf.WellKnownTypes;
using Timestamp = Google.Protobuf.WellKnownTypes.Timestamp;
namespace ScadaLink.AuditLog.Tests.Integration.Infrastructure;
/// <summary>
/// Test-side combined-telemetry dispatcher: wraps a production
/// <see cref="ICachedCallTelemetryForwarder"/> so the local audit + tracking
/// stores still get written, then projects the same packet onto the wire as a
/// <see cref="CachedTelemetryBatch"/> and pushes it through the supplied
/// <see cref="ISiteStreamAuditClient"/>. The bridge can be composed into the
/// existing <see cref="CachedCallLifecycleBridge"/> chain as the
/// <see cref="ICachedCallTelemetryForwarder"/> implementation so a single
/// observer callback fans out to both halves.
/// </summary>
/// <remarks>
/// <para>
/// Production wiring keeps the wire push deferred to M6 — the site SQLite hot
/// path is the source of truth and a future M6 drain will push the rows
/// through the gRPC client. For end-to-end testing today we need a way to
/// exercise the central dual-write transaction immediately, so this
/// dispatcher synthesises the wire packet inline and round-trips it through
/// the stub client. The shape mirrors what the M6 drain will eventually emit.
/// </para>
/// <para>
/// <b>Best-effort:</b> both the inner forwarder call and the wire push are
/// wrapped in independent try/catch blocks. A thrown wire client doesn't
/// abort the local writes (the audit row is already in SQLite); a thrown
/// local forwarder doesn't abort the wire push (central still gets the
/// dual-write attempt).
/// </para>
/// </remarks>
public sealed class CombinedTelemetryDispatcher : ICachedCallTelemetryForwarder
{
private readonly ICachedCallTelemetryForwarder _inner;
private readonly ISiteStreamAuditClient _wireClient;
public CombinedTelemetryDispatcher(
ICachedCallTelemetryForwarder inner,
ISiteStreamAuditClient wireClient)
{
_inner = inner ?? throw new ArgumentNullException(nameof(inner));
_wireClient = wireClient ?? throw new ArgumentNullException(nameof(wireClient));
}
/// <inheritdoc/>
public async Task ForwardAsync(CachedCallTelemetry telemetry, CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(telemetry);
// Inner forwarder writes the audit row to SQLite + updates the
// tracking store. Best-effort — exceptions are already swallowed
// inside the production forwarder, but wrap defensively here too in
// case a test substitutes a stricter inner.
try
{
await _inner.ForwardAsync(telemetry, ct).ConfigureAwait(false);
}
catch
{
// Swallow — alog.md §7 best-effort contract.
}
// Project the same packet onto the wire and push it through the stub
// client. This is the bit a future M6 drain will subsume — until
// then the test wraps the two halves into one observer-driven step.
try
{
var batch = new CachedTelemetryBatch();
batch.Packets.Add(BuildPacket(telemetry));
await _wireClient.IngestCachedTelemetryAsync(batch, ct).ConfigureAwait(false);
}
catch
{
// Swallow — the audit row is still in SQLite for a future drain;
// the central row will materialise the next time the wire path
// is exercised (or via the M6 reconciliation pull).
}
}
private static CachedTelemetryPacket BuildPacket(CachedCallTelemetry telemetry)
{
return new CachedTelemetryPacket
{
AuditEvent = AuditEventMapper.ToDto(telemetry.Audit),
Operational = ToOperationalDto(telemetry.Operational),
};
}
private static SiteCallOperationalDto ToOperationalDto(SiteCallOperational op)
{
var dto = new SiteCallOperationalDto
{
TrackedOperationId = op.TrackedOperationId.Value.ToString("D"),
Channel = op.Channel,
Target = op.Target,
SourceSite = op.SourceSite,
Status = op.Status,
RetryCount = op.RetryCount,
LastError = op.LastError ?? string.Empty,
CreatedAtUtc = Timestamp.FromDateTime(EnsureUtc(op.CreatedAtUtc)),
UpdatedAtUtc = Timestamp.FromDateTime(EnsureUtc(op.UpdatedAtUtc)),
};
if (op.HttpStatus.HasValue)
{
dto.HttpStatus = op.HttpStatus.Value;
}
if (op.TerminalAtUtc.HasValue)
{
dto.TerminalAtUtc = Timestamp.FromDateTime(EnsureUtc(op.TerminalAtUtc.Value));
}
return dto;
}
private static DateTime EnsureUtc(DateTime value) =>
value.Kind == DateTimeKind.Utc
? value
: DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
}

View File

@@ -0,0 +1,175 @@
using Akka.Actor;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using ScadaLink.AuditLog.Central;
using ScadaLink.AuditLog.Site;
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.Commons.Interfaces;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Interfaces.Services;
using ScadaLink.Commons.Messages.Integration;
using ScadaLink.ConfigurationDatabase;
using ScadaLink.ConfigurationDatabase.Repositories;
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
using ScadaLink.SiteRuntime.Tracking;
namespace ScadaLink.AuditLog.Tests.Integration.Infrastructure;
/// <summary>
/// Shared end-to-end harness for the M3 cached-call combined telemetry tests
/// (G2/G3/G4). Composes the full pipeline:
/// <list type="bullet">
/// <item><description>Site-local SQLite <see cref="SqliteAuditWriter"/> (in-memory) +
/// <see cref="RingBufferFallback"/> + <see cref="FallbackAuditWriter"/>.</description></item>
/// <item><description>Site-local SQLite <see cref="OperationTrackingStore"/> (in-memory).</description></item>
/// <item><description>Production <see cref="CachedCallTelemetryForwarder"/> wrapped by a
/// test-side <see cref="CombinedTelemetryDispatcher"/> that also pushes each
/// packet through the stub gRPC client.</description></item>
/// <item><description><see cref="CachedCallLifecycleBridge"/> wired to the
/// dispatcher so a single observer call fans out audit + tracking + wire.</description></item>
/// <item><description><see cref="DirectActorSiteStreamAuditClient"/> connected
/// to an <see cref="AuditLogIngestActor"/> backed by the real
/// <see cref="AuditLogRepository"/> + <see cref="SiteCallAuditRepository"/>
/// against the per-test <see cref="MsSqlMigrationFixture"/> database.</description></item>
/// </list>
/// </summary>
/// <remarks>
/// <para>
/// Disposal cleans up the in-memory SQLite stores. The Akka actor system is
/// owned by the calling <see cref="Akka.TestKit.Xunit2.TestKit"/>; the harness
/// only owns the ingest actor IActorRef and the underlying repositories'
/// DbContext lifecycle.
/// </para>
/// </remarks>
public sealed class CombinedTelemetryHarness : IAsyncDisposable
{
public SqliteAuditWriter SqliteWriter { get; }
public RingBufferFallback Ring { get; }
public FallbackAuditWriter FallbackWriter { get; }
public OperationTrackingStore TrackingStore { get; }
public CachedCallTelemetryForwarder InnerForwarder { get; }
public CombinedTelemetryDispatcher Dispatcher { get; }
public CachedCallLifecycleBridge Bridge { get; }
public DirectActorSiteStreamAuditClient StubClient { get; }
public IActorRef IngestActor { get; }
public IServiceProvider ServiceProvider { get; }
private readonly MsSqlMigrationFixture _fixture;
private bool _disposed;
public CombinedTelemetryHarness(
MsSqlMigrationFixture fixture,
Akka.TestKit.Xunit2.TestKit testKit,
Func<ScadaLinkDbContext, ISiteCallAuditRepository>? siteCallRepoOverride = null)
{
_fixture = fixture ?? throw new ArgumentNullException(nameof(fixture));
ArgumentNullException.ThrowIfNull(testKit);
// Site SQLite — unique in-memory database per harness so tests don't share
// an audit queue. Mode=Memory + Cache=Shared keeps the file alive for the
// lifetime of the writer connection.
SqliteWriter = new SqliteAuditWriter(
Options.Create(new SqliteAuditWriterOptions
{
DatabasePath = "ignored",
BatchSize = 64,
ChannelCapacity = 1024,
}),
NullLogger<SqliteAuditWriter>.Instance,
connectionStringOverride:
$"Data Source=file:cachedcall-g-{Guid.NewGuid():N}?mode=memory&cache=shared");
Ring = new RingBufferFallback();
FallbackWriter = new FallbackAuditWriter(
SqliteWriter, Ring, new NoOpAuditWriteFailureCounter(),
NullLogger<FallbackAuditWriter>.Instance);
TrackingStore = new OperationTrackingStore(
Options.Create(new OperationTrackingOptions
{
// Same shared-in-memory pattern as the audit writer.
ConnectionString =
$"Data Source=file:tracking-g-{Guid.NewGuid():N}?mode=memory&cache=shared",
}),
NullLogger<OperationTrackingStore>.Instance);
// Central wiring: real repositories backed by the MSSQL fixture's DB.
ServiceProvider = BuildCentralServiceProvider(siteCallRepoOverride);
IngestActor = testKit.Sys.ActorOf(Props.Create(() => new AuditLogIngestActor(
ServiceProvider,
NullLogger<AuditLogIngestActor>.Instance)));
StubClient = new DirectActorSiteStreamAuditClient(IngestActor);
// Production forwarder writes the local stores; the dispatcher wraps
// it to ALSO push the same packet to central via the stub client.
InnerForwarder = new CachedCallTelemetryForwarder(
FallbackWriter, TrackingStore, NullLogger<CachedCallTelemetryForwarder>.Instance);
Dispatcher = new CombinedTelemetryDispatcher(InnerForwarder, StubClient);
Bridge = new CachedCallLifecycleBridge(Dispatcher, NullLogger<CachedCallLifecycleBridge>.Instance);
}
/// <summary>
/// Convenience: emit the initial submit packet directly through the
/// dispatcher (the bridge's hooks fire only for S&amp;F retry-loop
/// attempts; submit-row emission happens at the script call site).
/// </summary>
public Task EmitSubmitAsync(CachedCallTelemetry submit, CancellationToken ct = default) =>
Dispatcher.ForwardAsync(submit, ct);
/// <summary>
/// Convenience: route a per-attempt or terminal outcome through the bridge.
/// </summary>
public Task EmitAttemptAsync(CachedCallAttemptContext context, CancellationToken ct = default) =>
Bridge.OnAttemptCompletedAsync(context, ct);
public ScadaLinkDbContext CreateReadContext()
{
var options = new DbContextOptionsBuilder<ScadaLinkDbContext>()
.UseSqlServer(_fixture.ConnectionString)
.Options;
return new ScadaLinkDbContext(options);
}
private IServiceProvider BuildCentralServiceProvider(
Func<ScadaLinkDbContext, ISiteCallAuditRepository>? siteCallRepoOverride)
{
var services = new ServiceCollection();
services.AddDbContext<ScadaLinkDbContext>(opts =>
opts.UseSqlServer(_fixture.ConnectionString)
.ConfigureWarnings(w => w.Ignore(
Microsoft.EntityFrameworkCore.Diagnostics.RelationalEventId.PendingModelChangesWarning)));
services.AddScoped<IAuditLogRepository>(sp =>
new AuditLogRepository(sp.GetRequiredService<ScadaLinkDbContext>()));
if (siteCallRepoOverride is null)
{
services.AddScoped<ISiteCallAuditRepository>(sp =>
new SiteCallAuditRepository(sp.GetRequiredService<ScadaLinkDbContext>()));
}
else
{
services.AddScoped(sp =>
siteCallRepoOverride(sp.GetRequiredService<ScadaLinkDbContext>()));
}
return services.BuildServiceProvider();
}
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
await SqliteWriter.DisposeAsync().ConfigureAwait(false);
await TrackingStore.DisposeAsync().ConfigureAwait(false);
if (ServiceProvider is IAsyncDisposable asyncSp)
{
await asyncSp.DisposeAsync().ConfigureAwait(false);
}
else if (ServiceProvider is IDisposable sp)
{
sp.Dispose();
}
}
}