Files
scadalink-design/tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs

342 lines
14 KiB
C#

using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using ScadaLink.AuditLog.Central;
using ScadaLink.AuditLog.Site;
using ScadaLink.AuditLog.Site.Telemetry;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Commons.Types.Audit;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.ConfigurationDatabase;
using ScadaLink.ConfigurationDatabase.Repositories;
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.AuditLog.Tests.Integration;
/// <summary>
/// Bundle H — end-to-end test wiring the full Audit Log #23 M2 sync-call pipeline:
/// <see cref="FallbackAuditWriter"/> over a <see cref="SqliteAuditWriter"/> backed by
/// an in-memory SQLite database; the <see cref="SiteAuditTelemetryActor"/> drains
/// Pending rows and pushes them through a stub <see cref="ISiteStreamAuditClient"/>
/// that forwards directly to the central <see cref="AuditLogIngestActor"/> backed
/// by a real <see cref="AuditLogRepository"/> on the <see cref="MsSqlMigrationFixture"/>.
/// </summary>
/// <remarks>
/// <para>
/// This is a <b>component-level</b> integration test, not a full Akka-cluster
/// test (per the M2 brainstorm decision). The stub gRPC client short-circuits
/// the wire so we exercise the real telemetry actor, the real ingest actor, the
/// real SQLite writer, and the real MSSQL repository — without standing up a
/// Kestrel host or two-cluster topology.
/// </para>
/// <para>
/// The site-side telemetry actor's <c>Drain</c> message is private; rather than
/// expose it we drive the drain by setting <c>BusyIntervalSeconds = 1</c> so the
/// initial scheduled tick fires within a second of actor start. Tests then
/// <see cref="TestKitBase.AwaitAssertAsync"/> until the central repository
/// observes the expected rows.
/// </para>
/// <para>
/// Each test uses a unique <c>SourceSiteId</c> (Guid suffix) so concurrent tests
/// and the per-fixture MSSQL database lifetime don't interfere with each other.
/// </para>
/// </remarks>
public class SyncCallEmissionEndToEndTests : TestKit, IClassFixture<MsSqlMigrationFixture>
{
private readonly MsSqlMigrationFixture _fixture;
public SyncCallEmissionEndToEndTests(MsSqlMigrationFixture fixture)
{
_fixture = fixture;
}
private static string NewSiteId() =>
"test-bundle-h-" + Guid.NewGuid().ToString("N").Substring(0, 8);
private ScadaLinkDbContext CreateContext()
{
var options = new DbContextOptionsBuilder<ScadaLinkDbContext>()
.UseSqlServer(_fixture.ConnectionString)
.Options;
return new ScadaLinkDbContext(options);
}
private static AuditEvent NewEvent(string siteId, Guid? id = null) => new()
{
EventId = id ?? Guid.NewGuid(),
OccurredAtUtc = DateTime.UtcNow,
Channel = AuditChannel.ApiOutbound,
Kind = AuditKind.ApiCall,
Status = AuditStatus.Delivered,
SourceSiteId = siteId,
Target = "external-system-a/method",
};
private static IOptions<SqliteAuditWriterOptions> InMemorySqliteOptions() =>
Options.Create(new SqliteAuditWriterOptions
{
// Per-test unique database name + Mode=Memory + Cache=Shared keeps
// the in-memory database alive for the duration of the test even
// though Microsoft.Data.Sqlite tears the file down with the last
// connection. The DatabasePath field is unused because we override
// the connection string below.
DatabasePath = "ignored",
BatchSize = 64,
ChannelCapacity = 1024,
});
private static SqliteAuditWriter CreateInMemorySqliteWriter() =>
// The 3rd constructor argument is connectionStringOverride. A unique
// shared-cache in-memory URI keeps the schema scoped to this writer
// instance and torn down when the writer is disposed.
new SqliteAuditWriter(
InMemorySqliteOptions(),
NullLogger<SqliteAuditWriter>.Instance,
connectionStringOverride: $"Data Source=file:auditlog-h-{Guid.NewGuid():N}?mode=memory&cache=shared");
private static IOptions<SiteAuditTelemetryOptions> FastTelemetryOptions() =>
Options.Create(new SiteAuditTelemetryOptions
{
BatchSize = 256,
// 1s for both intervals so the initial scheduled tick fires fast
// and any failure-driven re-tick also fires fast — without
// requiring a public Drain message to be exposed.
BusyIntervalSeconds = 1,
IdleIntervalSeconds = 1,
});
private IActorRef CreateIngestActor(IAuditLogRepository repo) =>
Sys.ActorOf(Props.Create(() => new AuditLogIngestActor(
repo,
NullLogger<AuditLogIngestActor>.Instance)));
private IActorRef CreateTelemetryActor(
ISiteAuditQueue queue,
ISiteStreamAuditClient client) =>
Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor(
queue,
client,
FastTelemetryOptions(),
NullLogger<SiteAuditTelemetryActor>.Instance)));
[SkippableFact]
public async Task EndToEnd_OneWrittenEvent_Reaches_Central_AuditLog_Within_Reasonable_Time()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
// Real central wiring: repo + ingest actor.
await using var ingestContext = CreateContext();
var ingestRepo = new AuditLogRepository(ingestContext);
var ingestActor = CreateIngestActor(ingestRepo);
// Real site wiring: SQLite (in-memory) + ring + fallback + telemetry.
await using var sqliteWriter = CreateInMemorySqliteWriter();
var ring = new RingBufferFallback();
var fallback = new FallbackAuditWriter(
sqliteWriter,
ring,
new NoOpAuditWriteFailureCounter(),
NullLogger<FallbackAuditWriter>.Instance);
var stubClient = new DirectActorSiteStreamAuditClient(ingestActor);
CreateTelemetryActor(sqliteWriter, stubClient);
// Act — one fresh event written via the FallbackAuditWriter hot-path.
var evt = NewEvent(siteId);
await fallback.WriteAsync(evt);
// Assert — the central AuditLog row materialises within a window that
// covers initial tick (1s) + a generous slack for SQLite + the actor
// round-trip + EF/MSSQL latency.
await AwaitAssertAsync(async () =>
{
await using var readContext = CreateContext();
var readRepo = new AuditLogRepository(readContext);
var rows = await readRepo.QueryAsync(
new AuditLogQueryFilter(SourceSiteId: siteId),
new AuditLogPaging(PageSize: 10));
Assert.Single(rows);
Assert.Equal(evt.EventId, rows[0].EventId);
// Central stamps IngestedAtUtc; site never sets it.
Assert.NotNull(rows[0].IngestedAtUtc);
}, TimeSpan.FromSeconds(15));
}
[SkippableFact]
public async Task EndToEnd_GrpcStubError_RowStays_Pending_NextTick_Succeeds()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var ingestContext = CreateContext();
var ingestRepo = new AuditLogRepository(ingestContext);
var ingestActor = CreateIngestActor(ingestRepo);
await using var sqliteWriter = CreateInMemorySqliteWriter();
var ring = new RingBufferFallback();
var fallback = new FallbackAuditWriter(
sqliteWriter,
ring,
new NoOpAuditWriteFailureCounter(),
NullLogger<FallbackAuditWriter>.Instance);
// Stub fails the first push; subsequent calls flow through. The
// telemetry actor's on-failure branch keeps rows in Pending state, so
// the next tick re-reads them and tries again.
var stubClient = new DirectActorSiteStreamAuditClient(ingestActor)
{
FailNextCallCount = 1,
};
CreateTelemetryActor(sqliteWriter, stubClient);
var evt = NewEvent(siteId);
await fallback.WriteAsync(evt);
// Wait long enough for at least one failure-then-success cycle. With
// both intervals = 1s the actor retries quickly; allow 15s for slow CI.
await AwaitAssertAsync(async () =>
{
await using var readContext = CreateContext();
var readRepo = new AuditLogRepository(readContext);
var rows = await readRepo.QueryAsync(
new AuditLogQueryFilter(SourceSiteId: siteId),
new AuditLogPaging(PageSize: 10));
Assert.Single(rows);
Assert.Equal(evt.EventId, rows[0].EventId);
}, TimeSpan.FromSeconds(15));
Assert.True(stubClient.CallCount >= 2,
$"Expected at least one failed push + one successful push; saw {stubClient.CallCount} total client calls.");
// The site SQLite row must have flipped to Forwarded after the
// successful retry. ReadPendingAsync only returns Pending rows; the
// row should NOT show up there anymore.
var stillPending = await sqliteWriter.ReadPendingAsync(64);
Assert.DoesNotContain(stillPending, p => p.EventId == evt.EventId);
}
[SkippableFact]
public async Task EndToEnd_DuplicateSubmit_OnlyOneCentralRow()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var ingestContext = CreateContext();
var ingestRepo = new AuditLogRepository(ingestContext);
var ingestActor = CreateIngestActor(ingestRepo);
await using var sqliteWriter = CreateInMemorySqliteWriter();
var ring = new RingBufferFallback();
var fallback = new FallbackAuditWriter(
sqliteWriter,
ring,
new NoOpAuditWriteFailureCounter(),
NullLogger<FallbackAuditWriter>.Instance);
var stubClient = new DirectActorSiteStreamAuditClient(ingestActor);
CreateTelemetryActor(sqliteWriter, stubClient);
// Both writes carry the SAME EventId. Site SQLite's PRIMARY KEY
// constraint and the central repo's InsertIfNotExistsAsync both
// enforce first-write-wins, so only one central row must materialise.
var sharedId = Guid.NewGuid();
var evt1 = NewEvent(siteId, sharedId);
var evt2 = NewEvent(siteId, sharedId);
await fallback.WriteAsync(evt1);
await fallback.WriteAsync(evt2);
await AwaitAssertAsync(async () =>
{
await using var readContext = CreateContext();
var readRepo = new AuditLogRepository(readContext);
var rows = await readRepo.QueryAsync(
new AuditLogQueryFilter(SourceSiteId: siteId),
new AuditLogPaging(PageSize: 10));
Assert.Single(rows);
Assert.Equal(sharedId, rows[0].EventId);
}, TimeSpan.FromSeconds(15));
}
/// <summary>
/// Test double for <see cref="ISiteStreamAuditClient"/> that short-circuits
/// the gRPC wire and forwards the batch directly to a central
/// <see cref="AuditLogIngestActor"/> via Akka <see cref="Futures.Ask"/>. The
/// Akka <see cref="IngestAuditEventsReply"/> is converted to the proto
/// <see cref="IngestAck"/> that the telemetry actor expects.
/// </summary>
private sealed class DirectActorSiteStreamAuditClient : ISiteStreamAuditClient
{
private readonly IActorRef _ingestActor;
private int _failsRemaining;
private int _callCount;
public DirectActorSiteStreamAuditClient(IActorRef ingestActor)
{
_ingestActor = ingestActor ?? throw new ArgumentNullException(nameof(ingestActor));
}
/// <summary>
/// When &gt; 0, the next <c>FailNextCallCount</c> invocations of
/// <see cref="IngestAuditEventsAsync"/> throw to simulate a gRPC error;
/// after that count is exhausted, calls succeed normally.
/// </summary>
public int FailNextCallCount
{
get => _failsRemaining;
set => _failsRemaining = value;
}
public int CallCount => Volatile.Read(ref _callCount);
public async Task<IngestAck> IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct)
{
Interlocked.Increment(ref _callCount);
// Atomically consume one of the queued failures, if any. This
// lets the test arm a deterministic number of failures before the
// stub recovers.
if (Interlocked.Decrement(ref _failsRemaining) >= 0)
{
throw new InvalidOperationException("simulated gRPC failure for test");
}
// Decrement under-ran into negative territory; clamp at -1 to keep
// the field bounded even under many calls.
Interlocked.Exchange(ref _failsRemaining, -1);
// Decode the proto batch back into AuditEvent records — this
// mirrors what the production SiteStreamGrpcServer does before
// dispatching to the ingest actor (see Bundle D's gRPC handler).
var events = new List<AuditEvent>(batch.Events.Count);
foreach (var dto in batch.Events)
{
events.Add(ScadaLink.AuditLog.Telemetry.AuditEventMapper.FromDto(dto));
}
// Ask the central actor; the reply carries the accepted EventIds.
var reply = await _ingestActor
.Ask<IngestAuditEventsReply>(
new IngestAuditEventsCommand(events),
TimeSpan.FromSeconds(10))
.ConfigureAwait(false);
var ack = new IngestAck();
foreach (var id in reply.AcceptedEventIds)
{
ack.AcceptedEventIds.Add(id.ToString());
}
return ack;
}
}
}