diff --git a/tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs b/tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs new file mode 100644 index 0000000..5f16ce9 --- /dev/null +++ b/tests/ScadaLink.AuditLog.Tests/Integration/SyncCallEmissionEndToEndTests.cs @@ -0,0 +1,341 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using ScadaLink.AuditLog.Central; +using ScadaLink.AuditLog.Site; +using ScadaLink.AuditLog.Site.Telemetry; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Commons.Types.Audit; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.ConfigurationDatabase; +using ScadaLink.ConfigurationDatabase.Repositories; +using ScadaLink.ConfigurationDatabase.Tests.Migrations; +using ScadaLink.Communication.Grpc; + +namespace ScadaLink.AuditLog.Tests.Integration; + +/// +/// Bundle H — end-to-end test wiring the full Audit Log #23 M2 sync-call pipeline: +/// over a backed by +/// an in-memory SQLite database; the drains +/// Pending rows and pushes them through a stub +/// that forwards directly to the central backed +/// by a real on the . +/// +/// +/// +/// This is a component-level integration test, not a full Akka-cluster +/// test (per the M2 brainstorm decision). The stub gRPC client short-circuits +/// the wire so we exercise the real telemetry actor, the real ingest actor, the +/// real SQLite writer, and the real MSSQL repository — without standing up a +/// Kestrel host or two-cluster topology. +/// +/// +/// The site-side telemetry actor's Drain message is private; rather than +/// expose it we drive the drain by setting BusyIntervalSeconds = 1 so the +/// initial scheduled tick fires within a second of actor start. Tests then +/// until the central repository +/// observes the expected rows. +/// +/// +/// Each test uses a unique SourceSiteId (Guid suffix) so concurrent tests +/// and the per-fixture MSSQL database lifetime don't interfere with each other. +/// +/// +public class SyncCallEmissionEndToEndTests : TestKit, IClassFixture +{ + private readonly MsSqlMigrationFixture _fixture; + + public SyncCallEmissionEndToEndTests(MsSqlMigrationFixture fixture) + { + _fixture = fixture; + } + + private static string NewSiteId() => + "test-bundle-h-" + Guid.NewGuid().ToString("N").Substring(0, 8); + + private ScadaLinkDbContext CreateContext() + { + var options = new DbContextOptionsBuilder() + .UseSqlServer(_fixture.ConnectionString) + .Options; + return new ScadaLinkDbContext(options); + } + + private static AuditEvent NewEvent(string siteId, Guid? id = null) => new() + { + EventId = id ?? Guid.NewGuid(), + OccurredAtUtc = DateTime.UtcNow, + Channel = AuditChannel.ApiOutbound, + Kind = AuditKind.ApiCall, + Status = AuditStatus.Delivered, + SourceSiteId = siteId, + Target = "external-system-a/method", + }; + + private static IOptions InMemorySqliteOptions() => + Options.Create(new SqliteAuditWriterOptions + { + // Per-test unique database name + Mode=Memory + Cache=Shared keeps + // the in-memory database alive for the duration of the test even + // though Microsoft.Data.Sqlite tears the file down with the last + // connection. The DatabasePath field is unused because we override + // the connection string below. + DatabasePath = "ignored", + BatchSize = 64, + ChannelCapacity = 1024, + }); + + private static SqliteAuditWriter CreateInMemorySqliteWriter() => + // The 3rd constructor argument is connectionStringOverride. A unique + // shared-cache in-memory URI keeps the schema scoped to this writer + // instance and torn down when the writer is disposed. + new SqliteAuditWriter( + InMemorySqliteOptions(), + NullLogger.Instance, + connectionStringOverride: $"Data Source=file:auditlog-h-{Guid.NewGuid():N}?mode=memory&cache=shared"); + + private static IOptions FastTelemetryOptions() => + Options.Create(new SiteAuditTelemetryOptions + { + BatchSize = 256, + // 1s for both intervals so the initial scheduled tick fires fast + // and any failure-driven re-tick also fires fast — without + // requiring a public Drain message to be exposed. + BusyIntervalSeconds = 1, + IdleIntervalSeconds = 1, + }); + + private IActorRef CreateIngestActor(IAuditLogRepository repo) => + Sys.ActorOf(Props.Create(() => new AuditLogIngestActor( + repo, + NullLogger.Instance))); + + private IActorRef CreateTelemetryActor( + ISiteAuditQueue queue, + ISiteStreamAuditClient client) => + Sys.ActorOf(Props.Create(() => new SiteAuditTelemetryActor( + queue, + client, + FastTelemetryOptions(), + NullLogger.Instance))); + + [SkippableFact] + public async Task EndToEnd_OneWrittenEvent_Reaches_Central_AuditLog_Within_Reasonable_Time() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + + // Real central wiring: repo + ingest actor. + await using var ingestContext = CreateContext(); + var ingestRepo = new AuditLogRepository(ingestContext); + var ingestActor = CreateIngestActor(ingestRepo); + + // Real site wiring: SQLite (in-memory) + ring + fallback + telemetry. + await using var sqliteWriter = CreateInMemorySqliteWriter(); + var ring = new RingBufferFallback(); + var fallback = new FallbackAuditWriter( + sqliteWriter, + ring, + new NoOpAuditWriteFailureCounter(), + NullLogger.Instance); + + var stubClient = new DirectActorSiteStreamAuditClient(ingestActor); + CreateTelemetryActor(sqliteWriter, stubClient); + + // Act — one fresh event written via the FallbackAuditWriter hot-path. + var evt = NewEvent(siteId); + await fallback.WriteAsync(evt); + + // Assert — the central AuditLog row materialises within a window that + // covers initial tick (1s) + a generous slack for SQLite + the actor + // round-trip + EF/MSSQL latency. + await AwaitAssertAsync(async () => + { + await using var readContext = CreateContext(); + var readRepo = new AuditLogRepository(readContext); + var rows = await readRepo.QueryAsync( + new AuditLogQueryFilter(SourceSiteId: siteId), + new AuditLogPaging(PageSize: 10)); + Assert.Single(rows); + Assert.Equal(evt.EventId, rows[0].EventId); + // Central stamps IngestedAtUtc; site never sets it. + Assert.NotNull(rows[0].IngestedAtUtc); + }, TimeSpan.FromSeconds(15)); + } + + [SkippableFact] + public async Task EndToEnd_GrpcStubError_RowStays_Pending_NextTick_Succeeds() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + + await using var ingestContext = CreateContext(); + var ingestRepo = new AuditLogRepository(ingestContext); + var ingestActor = CreateIngestActor(ingestRepo); + + await using var sqliteWriter = CreateInMemorySqliteWriter(); + var ring = new RingBufferFallback(); + var fallback = new FallbackAuditWriter( + sqliteWriter, + ring, + new NoOpAuditWriteFailureCounter(), + NullLogger.Instance); + + // Stub fails the first push; subsequent calls flow through. The + // telemetry actor's on-failure branch keeps rows in Pending state, so + // the next tick re-reads them and tries again. + var stubClient = new DirectActorSiteStreamAuditClient(ingestActor) + { + FailNextCallCount = 1, + }; + CreateTelemetryActor(sqliteWriter, stubClient); + + var evt = NewEvent(siteId); + await fallback.WriteAsync(evt); + + // Wait long enough for at least one failure-then-success cycle. With + // both intervals = 1s the actor retries quickly; allow 15s for slow CI. + await AwaitAssertAsync(async () => + { + await using var readContext = CreateContext(); + var readRepo = new AuditLogRepository(readContext); + var rows = await readRepo.QueryAsync( + new AuditLogQueryFilter(SourceSiteId: siteId), + new AuditLogPaging(PageSize: 10)); + Assert.Single(rows); + Assert.Equal(evt.EventId, rows[0].EventId); + }, TimeSpan.FromSeconds(15)); + + Assert.True(stubClient.CallCount >= 2, + $"Expected at least one failed push + one successful push; saw {stubClient.CallCount} total client calls."); + + // The site SQLite row must have flipped to Forwarded after the + // successful retry. ReadPendingAsync only returns Pending rows; the + // row should NOT show up there anymore. + var stillPending = await sqliteWriter.ReadPendingAsync(64); + Assert.DoesNotContain(stillPending, p => p.EventId == evt.EventId); + } + + [SkippableFact] + public async Task EndToEnd_DuplicateSubmit_OnlyOneCentralRow() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + + await using var ingestContext = CreateContext(); + var ingestRepo = new AuditLogRepository(ingestContext); + var ingestActor = CreateIngestActor(ingestRepo); + + await using var sqliteWriter = CreateInMemorySqliteWriter(); + var ring = new RingBufferFallback(); + var fallback = new FallbackAuditWriter( + sqliteWriter, + ring, + new NoOpAuditWriteFailureCounter(), + NullLogger.Instance); + + var stubClient = new DirectActorSiteStreamAuditClient(ingestActor); + CreateTelemetryActor(sqliteWriter, stubClient); + + // Both writes carry the SAME EventId. Site SQLite's PRIMARY KEY + // constraint and the central repo's InsertIfNotExistsAsync both + // enforce first-write-wins, so only one central row must materialise. + var sharedId = Guid.NewGuid(); + var evt1 = NewEvent(siteId, sharedId); + var evt2 = NewEvent(siteId, sharedId); + + await fallback.WriteAsync(evt1); + await fallback.WriteAsync(evt2); + + await AwaitAssertAsync(async () => + { + await using var readContext = CreateContext(); + var readRepo = new AuditLogRepository(readContext); + var rows = await readRepo.QueryAsync( + new AuditLogQueryFilter(SourceSiteId: siteId), + new AuditLogPaging(PageSize: 10)); + Assert.Single(rows); + Assert.Equal(sharedId, rows[0].EventId); + }, TimeSpan.FromSeconds(15)); + } + + /// + /// Test double for that short-circuits + /// the gRPC wire and forwards the batch directly to a central + /// via Akka . The + /// Akka is converted to the proto + /// that the telemetry actor expects. + /// + private sealed class DirectActorSiteStreamAuditClient : ISiteStreamAuditClient + { + private readonly IActorRef _ingestActor; + private int _failsRemaining; + private int _callCount; + + public DirectActorSiteStreamAuditClient(IActorRef ingestActor) + { + _ingestActor = ingestActor ?? throw new ArgumentNullException(nameof(ingestActor)); + } + + /// + /// When > 0, the next FailNextCallCount invocations of + /// throw to simulate a gRPC error; + /// after that count is exhausted, calls succeed normally. + /// + public int FailNextCallCount + { + get => _failsRemaining; + set => _failsRemaining = value; + } + + public int CallCount => Volatile.Read(ref _callCount); + + public async Task IngestAuditEventsAsync(AuditEventBatch batch, CancellationToken ct) + { + Interlocked.Increment(ref _callCount); + + // Atomically consume one of the queued failures, if any. This + // lets the test arm a deterministic number of failures before the + // stub recovers. + if (Interlocked.Decrement(ref _failsRemaining) >= 0) + { + throw new InvalidOperationException("simulated gRPC failure for test"); + } + + // Decrement under-ran into negative territory; clamp at -1 to keep + // the field bounded even under many calls. + Interlocked.Exchange(ref _failsRemaining, -1); + + // Decode the proto batch back into AuditEvent records — this + // mirrors what the production SiteStreamGrpcServer does before + // dispatching to the ingest actor (see Bundle D's gRPC handler). + var events = new List(batch.Events.Count); + foreach (var dto in batch.Events) + { + events.Add(ScadaLink.AuditLog.Telemetry.AuditEventMapper.FromDto(dto)); + } + + // Ask the central actor; the reply carries the accepted EventIds. + var reply = await _ingestActor + .Ask( + new IngestAuditEventsCommand(events), + TimeSpan.FromSeconds(10)) + .ConfigureAwait(false); + + var ack = new IngestAck(); + foreach (var id in reply.AcceptedEventIds) + { + ack.AcceptedEventIds.Add(id.ToString()); + } + return ack; + } + } +}