feat(sitecallaudit): PullSiteCalls reconciliation plumbing (store read + RPC + site handler + central client)

Site Call Audit (#22): build the documented periodic reconciliation PULL
self-heal path for the eventually-consistent central SiteCalls mirror, as a
dedicated PullSiteCalls gRPC RPC kept separate from the audit pull. This is the
pull PLUMBING only; the central reconciliation tick is a separate follow-up.

- IOperationTrackingStore.ReadChangedSinceAsync(sinceUtc, batchSize): inclusive
  UpdatedAtUtc cursor, oldest-first, batch-capped; SQLite impl projects tracking
  rows onto SiteCallOperational (Kind->Channel, TargetSummary->Target, SourceSite
  left empty - the store has no site-id column).
- sitestream.proto: rpc PullSiteCalls + PullSiteCallsRequest/Response, mirroring
  PullAuditEvents; regenerated checked-in SiteStreamGrpc/*.cs.
- SiteCallDtoMapper.ToDto(SiteCallOperational): inverse of FromDto for the handler.
- SiteStreamGrpcServer.PullSiteCalls handler + SetOperationTrackingStore seam;
  Host wires the seam alongside SetSiteAuditQueue (site roles only).
- Central IPullSiteCallsClient + GrpcPullSiteCallsClient (home: AuditLog/Central to
  reuse ISiteEnumerator; SiteCallAudit does not reference AuditLog). Re-stamps
  SourceSite from the dialed siteId; no-throw on tolerable transport faults;
  SpecifyKind (not ToUniversalTime) cursor handling. Central-only DI registration.

Tests: ReadChangedSinceAsync (4), PullSiteCalls handler (6), GrpcPullSiteCallsClient
(8). Full solution build 0 warnings/0 errors (TreatWarningsAsErrors).
This commit is contained in:
Joseph Doherty
2026-06-15 10:39:06 -04:00
parent c092e89fd1
commit 963e3427da
15 changed files with 1751 additions and 23 deletions
@@ -0,0 +1,287 @@
using System.Collections.Concurrent;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
using ZB.MOM.WW.ScadaBridge.Communication;
using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
using ProtoPullRequest = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest;
using ProtoPullResponse = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsResponse;
using PullSiteCallsResponse = ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration.PullSiteCallsResponse;
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central;
/// <summary>
/// Production <see cref="IPullSiteCallsClient"/> (Site Call Audit #22) that the
/// central reconciliation tick (a separate follow-up component) uses to pull the
/// next batch of cached-call operational rows from a site over the
/// <c>PullSiteCalls</c> unary gRPC RPC served by <c>SiteStreamGrpcServer</c>.
/// A near-exact sibling of <see cref="GrpcPullAuditEventsClient"/>.
/// </summary>
/// <remarks>
/// <para>
/// <b>Endpoint resolution.</b> The caller passes only a <c>siteId</c>; this
/// client resolves it to a gRPC authority via <see cref="ISiteEnumerator"/>
/// (<see cref="SiteEntry.GrpcEndpoint"/>) on every call so a NodeA→NodeB
/// failover flip or an edited site address takes effect on the next tick. A site
/// with no registered endpoint yields an empty response (no dial).
/// </para>
/// <para>
/// <b>SourceSite re-stamp.</b> The site leaves
/// <c>SiteCallOperationalDto.SourceSite</c> empty (the tracking store has no
/// site-id column). This client is the authority that knows which site it
/// dialed, so it re-stamps the mapped <see cref="SiteCall.SourceSite"/> from
/// <c>siteId</c> — the same "re-stamp from the forwarder's own id" pattern the
/// site push path uses.
/// </para>
/// <para>
/// <b>Fault tolerance.</b> Per the <see cref="IPullSiteCallsClient"/> contract,
/// tolerable transport faults (<see cref="StatusCode.Unavailable"/>,
/// <see cref="StatusCode.DeadlineExceeded"/>, <see cref="StatusCode.Cancelled"/>,
/// bare <see cref="HttpRequestException"/> / <c>SocketException</c>) are caught
/// and collapsed to an empty response so one offline site never sinks the rest
/// of the reconciliation tick. Any other fault (e.g. a malformed reply that
/// fails DTO mapping) is also swallowed to empty: reconciliation is best-effort.
/// </para>
/// <para>
/// <b>Testability.</b> The unary call is reached through the
/// <see cref="IPullSiteCallsInvoker"/> seam. Production binds
/// <see cref="GrpcPullSiteCallsInvoker"/> (one cached <see cref="GrpcChannel"/>
/// per endpoint, keepalive from <see cref="CommunicationOptions"/>); unit tests
/// inject a fake invoker so no real HTTP/2 endpoint is required.
/// </para>
/// </remarks>
public sealed class GrpcPullSiteCallsClient : IPullSiteCallsClient
{
private readonly ISiteEnumerator _sites;
private readonly IPullSiteCallsInvoker _invoker;
private readonly ILogger<GrpcPullSiteCallsClient> _logger;
/// <summary>
/// Creates the client over the given site enumerator and unary-call invoker.
/// </summary>
/// <param name="sites">Resolves a <c>siteId</c> to its gRPC endpoint.</param>
/// <param name="invoker">Seam that issues the <c>PullSiteCalls</c> unary RPC against a resolved endpoint.</param>
/// <param name="logger">Logger for transport-fault diagnostics.</param>
public GrpcPullSiteCallsClient(
ISiteEnumerator sites,
IPullSiteCallsInvoker invoker,
ILogger<GrpcPullSiteCallsClient> logger)
{
_sites = sites ?? throw new ArgumentNullException(nameof(sites));
_invoker = invoker ?? throw new ArgumentNullException(nameof(invoker));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc />
public async Task<PullSiteCallsResponse> PullAsync(
string siteId,
DateTime sinceUtc,
int batchSize,
CancellationToken ct)
{
var endpoint = await ResolveEndpointAsync(siteId, ct).ConfigureAwait(false);
if (endpoint is null)
{
// No gRPC address registered for the site — a configuration decision
// (mirrors ISiteEnumerator's own contract), not a runtime error, so
// there is simply nothing to pull.
_logger.LogDebug(
"PullSiteCalls skipped: no gRPC endpoint registered for site {SiteId}.", siteId);
return Empty;
}
var request = new ProtoPullRequest
{
// ReadChangedSinceAsync treats DateTime.MinValue as "from the start";
// EnsureUtc keeps Timestamp.FromDateTime happy (it requires UTC kind).
SinceUtc = Timestamp.FromDateTime(EnsureUtc(sinceUtc)),
BatchSize = batchSize,
};
ProtoPullResponse reply;
try
{
reply = await _invoker.InvokeAsync(endpoint, request, ct).ConfigureAwait(false);
}
catch (RpcException ex) when (IsTolerable(ex.StatusCode))
{
_logger.LogDebug(ex,
"PullSiteCalls tolerable transport fault for site {SiteId} ({Endpoint}): {Status}. Returning empty batch.",
siteId, endpoint, ex.StatusCode);
return Empty;
}
catch (Exception ex) when (ex is HttpRequestException or System.Net.Sockets.SocketException)
{
_logger.LogDebug(ex,
"PullSiteCalls connection-layer fault for site {SiteId} ({Endpoint}). Returning empty batch.",
siteId, endpoint);
return Empty;
}
catch (OperationCanceledException)
{
// Reconciliation tick cancelled — caller token (host shutdown) or an
// internal gRPC deadline / linked-CTS cancellation. Both tolerable for
// a best-effort pull; collapse to empty rather than landing noisily in
// the catch-all below.
return Empty;
}
catch (Exception ex)
{
// Any other fault. Reconciliation is best-effort; swallow to empty
// rather than throw — the (future) actor's per-site guard would only
// re-catch it.
_logger.LogWarning(ex,
"PullSiteCalls unexpected fault for site {SiteId} ({Endpoint}). Returning empty batch.",
siteId, endpoint);
return Empty;
}
// Map proto DTOs to central SiteCall entities, re-stamp SourceSite from
// the dialed siteId (the site leaves it empty), and order oldest-first by
// UpdatedAtUtc (the wire is already ordered by the site read, but the
// contract is explicit, so sort defensively).
var siteCalls = reply.Operationals
.Select(SiteCallDtoMapper.FromDto)
.Select(sc => sc with { SourceSite = siteId })
.OrderBy(sc => sc.UpdatedAtUtc)
.ToList();
return new PullSiteCallsResponse(siteCalls, reply.MoreAvailable);
}
private async Task<string?> ResolveEndpointAsync(string siteId, CancellationToken ct)
{
var sites = await _sites.EnumerateAsync(ct).ConfigureAwait(false);
foreach (var site in sites)
{
if (string.Equals(site.SiteId, siteId, StringComparison.Ordinal) &&
!string.IsNullOrWhiteSpace(site.GrpcEndpoint))
{
return site.GrpcEndpoint;
}
}
return null;
}
private static readonly PullSiteCallsResponse Empty =
new(Array.Empty<SiteCall>(), MoreAvailable: false);
private static bool IsTolerable(StatusCode code) => code is
StatusCode.Unavailable or
StatusCode.DeadlineExceeded or
StatusCode.Cancelled;
// All ScadaBridge timestamps are UTC by invariant. A non-UTC cursor (the
// reconciliation cursor starts at DateTime.MinValue, Kind=Unspecified) is
// treated AS UTC — never ToUniversalTime()-converted: on a host with a
// positive UTC offset MinValue.ToUniversalTime() underflows and
// Timestamp.FromDateTime throws, crashing the first pull for every site.
private static DateTime EnsureUtc(DateTime value) =>
value.Kind == DateTimeKind.Utc ? value : DateTime.SpecifyKind(value, DateTimeKind.Utc);
/// <summary>
/// Seam over the <c>PullSiteCalls</c> unary gRPC call against a resolved site
/// endpoint. Extracted so <see cref="GrpcPullSiteCallsClient"/> can be
/// unit-tested without a real <see cref="GrpcChannel"/>. Production binds
/// <see cref="GrpcPullSiteCallsInvoker"/>.
/// </summary>
public interface IPullSiteCallsInvoker
{
/// <summary>
/// Issues the <c>PullSiteCalls</c> unary RPC against <paramref name="endpoint"/>.
/// May throw <see cref="RpcException"/> / <see cref="HttpRequestException"/>
/// on transport faults — the caller classifies and swallows tolerable ones.
/// </summary>
/// <param name="endpoint">The site gRPC authority (e.g. <c>http://site-a:8083</c>).</param>
/// <param name="request">The wire-format pull request.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>The wire-format pull response.</returns>
Task<ProtoPullResponse> InvokeAsync(string endpoint, ProtoPullRequest request, CancellationToken ct);
}
}
/// <summary>
/// Production <see cref="GrpcPullSiteCallsClient.IPullSiteCallsInvoker"/>: caches
/// one <see cref="GrpcChannel"/> per endpoint (keepalive from
/// <see cref="CommunicationOptions"/>, mirroring <c>SiteStreamGrpcClient</c>) and
/// issues the unary <c>PullSiteCallsAsync</c> call. The cache is keyed by
/// endpoint string, so a changed site address (NodeA→NodeB failover flip / an
/// edited gRPC address) is reached as soon as the resolver hands the new endpoint
/// to <see cref="InvokeAsync"/>. The channel for a previous address lingers idle
/// until <see cref="Dispose"/> (idle channels hold no streams — a minor cache
/// footprint cost, not a correctness or liveness gap). Sibling of
/// <see cref="GrpcPullAuditEventsInvoker"/>.
/// </summary>
public sealed class GrpcPullSiteCallsInvoker
: GrpcPullSiteCallsClient.IPullSiteCallsInvoker, IDisposable
{
private readonly ConcurrentDictionary<string, GrpcChannel> _channels = new(StringComparer.Ordinal);
private readonly CommunicationOptions _options;
/// <summary>Creates the invoker using default <see cref="CommunicationOptions"/>.</summary>
public GrpcPullSiteCallsInvoker()
: this(new CommunicationOptions())
{
}
/// <summary>
/// Creates the invoker, applying the configured gRPC keepalive settings to
/// every channel it opens.
/// </summary>
/// <param name="options">Communication options supplying gRPC keepalive timings.</param>
public GrpcPullSiteCallsInvoker(CommunicationOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}
/// <inheritdoc />
public async Task<ProtoPullResponse> InvokeAsync(
string endpoint, ProtoPullRequest request, CancellationToken ct)
{
var channel = GetOrCreateChannel(endpoint);
var client = new SiteStreamService.SiteStreamServiceClient(channel);
using var call = client.PullSiteCallsAsync(request, cancellationToken: ct);
return await call.ResponseAsync.ConfigureAwait(false);
}
// Race-safe channel cache (create-then-GetOrAdd-then-dispose-if-lost): two
// concurrent first dials of the same endpoint can both build a GrpcChannel;
// only the channel actually installed survives, the loser is disposed.
// Mirrors SiteStreamGrpcClientFactory / GrpcPullAuditEventsInvoker.
private GrpcChannel GetOrCreateChannel(string endpoint)
{
if (!_channels.TryGetValue(endpoint, out var channel))
{
var created = CreateChannel(endpoint);
channel = _channels.GetOrAdd(endpoint, created);
if (!ReferenceEquals(channel, created))
{
created.Dispose();
}
}
return channel;
}
private GrpcChannel CreateChannel(string endpoint) =>
GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions
{
HttpHandler = new SocketsHttpHandler
{
KeepAlivePingDelay = _options.GrpcKeepAlivePingDelay,
KeepAlivePingTimeout = _options.GrpcKeepAlivePingTimeout,
KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always,
},
});
/// <summary>Disposes all cached channels.</summary>
public void Dispose()
{
foreach (var channel in _channels.Values)
{
channel.Dispose();
}
_channels.Clear();
}
}
@@ -0,0 +1,57 @@
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration;
namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central;
/// <summary>
/// Mockable abstraction over the central-side <c>PullSiteCalls</c> gRPC client
/// surface used by the Site Call Audit (#22) reconciliation tick to fetch the
/// next batch of cached-call operational rows from a specific site — the
/// documented periodic self-heal pull that backfills the eventually-consistent
/// central <c>SiteCalls</c> mirror when best-effort push telemetry is lost.
/// Extracted so the (separate, follow-up) reconciliation actor can be
/// unit-tested against an in-memory stub without standing up a real
/// <c>GrpcChannel</c> per site.
/// </summary>
/// <remarks>
/// <para>
/// The home is <c>ZB.MOM.WW.ScadaBridge.AuditLog.Central</c> rather than the
/// <c>ZB.MOM.WW.ScadaBridge.SiteCallAudit</c> project so it can reuse the
/// <see cref="ISiteEnumerator"/> / <see cref="SiteEntry"/> endpoint-resolution
/// abstraction that already lives here (and that the sibling
/// <see cref="IPullAuditEventsClient"/> uses) — SiteCallAudit does not reference
/// AuditLog, so hosting the client there would mean duplicating the enumerator.
/// This mirrors the decision to keep <see cref="SiteCallDtoMapper"/> in
/// <c>ZB.MOM.WW.ScadaBridge.Communication</c>.
/// </para>
/// <para>
/// Implementations MUST NOT throw on transport faults the reconciliation tick
/// can tolerate (connection refused, deadline exceeded, cancellation) — one
/// offline site must never sink the rest of the tick. The
/// <see cref="PullSiteCallsResponse.SiteCalls"/> are returned oldest-first by
/// <c>UpdatedAtUtc</c> with the <c>SourceSite</c> re-stamped from the dialed
/// site id (the site leaves it empty, being unaware of its own id), and a
/// <c>MoreAvailable</c> flag the caller uses to decide whether to fire another
/// pull immediately.
/// </para>
/// </remarks>
public interface IPullSiteCallsClient
{
/// <summary>
/// Issues a <c>PullSiteCalls</c> RPC against the site whose gRPC endpoint is
/// registered against <paramref name="siteId"/>. Returns the next batch of
/// <see cref="ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit.SiteCall"/> rows
/// ordered oldest-first (with <c>SourceSite</c> re-stamped from
/// <paramref name="siteId"/>) AND a <c>MoreAvailable</c> flag the caller uses
/// to decide whether to fire another pull immediately.
/// </summary>
/// <param name="siteId">The identifier of the site to pull cached-call operational rows from.</param>
/// <param name="sinceUtc">Only rows with an <c>UpdatedAtUtc</c> at or after this cursor time are returned.</param>
/// <param name="batchSize">Maximum number of rows to return per call.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>A task that resolves to the next reconciliation batch with a <c>MoreAvailable</c> flag.</returns>
Task<PullSiteCallsResponse> PullAsync(
string siteId,
DateTime sinceUtc,
int batchSize,
CancellationToken ct);
}
@@ -473,6 +473,31 @@ public static class ServiceCollectionExtensions
sp.GetRequiredService<GrpcPullAuditEventsClient.IPullAuditEventsInvoker>(),
sp.GetRequiredService<ILogger<GrpcPullAuditEventsClient>>()));
// Site Call Audit (#22) reconciliation pull client — central-only, the
// sibling of the audit pull client above. Lives here (not in the
// SiteCallAudit project) so it can reuse the central-only
// ISiteEnumerator registered just above; SiteCallAudit does not
// reference AuditLog. The invoker owns the per-endpoint GrpcChannel
// cache, so it must be a singleton (a fresh invoker per resolution
// would leak channels). CommunicationOptions flow through when bound by
// the central Host, else defaults — mirrors the audit invoker.
services.TryAddSingleton<GrpcPullSiteCallsInvoker>(sp =>
{
var options = sp
.GetService<Microsoft.Extensions.Options.IOptions<
ZB.MOM.WW.ScadaBridge.Communication.CommunicationOptions>>();
return options is null
? new GrpcPullSiteCallsInvoker()
: new GrpcPullSiteCallsInvoker(options.Value);
});
services.TryAddSingleton<GrpcPullSiteCallsClient.IPullSiteCallsInvoker>(
sp => sp.GetRequiredService<GrpcPullSiteCallsInvoker>());
services.TryAddSingleton<IPullSiteCallsClient>(sp => new GrpcPullSiteCallsClient(
sp.GetRequiredService<ISiteEnumerator>(),
sp.GetRequiredService<GrpcPullSiteCallsClient.IPullSiteCallsInvoker>(),
sp.GetRequiredService<ILogger<GrpcPullSiteCallsClient>>()));
return services;
}
}