using Grpc.Core; using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.Audit; using ZB.MOM.WW.ScadaBridge.AuditLog.Central; using ZB.MOM.WW.ScadaBridge.Communication.Grpc; using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; using Google.Protobuf.WellKnownTypes; using ProtoPullRequest = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest; using ProtoPullResponse = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse; namespace ZB.MOM.WW.ScadaBridge.AuditLog.Tests.Central; /// /// Bundle (M6) tests for — the /// production that dials a site over gRPC /// and issues the PullAuditEvents unary RPC for the reconciliation loop. /// The real GrpcChannel is replaced by an injected /// seam so the /// client's mapping / ordering / fault-swallowing behaviour can be asserted /// without standing up a Kestrel HTTP/2 endpoint. /// public class GrpcPullAuditEventsClientTests { private static readonly DateTime BaseTime = new(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc); /// Static enumerator returning a fixed site→endpoint map. private sealed class StaticEnumerator : ISiteEnumerator { private readonly IReadOnlyList _sites; public StaticEnumerator(params SiteEntry[] sites) => _sites = sites; public Task> EnumerateAsync(CancellationToken ct = default) => Task.FromResult(_sites); } /// /// Test invoker: records the endpoint + request it was asked to dial, then /// returns a scripted proto response (or throws a scripted exception so the /// fault-swallowing path can be exercised). /// private sealed class FakeInvoker : GrpcPullAuditEventsClient.IPullAuditEventsInvoker { public string? Endpoint { get; private set; } public ProtoPullRequest? Request { get; private set; } public int CallCount { get; private set; } private readonly ProtoPullResponse? _response; private readonly Exception? _throw; private FakeInvoker(ProtoPullResponse? response, Exception? toThrow) { _response = response; _throw = toThrow; } public static FakeInvoker Returning(ProtoPullResponse response) => new(response, null); public static FakeInvoker Throwing(Exception ex) => new(null, ex); public Task InvokeAsync( string endpoint, ProtoPullRequest request, CancellationToken ct) { CallCount++; Endpoint = endpoint; Request = request; if (_throw is not null) { throw _throw; } return Task.FromResult(_response!); } } private static AuditEventDto Dto(Guid id, DateTime occurredAtUtc) => AuditEventDtoMapper.ToDto(ScadaBridgeAuditEventFactory.Create( eventId: id, occurredAtUtc: occurredAtUtc, channel: AuditChannel.ApiOutbound, kind: AuditKind.ApiCall, status: AuditStatus.Delivered, sourceSiteId: "site-a")); [Fact] public async Task PullAsync_dials_the_resolved_endpoint_and_maps_events_oldest_first() { var older = Guid.NewGuid(); var newer = Guid.NewGuid(); // Wire is delivered newest-first on purpose to prove the client sorts. var proto = new ProtoPullResponse { MoreAvailable = true }; proto.Events.Add(Dto(newer, BaseTime.AddMinutes(5))); proto.Events.Add(Dto(older, BaseTime)); var invoker = FakeInvoker.Returning(proto); var sut = new GrpcPullAuditEventsClient( new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), invoker, NullLogger.Instance); var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); // Endpoint resolution + request shaping. Assert.Equal("http://site-a:8083", invoker.Endpoint); Assert.NotNull(invoker.Request); Assert.Equal(256, invoker.Request!.BatchSize); Assert.Equal(BaseTime, invoker.Request.SinceUtc.ToDateTime()); // Mapping + ordering + MoreAvailable surface. Assert.True(result.MoreAvailable); Assert.Equal(2, result.Events.Count); Assert.Equal(older, result.Events[0].EventId); Assert.Equal(newer, result.Events[1].EventId); } [Fact] public async Task PullAsync_returns_empty_when_site_endpoint_is_unknown() { var invoker = FakeInvoker.Returning(new ProtoPullResponse()); var sut = new GrpcPullAuditEventsClient( new StaticEnumerator(), // no sites registered invoker, NullLogger.Instance); var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); Assert.Empty(result.Events); Assert.False(result.MoreAvailable); Assert.Equal(0, invoker.CallCount); // never dialled — nothing to dial } [Theory] [InlineData(StatusCode.Unavailable)] // connection refused / site offline [InlineData(StatusCode.DeadlineExceeded)] // slow site / network blip [InlineData(StatusCode.Cancelled)] public async Task PullAsync_swallows_tolerable_transport_faults_to_empty_response(StatusCode code) { var invoker = FakeInvoker.Throwing(new RpcException(new Status(code, "transport fault"))); var sut = new GrpcPullAuditEventsClient( new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), invoker, NullLogger.Instance); // MUST NOT throw — per the IPullAuditEventsClient contract. var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); Assert.Empty(result.Events); Assert.False(result.MoreAvailable); } [Fact] public async Task PullAsync_swallows_connection_layer_faults_to_empty_response() { // A bare HttpRequestException (e.g. DNS / refused socket before a gRPC // status is established) is also tolerable. var invoker = FakeInvoker.Throwing(new HttpRequestException("connection refused")); var sut = new GrpcPullAuditEventsClient( new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), invoker, NullLogger.Instance); var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); Assert.Empty(result.Events); Assert.False(result.MoreAvailable); } [Fact] public async Task PullAsync_swallows_unexpected_faults_to_empty_response() { // I3(a): the catch-all path. A non-transport fault (e.g. a mapping/ // protocol error surfacing as InvalidOperationException) must still be // swallowed to empty — audit reconciliation is best-effort and a throw // would only get re-caught by the actor's per-site guard. var invoker = FakeInvoker.Throwing(new InvalidOperationException("boom")); var sut = new GrpcPullAuditEventsClient( new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), invoker, NullLogger.Instance); var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); Assert.Empty(result.Events); Assert.False(result.MoreAvailable); } [Fact] public async Task PullAsync_with_minvalue_unspecified_cursor_does_not_throw_and_dials() { // I3(b) / guards I2: the reconciliation cursor starts at DateTime.MinValue // with Kind=Unspecified. EnsureUtc must treat it AS UTC (per the system-wide // "all timestamps are UTC" invariant) and NOT call ToUniversalTime() — on a // host with a positive UTC offset that underflows and Timestamp.FromDateTime // throws ArgumentOutOfRangeException, crashing the FIRST pull for every site. var minUnspecified = default(DateTime); // DateTime.MinValue, Kind=Unspecified Assert.Equal(DateTimeKind.Unspecified, minUnspecified.Kind); var invoker = FakeInvoker.Returning(new ProtoPullResponse()); var sut = new GrpcPullAuditEventsClient( new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), invoker, NullLogger.Instance); // MUST NOT throw — must dial successfully. var result = await sut.PullAsync("site-a", minUnspecified, batchSize: 256, CancellationToken.None); Assert.Equal(1, invoker.CallCount); Assert.Equal("http://site-a:8083", invoker.Endpoint); Assert.NotNull(invoker.Request); // The unspecified-MinValue cursor is carried through verbatim as UTC // MinValue (no local-TZ conversion). Assert.Equal(DateTime.MinValue, invoker.Request!.SinceUtc.ToDateTime()); Assert.Empty(result.Events); Assert.False(result.MoreAvailable); } }