using Akka.TestKit.Xunit2; using Google.Protobuf.WellKnownTypes; using Grpc.Core; using Microsoft.Extensions.Logging.Abstractions; using NSubstitute; using NSubstitute.ExceptionExtensions; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces.Services; using ScadaLink.Commons.Types.Enums; using ScadaLink.Communication.Grpc; namespace ScadaLink.Communication.Tests; /// /// Bundle A A2 tests for . /// Verifies the request → ISiteAuditQueue.ReadPendingSinceAsync → response → /// MarkReconciledAsync round-trip through the gRPC handler. The queue is an /// NSubstitute stub so the tests never touch SQLite. /// public class SiteStreamPullAuditEventsTests : TestKit { private readonly ISiteStreamSubscriber _subscriber = Substitute.For(); private SiteStreamGrpcServer CreateServer() => new(_subscriber, NullLogger.Instance); private static ServerCallContext NewContext(CancellationToken ct = default) { var context = Substitute.For(); context.CancellationToken.Returns(ct); return context; } private static AuditEvent NewEvent(DateTime? occurredAt = null) => new() { EventId = Guid.NewGuid(), OccurredAtUtc = occurredAt ?? DateTime.SpecifyKind(new DateTime(2026, 5, 20, 10, 0, 0), DateTimeKind.Utc), Channel = AuditChannel.ApiOutbound, Kind = AuditKind.ApiCall, Status = AuditStatus.Delivered, SourceSiteId = "site-1", PayloadTruncated = false, ForwardState = AuditForwardState.Pending, }; [Fact] public async Task PullAuditEvents_NoQueueWired_ReturnsEmptyResponse() { var server = CreateServer(); // Intentionally do NOT call SetSiteAuditQueue — simulates a central-only // host or a wiring-incomplete startup window. var request = new PullAuditEventsRequest { SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddMinutes(-5)), BatchSize = 100, }; var response = await server.PullAuditEvents(request, NewContext()); Assert.Empty(response.Events); Assert.False(response.MoreAvailable); } [Fact] public async Task PullAuditEvents_With5PendingRows_ReturnsAllFiveDtos_AndFlipsToReconciled() { var queue = Substitute.For(); var events = Enumerable.Range(0, 5).Select(_ => NewEvent()).ToList(); queue.ReadPendingSinceAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns((IReadOnlyList)events); var server = CreateServer(); server.SetSiteAuditQueue(queue); var request = new PullAuditEventsRequest { SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddHours(-1)), BatchSize = 100, // larger than returned count so MoreAvailable should be false }; var response = await server.PullAuditEvents(request, NewContext()); Assert.Equal(5, response.Events.Count); Assert.False(response.MoreAvailable); // 5 < 100 var expectedIds = events.Select(e => e.EventId.ToString()).ToHashSet(); Assert.True(expectedIds.SetEquals(response.Events.Select(d => d.EventId).ToHashSet())); // Verify MarkReconciledAsync received the same 5 ids (best-effort flip). await queue.Received(1).MarkReconciledAsync( Arg.Is>(ids => ids.Count == 5 && ids.ToHashSet().SetEquals(events.Select(e => e.EventId))), Arg.Any()); } [Fact] public async Task PullAuditEvents_RowsOlderThanSinceUtc_Excluded() { // The handler delegates the since-utc filter to ReadPendingSinceAsync; // this test verifies it passes the request value through verbatim // (no clock skew, no off-by-one) and that an empty queue response // yields an empty gRPC response. var queue = Substitute.For(); var capturedSince = DateTime.MinValue; queue.ReadPendingSinceAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(call => { capturedSince = call.ArgAt(0); return (IReadOnlyList)Array.Empty(); }); var server = CreateServer(); server.SetSiteAuditQueue(queue); var since = DateTime.SpecifyKind(new DateTime(2026, 5, 20, 9, 30, 0), DateTimeKind.Utc); var request = new PullAuditEventsRequest { SinceUtc = Timestamp.FromDateTime(since), BatchSize = 50, }; var response = await server.PullAuditEvents(request, NewContext()); Assert.Empty(response.Events); Assert.False(response.MoreAvailable); Assert.Equal(since, capturedSince); // Empty result → no MarkReconciledAsync call (no rows to flip). await queue.DidNotReceive().MarkReconciledAsync( Arg.Any>(), Arg.Any()); } [Fact] public async Task PullAuditEvents_BatchSize3_Returns3Rows_MoreAvailableTrue() { var queue = Substitute.For(); var events = Enumerable.Range(0, 3).Select(_ => NewEvent()).ToList(); queue.ReadPendingSinceAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns((IReadOnlyList)events); var server = CreateServer(); server.SetSiteAuditQueue(queue); var request = new PullAuditEventsRequest { SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddHours(-1)), BatchSize = 3, }; var response = await server.PullAuditEvents(request, NewContext()); Assert.Equal(3, response.Events.Count); // saturated batch → central needs to know to issue a follow-up pull Assert.True(response.MoreAvailable); } [Fact] public async Task PullAuditEvents_MarkReconciledThrows_ResponseStillReturned() { // The Reconciled flip is best-effort — if it fails, the response must // still surface so central can ingest the rows (and dedup on EventId // when it pulls them again). var queue = Substitute.For(); var events = Enumerable.Range(0, 2).Select(_ => NewEvent()).ToList(); queue.ReadPendingSinceAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns((IReadOnlyList)events); queue.MarkReconciledAsync(Arg.Any>(), Arg.Any()) .ThrowsAsync(new InvalidOperationException("SQLite disposed mid-call")); var server = CreateServer(); server.SetSiteAuditQueue(queue); var request = new PullAuditEventsRequest { SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddHours(-1)), BatchSize = 100, }; // Must NOT throw — the response is built before the flip and returned // regardless of the flip outcome. var response = await server.PullAuditEvents(request, NewContext()); Assert.Equal(2, response.Events.Count); } }