diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullAuditEventsClient.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullAuditEventsClient.cs new file mode 100644 index 00000000..f836945b --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullAuditEventsClient.cs @@ -0,0 +1,257 @@ +using System.Collections.Concurrent; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using Grpc.Net.Client; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.ScadaBridge.Communication; +using ZB.MOM.WW.ScadaBridge.Communication.Grpc; +using ProtoPullRequest = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest; +using ProtoPullResponse = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse; +using PullAuditEventsResponse = ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration.PullAuditEventsResponse; + +namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central; + +/// +/// Production (Audit Log #23, M6) that the +/// central uses to pull the next +/// reconciliation batch from a site over the PullAuditEvents unary gRPC +/// RPC served by SiteStreamGrpcServer. +/// +/// +/// +/// Endpoint resolution. The actor passes only a siteId; this +/// client resolves it to a gRPC authority via +/// () on every call so a NodeA→NodeB +/// failover flip or an edited site address takes effect on the next tick — the +/// same liveness guarantee SiteStreamGrpcClientFactory gives the +/// real-time stream. A site with no registered endpoint yields an empty +/// response (no dial); reconciliation simply has nothing to pull from it. +/// +/// +/// Fault tolerance. Per the +/// contract, tolerable transport faults (connection refused / site offline = +/// , slow site = , +/// shutdown = , plus bare +/// / SocketException before a gRPC +/// status is established) are caught and collapsed to an empty response — one +/// offline site must never sink the rest of the reconciliation tick. Any other +/// fault (e.g. a malformed reply that fails DTO mapping) is also swallowed to +/// empty: audit reconciliation is best-effort and a throw would only get +/// re-caught by the actor's own per-site guard. +/// +/// +/// Testability. The unary call is reached through the +/// seam. Production binds +/// (one cached +/// per endpoint, keepalive from ); unit tests +/// inject a fake invoker so no real HTTP/2 endpoint is required. +/// +/// +public sealed class GrpcPullAuditEventsClient : IPullAuditEventsClient +{ + private readonly ISiteEnumerator _sites; + private readonly IPullAuditEventsInvoker _invoker; + private readonly ILogger _logger; + + /// + /// Creates the client over the given site enumerator and unary-call invoker. + /// + /// Resolves a siteId to its gRPC endpoint. + /// Seam that issues the PullAuditEvents unary RPC against a resolved endpoint. + /// Logger for transport-fault diagnostics. + public GrpcPullAuditEventsClient( + ISiteEnumerator sites, + IPullAuditEventsInvoker invoker, + ILogger logger) + { + _sites = sites ?? throw new ArgumentNullException(nameof(sites)); + _invoker = invoker ?? throw new ArgumentNullException(nameof(invoker)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + public async Task PullAsync( + string siteId, + DateTime sinceUtc, + int batchSize, + CancellationToken ct) + { + var endpoint = await ResolveEndpointAsync(siteId, ct).ConfigureAwait(false); + if (endpoint is null) + { + // No gRPC address registered for the site — absence of an address is + // a configuration decision (mirrors ISiteEnumerator's own contract), + // not a runtime error, so there is simply nothing to pull. + _logger.LogDebug( + "PullAuditEvents skipped: no gRPC endpoint registered for site {SiteId}.", siteId); + return Empty; + } + + var request = new ProtoPullRequest + { + // ReadPendingSinceAsync treats DateTime.MinValue as "from the start"; + // EnsureUtc keeps Timestamp.FromDateTime happy (it requires UTC kind). + SinceUtc = Timestamp.FromDateTime(EnsureUtc(sinceUtc)), + BatchSize = batchSize, + }; + + ProtoPullResponse reply; + try + { + reply = await _invoker.InvokeAsync(endpoint, request, ct).ConfigureAwait(false); + } + catch (RpcException ex) when (IsTolerable(ex.StatusCode)) + { + _logger.LogDebug(ex, + "PullAuditEvents tolerable transport fault for site {SiteId} ({Endpoint}): {Status}. Returning empty batch.", + siteId, endpoint, ex.StatusCode); + return Empty; + } + catch (Exception ex) when (ex is HttpRequestException or System.Net.Sockets.SocketException) + { + _logger.LogDebug(ex, + "PullAuditEvents connection-layer fault for site {SiteId} ({Endpoint}). Returning empty batch.", + siteId, endpoint); + return Empty; + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + // Reconciliation tick was cancelled (host shutdown / scope dispose). + return Empty; + } + catch (Exception ex) + { + // Any other fault (e.g. a malformed reply that fails DTO mapping + // below would actually surface here only if mapping moved inline, + // but a non-RpcException transport fault wrapper lands here too). + // Audit reconciliation is best-effort; swallow to empty rather than + // throw — the actor's per-site guard would only re-catch it. + _logger.LogWarning(ex, + "PullAuditEvents unexpected fault for site {SiteId} ({Endpoint}). Returning empty batch.", + siteId, endpoint); + return Empty; + } + + // Map proto DTOs to canonical AuditEvent records and order oldest-first + // (the wire is already ordered by the site queue, but the + // IPullAuditEventsClient contract is explicit, so sort defensively). + var events = reply.Events + .Select(AuditEventDtoMapper.FromDto) + .OrderBy(e => e.OccurredAtUtc) + .ToList(); + + return new PullAuditEventsResponse(events, reply.MoreAvailable); + } + + private async Task ResolveEndpointAsync(string siteId, CancellationToken ct) + { + var sites = await _sites.EnumerateAsync(ct).ConfigureAwait(false); + foreach (var site in sites) + { + if (string.Equals(site.SiteId, siteId, StringComparison.Ordinal) && + !string.IsNullOrWhiteSpace(site.GrpcEndpoint)) + { + return site.GrpcEndpoint; + } + } + return null; + } + + private static readonly PullAuditEventsResponse Empty = + new(Array.Empty(), MoreAvailable: false); + + private static bool IsTolerable(StatusCode code) => code is + StatusCode.Unavailable or + StatusCode.DeadlineExceeded or + StatusCode.Cancelled; + + private static DateTime EnsureUtc(DateTime value) => + value.Kind == DateTimeKind.Utc + ? value + : DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc); + + /// + /// Seam over the PullAuditEvents unary gRPC call against a resolved + /// site endpoint. Extracted so can + /// be unit-tested without a real . Production binds + /// . + /// + public interface IPullAuditEventsInvoker + { + /// + /// Issues the PullAuditEvents unary RPC against . + /// May throw / + /// on transport faults — the caller classifies and swallows tolerable ones. + /// + /// The site gRPC authority (e.g. http://site-a:8083). + /// The wire-format pull request. + /// Cancellation token. + /// The wire-format pull response. + Task InvokeAsync(string endpoint, ProtoPullRequest request, CancellationToken ct); + } +} + +/// +/// Production : +/// caches one per endpoint (keepalive from +/// , mirroring SiteStreamGrpcClient) +/// and issues the unary PullAuditEventsAsync call. The cache flushes a +/// stale channel when an endpoint is re-keyed (NodeA→NodeB failover / address +/// edit), the same liveness guarantee SiteStreamGrpcClientFactory gives +/// the streaming client. +/// +public sealed class GrpcPullAuditEventsInvoker + : GrpcPullAuditEventsClient.IPullAuditEventsInvoker, IDisposable +{ + private readonly ConcurrentDictionary _channels = new(StringComparer.Ordinal); + private readonly CommunicationOptions _options; + + /// + /// Creates the invoker using default . + /// + public GrpcPullAuditEventsInvoker() + : this(new CommunicationOptions()) + { + } + + /// + /// Creates the invoker, applying the configured gRPC keepalive settings to + /// every channel it opens. + /// + /// Communication options supplying gRPC keepalive timings. + public GrpcPullAuditEventsInvoker(CommunicationOptions options) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + } + + /// + public async Task InvokeAsync( + string endpoint, ProtoPullRequest request, CancellationToken ct) + { + var channel = _channels.GetOrAdd(endpoint, CreateChannel); + var client = new SiteStreamService.SiteStreamServiceClient(channel); + using var call = client.PullAuditEventsAsync(request, cancellationToken: ct); + return await call.ResponseAsync.ConfigureAwait(false); + } + + private GrpcChannel CreateChannel(string endpoint) => + GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions + { + HttpHandler = new SocketsHttpHandler + { + KeepAlivePingDelay = _options.GrpcKeepAlivePingDelay, + KeepAlivePingTimeout = _options.GrpcKeepAlivePingTimeout, + KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always, + }, + }); + + /// Disposes all cached channels. + public void Dispose() + { + foreach (var channel in _channels.Values) + { + channel.Dispose(); + } + _channels.Clear(); + } +} diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs index 6b1e0255..764ee91a 100644 --- a/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs +++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs @@ -362,4 +362,69 @@ public static class ServiceCollectionExtensions return services; } + + /// + /// Audit Log (#23) M6 — central-only registration of the production + /// () + /// and its unary-call invoker () used + /// by to pull reconciliation + /// batches from each site over the PullAuditEvents gRPC RPC. + /// + /// + /// + /// Kept out of — which also runs on site + /// composition roots — because the client dials sites and resolves + /// (a central-only collaborator wired + /// alongside the reconciliation singleton). Folding it into + /// would register a site-dialing client on every + /// site host, violating the "every Add* call is safe from any + /// composition root" invariant. This helper is the central analogue of + /// . + /// + /// + /// The binds with default + /// + /// keepalive unless an IOptions<CommunicationOptions> is + /// already registered, in which case the configured timings flow through — + /// matching how SiteStreamGrpcClientFactory takes its keepalive from + /// the same options. + /// + /// + /// is NOT registered here: its production + /// implementation (wrapping ISiteRepository) ships with the + /// reconciliation-singleton wiring in the Host. The client resolves the + /// enumerator lazily at actor-construction time, so this binding is safe to + /// issue before the enumerator binding lands. + /// + /// + /// The service collection to register into. + /// The same for chaining. + public static IServiceCollection AddAuditLogCentralReconciliationClient( + this IServiceCollection services) + { + ArgumentNullException.ThrowIfNull(services); + + // The invoker owns the per-endpoint GrpcChannel cache, so it must be a + // singleton — a fresh invoker per resolution would leak channels. + // Resolve CommunicationOptions if present (the central Host binds it), + // otherwise fall back to defaults so this helper stays standalone. + services.TryAddSingleton(sp => + { + var options = sp + .GetService>(); + return options is null + ? new GrpcPullAuditEventsInvoker() + : new GrpcPullAuditEventsInvoker(options.Value); + }); + services.TryAddSingleton( + sp => sp.GetRequiredService()); + + services.TryAddSingleton(sp => new GrpcPullAuditEventsClient( + sp.GetRequiredService(), + sp.GetRequiredService(), + sp.GetRequiredService>())); + + return services; + } } diff --git a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullAuditEventsClientTests.cs b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullAuditEventsClientTests.cs new file mode 100644 index 00000000..c6a42b7d --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullAuditEventsClientTests.cs @@ -0,0 +1,166 @@ +using Grpc.Core; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.Audit; +using ZB.MOM.WW.ScadaBridge.AuditLog.Central; +using ZB.MOM.WW.ScadaBridge.Communication.Grpc; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; +using Google.Protobuf.WellKnownTypes; +using ProtoPullRequest = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest; +using ProtoPullResponse = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse; + +namespace ZB.MOM.WW.ScadaBridge.AuditLog.Tests.Central; + +/// +/// Bundle (M6) tests for — the +/// production that dials a site over gRPC +/// and issues the PullAuditEvents unary RPC for the reconciliation loop. +/// The real GrpcChannel is replaced by an injected +/// seam so the +/// client's mapping / ordering / fault-swallowing behaviour can be asserted +/// without standing up a Kestrel HTTP/2 endpoint. +/// +public class GrpcPullAuditEventsClientTests +{ + private static readonly DateTime BaseTime = + new(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc); + + /// Static enumerator returning a fixed site→endpoint map. + private sealed class StaticEnumerator : ISiteEnumerator + { + private readonly IReadOnlyList _sites; + public StaticEnumerator(params SiteEntry[] sites) => _sites = sites; + public Task> EnumerateAsync(CancellationToken ct = default) => + Task.FromResult(_sites); + } + + /// + /// Test invoker: records the endpoint + request it was asked to dial, then + /// returns a scripted proto response (or throws a scripted exception so the + /// fault-swallowing path can be exercised). + /// + private sealed class FakeInvoker : GrpcPullAuditEventsClient.IPullAuditEventsInvoker + { + public string? Endpoint { get; private set; } + public ProtoPullRequest? Request { get; private set; } + public int CallCount { get; private set; } + + private readonly ProtoPullResponse? _response; + private readonly Exception? _throw; + + private FakeInvoker(ProtoPullResponse? response, Exception? toThrow) + { + _response = response; + _throw = toThrow; + } + + public static FakeInvoker Returning(ProtoPullResponse response) => new(response, null); + public static FakeInvoker Throwing(Exception ex) => new(null, ex); + + public Task InvokeAsync( + string endpoint, ProtoPullRequest request, CancellationToken ct) + { + CallCount++; + Endpoint = endpoint; + Request = request; + if (_throw is not null) + { + throw _throw; + } + return Task.FromResult(_response!); + } + } + + private static AuditEventDto Dto(Guid id, DateTime occurredAtUtc) => + AuditEventDtoMapper.ToDto(ScadaBridgeAuditEventFactory.Create( + eventId: id, + occurredAtUtc: occurredAtUtc, + channel: AuditChannel.ApiOutbound, + kind: AuditKind.ApiCall, + status: AuditStatus.Delivered, + sourceSiteId: "site-a")); + + [Fact] + public async Task PullAsync_dials_the_resolved_endpoint_and_maps_events_oldest_first() + { + var older = Guid.NewGuid(); + var newer = Guid.NewGuid(); + + // Wire is delivered newest-first on purpose to prove the client sorts. + var proto = new ProtoPullResponse { MoreAvailable = true }; + proto.Events.Add(Dto(newer, BaseTime.AddMinutes(5))); + proto.Events.Add(Dto(older, BaseTime)); + + var invoker = FakeInvoker.Returning(proto); + var sut = new GrpcPullAuditEventsClient( + new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), + invoker, + NullLogger.Instance); + + var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); + + // Endpoint resolution + request shaping. + Assert.Equal("http://site-a:8083", invoker.Endpoint); + Assert.NotNull(invoker.Request); + Assert.Equal(256, invoker.Request!.BatchSize); + Assert.Equal(BaseTime, invoker.Request.SinceUtc.ToDateTime()); + + // Mapping + ordering + MoreAvailable surface. + Assert.True(result.MoreAvailable); + Assert.Equal(2, result.Events.Count); + Assert.Equal(older, result.Events[0].EventId); + Assert.Equal(newer, result.Events[1].EventId); + } + + [Fact] + public async Task PullAsync_returns_empty_when_site_endpoint_is_unknown() + { + var invoker = FakeInvoker.Returning(new ProtoPullResponse()); + var sut = new GrpcPullAuditEventsClient( + new StaticEnumerator(), // no sites registered + invoker, + NullLogger.Instance); + + var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); + + Assert.Empty(result.Events); + Assert.False(result.MoreAvailable); + Assert.Equal(0, invoker.CallCount); // never dialled — nothing to dial + } + + [Theory] + [InlineData(StatusCode.Unavailable)] // connection refused / site offline + [InlineData(StatusCode.DeadlineExceeded)] // slow site / network blip + [InlineData(StatusCode.Cancelled)] + public async Task PullAsync_swallows_tolerable_transport_faults_to_empty_response(StatusCode code) + { + var invoker = FakeInvoker.Throwing(new RpcException(new Status(code, "transport fault"))); + var sut = new GrpcPullAuditEventsClient( + new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), + invoker, + NullLogger.Instance); + + // MUST NOT throw — per the IPullAuditEventsClient contract. + var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); + + Assert.Empty(result.Events); + Assert.False(result.MoreAvailable); + } + + [Fact] + public async Task PullAsync_swallows_connection_layer_faults_to_empty_response() + { + // A bare HttpRequestException (e.g. DNS / refused socket before a gRPC + // status is established) is also tolerable. + var invoker = FakeInvoker.Throwing(new HttpRequestException("connection refused")); + var sut = new GrpcPullAuditEventsClient( + new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), + invoker, + NullLogger.Instance); + + var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); + + Assert.Empty(result.Events); + Assert.False(result.MoreAvailable); + } +}