diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullSiteCallsClient.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullSiteCallsClient.cs new file mode 100644 index 00000000..b483b1a5 --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullSiteCallsClient.cs @@ -0,0 +1,287 @@ +using System.Collections.Concurrent; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using Grpc.Net.Client; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit; +using ZB.MOM.WW.ScadaBridge.Communication; +using ZB.MOM.WW.ScadaBridge.Communication.Grpc; +using ProtoPullRequest = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest; +using ProtoPullResponse = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsResponse; +using PullSiteCallsResponse = ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration.PullSiteCallsResponse; + +namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central; + +/// +/// Production (Site Call Audit #22) that the +/// central reconciliation tick (a separate follow-up component) uses to pull the +/// next batch of cached-call operational rows from a site over the +/// PullSiteCalls unary gRPC RPC served by SiteStreamGrpcServer. +/// A near-exact sibling of . +/// +/// +/// +/// Endpoint resolution. The caller passes only a siteId; this +/// client resolves it to a gRPC authority via +/// () on every call so a NodeA→NodeB +/// failover flip or an edited site address takes effect on the next tick. A site +/// with no registered endpoint yields an empty response (no dial). +/// +/// +/// SourceSite re-stamp. The site leaves +/// SiteCallOperationalDto.SourceSite empty (the tracking store has no +/// site-id column). This client is the authority that knows which site it +/// dialed, so it re-stamps the mapped from +/// siteId — the same "re-stamp from the forwarder's own id" pattern the +/// site push path uses. +/// +/// +/// Fault tolerance. Per the contract, +/// tolerable transport faults (, +/// , , +/// bare / SocketException) are caught +/// and collapsed to an empty response so one offline site never sinks the rest +/// of the reconciliation tick. Any other fault (e.g. a malformed reply that +/// fails DTO mapping) is also swallowed to empty: reconciliation is best-effort. +/// +/// +/// Testability. The unary call is reached through the +/// seam. Production binds +/// (one cached +/// per endpoint, keepalive from ); unit tests +/// inject a fake invoker so no real HTTP/2 endpoint is required. +/// +/// +public sealed class GrpcPullSiteCallsClient : IPullSiteCallsClient +{ + private readonly ISiteEnumerator _sites; + private readonly IPullSiteCallsInvoker _invoker; + private readonly ILogger _logger; + + /// + /// Creates the client over the given site enumerator and unary-call invoker. + /// + /// Resolves a siteId to its gRPC endpoint. + /// Seam that issues the PullSiteCalls unary RPC against a resolved endpoint. + /// Logger for transport-fault diagnostics. + public GrpcPullSiteCallsClient( + ISiteEnumerator sites, + IPullSiteCallsInvoker invoker, + ILogger logger) + { + _sites = sites ?? throw new ArgumentNullException(nameof(sites)); + _invoker = invoker ?? throw new ArgumentNullException(nameof(invoker)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + /// + public async Task PullAsync( + string siteId, + DateTime sinceUtc, + int batchSize, + CancellationToken ct) + { + var endpoint = await ResolveEndpointAsync(siteId, ct).ConfigureAwait(false); + if (endpoint is null) + { + // No gRPC address registered for the site — a configuration decision + // (mirrors ISiteEnumerator's own contract), not a runtime error, so + // there is simply nothing to pull. + _logger.LogDebug( + "PullSiteCalls skipped: no gRPC endpoint registered for site {SiteId}.", siteId); + return Empty; + } + + var request = new ProtoPullRequest + { + // ReadChangedSinceAsync treats DateTime.MinValue as "from the start"; + // EnsureUtc keeps Timestamp.FromDateTime happy (it requires UTC kind). + SinceUtc = Timestamp.FromDateTime(EnsureUtc(sinceUtc)), + BatchSize = batchSize, + }; + + ProtoPullResponse reply; + try + { + reply = await _invoker.InvokeAsync(endpoint, request, ct).ConfigureAwait(false); + } + catch (RpcException ex) when (IsTolerable(ex.StatusCode)) + { + _logger.LogDebug(ex, + "PullSiteCalls tolerable transport fault for site {SiteId} ({Endpoint}): {Status}. Returning empty batch.", + siteId, endpoint, ex.StatusCode); + return Empty; + } + catch (Exception ex) when (ex is HttpRequestException or System.Net.Sockets.SocketException) + { + _logger.LogDebug(ex, + "PullSiteCalls connection-layer fault for site {SiteId} ({Endpoint}). Returning empty batch.", + siteId, endpoint); + return Empty; + } + catch (OperationCanceledException) + { + // Reconciliation tick cancelled — caller token (host shutdown) or an + // internal gRPC deadline / linked-CTS cancellation. Both tolerable for + // a best-effort pull; collapse to empty rather than landing noisily in + // the catch-all below. + return Empty; + } + catch (Exception ex) + { + // Any other fault. Reconciliation is best-effort; swallow to empty + // rather than throw — the (future) actor's per-site guard would only + // re-catch it. + _logger.LogWarning(ex, + "PullSiteCalls unexpected fault for site {SiteId} ({Endpoint}). Returning empty batch.", + siteId, endpoint); + return Empty; + } + + // Map proto DTOs to central SiteCall entities, re-stamp SourceSite from + // the dialed siteId (the site leaves it empty), and order oldest-first by + // UpdatedAtUtc (the wire is already ordered by the site read, but the + // contract is explicit, so sort defensively). + var siteCalls = reply.Operationals + .Select(SiteCallDtoMapper.FromDto) + .Select(sc => sc with { SourceSite = siteId }) + .OrderBy(sc => sc.UpdatedAtUtc) + .ToList(); + + return new PullSiteCallsResponse(siteCalls, reply.MoreAvailable); + } + + private async Task ResolveEndpointAsync(string siteId, CancellationToken ct) + { + var sites = await _sites.EnumerateAsync(ct).ConfigureAwait(false); + foreach (var site in sites) + { + if (string.Equals(site.SiteId, siteId, StringComparison.Ordinal) && + !string.IsNullOrWhiteSpace(site.GrpcEndpoint)) + { + return site.GrpcEndpoint; + } + } + return null; + } + + private static readonly PullSiteCallsResponse Empty = + new(Array.Empty(), MoreAvailable: false); + + private static bool IsTolerable(StatusCode code) => code is + StatusCode.Unavailable or + StatusCode.DeadlineExceeded or + StatusCode.Cancelled; + + // All ScadaBridge timestamps are UTC by invariant. A non-UTC cursor (the + // reconciliation cursor starts at DateTime.MinValue, Kind=Unspecified) is + // treated AS UTC — never ToUniversalTime()-converted: on a host with a + // positive UTC offset MinValue.ToUniversalTime() underflows and + // Timestamp.FromDateTime throws, crashing the first pull for every site. + private static DateTime EnsureUtc(DateTime value) => + value.Kind == DateTimeKind.Utc ? value : DateTime.SpecifyKind(value, DateTimeKind.Utc); + + /// + /// Seam over the PullSiteCalls unary gRPC call against a resolved site + /// endpoint. Extracted so can be + /// unit-tested without a real . Production binds + /// . + /// + public interface IPullSiteCallsInvoker + { + /// + /// Issues the PullSiteCalls unary RPC against . + /// May throw / + /// on transport faults — the caller classifies and swallows tolerable ones. + /// + /// The site gRPC authority (e.g. http://site-a:8083). + /// The wire-format pull request. + /// Cancellation token. + /// The wire-format pull response. + Task InvokeAsync(string endpoint, ProtoPullRequest request, CancellationToken ct); + } +} + +/// +/// Production : caches +/// one per endpoint (keepalive from +/// , mirroring SiteStreamGrpcClient) and +/// issues the unary PullSiteCallsAsync call. The cache is keyed by +/// endpoint string, so a changed site address (NodeA→NodeB failover flip / an +/// edited gRPC address) is reached as soon as the resolver hands the new endpoint +/// to . The channel for a previous address lingers idle +/// until (idle channels hold no streams — a minor cache +/// footprint cost, not a correctness or liveness gap). Sibling of +/// . +/// +public sealed class GrpcPullSiteCallsInvoker + : GrpcPullSiteCallsClient.IPullSiteCallsInvoker, IDisposable +{ + private readonly ConcurrentDictionary _channels = new(StringComparer.Ordinal); + private readonly CommunicationOptions _options; + + /// Creates the invoker using default . + public GrpcPullSiteCallsInvoker() + : this(new CommunicationOptions()) + { + } + + /// + /// Creates the invoker, applying the configured gRPC keepalive settings to + /// every channel it opens. + /// + /// Communication options supplying gRPC keepalive timings. + public GrpcPullSiteCallsInvoker(CommunicationOptions options) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + } + + /// + public async Task InvokeAsync( + string endpoint, ProtoPullRequest request, CancellationToken ct) + { + var channel = GetOrCreateChannel(endpoint); + var client = new SiteStreamService.SiteStreamServiceClient(channel); + using var call = client.PullSiteCallsAsync(request, cancellationToken: ct); + return await call.ResponseAsync.ConfigureAwait(false); + } + + // Race-safe channel cache (create-then-GetOrAdd-then-dispose-if-lost): two + // concurrent first dials of the same endpoint can both build a GrpcChannel; + // only the channel actually installed survives, the loser is disposed. + // Mirrors SiteStreamGrpcClientFactory / GrpcPullAuditEventsInvoker. + private GrpcChannel GetOrCreateChannel(string endpoint) + { + if (!_channels.TryGetValue(endpoint, out var channel)) + { + var created = CreateChannel(endpoint); + channel = _channels.GetOrAdd(endpoint, created); + if (!ReferenceEquals(channel, created)) + { + created.Dispose(); + } + } + return channel; + } + + private GrpcChannel CreateChannel(string endpoint) => + GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions + { + HttpHandler = new SocketsHttpHandler + { + KeepAlivePingDelay = _options.GrpcKeepAlivePingDelay, + KeepAlivePingTimeout = _options.GrpcKeepAlivePingTimeout, + KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always, + }, + }); + + /// Disposes all cached channels. + public void Dispose() + { + foreach (var channel in _channels.Values) + { + channel.Dispose(); + } + _channels.Clear(); + } +} diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/IPullSiteCallsClient.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/IPullSiteCallsClient.cs new file mode 100644 index 00000000..c22d5706 --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/IPullSiteCallsClient.cs @@ -0,0 +1,57 @@ +using ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration; + +namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central; + +/// +/// Mockable abstraction over the central-side PullSiteCalls gRPC client +/// surface used by the Site Call Audit (#22) reconciliation tick to fetch the +/// next batch of cached-call operational rows from a specific site — the +/// documented periodic self-heal pull that backfills the eventually-consistent +/// central SiteCalls mirror when best-effort push telemetry is lost. +/// Extracted so the (separate, follow-up) reconciliation actor can be +/// unit-tested against an in-memory stub without standing up a real +/// GrpcChannel per site. +/// +/// +/// +/// The home is ZB.MOM.WW.ScadaBridge.AuditLog.Central rather than the +/// ZB.MOM.WW.ScadaBridge.SiteCallAudit project so it can reuse the +/// / endpoint-resolution +/// abstraction that already lives here (and that the sibling +/// uses) — SiteCallAudit does not reference +/// AuditLog, so hosting the client there would mean duplicating the enumerator. +/// This mirrors the decision to keep in +/// ZB.MOM.WW.ScadaBridge.Communication. +/// +/// +/// Implementations MUST NOT throw on transport faults the reconciliation tick +/// can tolerate (connection refused, deadline exceeded, cancellation) — one +/// offline site must never sink the rest of the tick. The +/// are returned oldest-first by +/// UpdatedAtUtc with the SourceSite re-stamped from the dialed +/// site id (the site leaves it empty, being unaware of its own id), and a +/// MoreAvailable flag the caller uses to decide whether to fire another +/// pull immediately. +/// +/// +public interface IPullSiteCallsClient +{ + /// + /// Issues a PullSiteCalls RPC against the site whose gRPC endpoint is + /// registered against . Returns the next batch of + /// rows + /// ordered oldest-first (with SourceSite re-stamped from + /// ) AND a MoreAvailable flag the caller uses + /// to decide whether to fire another pull immediately. + /// + /// The identifier of the site to pull cached-call operational rows from. + /// Only rows with an UpdatedAtUtc at or after this cursor time are returned. + /// Maximum number of rows to return per call. + /// Cancellation token. + /// A task that resolves to the next reconciliation batch with a MoreAvailable flag. + Task PullAsync( + string siteId, + DateTime sinceUtc, + int batchSize, + CancellationToken ct); +} diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs index 632e2317..631200a1 100644 --- a/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs +++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs @@ -473,6 +473,31 @@ public static class ServiceCollectionExtensions sp.GetRequiredService(), sp.GetRequiredService>())); + // Site Call Audit (#22) reconciliation pull client — central-only, the + // sibling of the audit pull client above. Lives here (not in the + // SiteCallAudit project) so it can reuse the central-only + // ISiteEnumerator registered just above; SiteCallAudit does not + // reference AuditLog. The invoker owns the per-endpoint GrpcChannel + // cache, so it must be a singleton (a fresh invoker per resolution + // would leak channels). CommunicationOptions flow through when bound by + // the central Host, else defaults — mirrors the audit invoker. + services.TryAddSingleton(sp => + { + var options = sp + .GetService>(); + return options is null + ? new GrpcPullSiteCallsInvoker() + : new GrpcPullSiteCallsInvoker(options.Value); + }); + services.TryAddSingleton( + sp => sp.GetRequiredService()); + + services.TryAddSingleton(sp => new GrpcPullSiteCallsClient( + sp.GetRequiredService(), + sp.GetRequiredService(), + sp.GetRequiredService>())); + return services; } } diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Services/IOperationTrackingStore.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Services/IOperationTrackingStore.cs index b85bff5c..1d69023a 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Services/IOperationTrackingStore.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Services/IOperationTrackingStore.cs @@ -118,4 +118,40 @@ public interface IOperationTrackingStore Task PurgeTerminalAsync( DateTime olderThanUtc, CancellationToken ct = default); + + /// + /// Reconciliation read (Site Call Audit #22): return tracking rows whose + /// UpdatedAtUtc is at or after as + /// projections, ordered by + /// UpdatedAtUtc ascending and capped at . + /// This is the site-side feed for central's PullSiteCalls RPC — the + /// documented periodic self-heal pull that backfills the eventually-consistent + /// central SiteCalls mirror when best-effort push telemetry is lost. + /// + /// + /// + /// The lower bound is inclusive so a caller can resume from the last + /// returned UpdatedAtUtc without skipping a row that shares that + /// instant; central ingest is insert-if-not-exists then upsert-on-newer, so + /// re-reading the boundary row is a harmless no-op. The oldest-first cap lets + /// the caller advance the cursor monotonically across follow-up pulls. + /// + /// + /// is left as the empty string: + /// the site id is not a tracking-store column, and the central client re-stamps + /// it from the siteId it dialed (the only authority that knows which + /// site the rows came from). is + /// projected from the row's Kind (DbWriteCached → DbOutbound, + /// otherwise ApiOutbound) and + /// from TargetSummary. + /// + /// + /// Inclusive lower bound on UpdatedAtUtc; reads from the start. + /// Maximum number of rows to return (oldest first). + /// Cancellation token. + /// The matching rows projected to , oldest-first, capped at . + Task> ReadChangedSinceAsync( + DateTime sinceUtc, + int batchSize, + CancellationToken ct = default); } diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Integration/PullSiteCallsResponse.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Integration/PullSiteCallsResponse.cs new file mode 100644 index 00000000..fa3949cb --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Integration/PullSiteCallsResponse.cs @@ -0,0 +1,17 @@ +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit; + +namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration; + +/// +/// Site Call Audit (#22) periodic reconciliation pull response: the next batch of +/// site cached-call operational rows (the eventually-consistent SiteCalls +/// mirror's self-heal feed) plus a flag signalling +/// the caller to advance the watermark and pull again. Mirrors +/// ; carries the central +/// entity the ingest path upserts. See Component-SiteCallAudit.md. +/// +/// The next batch of operational rows, ordered oldest-first by . +/// True when the site saturated the requested batch size — the caller should advance the cursor and pull again. +public sealed record PullSiteCallsResponse( + IReadOnlyList SiteCalls, + bool MoreAvailable); diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteCallDtoMapper.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteCallDtoMapper.cs index ec7a0dbd..265a37eb 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteCallDtoMapper.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteCallDtoMapper.cs @@ -1,5 +1,6 @@ using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Types; +using Timestamp = Google.Protobuf.WellKnownTypes.Timestamp; namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc; @@ -20,10 +21,15 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc; /// Mirrors the sibling . /// /// -/// Only the DTO→entity direction is provided: nothing in the system maps a -/// back onto the wire (sites emit the operational state -/// from SiteCallOperational, never from the central -/// entity), so an entity→DTO method would be dead code. +/// Two directions are provided. rehydrates the central +/// entity central writes into the SiteCalls table. +/// projects a site-local +/// onto the wire — used by the Site Call Audit (#22) PullSiteCalls +/// reconciliation handler (the central→site self-heal pull). The +/// entity itself is never mapped back onto the wire: +/// sites emit operational state from , never +/// from the central , so a SiteCall→DTO method +/// would be dead code. /// /// /// String nullability convention: proto3 scalar strings cannot be absent, so the @@ -70,4 +76,54 @@ public static class SiteCallDtoMapper IngestedAtUtc = DateTime.UtcNow, // overwritten by AuditLogIngestActor }; } + + /// + /// Projects a site-local onto its + /// wire-format DTO for the Site Call Audit (#22) PullSiteCalls + /// reconciliation RPC. The inverse of ; null + /// / + /// collapse to empty strings (proto3 scalar strings cannot be absent), while + /// the nullable HttpStatus and TerminalAtUtc stay unset on the + /// wire so true-null semantics survive the round-trip back through + /// . + /// + /// The site-local operational state to project to wire format. + /// A populated ready for transmission. + public static SiteCallOperationalDto ToDto(SiteCallOperational operational) + { + ArgumentNullException.ThrowIfNull(operational); + + var dto = new SiteCallOperationalDto + { + TrackedOperationId = operational.TrackedOperationId.ToString(), + Channel = operational.Channel, + Target = operational.Target, + SourceSite = operational.SourceSite, + SourceNode = operational.SourceNode ?? string.Empty, + Status = operational.Status, + RetryCount = operational.RetryCount, + LastError = operational.LastError ?? string.Empty, + CreatedAtUtc = Timestamp.FromDateTime(EnsureUtc(operational.CreatedAtUtc)), + UpdatedAtUtc = Timestamp.FromDateTime(EnsureUtc(operational.UpdatedAtUtc)), + }; + + if (operational.HttpStatus.HasValue) + { + dto.HttpStatus = operational.HttpStatus.Value; + } + + if (operational.TerminalAtUtc.HasValue) + { + dto.TerminalAtUtc = Timestamp.FromDateTime(EnsureUtc(operational.TerminalAtUtc.Value)); + } + + return dto; + } + + // All ScadaBridge timestamps are UTC by invariant; Timestamp.FromDateTime + // requires UTC kind. Specify (never convert) so a row read back from SQLite + // with Kind=Utc passes through and a defensively-unspecified value is + // treated as the UTC it already is. Mirrors AuditEventDtoMapper.EnsureUtc. + private static DateTime EnsureUtc(DateTime value) => + value.Kind == DateTimeKind.Utc ? value : DateTime.SpecifyKind(value, DateTimeKind.Utc); } diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs index 7cb82444..f9d9aec0 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs @@ -5,7 +5,9 @@ using Grpc.Core; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using ZB.MOM.WW.Audit; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services; +using ZB.MOM.WW.ScadaBridge.Commons.Types; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Observability; using GrpcStatus = Grpc.Core.Status; @@ -48,6 +50,14 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase // the missing queue as "nothing to ship" and returns an empty response so // central retries on its next reconciliation cycle. private ISiteAuditQueue? _siteAuditQueue; + // Site Call Audit (#22): site-local operation-tracking store handed in by + // AkkaHostedService on site roles so the central reconciliation puller's + // PullSiteCalls RPC can read tracking rows changed since a cursor. Null + // when not wired (central-only host or test composing the server in + // isolation) — the handler treats the missing store as "nothing to ship" + // and returns an empty response so central retries on its next cycle. + // Mirrors _siteAuditQueue. + private IOperationTrackingStore? _operationTrackingStore; /// /// Test-only constructor — kept internal so the DI container sees a @@ -137,6 +147,21 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase _siteAuditQueue = queue; } + /// + /// Hands the site-local (the same + /// OperationTrackingStore singleton that backs + /// Tracking.Status(id) on the script thread) to the gRPC server so + /// the Site Call Audit (#22) RPC can serve + /// central's reconciliation pulls. Mirrors : + /// wired post-construction because the store and the gRPC server are both + /// DI singletons brought up in independent orders on site startup. + /// + /// The site operation-tracking store for serving reconciliation pulls. + public void SetOperationTrackingStore(IOperationTrackingStore store) + { + _operationTrackingStore = store; + } + /// /// Host-017 / REQ-HOST-7: signals the gRPC server to begin its part of the /// site shutdown sequence — refuse new @@ -488,6 +513,69 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase return response; } + /// + public override async Task PullSiteCalls( + PullSiteCallsRequest request, + ServerCallContext context) + { + var store = _operationTrackingStore; + if (store is null) + { + _logger.LogWarning( + "PullSiteCalls invoked before SetOperationTrackingStore was called; returning empty response."); + return new PullSiteCallsResponse(); + } + + if (request.BatchSize <= 0) + { + // Mirrors PullAuditEvents: reject malformed requests cleanly with + // InvalidArgument so the caller doesn't see a generic RpcException + // from the underlying SQLite parameter validation. + throw new RpcException(new GrpcStatus( + StatusCode.InvalidArgument, "batch_size must be > 0")); + } + + // since_utc defaults to DateTime.MinValue when the wrapper is absent — + // i.e. "pull from the beginning of recorded history", the intended + // behaviour for the very first reconciliation cycle. ToUniversalTime + // is safe here (the wire value is always a real UTC Timestamp, never the + // unspecified-MinValue the central client guards against on its side). + var since = request.SinceUtc?.ToDateTime().ToUniversalTime() ?? DateTime.MinValue; + + IReadOnlyList operationals; + try + { + operationals = await store.ReadChangedSinceAsync( + since, request.BatchSize, context.CancellationToken); + } + catch (Exception ex) + { + // Best-effort, like PullAuditEvents: a read fault must never abort + // the reconciliation tick — central retries on its next cycle. + _logger.LogError(ex, + "ReadChangedSinceAsync failed for since={Since} batch={Batch}; returning empty response.", + since, request.BatchSize); + return new PullSiteCallsResponse(); + } + + var response = new PullSiteCallsResponse + { + // batch_size saturated → tell central to issue a follow-up pull with + // an advanced cursor. The site doesn't compute the cursor — central + // walks it forward from the last returned UpdatedAtUtc. Unlike + // PullAuditEvents there is no MarkReconciled step: the tracking store + // is the operational source of truth and the central SiteCalls mirror + // is upsert-on-newer, so re-reading rows is a harmless no-op. + MoreAvailable = operationals.Count >= request.BatchSize, + }; + foreach (var op in operationals) + { + response.Operationals.Add(SiteCallDtoMapper.ToDto(op)); + } + + return response; + } + /// /// Tracks a single active stream so cleanup only removes its own entry. /// diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Protos/sitestream.proto b/src/ZB.MOM.WW.ScadaBridge.Communication/Protos/sitestream.proto index 6beae55c..df9ee7af 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/Protos/sitestream.proto +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Protos/sitestream.proto @@ -10,6 +10,7 @@ service SiteStreamService { rpc IngestAuditEvents(AuditEventBatch) returns (IngestAck); rpc IngestCachedTelemetry(CachedTelemetryBatch) returns (IngestAck); rpc PullAuditEvents(PullAuditEventsRequest) returns (PullAuditEventsResponse); + rpc PullSiteCalls(PullSiteCallsRequest) returns (PullSiteCallsResponse); } message InstanceStreamRequest { @@ -157,3 +158,20 @@ message PullAuditEventsResponse { repeated AuditEventDto events = 1; bool more_available = 2; } + +// Site Call Audit (#22) reconciliation pull: central→site request for any +// site-local operation-tracking rows whose UpdatedAtUtc >= since_utc — the +// self-heal feed that backfills the eventually-consistent central SiteCalls +// mirror when best-effort push telemetry is lost. Mirrors PullAuditEvents +// but is a SEPARATE RPC (the tracking store is the operational source of +// truth, distinct from the site audit queue). more_available signals +// batch_size was saturated so the caller advances since_utc and pulls again. +message PullSiteCallsRequest { + google.protobuf.Timestamp since_utc = 1; + int32 batch_size = 2; +} + +message PullSiteCallsResponse { + repeated SiteCallOperationalDto operationals = 1; + bool more_available = 2; +} diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/Sitestream.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/Sitestream.cs index cebfccab..a0e79003 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/Sitestream.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/Sitestream.cs @@ -81,23 +81,30 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc { "dWVzdBItCglzaW5jZV91dGMYASABKAsyGi5nb29nbGUucHJvdG9idWYuVGlt", "ZXN0YW1wEhIKCmJhdGNoX3NpemUYAiABKAUiXAoXUHVsbEF1ZGl0RXZlbnRz", "UmVzcG9uc2USKQoGZXZlbnRzGAEgAygLMhkuc2l0ZXN0cmVhbS5BdWRpdEV2", - "ZW50RHRvEhYKDm1vcmVfYXZhaWxhYmxlGAIgASgIKlwKB1F1YWxpdHkSFwoT", - "UVVBTElUWV9VTlNQRUNJRklFRBAAEhAKDFFVQUxJVFlfR09PRBABEhUKEVFV", - "QUxJVFlfVU5DRVJUQUlOEAISDwoLUVVBTElUWV9CQUQQAypdCg5BbGFybVN0", - "YXRlRW51bRIbChdBTEFSTV9TVEFURV9VTlNQRUNJRklFRBAAEhYKEkFMQVJN", - "X1NUQVRFX05PUk1BTBABEhYKEkFMQVJNX1NUQVRFX0FDVElWRRACKoUBCg5B", - "bGFybUxldmVsRW51bRIUChBBTEFSTV9MRVZFTF9OT05FEAASEwoPQUxBUk1f", - "TEVWRUxfTE9XEAESFwoTQUxBUk1fTEVWRUxfTE9XX0xPVxACEhQKEEFMQVJN", - "X0xFVkVMX0hJR0gQAxIZChVBTEFSTV9MRVZFTF9ISUdIX0hJR0gQBDLhAgoR", - "U2l0ZVN0cmVhbVNlcnZpY2USVQoRU3Vic2NyaWJlSW5zdGFuY2USIS5zaXRl", - "c3RyZWFtLkluc3RhbmNlU3RyZWFtUmVxdWVzdBobLnNpdGVzdHJlYW0uU2l0", - "ZVN0cmVhbUV2ZW50MAESRwoRSW5nZXN0QXVkaXRFdmVudHMSGy5zaXRlc3Ry", - "ZWFtLkF1ZGl0RXZlbnRCYXRjaBoVLnNpdGVzdHJlYW0uSW5nZXN0QWNrElAK", - "FUluZ2VzdENhY2hlZFRlbGVtZXRyeRIgLnNpdGVzdHJlYW0uQ2FjaGVkVGVs", - "ZW1ldHJ5QmF0Y2gaFS5zaXRlc3RyZWFtLkluZ2VzdEFjaxJaCg9QdWxsQXVk", - "aXRFdmVudHMSIi5zaXRlc3RyZWFtLlB1bGxBdWRpdEV2ZW50c1JlcXVlc3Qa", - "Iy5zaXRlc3RyZWFtLlB1bGxBdWRpdEV2ZW50c1Jlc3BvbnNlQiuqAihaQi5N", - "T00uV1cuU2NhZGFCcmlkZ2UuQ29tbXVuaWNhdGlvbi5HcnBjYgZwcm90bzM=")); + "ZW50RHRvEhYKDm1vcmVfYXZhaWxhYmxlGAIgASgIIlkKFFB1bGxTaXRlQ2Fs", + "bHNSZXF1ZXN0Ei0KCXNpbmNlX3V0YxgBIAEoCzIaLmdvb2dsZS5wcm90b2J1", + "Zi5UaW1lc3RhbXASEgoKYmF0Y2hfc2l6ZRgCIAEoBSJpChVQdWxsU2l0ZUNh", + "bGxzUmVzcG9uc2USOAoMb3BlcmF0aW9uYWxzGAEgAygLMiIuc2l0ZXN0cmVh", + "bS5TaXRlQ2FsbE9wZXJhdGlvbmFsRHRvEhYKDm1vcmVfYXZhaWxhYmxlGAIg", + "ASgIKlwKB1F1YWxpdHkSFwoTUVVBTElUWV9VTlNQRUNJRklFRBAAEhAKDFFV", + "QUxJVFlfR09PRBABEhUKEVFVQUxJVFlfVU5DRVJUQUlOEAISDwoLUVVBTElU", + "WV9CQUQQAypdCg5BbGFybVN0YXRlRW51bRIbChdBTEFSTV9TVEFURV9VTlNQ", + "RUNJRklFRBAAEhYKEkFMQVJNX1NUQVRFX05PUk1BTBABEhYKEkFMQVJNX1NU", + "QVRFX0FDVElWRRACKoUBCg5BbGFybUxldmVsRW51bRIUChBBTEFSTV9MRVZF", + "TF9OT05FEAASEwoPQUxBUk1fTEVWRUxfTE9XEAESFwoTQUxBUk1fTEVWRUxf", + "TE9XX0xPVxACEhQKEEFMQVJNX0xFVkVMX0hJR0gQAxIZChVBTEFSTV9MRVZF", + "TF9ISUdIX0hJR0gQBDK3AwoRU2l0ZVN0cmVhbVNlcnZpY2USVQoRU3Vic2Ny", + "aWJlSW5zdGFuY2USIS5zaXRlc3RyZWFtLkluc3RhbmNlU3RyZWFtUmVxdWVz", + "dBobLnNpdGVzdHJlYW0uU2l0ZVN0cmVhbUV2ZW50MAESRwoRSW5nZXN0QXVk", + "aXRFdmVudHMSGy5zaXRlc3RyZWFtLkF1ZGl0RXZlbnRCYXRjaBoVLnNpdGVz", + "dHJlYW0uSW5nZXN0QWNrElAKFUluZ2VzdENhY2hlZFRlbGVtZXRyeRIgLnNp", + "dGVzdHJlYW0uQ2FjaGVkVGVsZW1ldHJ5QmF0Y2gaFS5zaXRlc3RyZWFtLklu", + "Z2VzdEFjaxJaCg9QdWxsQXVkaXRFdmVudHMSIi5zaXRlc3RyZWFtLlB1bGxB", + "dWRpdEV2ZW50c1JlcXVlc3QaIy5zaXRlc3RyZWFtLlB1bGxBdWRpdEV2ZW50", + "c1Jlc3BvbnNlElQKDVB1bGxTaXRlQ2FsbHMSIC5zaXRlc3RyZWFtLlB1bGxT", + "aXRlQ2FsbHNSZXF1ZXN0GiEuc2l0ZXN0cmVhbS5QdWxsU2l0ZUNhbGxzUmVz", + "cG9uc2VCK6oCKFpCLk1PTS5XVy5TY2FkYUJyaWRnZS5Db21tdW5pY2F0aW9u", + "LkdycGNiBnByb3RvMw==")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { global::Google.Protobuf.WellKnownTypes.TimestampReflection.Descriptor, global::Google.Protobuf.WellKnownTypes.WrappersReflection.Descriptor, }, new pbr::GeneratedClrTypeInfo(new[] {typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.Quality), typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.AlarmStateEnum), typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.AlarmLevelEnum), }, null, new pbr::GeneratedClrTypeInfo[] { @@ -112,7 +119,9 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc { new pbr::GeneratedClrTypeInfo(typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryPacket), global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryPacket.Parser, new[]{ "AuditEvent", "Operational" }, null, null, null, null), new pbr::GeneratedClrTypeInfo(typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryBatch), global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryBatch.Parser, new[]{ "Packets" }, null, null, null, null), new pbr::GeneratedClrTypeInfo(typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest), global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest.Parser, new[]{ "SinceUtc", "BatchSize" }, null, null, null, null), - new pbr::GeneratedClrTypeInfo(typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse), global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse.Parser, new[]{ "Events", "MoreAvailable" }, null, null, null, null) + new pbr::GeneratedClrTypeInfo(typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse), global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse.Parser, new[]{ "Events", "MoreAvailable" }, null, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest), global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest.Parser, new[]{ "SinceUtc", "BatchSize" }, null, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsResponse), global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsResponse.Parser, new[]{ "Operationals", "MoreAvailable" }, null, null, null, null) })); } #endregion @@ -5064,6 +5073,483 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc { } + /// + /// Site Call Audit (#22) reconciliation pull: central→site request for any + /// site-local operation-tracking rows whose UpdatedAtUtc >= since_utc — the + /// self-heal feed that backfills the eventually-consistent central SiteCalls + /// mirror when best-effort push telemetry is lost. Mirrors PullAuditEvents + /// but is a SEPARATE RPC (the tracking store is the operational source of + /// truth, distinct from the site audit queue). more_available signals + /// batch_size was saturated so the caller advances since_utc and pulls again. + /// + [global::System.Diagnostics.DebuggerDisplayAttribute("{ToString(),nq}")] + public sealed partial class PullSiteCallsRequest : pb::IMessage + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + , pb::IBufferMessage + #endif + { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new PullSiteCallsRequest()); + private pb::UnknownFieldSet _unknownFields; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public static pbr::MessageDescriptor Descriptor { + get { return global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.SitestreamReflection.Descriptor.MessageTypes[12]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public PullSiteCallsRequest() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public PullSiteCallsRequest(PullSiteCallsRequest other) : this() { + sinceUtc_ = other.sinceUtc_ != null ? other.sinceUtc_.Clone() : null; + batchSize_ = other.batchSize_; + _unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public PullSiteCallsRequest Clone() { + return new PullSiteCallsRequest(this); + } + + /// Field number for the "since_utc" field. + public const int SinceUtcFieldNumber = 1; + private global::Google.Protobuf.WellKnownTypes.Timestamp sinceUtc_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public global::Google.Protobuf.WellKnownTypes.Timestamp SinceUtc { + get { return sinceUtc_; } + set { + sinceUtc_ = value; + } + } + + /// Field number for the "batch_size" field. + public const int BatchSizeFieldNumber = 2; + private int batchSize_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public int BatchSize { + get { return batchSize_; } + set { + batchSize_ = value; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public override bool Equals(object other) { + return Equals(other as PullSiteCallsRequest); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public bool Equals(PullSiteCallsRequest other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (!object.Equals(SinceUtc, other.SinceUtc)) return false; + if (BatchSize != other.BatchSize) return false; + return Equals(_unknownFields, other._unknownFields); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public override int GetHashCode() { + int hash = 1; + if (sinceUtc_ != null) hash ^= SinceUtc.GetHashCode(); + if (BatchSize != 0) hash ^= BatchSize.GetHashCode(); + if (_unknownFields != null) { + hash ^= _unknownFields.GetHashCode(); + } + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public void WriteTo(pb::CodedOutputStream output) { + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + output.WriteRawMessage(this); + #else + if (sinceUtc_ != null) { + output.WriteRawTag(10); + output.WriteMessage(SinceUtc); + } + if (BatchSize != 0) { + output.WriteRawTag(16); + output.WriteInt32(BatchSize); + } + if (_unknownFields != null) { + _unknownFields.WriteTo(output); + } + #endif + } + + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + void pb::IBufferMessage.InternalWriteTo(ref pb::WriteContext output) { + if (sinceUtc_ != null) { + output.WriteRawTag(10); + output.WriteMessage(SinceUtc); + } + if (BatchSize != 0) { + output.WriteRawTag(16); + output.WriteInt32(BatchSize); + } + if (_unknownFields != null) { + _unknownFields.WriteTo(ref output); + } + } + #endif + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public int CalculateSize() { + int size = 0; + if (sinceUtc_ != null) { + size += 1 + pb::CodedOutputStream.ComputeMessageSize(SinceUtc); + } + if (BatchSize != 0) { + size += 1 + pb::CodedOutputStream.ComputeInt32Size(BatchSize); + } + if (_unknownFields != null) { + size += _unknownFields.CalculateSize(); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public void MergeFrom(PullSiteCallsRequest other) { + if (other == null) { + return; + } + if (other.sinceUtc_ != null) { + if (sinceUtc_ == null) { + SinceUtc = new global::Google.Protobuf.WellKnownTypes.Timestamp(); + } + SinceUtc.MergeFrom(other.SinceUtc); + } + if (other.BatchSize != 0) { + BatchSize = other.BatchSize; + } + _unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public void MergeFrom(pb::CodedInputStream input) { + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + input.ReadRawMessage(this); + #else + uint tag; + while ((tag = input.ReadTag()) != 0) { + if ((tag & 7) == 4) { + // Abort on any end group tag. + return; + } + switch(tag) { + default: + _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input); + break; + case 10: { + if (sinceUtc_ == null) { + SinceUtc = new global::Google.Protobuf.WellKnownTypes.Timestamp(); + } + input.ReadMessage(SinceUtc); + break; + } + case 16: { + BatchSize = input.ReadInt32(); + break; + } + } + } + #endif + } + + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + if ((tag & 7) == 4) { + // Abort on any end group tag. + return; + } + switch(tag) { + default: + _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input); + break; + case 10: { + if (sinceUtc_ == null) { + SinceUtc = new global::Google.Protobuf.WellKnownTypes.Timestamp(); + } + input.ReadMessage(SinceUtc); + break; + } + case 16: { + BatchSize = input.ReadInt32(); + break; + } + } + } + } + #endif + + } + + [global::System.Diagnostics.DebuggerDisplayAttribute("{ToString(),nq}")] + public sealed partial class PullSiteCallsResponse : pb::IMessage + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + , pb::IBufferMessage + #endif + { + private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new PullSiteCallsResponse()); + private pb::UnknownFieldSet _unknownFields; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public static pb::MessageParser Parser { get { return _parser; } } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public static pbr::MessageDescriptor Descriptor { + get { return global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.SitestreamReflection.Descriptor.MessageTypes[13]; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public PullSiteCallsResponse() { + OnConstruction(); + } + + partial void OnConstruction(); + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public PullSiteCallsResponse(PullSiteCallsResponse other) : this() { + operationals_ = other.operationals_.Clone(); + moreAvailable_ = other.moreAvailable_; + _unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public PullSiteCallsResponse Clone() { + return new PullSiteCallsResponse(this); + } + + /// Field number for the "operationals" field. + public const int OperationalsFieldNumber = 1; + private static readonly pb::FieldCodec _repeated_operationals_codec + = pb::FieldCodec.ForMessage(10, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.SiteCallOperationalDto.Parser); + private readonly pbc::RepeatedField operationals_ = new pbc::RepeatedField(); + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public pbc::RepeatedField Operationals { + get { return operationals_; } + } + + /// Field number for the "more_available" field. + public const int MoreAvailableFieldNumber = 2; + private bool moreAvailable_; + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public bool MoreAvailable { + get { return moreAvailable_; } + set { + moreAvailable_ = value; + } + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public override bool Equals(object other) { + return Equals(other as PullSiteCallsResponse); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public bool Equals(PullSiteCallsResponse other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if(!operationals_.Equals(other.operationals_)) return false; + if (MoreAvailable != other.MoreAvailable) return false; + return Equals(_unknownFields, other._unknownFields); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public override int GetHashCode() { + int hash = 1; + hash ^= operationals_.GetHashCode(); + if (MoreAvailable != false) hash ^= MoreAvailable.GetHashCode(); + if (_unknownFields != null) { + hash ^= _unknownFields.GetHashCode(); + } + return hash; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public void WriteTo(pb::CodedOutputStream output) { + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + output.WriteRawMessage(this); + #else + operationals_.WriteTo(output, _repeated_operationals_codec); + if (MoreAvailable != false) { + output.WriteRawTag(16); + output.WriteBool(MoreAvailable); + } + if (_unknownFields != null) { + _unknownFields.WriteTo(output); + } + #endif + } + + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + void pb::IBufferMessage.InternalWriteTo(ref pb::WriteContext output) { + operationals_.WriteTo(ref output, _repeated_operationals_codec); + if (MoreAvailable != false) { + output.WriteRawTag(16); + output.WriteBool(MoreAvailable); + } + if (_unknownFields != null) { + _unknownFields.WriteTo(ref output); + } + } + #endif + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public int CalculateSize() { + int size = 0; + size += operationals_.CalculateSize(_repeated_operationals_codec); + if (MoreAvailable != false) { + size += 1 + 1; + } + if (_unknownFields != null) { + size += _unknownFields.CalculateSize(); + } + return size; + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public void MergeFrom(PullSiteCallsResponse other) { + if (other == null) { + return; + } + operationals_.Add(other.operationals_); + if (other.MoreAvailable != false) { + MoreAvailable = other.MoreAvailable; + } + _unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields); + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public void MergeFrom(pb::CodedInputStream input) { + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + input.ReadRawMessage(this); + #else + uint tag; + while ((tag = input.ReadTag()) != 0) { + if ((tag & 7) == 4) { + // Abort on any end group tag. + return; + } + switch(tag) { + default: + _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input); + break; + case 10: { + operationals_.AddEntriesFrom(input, _repeated_operationals_codec); + break; + } + case 16: { + MoreAvailable = input.ReadBool(); + break; + } + } + } + #endif + } + + #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + if ((tag & 7) == 4) { + // Abort on any end group tag. + return; + } + switch(tag) { + default: + _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input); + break; + case 10: { + operationals_.AddEntriesFrom(ref input, _repeated_operationals_codec); + break; + } + case 16: { + MoreAvailable = input.ReadBool(); + break; + } + } + } + } + #endif + + } + #endregion } diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/SitestreamGrpc.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/SitestreamGrpc.cs index 8993b16a..b57de38e 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/SitestreamGrpc.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/SitestreamGrpc.cs @@ -59,6 +59,10 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc { static readonly grpc::Marshaller __Marshaller_sitestream_PullAuditEventsRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest.Parser)); [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] static readonly grpc::Marshaller __Marshaller_sitestream_PullAuditEventsResponse = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse.Parser)); + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + static readonly grpc::Marshaller __Marshaller_sitestream_PullSiteCallsRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest.Parser)); + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + static readonly grpc::Marshaller __Marshaller_sitestream_PullSiteCallsResponse = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsResponse.Parser)); [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] static readonly grpc::Method __Method_SubscribeInstance = new grpc::Method( @@ -92,6 +96,14 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc { __Marshaller_sitestream_PullAuditEventsRequest, __Marshaller_sitestream_PullAuditEventsResponse); + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + static readonly grpc::Method __Method_PullSiteCalls = new grpc::Method( + grpc::MethodType.Unary, + __ServiceName, + "PullSiteCalls", + __Marshaller_sitestream_PullSiteCallsRequest, + __Marshaller_sitestream_PullSiteCallsResponse); + /// Service descriptor public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor { @@ -126,6 +138,12 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc { throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, "")); } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + public virtual global::System.Threading.Tasks.Task PullSiteCalls(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest request, grpc::ServerCallContext context) + { + throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, "")); + } + } /// Client for SiteStreamService @@ -225,6 +243,26 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc { { return CallInvoker.AsyncUnaryCall(__Method_PullAuditEvents, null, options, request); } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + public virtual global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsResponse PullSiteCalls(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) + { + return PullSiteCalls(request, new grpc::CallOptions(headers, deadline, cancellationToken)); + } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + public virtual global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsResponse PullSiteCalls(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest request, grpc::CallOptions options) + { + return CallInvoker.BlockingUnaryCall(__Method_PullSiteCalls, null, options, request); + } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + public virtual grpc::AsyncUnaryCall PullSiteCallsAsync(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken)) + { + return PullSiteCallsAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken)); + } + [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] + public virtual grpc::AsyncUnaryCall PullSiteCallsAsync(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest request, grpc::CallOptions options) + { + return CallInvoker.AsyncUnaryCall(__Method_PullSiteCalls, null, options, request); + } /// Creates a new instance of client from given ClientBaseConfiguration. [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] protected override SiteStreamServiceClient NewInstance(ClientBaseConfiguration configuration) @@ -242,7 +280,8 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc { .AddMethod(__Method_SubscribeInstance, serviceImpl.SubscribeInstance) .AddMethod(__Method_IngestAuditEvents, serviceImpl.IngestAuditEvents) .AddMethod(__Method_IngestCachedTelemetry, serviceImpl.IngestCachedTelemetry) - .AddMethod(__Method_PullAuditEvents, serviceImpl.PullAuditEvents).Build(); + .AddMethod(__Method_PullAuditEvents, serviceImpl.PullAuditEvents) + .AddMethod(__Method_PullSiteCalls, serviceImpl.PullSiteCalls).Build(); } /// Register service method with a service binder with or without implementation. Useful when customizing the service binding logic. @@ -256,6 +295,7 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc { serviceBinder.AddMethod(__Method_IngestAuditEvents, serviceImpl == null ? null : new grpc::UnaryServerMethod(serviceImpl.IngestAuditEvents)); serviceBinder.AddMethod(__Method_IngestCachedTelemetry, serviceImpl == null ? null : new grpc::UnaryServerMethod(serviceImpl.IngestCachedTelemetry)); serviceBinder.AddMethod(__Method_PullAuditEvents, serviceImpl == null ? null : new grpc::UnaryServerMethod(serviceImpl.PullAuditEvents)); + serviceBinder.AddMethod(__Method_PullSiteCalls, serviceImpl == null ? null : new grpc::UnaryServerMethod(serviceImpl.PullSiteCalls)); } } diff --git a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs index 864f2e63..2c45478f 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs @@ -1009,6 +1009,18 @@ akka {{ // direction one-way (Host knows both; Communication doesn't reach back // into AuditLog). grpcServer?.SetSiteAuditQueue(siteAuditQueue); + // Site Call Audit (#22): hand the site-local OperationTrackingStore to + // the gRPC server so the PullSiteCalls reconciliation RPC can serve + // central's self-heal pulls. siteTrackingStore is resolved above with + // GetService — present on site composition roots, null on central — so + // wire the seam only when the store exists. Like SetSiteAuditQueue, both + // the store and the gRPC server are singletons; wiring here keeps the + // dependency direction one-way (Host knows both; Communication doesn't + // reach back into SiteRuntime). + if (siteTrackingStore is not null) + { + grpcServer?.SetOperationTrackingStore(siteTrackingStore); + } grpcServer?.SetReady(_actorSystem!); } } diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Tracking/OperationTrackingStore.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Tracking/OperationTrackingStore.cs index 8cec600b..b3e48c54 100644 --- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Tracking/OperationTrackingStore.cs +++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Tracking/OperationTrackingStore.cs @@ -360,6 +360,76 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, } } + /// + public async Task> ReadChangedSinceAsync( + DateTime sinceUtc, + int batchSize, + CancellationToken ct = default) + { + ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this); + + // SiteRuntime-024: like GetStatusAsync, the reconciliation pull opens a + // fresh, ungated read connection so a long-running write never blocks + // central's PullSiteCalls. The query is a bounded, ordered scan over the + // (Status, UpdatedAtUtc) index range — UpdatedAtUtc is the cursor. + await using var readConnection = new SqliteConnection(_connectionString); + await readConnection.OpenAsync(ct).ConfigureAwait(false); + + await using var cmd = readConnection.CreateCommand(); + // Inclusive lower bound on UpdatedAtUtc (>=) so a caller resuming from + // the last returned timestamp does not skip a row sharing that instant; + // central ingest is insert-if-not-exists + upsert-on-newer, so the + // boundary row re-read is a no-op. ORDER BY ... ASC + LIMIT yields the + // OLDEST matching rows so the cursor advances monotonically. + cmd.CommandText = """ + SELECT TrackedOperationId, Kind, TargetSummary, Status, + RetryCount, LastError, HttpStatus, + CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc, SourceNode + FROM OperationTracking + WHERE UpdatedAtUtc >= $since + ORDER BY UpdatedAtUtc ASC + LIMIT $batchSize; + """; + cmd.Parameters.AddWithValue( + "$since", + sinceUtc.ToString("o", CultureInfo.InvariantCulture)); + cmd.Parameters.AddWithValue("$batchSize", batchSize); + + var rows = new List(); + await using var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false); + while (await reader.ReadAsync(ct).ConfigureAwait(false)) + { + var kind = reader.GetString(1); + rows.Add(new SiteCallOperational( + TrackedOperationId: TrackedOperationId.Parse(reader.GetString(0)), + Channel: KindToChannel(kind), + Target: reader.IsDBNull(2) ? string.Empty : reader.GetString(2), + // The site id is not a tracking-store column; the central client + // re-stamps SourceSite from the siteId it dialed. + SourceSite: string.Empty, + SourceNode: reader.IsDBNull(10) ? null : reader.GetString(10), + Status: reader.GetString(3), + RetryCount: reader.GetInt32(4), + LastError: reader.IsDBNull(5) ? null : reader.GetString(5), + HttpStatus: reader.IsDBNull(6) ? null : reader.GetInt32(6), + CreatedAtUtc: ParseUtc(reader.GetString(7)), + UpdatedAtUtc: ParseUtc(reader.GetString(8)), + TerminalAtUtc: reader.IsDBNull(9) ? null : ParseUtc(reader.GetString(9)))); + } + + return rows; + } + + // Cached-call Kind → SiteCalls Channel. Only ApiCallCached / DbWriteCached + // ever reach the tracking store (RecordEnqueueAsync is the cached-call + // entry point); DbWriteCached maps to DbOutbound, everything else to the + // ApiOutbound default. Mirrors CachedCallLifecycleBridge's channel handling. + private static string KindToChannel(string kind) => kind switch + { + nameof(Commons.Types.Enums.AuditKind.DbWriteCached) => nameof(Commons.Types.Enums.AuditChannel.DbOutbound), + _ => nameof(Commons.Types.Enums.AuditChannel.ApiOutbound), + }; + private static DateTime ParseUtc(string raw) { return DateTime.Parse( diff --git a/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullSiteCallsClientTests.cs b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullSiteCallsClientTests.cs new file mode 100644 index 00000000..982a9923 --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.AuditLog.Tests/Central/GrpcPullSiteCallsClientTests.cs @@ -0,0 +1,215 @@ +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.ScadaBridge.AuditLog.Central; +using ZB.MOM.WW.ScadaBridge.Communication.Grpc; +using ProtoPullRequest = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest; +using ProtoPullResponse = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsResponse; + +namespace ZB.MOM.WW.ScadaBridge.AuditLog.Tests.Central; + +/// +/// Tests for — the production +/// that dials a site over gRPC and issues the +/// PullSiteCalls unary RPC for the Site Call Audit (#22) reconciliation +/// loop. The real GrpcChannel is replaced by an injected +/// seam so the +/// client's mapping / ordering / SourceSite-restamp / fault-swallowing behaviour +/// can be asserted without standing up a Kestrel HTTP/2 endpoint. Mirrors +/// . +/// +public class GrpcPullSiteCallsClientTests +{ + private static readonly DateTime BaseTime = + new(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc); + + private sealed class StaticEnumerator : ISiteEnumerator + { + private readonly IReadOnlyList _sites; + public StaticEnumerator(params SiteEntry[] sites) => _sites = sites; + public Task> EnumerateAsync(CancellationToken ct = default) => + Task.FromResult(_sites); + } + + private sealed class FakeInvoker : GrpcPullSiteCallsClient.IPullSiteCallsInvoker + { + public string? Endpoint { get; private set; } + public ProtoPullRequest? Request { get; private set; } + public int CallCount { get; private set; } + + private readonly ProtoPullResponse? _response; + private readonly Exception? _throw; + + private FakeInvoker(ProtoPullResponse? response, Exception? toThrow) + { + _response = response; + _throw = toThrow; + } + + public static FakeInvoker Returning(ProtoPullResponse response) => new(response, null); + public static FakeInvoker Throwing(Exception ex) => new(null, ex); + + public Task InvokeAsync( + string endpoint, ProtoPullRequest request, CancellationToken ct) + { + CallCount++; + Endpoint = endpoint; + Request = request; + if (_throw is not null) + { + throw _throw; + } + return Task.FromResult(_response!); + } + } + + // The site leaves SourceSite empty (it is not a tracking-store column); the + // client re-stamps it from the dialed siteId. Mint DTOs with empty SourceSite + // to prove that re-stamp. + private static SiteCallOperationalDto Dto(Guid id, DateTime updatedAtUtc) => + new() + { + TrackedOperationId = id.ToString(), + Channel = "ApiOutbound", + Target = "ERP.GetOrder", + SourceSite = string.Empty, + SourceNode = "node-a", + Status = "Attempted", + RetryCount = 1, + LastError = string.Empty, + CreatedAtUtc = Timestamp.FromDateTime(BaseTime), + UpdatedAtUtc = Timestamp.FromDateTime(updatedAtUtc), + }; + + [Fact] + public async Task PullAsync_dials_resolved_endpoint_maps_oldest_first_and_restamps_source_site() + { + var older = Guid.NewGuid(); + var newer = Guid.NewGuid(); + + // Wire delivered newest-first on purpose to prove the client sorts. + var proto = new ProtoPullResponse { MoreAvailable = true }; + proto.Operationals.Add(Dto(newer, BaseTime.AddMinutes(5))); + proto.Operationals.Add(Dto(older, BaseTime)); + + var invoker = FakeInvoker.Returning(proto); + var sut = new GrpcPullSiteCallsClient( + new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), + invoker, + NullLogger.Instance); + + var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); + + // Endpoint resolution + request shaping. + Assert.Equal("http://site-a:8083", invoker.Endpoint); + Assert.NotNull(invoker.Request); + Assert.Equal(256, invoker.Request!.BatchSize); + Assert.Equal(BaseTime, invoker.Request.SinceUtc.ToDateTime()); + + // Mapping + ordering + MoreAvailable surface. + Assert.True(result.MoreAvailable); + Assert.Equal(2, result.SiteCalls.Count); + Assert.Equal(older, result.SiteCalls[0].TrackedOperationId.Value); + Assert.Equal(newer, result.SiteCalls[1].TrackedOperationId.Value); + + // SourceSite re-stamped from the dialed siteId (DTO carried empty). + Assert.Equal("site-a", result.SiteCalls[0].SourceSite); + Assert.Equal("site-a", result.SiteCalls[1].SourceSite); + + // Round-tripped fields survive FromDto. + Assert.Equal("ApiOutbound", result.SiteCalls[0].Channel); + Assert.Equal("node-a", result.SiteCalls[0].SourceNode); + Assert.Equal(1, result.SiteCalls[0].RetryCount); + } + + [Fact] + public async Task PullAsync_returns_empty_when_site_endpoint_is_unknown() + { + var invoker = FakeInvoker.Returning(new ProtoPullResponse()); + var sut = new GrpcPullSiteCallsClient( + new StaticEnumerator(), // no sites registered + invoker, + NullLogger.Instance); + + var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); + + Assert.Empty(result.SiteCalls); + Assert.False(result.MoreAvailable); + Assert.Equal(0, invoker.CallCount); // never dialled — nothing to dial + } + + [Theory] + [InlineData(StatusCode.Unavailable)] + [InlineData(StatusCode.DeadlineExceeded)] + [InlineData(StatusCode.Cancelled)] + public async Task PullAsync_swallows_tolerable_transport_faults_to_empty_response(StatusCode code) + { + var invoker = FakeInvoker.Throwing(new RpcException(new Status(code, "transport fault"))); + var sut = new GrpcPullSiteCallsClient( + new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), + invoker, + NullLogger.Instance); + + var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); + + Assert.Empty(result.SiteCalls); + Assert.False(result.MoreAvailable); + } + + [Fact] + public async Task PullAsync_swallows_connection_layer_faults_to_empty_response() + { + var invoker = FakeInvoker.Throwing(new HttpRequestException("connection refused")); + var sut = new GrpcPullSiteCallsClient( + new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), + invoker, + NullLogger.Instance); + + var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); + + Assert.Empty(result.SiteCalls); + Assert.False(result.MoreAvailable); + } + + [Fact] + public async Task PullAsync_swallows_unexpected_faults_to_empty_response() + { + var invoker = FakeInvoker.Throwing(new InvalidOperationException("boom")); + var sut = new GrpcPullSiteCallsClient( + new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), + invoker, + NullLogger.Instance); + + var result = await sut.PullAsync("site-a", BaseTime, batchSize: 256, CancellationToken.None); + + Assert.Empty(result.SiteCalls); + Assert.False(result.MoreAvailable); + } + + [Fact] + public async Task PullAsync_with_minvalue_unspecified_cursor_does_not_throw_and_dials() + { + // The reconciliation cursor starts at DateTime.MinValue with + // Kind=Unspecified. EnsureUtc must treat it AS UTC (per the system-wide + // invariant) and NOT call ToUniversalTime() — on a host with a positive + // UTC offset that underflows and Timestamp.FromDateTime throws, crashing + // the FIRST pull for every site. + var minUnspecified = default(DateTime); + Assert.Equal(DateTimeKind.Unspecified, minUnspecified.Kind); + + var invoker = FakeInvoker.Returning(new ProtoPullResponse()); + var sut = new GrpcPullSiteCallsClient( + new StaticEnumerator(new SiteEntry("site-a", "http://site-a:8083")), + invoker, + NullLogger.Instance); + + var result = await sut.PullAsync("site-a", minUnspecified, batchSize: 256, CancellationToken.None); + + Assert.Equal(1, invoker.CallCount); + Assert.Equal("http://site-a:8083", invoker.Endpoint); + Assert.NotNull(invoker.Request); + Assert.Equal(DateTime.MinValue, invoker.Request!.SinceUtc.ToDateTime()); + Assert.Empty(result.SiteCalls); + Assert.False(result.MoreAvailable); + } +} diff --git a/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/SiteStreamPullSiteCallsTests.cs b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/SiteStreamPullSiteCallsTests.cs new file mode 100644 index 00000000..e6b2bb99 --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.Communication.Tests/SiteStreamPullSiteCallsTests.cs @@ -0,0 +1,189 @@ +using Akka.TestKit.Xunit2; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using NSubstitute.ExceptionExtensions; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces; +using ZB.MOM.WW.ScadaBridge.Commons.Types; +using ZB.MOM.WW.ScadaBridge.Communication.Grpc; + +namespace ZB.MOM.WW.ScadaBridge.Communication.Tests; + +/// +/// Tests for (Site Call Audit +/// #22 reconciliation handler). Verifies the request → +/// → response +/// round-trip through the gRPC handler. The store is an NSubstitute stub so the +/// tests never touch SQLite. Mirrors +/// — but there is no MarkReconciled step (the tracking store is the operational +/// source of truth; the central SiteCalls mirror is upsert-on-newer). +/// +public class SiteStreamPullSiteCallsTests : TestKit +{ + private readonly ISiteStreamSubscriber _subscriber = Substitute.For(); + + private SiteStreamGrpcServer CreateServer() => + new(_subscriber, NullLogger.Instance); + + private static ServerCallContext NewContext(CancellationToken ct = default) + { + var context = Substitute.For(); + context.CancellationToken.Returns(ct); + return context; + } + + private static SiteCallOperational NewOperational() => + new( + TrackedOperationId: TrackedOperationId.New(), + Channel: "ApiOutbound", + Target: "ERP.GetOrder", + SourceSite: string.Empty, + SourceNode: "node-a", + Status: "Attempted", + RetryCount: 1, + LastError: null, + HttpStatus: 503, + CreatedAtUtc: DateTime.SpecifyKind(new DateTime(2026, 5, 20, 10, 0, 0), DateTimeKind.Utc), + UpdatedAtUtc: DateTime.SpecifyKind(new DateTime(2026, 5, 20, 10, 1, 0), DateTimeKind.Utc), + TerminalAtUtc: null); + + [Fact] + public async Task PullSiteCalls_NoStoreWired_ReturnsEmptyResponse() + { + var server = CreateServer(); + // Intentionally do NOT call SetOperationTrackingStore — simulates a + // central-only host or a wiring-incomplete startup window. + + var request = new PullSiteCallsRequest + { + SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddMinutes(-5)), + BatchSize = 100, + }; + + var response = await server.PullSiteCalls(request, NewContext()); + + Assert.Empty(response.Operationals); + Assert.False(response.MoreAvailable); + } + + [Fact] + public async Task PullSiteCalls_With5Rows_ReturnsAllFiveDtos() + { + var store = Substitute.For(); + var rows = Enumerable.Range(0, 5).Select(_ => NewOperational()).ToList(); + store.ReadChangedSinceAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns((IReadOnlyList)rows); + + var server = CreateServer(); + server.SetOperationTrackingStore(store); + + var request = new PullSiteCallsRequest + { + SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddHours(-1)), + BatchSize = 100, // larger than returned count so MoreAvailable should be false + }; + + var response = await server.PullSiteCalls(request, NewContext()); + + Assert.Equal(5, response.Operationals.Count); + Assert.False(response.MoreAvailable); // 5 < 100 + var expectedIds = rows.Select(r => r.TrackedOperationId.ToString()).ToHashSet(); + Assert.True(expectedIds.SetEquals(response.Operationals.Select(d => d.TrackedOperationId).ToHashSet())); + } + + [Fact] + public async Task PullSiteCalls_PassesSinceUtcThroughVerbatim() + { + var store = Substitute.For(); + var capturedSince = DateTime.MinValue; + store.ReadChangedSinceAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(call => + { + capturedSince = call.ArgAt(0); + return (IReadOnlyList)Array.Empty(); + }); + + var server = CreateServer(); + server.SetOperationTrackingStore(store); + + var since = DateTime.SpecifyKind(new DateTime(2026, 5, 20, 9, 30, 0), DateTimeKind.Utc); + var request = new PullSiteCallsRequest + { + SinceUtc = Timestamp.FromDateTime(since), + BatchSize = 50, + }; + + var response = await server.PullSiteCalls(request, NewContext()); + + Assert.Empty(response.Operationals); + Assert.False(response.MoreAvailable); + Assert.Equal(since, capturedSince); + } + + [Fact] + public async Task PullSiteCalls_BatchSize3_Returns3Rows_MoreAvailableTrue() + { + var store = Substitute.For(); + var rows = Enumerable.Range(0, 3).Select(_ => NewOperational()).ToList(); + store.ReadChangedSinceAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns((IReadOnlyList)rows); + + var server = CreateServer(); + server.SetOperationTrackingStore(store); + + var request = new PullSiteCallsRequest + { + SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddHours(-1)), + BatchSize = 3, + }; + + var response = await server.PullSiteCalls(request, NewContext()); + + Assert.Equal(3, response.Operationals.Count); + // saturated batch → central needs to know to issue a follow-up pull + Assert.True(response.MoreAvailable); + } + + [Fact] + public async Task PullSiteCalls_NonPositiveBatchSize_ThrowsInvalidArgument() + { + var store = Substitute.For(); + var server = CreateServer(); + server.SetOperationTrackingStore(store); + + var request = new PullSiteCallsRequest + { + SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddHours(-1)), + BatchSize = 0, + }; + + var ex = await Assert.ThrowsAsync( + () => server.PullSiteCalls(request, NewContext())); + Assert.Equal(StatusCode.InvalidArgument, ex.StatusCode); + } + + [Fact] + public async Task PullSiteCalls_ReadThrows_ReturnsEmptyResponse() + { + // Best-effort: a read fault must never abort the reconciliation tick. + var store = Substitute.For(); + store.ReadChangedSinceAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .ThrowsAsync(new InvalidOperationException("SQLite disposed mid-call")); + + var server = CreateServer(); + server.SetOperationTrackingStore(store); + + var request = new PullSiteCallsRequest + { + SinceUtc = Timestamp.FromDateTime(DateTime.UtcNow.AddHours(-1)), + BatchSize = 100, + }; + + // Must NOT throw — the handler swallows the fault to an empty response. + var response = await server.PullSiteCalls(request, NewContext()); + + Assert.Empty(response.Operationals); + Assert.False(response.MoreAvailable); + } +} diff --git a/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs index f9425ec8..a89fc398 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs @@ -439,6 +439,138 @@ public class OperationTrackingStoreTests Assert.NotNull(await store.GetStatusAsync(cId)); // kept (non-terminal) } + // ── Site Call Audit #22: ReadChangedSinceAsync (reconciliation pull) ─── + + [Fact] + public async Task ReadChangedSinceAsync_ReturnsRowsAtOrAfterCursor_OldestFirst() + { + var (store, dataSource) = CreateStore(nameof(ReadChangedSinceAsync_ReturnsRowsAtOrAfterCursor_OldestFirst)); + await using var _store = store; + + // Three rows with distinct UpdatedAtUtc, written out of chronological + // order to prove the read sorts by UpdatedAtUtc ascending. + var older = TrackedOperationId.New(); + var middle = TrackedOperationId.New(); + var newer = TrackedOperationId.New(); + await store.RecordEnqueueAsync(older, nameof(AuditKind.ApiCallCached), "ERP.A", null, null, "node-a"); + await store.RecordEnqueueAsync(middle, nameof(AuditKind.DbWriteCached), "DB.B", null, null, "node-b"); + await store.RecordEnqueueAsync(newer, nameof(AuditKind.ApiCallCached), "ERP.C", null, null, null); + + // Backdate UpdatedAtUtc so the ordering is deterministic and a cursor + // can be placed cleanly between rows. (Enqueue stamps DateTime.UtcNow; + // we cannot inject the clock, so set the timestamps directly.) + var t0 = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc); + SetUpdatedAt(dataSource, older, t0); + SetUpdatedAt(dataSource, middle, t0.AddMinutes(10)); + SetUpdatedAt(dataSource, newer, t0.AddMinutes(20)); + + // Cursor at the middle row's UpdatedAtUtc: inclusive lower bound, so + // middle + newer come back, older is excluded. + var result = await store.ReadChangedSinceAsync(t0.AddMinutes(10), batchSize: 100, CancellationToken.None); + + Assert.Equal(2, result.Count); + Assert.Equal(middle, result[0].TrackedOperationId); + Assert.Equal(newer, result[1].TrackedOperationId); + Assert.True(result[0].UpdatedAtUtc <= result[1].UpdatedAtUtc); + } + + [Fact] + public async Task ReadChangedSinceAsync_FromMinValue_ReturnsAllRows() + { + var (store, _) = CreateStore(nameof(ReadChangedSinceAsync_FromMinValue_ReturnsAllRows)); + await using var _store = store; + + await store.RecordEnqueueAsync(TrackedOperationId.New(), nameof(AuditKind.ApiCallCached), "A", null, null, null); + await store.RecordEnqueueAsync(TrackedOperationId.New(), nameof(AuditKind.ApiCallCached), "B", null, null, null); + + var result = await store.ReadChangedSinceAsync(DateTime.MinValue, batchSize: 100, CancellationToken.None); + + Assert.Equal(2, result.Count); + } + + [Fact] + public async Task ReadChangedSinceAsync_IsBatchCapped() + { + var (store, dataSource) = CreateStore(nameof(ReadChangedSinceAsync_IsBatchCapped)); + await using var _store = store; + + var ids = new List(); + var t0 = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc); + for (var i = 0; i < 5; i++) + { + var id = TrackedOperationId.New(); + ids.Add(id); + await store.RecordEnqueueAsync(id, nameof(AuditKind.ApiCallCached), $"T{i}", null, null, null); + SetUpdatedAt(dataSource, id, t0.AddMinutes(i)); + } + + var result = await store.ReadChangedSinceAsync(DateTime.MinValue, batchSize: 3, CancellationToken.None); + + // Capped to 3 — and the cap takes the OLDEST 3 (asc order) so the + // caller can advance the cursor monotonically across follow-up pulls. + Assert.Equal(3, result.Count); + Assert.Equal(ids[0], result[0].TrackedOperationId); + Assert.Equal(ids[1], result[1].TrackedOperationId); + Assert.Equal(ids[2], result[2].TrackedOperationId); + } + + [Fact] + public async Task ReadChangedSinceAsync_MapsTrackingRowOntoSiteCallOperational() + { + var (store, _) = CreateStore(nameof(ReadChangedSinceAsync_MapsTrackingRowOntoSiteCallOperational)); + await using var _store = store; + + var apiId = TrackedOperationId.New(); + var dbId = TrackedOperationId.New(); + await store.RecordEnqueueAsync(apiId, nameof(AuditKind.ApiCallCached), "ERP.GetOrder", "inst-1", "ScriptActor:OnTick", "node-a"); + await store.RecordEnqueueAsync(dbId, nameof(AuditKind.DbWriteCached), "Historian.Write", null, null, "node-b"); + await store.RecordAttemptAsync(apiId, nameof(AuditStatus.Attempted), 2, "HTTP 503", 503); + await store.RecordTerminalAsync(dbId, nameof(AuditStatus.Parked), "max retries", null); + + var result = await store.ReadChangedSinceAsync(DateTime.MinValue, batchSize: 100, CancellationToken.None); + var api = result.Single(r => r.TrackedOperationId == apiId); + var db = result.Single(r => r.TrackedOperationId == dbId); + + // Kind → Channel projection. + Assert.Equal("ApiOutbound", api.Channel); + Assert.Equal("DbOutbound", db.Channel); + + // TargetSummary → Target; SourceNode carried verbatim. + Assert.Equal("ERP.GetOrder", api.Target); + Assert.Equal("node-a", api.SourceNode); + Assert.Equal("node-b", db.SourceNode); + + // Status / RetryCount / LastError / HttpStatus carried through. + Assert.Equal(nameof(AuditStatus.Attempted), api.Status); + Assert.Equal(2, api.RetryCount); + Assert.Equal("HTTP 503", api.LastError); + Assert.Equal(503, api.HttpStatus); + + // SourceSite is left empty by the store (the site id is not a tracking + // column); the central client re-stamps it from the dialed siteId. + Assert.Equal(string.Empty, api.SourceSite); + + // Terminal row carries TerminalAtUtc (UTC kind); active row leaves it null. + Assert.Null(api.TerminalAtUtc); + Assert.NotNull(db.TerminalAtUtc); + Assert.Equal(DateTimeKind.Utc, db.TerminalAtUtc!.Value.Kind); + + // Timestamps round-trip as UTC. + Assert.Equal(DateTimeKind.Utc, api.CreatedAtUtc.Kind); + Assert.Equal(DateTimeKind.Utc, api.UpdatedAtUtc.Kind); + } + + /// Directly sets a row's UpdatedAtUtc so cursor/ordering tests are deterministic. + private static void SetUpdatedAt(string dataSource, TrackedOperationId id, DateTime updatedAtUtc) + { + using var connection = OpenVerifierConnection(dataSource); + using var cmd = connection.CreateCommand(); + cmd.CommandText = "UPDATE OperationTracking SET UpdatedAtUtc = $u WHERE TrackedOperationId = $id;"; + cmd.Parameters.AddWithValue("$u", updatedAtUtc.ToString("o", System.Globalization.CultureInfo.InvariantCulture)); + cmd.Parameters.AddWithValue("$id", id.ToString()); + cmd.ExecuteNonQuery(); + } + // ── SiteRuntime-024: read/write split + sync-safe Dispose ────────────── [Fact]