using Google.Protobuf.WellKnownTypes; using Grpc.Core; using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.ScadaBridge.AuditLog.Central; using ZB.MOM.WW.ScadaBridge.Communication.Grpc; using ProtoPullRequest = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest; using ProtoPullResponse = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsResponse; namespace ZB.MOM.WW.ScadaBridge.AuditLog.Tests.Central; /// /// Tests for — the production /// that dials a site over gRPC and issues the /// PullSiteCalls unary RPC for the Site Call Audit (#22) reconciliation /// loop. The real GrpcChannel is replaced by an injected /// seam so the /// client's mapping / ordering / SourceSite-restamp / fault-swallowing behaviour /// can be asserted without standing up a Kestrel HTTP/2 endpoint. Mirrors /// . /// public class GrpcPullSiteCallsClientTests { private static readonly DateTime BaseTime = new(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc); private sealed class StaticEnumerator : ISiteEnumerator { private readonly IReadOnlyList _sites; public StaticEnumerator(params SiteEntry[] sites) => _sites = sites; public Task> EnumerateAsync(CancellationToken ct = default) => Task.FromResult(_sites); } private sealed class FakeInvoker : GrpcPullSiteCallsClient.IPullSiteCallsInvoker { public string? Endpoint { get; private set; } public ProtoPullRequest? Request { get; private set; } public int CallCount { get; private set; } private readonly ProtoPullResponse? _response; private readonly Exception? _throw; private FakeInvoker(ProtoPullResponse? response, Exception? toThrow) { _response = response; _throw = toThrow; } public static FakeInvoker Returning(ProtoPullResponse response) => new(response, null); public static FakeInvoker Throwing(Exception ex) => new(null, ex); public Task InvokeAsync( string endpoint, ProtoPullRequest request, CancellationToken ct) { CallCount++; Endpoint = endpoint; Request = request; if (_throw is not null) { throw _throw; } return Task.FromResult(_response!); } } // The site leaves SourceSite empty (it is not a tracking-store column); the // client re-stamps it from the dialed siteId. Mint DTOs with empty SourceSite // to prove that re-stamp. private static SiteCallOperationalDto Dto(Guid id, DateTime updatedAtUtc) => new() { TrackedOperationId = id.ToString(), Channel = "ApiOutbound", Target = "ERP.GetOrder", SourceSite = string.Empty, SourceNode = "node-a", Status = "Attempted", RetryCount = 1, LastError = string.Empty, CreatedAtUtc = Timestamp.FromDateTime(BaseTime), UpdatedAtUtc = Timestamp.FromDateTime(updatedAtUtc), }; [Fact] public async Task PullAsync_dials_resolved_endpoint_maps_oldest_first_and_restamps_source_site() { var older = Guid.NewGuid(); var newer = Guid.NewGuid(); // Wire delivered newest-first on purpose to prove the client sorts. var proto = new ProtoPullResponse { MoreAvailable = true }; proto.Operationals.Add(Dto(newer, BaseTime.AddMinutes(5))); proto.Operationals.Add(Dto(older, BaseTime)); var invoker = FakeInvoker.Returning(proto); var sut = new GrpcPullSiteCallsClient( new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), invoker, NullLogger.Instance); var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); // Endpoint resolution + request shaping. Assert.Equal("http://site-a:8083", invoker.Endpoint); Assert.NotNull(invoker.Request); Assert.Equal(256, invoker.Request!.BatchSize); Assert.Equal(BaseTime, invoker.Request.SinceUtc.ToDateTime()); // Mapping + ordering + MoreAvailable surface. Assert.True(result.MoreAvailable); Assert.Equal(2, result.SiteCalls.Count); Assert.Equal(older, result.SiteCalls[0].TrackedOperationId.Value); Assert.Equal(newer, result.SiteCalls[1].TrackedOperationId.Value); // SourceSite re-stamped from the dialed siteId (DTO carried empty). Assert.Equal("site-a", result.SiteCalls[0].SourceSite); Assert.Equal("site-a", result.SiteCalls[1].SourceSite); // Round-tripped fields survive FromDto. Assert.Equal("ApiOutbound", result.SiteCalls[0].Channel); Assert.Equal("node-a", result.SiteCalls[0].SourceNode); Assert.Equal(1, result.SiteCalls[0].RetryCount); } [Fact] public async Task PullAsync_returns_empty_when_site_endpoint_is_unknown() { var invoker = FakeInvoker.Returning(new ProtoPullResponse()); var sut = new GrpcPullSiteCallsClient( new StaticEnumerator(), // no sites registered invoker, NullLogger.Instance); var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); Assert.Empty(result.SiteCalls); Assert.False(result.MoreAvailable); Assert.Equal(0, invoker.CallCount); // never dialled — nothing to dial } [Theory] [InlineData(StatusCode.Unavailable)] [InlineData(StatusCode.DeadlineExceeded)] [InlineData(StatusCode.Cancelled)] public async Task PullAsync_swallows_tolerable_transport_faults_to_empty_response(StatusCode code) { var invoker = FakeInvoker.Throwing(new RpcException(new Status(code, "transport fault"))); var sut = new GrpcPullSiteCallsClient( new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), invoker, NullLogger.Instance); var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); Assert.Empty(result.SiteCalls); Assert.False(result.MoreAvailable); } [Fact] public async Task PullAsync_swallows_connection_layer_faults_to_empty_response() { var invoker = FakeInvoker.Throwing(new HttpRequestException("connection refused")); var sut = new GrpcPullSiteCallsClient( new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), invoker, NullLogger.Instance); var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); Assert.Empty(result.SiteCalls); Assert.False(result.MoreAvailable); } [Fact] public async Task PullAsync_swallows_unexpected_faults_to_empty_response() { var invoker = FakeInvoker.Throwing(new InvalidOperationException("boom")); var sut = new GrpcPullSiteCallsClient( new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), invoker, NullLogger.Instance); var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); Assert.Empty(result.SiteCalls); Assert.False(result.MoreAvailable); } [Fact] public async Task PullAsync_skips_poison_row_and_returns_the_good_rows() { // Poison-row resilience: one malformed operational (an unparseable // TrackedOperationId fails SiteCallDtoMapper.FromDto → Guid.Parse) must be // skipped+logged PER ROW rather than sinking the whole batch through the // outer catch-all. The two good rows survive, re-stamped + oldest-first. var older = Guid.NewGuid(); var newer = Guid.NewGuid(); var proto = new ProtoPullResponse { MoreAvailable = false }; proto.Operationals.Add(Dto(newer, BaseTime.AddMinutes(5))); // Malformed row in the middle of the batch. var bad = Dto(Guid.NewGuid(), BaseTime.AddMinutes(2)); bad.TrackedOperationId = "not-a-guid"; proto.Operationals.Add(bad); proto.Operationals.Add(Dto(older, BaseTime)); var invoker = FakeInvoker.Returning(proto); var sut = new GrpcPullSiteCallsClient( new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), invoker, NullLogger.Instance); // Must NOT throw — the bad row is dropped, the good rows are returned. var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); Assert.Equal(2, result.SiteCalls.Count); // Survivors are oldest-first and SourceSite re-stamped from the dialed siteId. Assert.Equal(older, result.SiteCalls[0].TrackedOperationId.Value); Assert.Equal(newer, result.SiteCalls[1].TrackedOperationId.Value); Assert.Equal("site-a", result.SiteCalls[0].SourceSite); Assert.Equal("site-a", result.SiteCalls[1].SourceSite); Assert.False(result.MoreAvailable); } [Fact] public async Task PullAsync_with_minvalue_unspecified_cursor_does_not_throw_and_dials() { // The reconciliation cursor starts at DateTime.MinValue with // Kind=Unspecified. EnsureUtc must treat it AS UTC (per the system-wide // invariant) and NOT call ToUniversalTime() — on a host with a positive // UTC offset that underflows and Timestamp.FromDateTime throws, crashing // the FIRST pull for every site. var minUnspecified = default(DateTime); Assert.Equal(DateTimeKind.Unspecified, minUnspecified.Kind); var invoker = FakeInvoker.Returning(new ProtoPullResponse()); var sut = new GrpcPullSiteCallsClient( new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), invoker, NullLogger.Instance); var result = await sut.PullAsync("site-a", minUnspecified, batchSize: 256, CancellationToken.None); Assert.Equal(1, invoker.CallCount); Assert.Equal("http://site-a:8083", invoker.Endpoint); Assert.NotNull(invoker.Request); Assert.Equal(DateTime.MinValue, invoker.Request!.SinceUtc.ToDateTime()); Assert.Empty(result.SiteCalls); Assert.False(result.MoreAvailable); } }