using System.Collections.Concurrent;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.Communication;
using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
using ProtoPullRequest = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest;
using ProtoPullResponse = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse;
using PullAuditEventsResponse = ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration.PullAuditEventsResponse;
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central;
///
/// Production (Audit Log #23, M6) that the
/// central uses to pull the next
/// reconciliation batch from a site over the PullAuditEvents unary gRPC
/// RPC served by SiteStreamGrpcServer.
///
///
///
/// Endpoint resolution. The actor passes only a siteId; this
/// client resolves it to a gRPC authority via
/// () on every call so a NodeA→NodeB
/// failover flip or an edited site address takes effect on the next tick — the
/// same liveness guarantee SiteStreamGrpcClientFactory gives the
/// real-time stream. A site with no registered endpoint yields an empty
/// response (no dial); reconciliation simply has nothing to pull from it.
///
///
/// Fault tolerance. Per the
/// contract, tolerable transport faults (connection refused / site offline =
/// , slow site = ,
/// shutdown = , plus bare
/// / SocketException before a gRPC
/// status is established) are caught and collapsed to an empty response — one
/// offline site must never sink the rest of the reconciliation tick. Any other
/// fault (e.g. a malformed reply that fails DTO mapping) is also swallowed to
/// empty: audit reconciliation is best-effort and a throw would only get
/// re-caught by the actor's own per-site guard.
///
///
/// Testability. The unary call is reached through the
/// seam. Production binds
/// (one cached
/// per endpoint, keepalive from ); unit tests
/// inject a fake invoker so no real HTTP/2 endpoint is required.
///
///
public sealed class GrpcPullAuditEventsClient : IPullAuditEventsClient
{
private readonly ISiteEnumerator _sites;
private readonly IPullAuditEventsInvoker _invoker;
private readonly ILogger _logger;
///
/// Creates the client over the given site enumerator and unary-call invoker.
///
/// Resolves a siteId to its gRPC endpoint.
/// Seam that issues the PullAuditEvents unary RPC against a resolved endpoint.
/// Logger for transport-fault diagnostics.
public GrpcPullAuditEventsClient(
ISiteEnumerator sites,
IPullAuditEventsInvoker invoker,
ILogger logger)
{
_sites = sites ?? throw new ArgumentNullException(nameof(sites));
_invoker = invoker ?? throw new ArgumentNullException(nameof(invoker));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
///
public async Task PullAsync(
string siteId,
DateTime sinceUtc,
int batchSize,
CancellationToken ct)
{
var endpoint = await ResolveEndpointAsync(siteId, ct).ConfigureAwait(false);
if (endpoint is null)
{
// No gRPC address registered for the site — absence of an address is
// a configuration decision (mirrors ISiteEnumerator's own contract),
// not a runtime error, so there is simply nothing to pull.
_logger.LogDebug(
"PullAuditEvents skipped: no gRPC endpoint registered for site {SiteId}.", siteId);
return Empty;
}
var request = new ProtoPullRequest
{
// ReadPendingSinceAsync treats DateTime.MinValue as "from the start";
// EnsureUtc keeps Timestamp.FromDateTime happy (it requires UTC kind).
SinceUtc = Timestamp.FromDateTime(EnsureUtc(sinceUtc)),
BatchSize = batchSize,
};
ProtoPullResponse reply;
try
{
reply = await _invoker.InvokeAsync(endpoint, request, ct).ConfigureAwait(false);
}
catch (RpcException ex) when (IsTolerable(ex.StatusCode))
{
_logger.LogDebug(ex,
"PullAuditEvents tolerable transport fault for site {SiteId} ({Endpoint}): {Status}. Returning empty batch.",
siteId, endpoint, ex.StatusCode);
return Empty;
}
catch (Exception ex) when (ex is HttpRequestException or System.Net.Sockets.SocketException)
{
_logger.LogDebug(ex,
"PullAuditEvents connection-layer fault for site {SiteId} ({Endpoint}). Returning empty batch.",
siteId, endpoint);
return Empty;
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Reconciliation tick was cancelled (host shutdown / scope dispose).
return Empty;
}
catch (Exception ex)
{
// Any other fault (e.g. a malformed reply that fails DTO mapping
// below would actually surface here only if mapping moved inline,
// but a non-RpcException transport fault wrapper lands here too).
// Audit reconciliation is best-effort; swallow to empty rather than
// throw — the actor's per-site guard would only re-catch it.
_logger.LogWarning(ex,
"PullAuditEvents unexpected fault for site {SiteId} ({Endpoint}). Returning empty batch.",
siteId, endpoint);
return Empty;
}
// Map proto DTOs to canonical AuditEvent records and order oldest-first
// (the wire is already ordered by the site queue, but the
// IPullAuditEventsClient contract is explicit, so sort defensively).
var events = reply.Events
.Select(AuditEventDtoMapper.FromDto)
.OrderBy(e => e.OccurredAtUtc)
.ToList();
return new PullAuditEventsResponse(events, reply.MoreAvailable);
}
private async Task ResolveEndpointAsync(string siteId, CancellationToken ct)
{
var sites = await _sites.EnumerateAsync(ct).ConfigureAwait(false);
foreach (var site in sites)
{
if (string.Equals(site.SiteId, siteId, StringComparison.Ordinal) &&
!string.IsNullOrWhiteSpace(site.GrpcEndpoint))
{
return site.GrpcEndpoint;
}
}
return null;
}
private static readonly PullAuditEventsResponse Empty =
new(Array.Empty(), MoreAvailable: false);
private static bool IsTolerable(StatusCode code) => code is
StatusCode.Unavailable or
StatusCode.DeadlineExceeded or
StatusCode.Cancelled;
private static DateTime EnsureUtc(DateTime value) =>
value.Kind == DateTimeKind.Utc
? value
: DateTime.SpecifyKind(value.ToUniversalTime(), DateTimeKind.Utc);
///
/// Seam over the PullAuditEvents unary gRPC call against a resolved
/// site endpoint. Extracted so can
/// be unit-tested without a real . Production binds
/// .
///
public interface IPullAuditEventsInvoker
{
///
/// Issues the PullAuditEvents unary RPC against .
/// May throw /
/// on transport faults — the caller classifies and swallows tolerable ones.
///
/// The site gRPC authority (e.g. http://site-a:8083).
/// The wire-format pull request.
/// Cancellation token.
/// The wire-format pull response.
Task InvokeAsync(string endpoint, ProtoPullRequest request, CancellationToken ct);
}
}
///
/// Production :
/// caches one per endpoint (keepalive from
/// , mirroring SiteStreamGrpcClient)
/// and issues the unary PullAuditEventsAsync call. The cache flushes a
/// stale channel when an endpoint is re-keyed (NodeA→NodeB failover / address
/// edit), the same liveness guarantee SiteStreamGrpcClientFactory gives
/// the streaming client.
///
public sealed class GrpcPullAuditEventsInvoker
: GrpcPullAuditEventsClient.IPullAuditEventsInvoker, IDisposable
{
private readonly ConcurrentDictionary _channels = new(StringComparer.Ordinal);
private readonly CommunicationOptions _options;
///
/// Creates the invoker using default .
///
public GrpcPullAuditEventsInvoker()
: this(new CommunicationOptions())
{
}
///
/// Creates the invoker, applying the configured gRPC keepalive settings to
/// every channel it opens.
///
/// Communication options supplying gRPC keepalive timings.
public GrpcPullAuditEventsInvoker(CommunicationOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}
///
public async Task InvokeAsync(
string endpoint, ProtoPullRequest request, CancellationToken ct)
{
var channel = _channels.GetOrAdd(endpoint, CreateChannel);
var client = new SiteStreamService.SiteStreamServiceClient(channel);
using var call = client.PullAuditEventsAsync(request, cancellationToken: ct);
return await call.ResponseAsync.ConfigureAwait(false);
}
private GrpcChannel CreateChannel(string endpoint) =>
GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions
{
HttpHandler = new SocketsHttpHandler
{
KeepAlivePingDelay = _options.GrpcKeepAlivePingDelay,
KeepAlivePingTimeout = _options.GrpcKeepAlivePingTimeout,
KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always,
},
});
/// Disposes all cached channels.
public void Dispose()
{
foreach (var channel in _channels.Values)
{
channel.Dispose();
}
_channels.Clear();
}
}