test(auditlog): end-to-end sync-call emission via TestKit + MSSQL fixture (#23)
This commit is contained in:
@@ -0,0 +1,341 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.TestKit.Xunit2;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using ScadaLink.AuditLog.Central;
|
||||||
|
using ScadaLink.AuditLog.Site;
|
||||||
|
using ScadaLink.AuditLog.Site.Telemetry;
|
||||||
|
using ScadaLink.Commons.Entities.Audit;
|
||||||
|
using ScadaLink.Commons.Interfaces.Repositories;
|
||||||
|
using ScadaLink.Commons.Messages.Audit;
|
||||||
|
using ScadaLink.Commons.Types.Audit;
|
||||||
|
using ScadaLink.Commons.Types.Enums;
|
||||||
|
using ScadaLink.ConfigurationDatabase;
|
||||||
|
using ScadaLink.ConfigurationDatabase.Repositories;
|
||||||
|
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
|
||||||
|
using ScadaLink.Communication.Grpc;
|
||||||
|
|
||||||
|
namespace ScadaLink.AuditLog.Tests.Integration;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Bundle H — end-to-end test wiring the full Audit Log #23 M2 sync-call pipeline:
|
||||||
|
/// <see cref="FallbackAuditWriter"/> over a <see cref="SqliteAuditWriter"/> backed by
|
||||||
|
/// an in-memory SQLite database; the <see cref="SiteAuditTelemetryActor"/> drains
|
||||||
|
/// Pending rows and pushes them through a stub <see cref="ISiteStreamAuditClient"/>
|
||||||
|
/// that forwards directly to the central <see cref="AuditLogIngestActor"/> backed
|
||||||
|
/// by a real <see cref="AuditLogRepository"/> on the <see cref="MsSqlMigrationFixture"/>.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>
|
||||||
|
/// This is a <b>component-level</b> integration test, not a full Akka-cluster
|
||||||
|
/// test (per the M2 brainstorm decision). The stub gRPC client short-circuits
|
||||||
|
/// the wire so we exercise the real telemetry actor, the real ingest actor, the
|
||||||
|
/// real SQLite writer, and the real MSSQL repository — without standing up a
|
||||||
|
/// Kestrel host or two-cluster topology.
|
||||||
|
/// </para>
|
||||||
|
/// <para>
|
||||||
|
/// The site-side telemetry actor's <c>Drain</c> message is private; rather than
|
||||||
|
/// expose it we drive the drain by setting <c>BusyIntervalSeconds = 1</c> so the
|
||||||
|
/// initial scheduled tick fires within a second of actor start. Tests then
|
||||||
|
/// <see cref="TestKitBase.AwaitAssertAsync"/> until the central repository
|
||||||
|
/// observes the expected rows.
|
||||||
|
/// </para>
|
||||||
|
/// <para>
|
||||||
|
/// Each test uses a unique <c>SourceSiteId</c> (Guid suffix) so concurrent tests
|
||||||
|
/// and the per-fixture MSSQL database lifetime don't interfere with each other.
|
||||||
|
/// </para>
|
||||||
|
/// </remarks>
|
||||||
|
public class SyncCallEmissionEndToEndTests : TestKit, IClassFixture<MsSqlMigrationFixture>
|
||||||
|
{
|
||||||
|
private readonly MsSqlMigrationFixture _fixture;
|
||||||
|
|
||||||
|
public SyncCallEmissionEndToEndTests(MsSqlMigrationFixture fixture)
|
||||||
|
{
|
||||||
|
_fixture = fixture;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string NewSiteId() =>
|
||||||
|
"test-bundle-h-" + Guid.NewGuid().ToString("N").Substring(0, 8);
|
||||||
|
|
||||||
|
private ScadaLinkDbContext CreateContext()
|
||||||
|
{
|
||||||
|
var options = new DbContextOptionsBuilder<ScadaLinkDbContext>()
|
||||||
|
.UseSqlServer(_fixture.ConnectionString)
|
||||||
|
.Options;
|
||||||
|
return new ScadaLinkDbContext(options);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static AuditEvent NewEvent(string siteId, Guid? id = null) => new()
|
||||||
|
{
|
||||||
|
EventId = id ?? Guid.NewGuid(),
|
||||||
|
OccurredAtUtc = DateTime.UtcNow,
|
||||||
|
Channel = AuditChannel.ApiOutbound,
|
||||||
|
Kind = AuditKind.ApiCall,
|
||||||
|
Status = AuditStatus.Delivered,
|
||||||
|
SourceSiteId = siteId,
|
||||||
|
Target = "external-system-a/method",
|
||||||
|
};
|
||||||
|
|
||||||
|
private static IOptions<SqliteAuditWriterOptions> InMemorySqliteOptions() =>
|
||||||
|
Options.Create(new SqliteAuditWriterOptions
|
||||||
|
{
|
||||||
|
// Per-test unique database name + Mode=Memory + Cache=Shared keeps
|
||||||
|
// the in-memory database alive for the duration of the test even
|
||||||
|
// though Microsoft.Data.Sqlite tears the file down with the last
|
||||||
|
// connection. The DatabasePath field is unused because we override
|
||||||
|
// the connection string below.
|
||||||
|
DatabasePath = "ignored",
|
||||||
|
BatchSize = 64,
|
||||||
|
ChannelCapacity = 1024,
|
||||||
|
});
|
||||||
|
|
||||||
|
private static SqliteAuditWriter CreateInMemorySqliteWriter() =>
|
||||||
|
// The 3rd constructor argument is connectionStringOverride. A unique
|
||||||
|
// shared-cache in-memory URI keeps the schema scoped to this writer
|
||||||
|
// instance and torn down when the writer is disposed.
|
||||||
|
new SqliteAuditWriter(
|
||||||
|
InMemorySqliteOptions(),
|
||||||
|
NullLogger<SqliteAuditWriter>.Instance,
|
||||||
|
connectionStringOverride: $"Data Source=file:auditlog-h-{Guid.NewGuid():N}?mode=memory&cache=shared");
|
||||||
|
|
||||||
|
private static IOptions<SiteAuditTelemetryOptions> FastTelemetryOptions() =>
|
||||||
|
Options.Create(new SiteAuditTelemetryOptions
|
||||||
|
{
|
||||||
|
BatchSize = 256,
|
||||||
|
// 1s for both intervals so the initial scheduled tick fires fast
|
||||||
|
// and any failure-driven re-tick also fires fast — without
|
||||||
|
// requiring a public Drain message to be exposed.
|
||||||
|
BusyIntervalSeconds = 1,
|
||||||
|
IdleIntervalSeconds = 1,
|
||||||
|
});
|
||||||
|
|
||||||
|
private IActorRef CreateIngestActor(IAuditLogRepository repo) =>
|
||||||
|
Sys.ActorOf(Props.Create(() => new AuditLogIngestActor(
|
||||||
|
repo,
|
||||||
|
NullLogger<AuditLogIngestActor>.Instance)));
|
||||||
|
|
||||||
|
private IActorRef CreateTelemetryActor(
|
||||||
|
ISiteAuditQueue queue,
|
||||||
|
ISiteStreamAuditClient client) =>
|
||||||
|
Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor(
|
||||||
|
queue,
|
||||||
|
client,
|
||||||
|
FastTelemetryOptions(),
|
||||||
|
NullLogger<SiteAuditTelemetryActor>.Instance)));
|
||||||
|
|
||||||
|
[SkippableFact]
|
||||||
|
public async Task EndToEnd_OneWrittenEvent_Reaches_Central_AuditLog_Within_Reasonable_Time()
|
||||||
|
{
|
||||||
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||||
|
|
||||||
|
var siteId = NewSiteId();
|
||||||
|
|
||||||
|
// Real central wiring: repo + ingest actor.
|
||||||
|
await using var ingestContext = CreateContext();
|
||||||
|
var ingestRepo = new AuditLogRepository(ingestContext);
|
||||||
|
var ingestActor = CreateIngestActor(ingestRepo);
|
||||||
|
|
||||||
|
// Real site wiring: SQLite (in-memory) + ring + fallback + telemetry.
|
||||||
|
await using var sqliteWriter = CreateInMemorySqliteWriter();
|
||||||
|
var ring = new RingBufferFallback();
|
||||||
|
var fallback = new FallbackAuditWriter(
|
||||||
|
sqliteWriter,
|
||||||
|
ring,
|
||||||
|
new NoOpAuditWriteFailureCounter(),
|
||||||
|
NullLogger<FallbackAuditWriter>.Instance);
|
||||||
|
|
||||||
|
var stubClient = new DirectActorSiteStreamAuditClient(ingestActor);
|
||||||
|
CreateTelemetryActor(sqliteWriter, stubClient);
|
||||||
|
|
||||||
|
// Act — one fresh event written via the FallbackAuditWriter hot-path.
|
||||||
|
var evt = NewEvent(siteId);
|
||||||
|
await fallback.WriteAsync(evt);
|
||||||
|
|
||||||
|
// Assert — the central AuditLog row materialises within a window that
|
||||||
|
// covers initial tick (1s) + a generous slack for SQLite + the actor
|
||||||
|
// round-trip + EF/MSSQL latency.
|
||||||
|
await AwaitAssertAsync(async () =>
|
||||||
|
{
|
||||||
|
await using var readContext = CreateContext();
|
||||||
|
var readRepo = new AuditLogRepository(readContext);
|
||||||
|
var rows = await readRepo.QueryAsync(
|
||||||
|
new AuditLogQueryFilter(SourceSiteId: siteId),
|
||||||
|
new AuditLogPaging(PageSize: 10));
|
||||||
|
Assert.Single(rows);
|
||||||
|
Assert.Equal(evt.EventId, rows[0].EventId);
|
||||||
|
// Central stamps IngestedAtUtc; site never sets it.
|
||||||
|
Assert.NotNull(rows[0].IngestedAtUtc);
|
||||||
|
}, TimeSpan.FromSeconds(15));
|
||||||
|
}
|
||||||
|
|
||||||
|
[SkippableFact]
|
||||||
|
public async Task EndToEnd_GrpcStubError_RowStays_Pending_NextTick_Succeeds()
|
||||||
|
{
|
||||||
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||||
|
|
||||||
|
var siteId = NewSiteId();
|
||||||
|
|
||||||
|
await using var ingestContext = CreateContext();
|
||||||
|
var ingestRepo = new AuditLogRepository(ingestContext);
|
||||||
|
var ingestActor = CreateIngestActor(ingestRepo);
|
||||||
|
|
||||||
|
await using var sqliteWriter = CreateInMemorySqliteWriter();
|
||||||
|
var ring = new RingBufferFallback();
|
||||||
|
var fallback = new FallbackAuditWriter(
|
||||||
|
sqliteWriter,
|
||||||
|
ring,
|
||||||
|
new NoOpAuditWriteFailureCounter(),
|
||||||
|
NullLogger<FallbackAuditWriter>.Instance);
|
||||||
|
|
||||||
|
// Stub fails the first push; subsequent calls flow through. The
|
||||||
|
// telemetry actor's on-failure branch keeps rows in Pending state, so
|
||||||
|
// the next tick re-reads them and tries again.
|
||||||
|
var stubClient = new DirectActorSiteStreamAuditClient(ingestActor)
|
||||||
|
{
|
||||||
|
FailNextCallCount = 1,
|
||||||
|
};
|
||||||
|
CreateTelemetryActor(sqliteWriter, stubClient);
|
||||||
|
|
||||||
|
var evt = NewEvent(siteId);
|
||||||
|
await fallback.WriteAsync(evt);
|
||||||
|
|
||||||
|
// Wait long enough for at least one failure-then-success cycle. With
|
||||||
|
// both intervals = 1s the actor retries quickly; allow 15s for slow CI.
|
||||||
|
await AwaitAssertAsync(async () =>
|
||||||
|
{
|
||||||
|
await using var readContext = CreateContext();
|
||||||
|
var readRepo = new AuditLogRepository(readContext);
|
||||||
|
var rows = await readRepo.QueryAsync(
|
||||||
|
new AuditLogQueryFilter(SourceSiteId: siteId),
|
||||||
|
new AuditLogPaging(PageSize: 10));
|
||||||
|
Assert.Single(rows);
|
||||||
|
Assert.Equal(evt.EventId, rows[0].EventId);
|
||||||
|
}, TimeSpan.FromSeconds(15));
|
||||||
|
|
||||||
|
Assert.True(stubClient.CallCount >= 2,
|
||||||
|
$"Expected at least one failed push + one successful push; saw {stubClient.CallCount} total client calls.");
|
||||||
|
|
||||||
|
// The site SQLite row must have flipped to Forwarded after the
|
||||||
|
// successful retry. ReadPendingAsync only returns Pending rows; the
|
||||||
|
// row should NOT show up there anymore.
|
||||||
|
var stillPending = await sqliteWriter.ReadPendingAsync(64);
|
||||||
|
Assert.DoesNotContain(stillPending, p => p.EventId == evt.EventId);
|
||||||
|
}
|
||||||
|
|
||||||
|
[SkippableFact]
|
||||||
|
public async Task EndToEnd_DuplicateSubmit_OnlyOneCentralRow()
|
||||||
|
{
|
||||||
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||||
|
|
||||||
|
var siteId = NewSiteId();
|
||||||
|
|
||||||
|
await using var ingestContext = CreateContext();
|
||||||
|
var ingestRepo = new AuditLogRepository(ingestContext);
|
||||||
|
var ingestActor = CreateIngestActor(ingestRepo);
|
||||||
|
|
||||||
|
await using var sqliteWriter = CreateInMemorySqliteWriter();
|
||||||
|
var ring = new RingBufferFallback();
|
||||||
|
var fallback = new FallbackAuditWriter(
|
||||||
|
sqliteWriter,
|
||||||
|
ring,
|
||||||
|
new NoOpAuditWriteFailureCounter(),
|
||||||
|
NullLogger<FallbackAuditWriter>.Instance);
|
||||||
|
|
||||||
|
var stubClient = new DirectActorSiteStreamAuditClient(ingestActor);
|
||||||
|
CreateTelemetryActor(sqliteWriter, stubClient);
|
||||||
|
|
||||||
|
// Both writes carry the SAME EventId. Site SQLite's PRIMARY KEY
|
||||||
|
// constraint and the central repo's InsertIfNotExistsAsync both
|
||||||
|
// enforce first-write-wins, so only one central row must materialise.
|
||||||
|
var sharedId = Guid.NewGuid();
|
||||||
|
var evt1 = NewEvent(siteId, sharedId);
|
||||||
|
var evt2 = NewEvent(siteId, sharedId);
|
||||||
|
|
||||||
|
await fallback.WriteAsync(evt1);
|
||||||
|
await fallback.WriteAsync(evt2);
|
||||||
|
|
||||||
|
await AwaitAssertAsync(async () =>
|
||||||
|
{
|
||||||
|
await using var readContext = CreateContext();
|
||||||
|
var readRepo = new AuditLogRepository(readContext);
|
||||||
|
var rows = await readRepo.QueryAsync(
|
||||||
|
new AuditLogQueryFilter(SourceSiteId: siteId),
|
||||||
|
new AuditLogPaging(PageSize: 10));
|
||||||
|
Assert.Single(rows);
|
||||||
|
Assert.Equal(sharedId, rows[0].EventId);
|
||||||
|
}, TimeSpan.FromSeconds(15));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Test double for <see cref="ISiteStreamAuditClient"/> that short-circuits
|
||||||
|
/// the gRPC wire and forwards the batch directly to a central
|
||||||
|
/// <see cref="AuditLogIngestActor"/> via Akka <see cref="Futures.Ask"/>. The
|
||||||
|
/// Akka <see cref="IngestAuditEventsReply"/> is converted to the proto
|
||||||
|
/// <see cref="IngestAck"/> that the telemetry actor expects.
|
||||||
|
/// </summary>
|
||||||
|
private sealed class DirectActorSiteStreamAuditClient : ISiteStreamAuditClient
|
||||||
|
{
|
||||||
|
private readonly IActorRef _ingestActor;
|
||||||
|
private int _failsRemaining;
|
||||||
|
private int _callCount;
|
||||||
|
|
||||||
|
public DirectActorSiteStreamAuditClient(IActorRef ingestActor)
|
||||||
|
{
|
||||||
|
_ingestActor = ingestActor ?? throw new ArgumentNullException(nameof(ingestActor));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// When > 0, the next <c>FailNextCallCount</c> invocations of
|
||||||
|
/// <see cref="IngestAuditEventsAsync"/> throw to simulate a gRPC error;
|
||||||
|
/// after that count is exhausted, calls succeed normally.
|
||||||
|
/// </summary>
|
||||||
|
public int FailNextCallCount
|
||||||
|
{
|
||||||
|
get => _failsRemaining;
|
||||||
|
set => _failsRemaining = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int CallCount => Volatile.Read(ref _callCount);
|
||||||
|
|
||||||
|
public async Task<IngestAck> IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct)
|
||||||
|
{
|
||||||
|
Interlocked.Increment(ref _callCount);
|
||||||
|
|
||||||
|
// Atomically consume one of the queued failures, if any. This
|
||||||
|
// lets the test arm a deterministic number of failures before the
|
||||||
|
// stub recovers.
|
||||||
|
if (Interlocked.Decrement(ref _failsRemaining) >= 0)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException("simulated gRPC failure for test");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decrement under-ran into negative territory; clamp at -1 to keep
|
||||||
|
// the field bounded even under many calls.
|
||||||
|
Interlocked.Exchange(ref _failsRemaining, -1);
|
||||||
|
|
||||||
|
// Decode the proto batch back into AuditEvent records — this
|
||||||
|
// mirrors what the production SiteStreamGrpcServer does before
|
||||||
|
// dispatching to the ingest actor (see Bundle D's gRPC handler).
|
||||||
|
var events = new List<AuditEvent>(batch.Events.Count);
|
||||||
|
foreach (var dto in batch.Events)
|
||||||
|
{
|
||||||
|
events.Add(ScadaLink.AuditLog.Telemetry.AuditEventMapper.FromDto(dto));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ask the central actor; the reply carries the accepted EventIds.
|
||||||
|
var reply = await _ingestActor
|
||||||
|
.Ask<IngestAuditEventsReply>(
|
||||||
|
new IngestAuditEventsCommand(events),
|
||||||
|
TimeSpan.FromSeconds(10))
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
|
||||||
|
var ack = new IngestAck();
|
||||||
|
foreach (var id in reply.AcceptedEventIds)
|
||||||
|
{
|
||||||
|
ack.AcceptedEventIds.Add(id.ToString());
|
||||||
|
}
|
||||||
|
return ack;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user