using Akka.Actor; using Akka.TestKit.Xunit2; using Google.Protobuf.WellKnownTypes; using Grpc.Core; using Microsoft.Extensions.Logging.Abstractions; using NSubstitute; using ScadaLink.Commons.Messages.Audit; using ScadaLink.Communication.Grpc; namespace ScadaLink.Communication.Tests; /// /// Bundle D D2 tests for . /// Verifies the DTO→entity→actor→ack round-trip through the gRPC handler. A /// tiny EchoCachedIngestActor stands in for the central /// AuditLogIngestActor, replying with the EventIds it received so the /// test asserts the wiring without depending on MSSQL. /// public class SiteStreamIngestCachedTelemetryTests : TestKit { private readonly ISiteStreamSubscriber _subscriber = Substitute.For(); private SiteStreamGrpcServer CreateServer() => new(_subscriber, NullLogger.Instance); private static ServerCallContext NewContext(CancellationToken ct = default) { var context = Substitute.For(); context.CancellationToken.Returns(ct); return context; } private static CachedTelemetryPacket NewPacket(Guid? eventId = null, Guid? trackedId = null) { var now = Timestamp.FromDateTime( DateTime.SpecifyKind(new DateTime(2026, 5, 20, 10, 0, 0), DateTimeKind.Utc)); return new CachedTelemetryPacket { AuditEvent = new AuditEventDto { EventId = (eventId ?? Guid.NewGuid()).ToString(), OccurredAtUtc = now, Channel = "ApiOutbound", Kind = "CachedSubmit", Status = "Submitted", SourceSiteId = "site-1", CorrelationId = (trackedId ?? Guid.NewGuid()).ToString(), }, Operational = new SiteCallOperationalDto { TrackedOperationId = (trackedId ?? Guid.NewGuid()).ToString(), Channel = "ApiOutbound", Target = "ERP.GetOrder", SourceSite = "site-1", Status = "Submitted", RetryCount = 0, CreatedAtUtc = now, UpdatedAtUtc = now, }, }; } [Fact] public async Task IngestCachedTelemetry_RoutesToActor_ReturnsReply() { // Arrange — stub actor that echoes every received EventId back. var stubActor = Sys.ActorOf(Props.Create(() => new EchoCachedIngestActor())); var server = CreateServer(); server.SetAuditIngestActor(stubActor); var packets = Enumerable.Range(0, 3) .Select(_ => NewPacket()) .ToList(); var batch = new CachedTelemetryBatch(); batch.Packets.AddRange(packets); // Act var ack = await server.IngestCachedTelemetry(batch, NewContext()); // Assert — every packet's EventId appears in the ack, demonstrating // end-to-end routing through the actor. Assert.Equal(3, ack.AcceptedEventIds.Count); var expectedIds = packets.Select(p => p.AuditEvent.EventId).ToHashSet(); Assert.True(expectedIds.SetEquals(ack.AcceptedEventIds.ToHashSet())); } [Fact] public async Task IngestCachedTelemetry_NoActorWired_ReturnsEmptyAck() { var server = CreateServer(); // Intentionally do NOT call SetAuditIngestActor — simulates host // startup race window. var batch = new CachedTelemetryBatch(); batch.Packets.Add(NewPacket()); var ack = await server.IngestCachedTelemetry(batch, NewContext()); Assert.Empty(ack.AcceptedEventIds); } /// /// Tiny ReceiveActor that echoes every EventId in an incoming /// back as an /// . Stands in for the central /// AuditLogIngestActor so this test never touches MSSQL. /// private sealed class EchoCachedIngestActor : ReceiveActor { public EchoCachedIngestActor() { Receive(cmd => { var ids = cmd.Entries.Select(e => e.Audit.EventId).ToList(); Sender.Tell(new IngestCachedTelemetryReply(ids)); }); } } }