diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/AuditLogPurgeOptions.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/AuditLogPurgeOptions.cs
index 43d5bc74..0ba1bc00 100644
--- a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/AuditLogPurgeOptions.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/AuditLogPurgeOptions.cs
@@ -17,8 +17,10 @@ namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central;
///
///
/// exists for tests to drop the cadence to
-/// milliseconds without polluting the production config surface; production
-/// binds only.
+/// milliseconds; production config is expected to set
+/// only. Because this options class is Bind-ed wholesale, a config value
+/// at AuditLog:Purge:IntervalOverride would bind if present (and would
+/// bypass the minimum clamp) — operators must not set it.
///
///
public sealed class AuditLogPurgeOptions
@@ -29,15 +31,44 @@ public sealed class AuditLogPurgeOptions
///
/// Test-only override for finer control over the tick cadence than
/// whole-hour resolution allows. When non-null, takes precedence over
- /// . Not bound from config — production
- /// config exposes only.
+ /// AND bypasses the
+ /// minimum clamp (so tests can use millisecond cadences). Production
+ /// config exposes only and never sets this
+ /// knob — but because the options class is Bind-ed wholesale, a
+ /// config value at AuditLog:Purge:IntervalOverride WOULD bind if
+ /// present; operators must not set it.
///
public TimeSpan? IntervalOverride { get; set; }
///
- /// Resolves the effective tick interval, honouring the test override
- /// when set. Falls back to .
+ /// Minimum interval the config-bound can
+ /// resolve to. Clamps a misconfigured IntervalHours: 0 (or a
+ /// negative value) away from — a zero
+ /// interval would make Akka's ScheduleTellRepeatedlyCancelable
+ /// spin, looping the partition drop/rebuild dance into a sustained SQL
+ /// outage. The test-only bypasses this
+ /// clamp so unit tests can still drop the cadence to milliseconds.
///
- public TimeSpan Interval =>
- IntervalOverride ?? TimeSpan.FromHours(IntervalHours);
+ private static readonly TimeSpan MinConfiguredInterval = TimeSpan.FromMinutes(1);
+
+ ///
+ /// Resolves the effective tick interval, honouring the test override
+ /// when set. Falls back to , clamped to at
+ /// least so a zero/negative config
+ /// value can never yield (which would spin
+ /// the scheduler).
+ ///
+ public TimeSpan Interval
+ {
+ get
+ {
+ if (IntervalOverride is { } overrideValue)
+ {
+ return overrideValue;
+ }
+
+ var resolved = TimeSpan.FromHours(IntervalHours);
+ return resolved < MinConfiguredInterval ? MinConfiguredInterval : resolved;
+ }
+ }
}
diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullAuditEventsClient.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullAuditEventsClient.cs
new file mode 100644
index 00000000..bad3e88f
--- /dev/null
+++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullAuditEventsClient.cs
@@ -0,0 +1,289 @@
+using System.Collections.Concurrent;
+using Google.Protobuf.WellKnownTypes;
+using Grpc.Core;
+using Grpc.Net.Client;
+using Microsoft.Extensions.Logging;
+using ZB.MOM.WW.ScadaBridge.Communication;
+using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
+using ProtoPullRequest = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest;
+using ProtoPullResponse = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse;
+using PullAuditEventsResponse = ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration.PullAuditEventsResponse;
+
+namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central;
+
+///
+/// Production (Audit Log #23, M6) that the
+/// central uses to pull the next
+/// reconciliation batch from a site over the PullAuditEvents unary gRPC
+/// RPC served by SiteStreamGrpcServer.
+///
+///
+///
+/// Endpoint resolution. The actor passes only a siteId; this
+/// client resolves it to a gRPC authority via
+/// () on every call so a NodeA→NodeB
+/// failover flip or an edited site address takes effect on the next tick — the
+/// same liveness guarantee SiteStreamGrpcClientFactory gives the
+/// real-time stream. A site with no registered endpoint yields an empty
+/// response (no dial); reconciliation simply has nothing to pull from it.
+///
+///
+/// Fault tolerance. Per the
+/// contract, tolerable transport faults (connection refused / site offline =
+/// , slow site = ,
+/// shutdown = , plus bare
+/// / SocketException before a gRPC
+/// status is established) are caught and collapsed to an empty response — one
+/// offline site must never sink the rest of the reconciliation tick. Any other
+/// fault (e.g. a malformed reply that fails DTO mapping) is also swallowed to
+/// empty: audit reconciliation is best-effort and a throw would only get
+/// re-caught by the actor's own per-site guard.
+///
+///
+/// Testability. The unary call is reached through the
+/// seam. Production binds
+/// (one cached
+/// per endpoint, keepalive from ); unit tests
+/// inject a fake invoker so no real HTTP/2 endpoint is required.
+///
+///
+public sealed class GrpcPullAuditEventsClient : IPullAuditEventsClient
+{
+ private readonly ISiteEnumerator _sites;
+ private readonly IPullAuditEventsInvoker _invoker;
+ private readonly ILogger _logger;
+
+ ///
+ /// Creates the client over the given site enumerator and unary-call invoker.
+ ///
+ /// Resolves a siteId to its gRPC endpoint.
+ /// Seam that issues the PullAuditEvents unary RPC against a resolved endpoint.
+ /// Logger for transport-fault diagnostics.
+ public GrpcPullAuditEventsClient(
+ ISiteEnumerator sites,
+ IPullAuditEventsInvoker invoker,
+ ILogger logger)
+ {
+ _sites = sites ?? throw new ArgumentNullException(nameof(sites));
+ _invoker = invoker ?? throw new ArgumentNullException(nameof(invoker));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ }
+
+ ///
+ public async Task PullAsync(
+ string siteId,
+ DateTime sinceUtc,
+ int batchSize,
+ CancellationToken ct)
+ {
+ var endpoint = await ResolveEndpointAsync(siteId, ct).ConfigureAwait(false);
+ if (endpoint is null)
+ {
+ // No gRPC address registered for the site — absence of an address is
+ // a configuration decision (mirrors ISiteEnumerator's own contract),
+ // not a runtime error, so there is simply nothing to pull.
+ _logger.LogDebug(
+ "PullAuditEvents skipped: no gRPC endpoint registered for site {SiteId}.", siteId);
+ return Empty;
+ }
+
+ var request = new ProtoPullRequest
+ {
+ // ReadPendingSinceAsync treats DateTime.MinValue as "from the start";
+ // EnsureUtc keeps Timestamp.FromDateTime happy (it requires UTC kind).
+ SinceUtc = Timestamp.FromDateTime(EnsureUtc(sinceUtc)),
+ BatchSize = batchSize,
+ };
+
+ ProtoPullResponse reply;
+ try
+ {
+ reply = await _invoker.InvokeAsync(endpoint, request, ct).ConfigureAwait(false);
+ }
+ catch (RpcException ex) when (IsTolerable(ex.StatusCode))
+ {
+ _logger.LogDebug(ex,
+ "PullAuditEvents tolerable transport fault for site {SiteId} ({Endpoint}): {Status}. Returning empty batch.",
+ siteId, endpoint, ex.StatusCode);
+ return Empty;
+ }
+ catch (Exception ex) when (ex is HttpRequestException or System.Net.Sockets.SocketException)
+ {
+ _logger.LogDebug(ex,
+ "PullAuditEvents connection-layer fault for site {SiteId} ({Endpoint}). Returning empty batch.",
+ siteId, endpoint);
+ return Empty;
+ }
+ catch (OperationCanceledException)
+ {
+ // Reconciliation tick was cancelled — either the caller's token
+ // (host shutdown / scope dispose) or an internal gRPC deadline /
+ // linked-CTS cancellation. Both are tolerable for a best-effort
+ // pull; collapse to empty rather than letting an internal
+ // cancellation land noisily in the catch-all below.
+ return Empty;
+ }
+ catch (Exception ex)
+ {
+ // Any other fault (e.g. a malformed reply that fails DTO mapping
+ // below would actually surface here only if mapping moved inline,
+ // but a non-RpcException transport fault wrapper lands here too).
+ // Audit reconciliation is best-effort; swallow to empty rather than
+ // throw — the actor's per-site guard would only re-catch it.
+ _logger.LogWarning(ex,
+ "PullAuditEvents unexpected fault for site {SiteId} ({Endpoint}). Returning empty batch.",
+ siteId, endpoint);
+ return Empty;
+ }
+
+ // Map proto DTOs to canonical AuditEvent records and order oldest-first
+ // (the wire is already ordered by the site queue, but the
+ // IPullAuditEventsClient contract is explicit, so sort defensively).
+ var events = reply.Events
+ .Select(AuditEventDtoMapper.FromDto)
+ .OrderBy(e => e.OccurredAtUtc)
+ .ToList();
+
+ return new PullAuditEventsResponse(events, reply.MoreAvailable);
+ }
+
+ private async Task ResolveEndpointAsync(string siteId, CancellationToken ct)
+ {
+ var sites = await _sites.EnumerateAsync(ct).ConfigureAwait(false);
+ foreach (var site in sites)
+ {
+ if (string.Equals(site.SiteId, siteId, StringComparison.Ordinal) &&
+ !string.IsNullOrWhiteSpace(site.GrpcEndpoint))
+ {
+ return site.GrpcEndpoint;
+ }
+ }
+ return null;
+ }
+
+ private static readonly PullAuditEventsResponse Empty =
+ new(Array.Empty(), MoreAvailable: false);
+
+ private static bool IsTolerable(StatusCode code) => code is
+ StatusCode.Unavailable or
+ StatusCode.DeadlineExceeded or
+ StatusCode.Cancelled;
+
+ // All ScadaBridge timestamps are UTC by invariant. A non-UTC cursor (the
+ // reconciliation cursor starts at DateTime.MinValue, Kind=Unspecified) is
+ // therefore treated AS UTC — never ToUniversalTime()-converted: on a host
+ // with a positive UTC offset MinValue.ToUniversalTime() underflows and
+ // Timestamp.FromDateTime throws, crashing the first pull for every site.
+ private static DateTime EnsureUtc(DateTime value) =>
+ value.Kind == DateTimeKind.Utc ? value : DateTime.SpecifyKind(value, DateTimeKind.Utc);
+
+ ///
+ /// Seam over the PullAuditEvents unary gRPC call against a resolved
+ /// site endpoint. Extracted so can
+ /// be unit-tested without a real . Production binds
+ /// .
+ ///
+ public interface IPullAuditEventsInvoker
+ {
+ ///
+ /// Issues the PullAuditEvents unary RPC against .
+ /// May throw /
+ /// on transport faults — the caller classifies and swallows tolerable ones.
+ ///
+ /// The site gRPC authority (e.g. http://site-a:8083).
+ /// The wire-format pull request.
+ /// Cancellation token.
+ /// The wire-format pull response.
+ Task InvokeAsync(string endpoint, ProtoPullRequest request, CancellationToken ct);
+ }
+}
+
+///
+/// Production :
+/// caches one per endpoint (keepalive from
+/// , mirroring SiteStreamGrpcClient)
+/// and issues the unary PullAuditEventsAsync call. The cache is keyed by
+/// endpoint string, so a changed site address (NodeA→NodeB failover flip / an
+/// edited gRPC address) is reached as soon as the resolver hands the new
+/// endpoint to — it creates a fresh channel for the
+/// new address. Unlike SiteStreamGrpcClientFactory (keyed by siteId,
+/// which actively evicts a re-keyed client), the channel for the previous
+/// address is NOT actively evicted here; it lingers idle until
+/// . Idle channels hold no streams, so this is a minor
+/// cache footprint cost, not a correctness or liveness gap.
+///
+public sealed class GrpcPullAuditEventsInvoker
+ : GrpcPullAuditEventsClient.IPullAuditEventsInvoker, IDisposable
+{
+ private readonly ConcurrentDictionary _channels = new(StringComparer.Ordinal);
+ private readonly CommunicationOptions _options;
+
+ ///
+ /// Creates the invoker using default .
+ ///
+ public GrpcPullAuditEventsInvoker()
+ : this(new CommunicationOptions())
+ {
+ }
+
+ ///
+ /// Creates the invoker, applying the configured gRPC keepalive settings to
+ /// every channel it opens.
+ ///
+ /// Communication options supplying gRPC keepalive timings.
+ public GrpcPullAuditEventsInvoker(CommunicationOptions options)
+ {
+ _options = options ?? throw new ArgumentNullException(nameof(options));
+ }
+
+ ///
+ public async Task InvokeAsync(
+ string endpoint, ProtoPullRequest request, CancellationToken ct)
+ {
+ var channel = GetOrCreateChannel(endpoint);
+ var client = new SiteStreamService.SiteStreamServiceClient(channel);
+ using var call = client.PullAuditEventsAsync(request, cancellationToken: ct);
+ return await call.ResponseAsync.ConfigureAwait(false);
+ }
+
+ // Race-safe channel cache. ConcurrentDictionary.GetOrAdd(key, valueFactory)
+ // does NOT serialize the factory, so two concurrent first dials of the same
+ // endpoint can both build a GrpcChannel (each holds an HTTP/2 connection
+ // pool) and the loser would leak. Create-then-GetOrAdd-then-dispose-if-lost
+ // mirrors SiteStreamGrpcClientFactory: only the channel actually installed
+ // survives; a channel that lost the race is disposed immediately.
+ private GrpcChannel GetOrCreateChannel(string endpoint)
+ {
+ if (!_channels.TryGetValue(endpoint, out var channel))
+ {
+ var created = CreateChannel(endpoint);
+ channel = _channels.GetOrAdd(endpoint, created);
+ if (!ReferenceEquals(channel, created))
+ {
+ created.Dispose();
+ }
+ }
+ return channel;
+ }
+
+ private GrpcChannel CreateChannel(string endpoint) =>
+ GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions
+ {
+ HttpHandler = new SocketsHttpHandler
+ {
+ KeepAlivePingDelay = _options.GrpcKeepAlivePingDelay,
+ KeepAlivePingTimeout = _options.GrpcKeepAlivePingTimeout,
+ KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always,
+ },
+ });
+
+ /// Disposes all cached channels.
+ public void Dispose()
+ {
+ foreach (var channel in _channels.Values)
+ {
+ channel.Dispose();
+ }
+ _channels.Clear();
+ }
+}
diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullSiteCallsClient.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullSiteCallsClient.cs
new file mode 100644
index 00000000..350ee1ac
--- /dev/null
+++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/GrpcPullSiteCallsClient.cs
@@ -0,0 +1,304 @@
+using System.Collections.Concurrent;
+using Google.Protobuf.WellKnownTypes;
+using Grpc.Core;
+using Grpc.Net.Client;
+using Microsoft.Extensions.Logging;
+using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
+using ZB.MOM.WW.ScadaBridge.Communication;
+using ZB.MOM.WW.ScadaBridge.Communication.Grpc;
+using ProtoPullRequest = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest;
+using ProtoPullResponse = ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsResponse;
+using PullSiteCallsResponse = ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration.PullSiteCallsResponse;
+
+namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central;
+
+///
+/// Production (Site Call Audit #22) that the
+/// central reconciliation tick (a separate follow-up component) uses to pull the
+/// next batch of cached-call operational rows from a site over the
+/// PullSiteCalls unary gRPC RPC served by SiteStreamGrpcServer.
+/// A near-exact sibling of .
+///
+///
+///
+/// Endpoint resolution. The caller passes only a siteId; this
+/// client resolves it to a gRPC authority via
+/// () on every call so a NodeA→NodeB
+/// failover flip or an edited site address takes effect on the next tick. A site
+/// with no registered endpoint yields an empty response (no dial).
+///
+///
+/// SourceSite re-stamp. The site leaves
+/// SiteCallOperationalDto.SourceSite empty (the tracking store has no
+/// site-id column). This client is the authority that knows which site it
+/// dialed, so it re-stamps the mapped from
+/// siteId — the same "re-stamp from the forwarder's own id" pattern the
+/// site push path uses.
+///
+///
+/// Fault tolerance. Per the contract,
+/// tolerable transport faults (,
+/// , ,
+/// bare / SocketException) are caught
+/// and collapsed to an empty response so one offline site never sinks the rest
+/// of the reconciliation tick. Any other transport/protocol fault is also
+/// swallowed to empty: reconciliation is best-effort. Per-row DTO mapping faults
+/// (e.g. a single unparseable TrackedOperationId) are narrower still —
+/// the offending row is skipped+logged and the rest of the batch is returned.
+///
+///
+/// Testability. The unary call is reached through the
+/// seam. Production binds
+/// (one cached
+/// per endpoint, keepalive from ); unit tests
+/// inject a fake invoker so no real HTTP/2 endpoint is required.
+///
+///
+public sealed class GrpcPullSiteCallsClient : IPullSiteCallsClient
+{
+ private readonly ISiteEnumerator _sites;
+ private readonly IPullSiteCallsInvoker _invoker;
+ private readonly ILogger _logger;
+
+ ///
+ /// Creates the client over the given site enumerator and unary-call invoker.
+ ///
+ /// Resolves a siteId to its gRPC endpoint.
+ /// Seam that issues the PullSiteCalls unary RPC against a resolved endpoint.
+ /// Logger for transport-fault diagnostics.
+ public GrpcPullSiteCallsClient(
+ ISiteEnumerator sites,
+ IPullSiteCallsInvoker invoker,
+ ILogger logger)
+ {
+ _sites = sites ?? throw new ArgumentNullException(nameof(sites));
+ _invoker = invoker ?? throw new ArgumentNullException(nameof(invoker));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ }
+
+ ///
+ public async Task PullAsync(
+ string siteId,
+ DateTime sinceUtc,
+ int batchSize,
+ CancellationToken ct)
+ {
+ var endpoint = await ResolveEndpointAsync(siteId, ct).ConfigureAwait(false);
+ if (endpoint is null)
+ {
+ // No gRPC address registered for the site — a configuration decision
+ // (mirrors ISiteEnumerator's own contract), not a runtime error, so
+ // there is simply nothing to pull.
+ _logger.LogDebug(
+ "PullSiteCalls skipped: no gRPC endpoint registered for site {SiteId}.", siteId);
+ return Empty;
+ }
+
+ var request = new ProtoPullRequest
+ {
+ // ReadChangedSinceAsync treats DateTime.MinValue as "from the start";
+ // EnsureUtc keeps Timestamp.FromDateTime happy (it requires UTC kind).
+ SinceUtc = Timestamp.FromDateTime(EnsureUtc(sinceUtc)),
+ BatchSize = batchSize,
+ };
+
+ ProtoPullResponse reply;
+ try
+ {
+ reply = await _invoker.InvokeAsync(endpoint, request, ct).ConfigureAwait(false);
+ }
+ catch (RpcException ex) when (IsTolerable(ex.StatusCode))
+ {
+ _logger.LogDebug(ex,
+ "PullSiteCalls tolerable transport fault for site {SiteId} ({Endpoint}): {Status}. Returning empty batch.",
+ siteId, endpoint, ex.StatusCode);
+ return Empty;
+ }
+ catch (Exception ex) when (ex is HttpRequestException or System.Net.Sockets.SocketException)
+ {
+ _logger.LogDebug(ex,
+ "PullSiteCalls connection-layer fault for site {SiteId} ({Endpoint}). Returning empty batch.",
+ siteId, endpoint);
+ return Empty;
+ }
+ catch (OperationCanceledException)
+ {
+ // Reconciliation tick cancelled — caller token (host shutdown) or an
+ // internal gRPC deadline / linked-CTS cancellation. Both tolerable for
+ // a best-effort pull; collapse to empty rather than landing noisily in
+ // the catch-all below.
+ return Empty;
+ }
+ catch (Exception ex)
+ {
+ // Any other fault. Reconciliation is best-effort; swallow to empty
+ // rather than throw — the (future) actor's per-site guard would only
+ // re-catch it.
+ _logger.LogWarning(ex,
+ "PullSiteCalls unexpected fault for site {SiteId} ({Endpoint}). Returning empty batch.",
+ siteId, endpoint);
+ return Empty;
+ }
+
+ // Map proto DTOs to central SiteCall entities PER-ROW so one malformed
+ // operational (e.g. an unparseable TrackedOperationId) is skipped+logged
+ // rather than sinking the whole batch through the outer catch-all. Each
+ // survivor is re-stamped with SourceSite from the dialed siteId (the site
+ // leaves it empty).
+ var siteCalls = new List(reply.Operationals.Count);
+ foreach (var dto in reply.Operationals)
+ {
+ try
+ {
+ var sc = SiteCallDtoMapper.FromDto(dto) with { SourceSite = siteId };
+ siteCalls.Add(sc);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex,
+ "PullSiteCalls dropped a malformed operational row from site {SiteId} (id='{Id}'); continuing with the rest of the batch.",
+ siteId, dto.TrackedOperationId);
+ }
+ }
+
+ // Order oldest-first by UpdatedAtUtc (the wire is already ordered by the
+ // site read, but the contract is explicit, so sort defensively).
+ siteCalls.Sort((a, b) => a.UpdatedAtUtc.CompareTo(b.UpdatedAtUtc));
+
+ return new PullSiteCallsResponse(siteCalls, reply.MoreAvailable);
+ }
+
+ private async Task ResolveEndpointAsync(string siteId, CancellationToken ct)
+ {
+ var sites = await _sites.EnumerateAsync(ct).ConfigureAwait(false);
+ foreach (var site in sites)
+ {
+ if (string.Equals(site.SiteId, siteId, StringComparison.Ordinal) &&
+ !string.IsNullOrWhiteSpace(site.GrpcEndpoint))
+ {
+ return site.GrpcEndpoint;
+ }
+ }
+ return null;
+ }
+
+ private static readonly PullSiteCallsResponse Empty =
+ new(Array.Empty(), MoreAvailable: false);
+
+ private static bool IsTolerable(StatusCode code) => code is
+ StatusCode.Unavailable or
+ StatusCode.DeadlineExceeded or
+ StatusCode.Cancelled;
+
+ // All ScadaBridge timestamps are UTC by invariant. A non-UTC cursor (the
+ // reconciliation cursor starts at DateTime.MinValue, Kind=Unspecified) is
+ // treated AS UTC — never ToUniversalTime()-converted: on a host with a
+ // positive UTC offset MinValue.ToUniversalTime() underflows and
+ // Timestamp.FromDateTime throws, crashing the first pull for every site.
+ private static DateTime EnsureUtc(DateTime value) =>
+ value.Kind == DateTimeKind.Utc ? value : DateTime.SpecifyKind(value, DateTimeKind.Utc);
+
+ ///
+ /// Seam over the PullSiteCalls unary gRPC call against a resolved site
+ /// endpoint. Extracted so can be
+ /// unit-tested without a real . Production binds
+ /// .
+ ///
+ public interface IPullSiteCallsInvoker
+ {
+ ///
+ /// Issues the PullSiteCalls unary RPC against .
+ /// May throw /
+ /// on transport faults — the caller classifies and swallows tolerable ones.
+ ///
+ /// The site gRPC authority (e.g. http://site-a:8083).
+ /// The wire-format pull request.
+ /// Cancellation token.
+ /// The wire-format pull response.
+ Task InvokeAsync(string endpoint, ProtoPullRequest request, CancellationToken ct);
+ }
+}
+
+///
+/// Production : caches
+/// one per endpoint (keepalive from
+/// , mirroring SiteStreamGrpcClient) and
+/// issues the unary PullSiteCallsAsync call. The cache is keyed by
+/// endpoint string, so a changed site address (NodeA→NodeB failover flip / an
+/// edited gRPC address) is reached as soon as the resolver hands the new endpoint
+/// to . The channel for a previous address lingers idle
+/// until (idle channels hold no streams — a minor cache
+/// footprint cost, not a correctness or liveness gap). Sibling of
+/// .
+///
+public sealed class GrpcPullSiteCallsInvoker
+ : GrpcPullSiteCallsClient.IPullSiteCallsInvoker, IDisposable
+{
+ private readonly ConcurrentDictionary _channels = new(StringComparer.Ordinal);
+ private readonly CommunicationOptions _options;
+
+ /// Creates the invoker using default .
+ public GrpcPullSiteCallsInvoker()
+ : this(new CommunicationOptions())
+ {
+ }
+
+ ///
+ /// Creates the invoker, applying the configured gRPC keepalive settings to
+ /// every channel it opens.
+ ///
+ /// Communication options supplying gRPC keepalive timings.
+ public GrpcPullSiteCallsInvoker(CommunicationOptions options)
+ {
+ _options = options ?? throw new ArgumentNullException(nameof(options));
+ }
+
+ ///
+ public async Task InvokeAsync(
+ string endpoint, ProtoPullRequest request, CancellationToken ct)
+ {
+ var channel = GetOrCreateChannel(endpoint);
+ var client = new SiteStreamService.SiteStreamServiceClient(channel);
+ using var call = client.PullSiteCallsAsync(request, cancellationToken: ct);
+ return await call.ResponseAsync.ConfigureAwait(false);
+ }
+
+ // Race-safe channel cache (create-then-GetOrAdd-then-dispose-if-lost): two
+ // concurrent first dials of the same endpoint can both build a GrpcChannel;
+ // only the channel actually installed survives, the loser is disposed.
+ // Mirrors SiteStreamGrpcClientFactory / GrpcPullAuditEventsInvoker.
+ private GrpcChannel GetOrCreateChannel(string endpoint)
+ {
+ if (!_channels.TryGetValue(endpoint, out var channel))
+ {
+ var created = CreateChannel(endpoint);
+ channel = _channels.GetOrAdd(endpoint, created);
+ if (!ReferenceEquals(channel, created))
+ {
+ created.Dispose();
+ }
+ }
+ return channel;
+ }
+
+ private GrpcChannel CreateChannel(string endpoint) =>
+ GrpcChannel.ForAddress(endpoint, new GrpcChannelOptions
+ {
+ HttpHandler = new SocketsHttpHandler
+ {
+ KeepAlivePingDelay = _options.GrpcKeepAlivePingDelay,
+ KeepAlivePingTimeout = _options.GrpcKeepAlivePingTimeout,
+ KeepAlivePingPolicy = HttpKeepAlivePingPolicy.Always,
+ },
+ });
+
+ /// Disposes all cached channels.
+ public void Dispose()
+ {
+ foreach (var channel in _channels.Values)
+ {
+ channel.Dispose();
+ }
+ _channels.Clear();
+ }
+}
diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/IPullSiteCallsClient.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/IPullSiteCallsClient.cs
new file mode 100644
index 00000000..c22d5706
--- /dev/null
+++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/IPullSiteCallsClient.cs
@@ -0,0 +1,57 @@
+using ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration;
+
+namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central;
+
+///
+/// Mockable abstraction over the central-side PullSiteCalls gRPC client
+/// surface used by the Site Call Audit (#22) reconciliation tick to fetch the
+/// next batch of cached-call operational rows from a specific site — the
+/// documented periodic self-heal pull that backfills the eventually-consistent
+/// central SiteCalls mirror when best-effort push telemetry is lost.
+/// Extracted so the (separate, follow-up) reconciliation actor can be
+/// unit-tested against an in-memory stub without standing up a real
+/// GrpcChannel per site.
+///
+///
+///
+/// The home is ZB.MOM.WW.ScadaBridge.AuditLog.Central rather than the
+/// ZB.MOM.WW.ScadaBridge.SiteCallAudit project so it can reuse the
+/// / endpoint-resolution
+/// abstraction that already lives here (and that the sibling
+/// uses) — SiteCallAudit does not reference
+/// AuditLog, so hosting the client there would mean duplicating the enumerator.
+/// This mirrors the decision to keep in
+/// ZB.MOM.WW.ScadaBridge.Communication.
+///
+///
+/// Implementations MUST NOT throw on transport faults the reconciliation tick
+/// can tolerate (connection refused, deadline exceeded, cancellation) — one
+/// offline site must never sink the rest of the tick. The
+/// are returned oldest-first by
+/// UpdatedAtUtc with the SourceSite re-stamped from the dialed
+/// site id (the site leaves it empty, being unaware of its own id), and a
+/// MoreAvailable flag the caller uses to decide whether to fire another
+/// pull immediately.
+///
+///
+public interface IPullSiteCallsClient
+{
+ ///
+ /// Issues a PullSiteCalls RPC against the site whose gRPC endpoint is
+ /// registered against . Returns the next batch of
+ /// rows
+ /// ordered oldest-first (with SourceSite re-stamped from
+ /// ) AND a MoreAvailable flag the caller uses
+ /// to decide whether to fire another pull immediately.
+ ///
+ /// The identifier of the site to pull cached-call operational rows from.
+ /// Only rows with an UpdatedAtUtc at or after this cursor time are returned.
+ /// Maximum number of rows to return per call.
+ /// Cancellation token.
+ /// A task that resolves to the next reconciliation batch with a MoreAvailable flag.
+ Task PullAsync(
+ string siteId,
+ DateTime sinceUtc,
+ int batchSize,
+ CancellationToken ct);
+}
diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/ISiteEnumerator.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/ISiteEnumerator.cs
index cc8cae1f..25a5a4c7 100644
--- a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/ISiteEnumerator.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/ISiteEnumerator.cs
@@ -9,11 +9,12 @@ namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central;
///
/// The production implementation wraps ISiteRepository.GetAllSitesAsync
/// and projects each Site to a using the
-/// site's configured GrpcNodeAAddress (falling back to
-/// GrpcNodeBAddress when NodeA is unset). Sites with NO gRPC address
-/// configured are silently skipped — the reconciliation pull cannot reach
-/// them, but absence of an address is a configuration decision, not a runtime
-/// error.
+/// site's configured GrpcNodeAAddress. This is a NodeA-only first cut:
+/// sites with a blank GrpcNodeAAddress are silently SKIPPED — the
+/// reconciliation pull cannot reach them, but absence of an address is a
+/// configuration decision, not a runtime error. NodeB-fallback endpoint
+/// selection (dial NodeB when NodeA is unset/unreachable) is a follow-up
+/// (mirrors the comment in SiteEnumerator.cs).
///
public interface ISiteEnumerator
{
diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/SiteAuditReconciliationActor.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/SiteAuditReconciliationActor.cs
index fb08bc57..8c6a297b 100644
--- a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/SiteAuditReconciliationActor.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/SiteAuditReconciliationActor.cs
@@ -182,6 +182,10 @@ public class SiteAuditReconciliationActor : ReceiveActor
IReadOnlyList sites;
try
{
+ // No ambient CancellationToken in a ReceiveActor message handler —
+ // CancellationToken.None (the EnumerateAsync default) is intentional.
+ // The work is bounded by the 5-min reconciliation tick plus the
+ // 10s graceful-stop drain on PhaseClusterLeave.
sites = await _sites.EnumerateAsync().ConfigureAwait(false);
}
catch (Exception ex)
diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/SiteAuditReconciliationOptions.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/SiteAuditReconciliationOptions.cs
index 31796ad9..b58b3fb4 100644
--- a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/SiteAuditReconciliationOptions.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/SiteAuditReconciliationOptions.cs
@@ -31,18 +31,45 @@ public sealed class SiteAuditReconciliationOptions
///
/// Test-only override for finer control over the tick cadence than
/// whole-second resolution allows. When non-null, takes precedence over
- /// . Not bound from config —
- /// production config exposes
- /// only.
+ /// AND bypasses the
+ /// minimum clamp (so tests can use
+ /// millisecond cadences). Production config exposes
+ /// only and never sets this
+ /// knob — but because the options class is Bind-ed wholesale, a
+ /// config value at AuditLog:Reconciliation:ReconciliationIntervalOverride
+ /// WOULD bind if present; operators must not set it.
///
public TimeSpan? ReconciliationIntervalOverride { get; set; }
///
- /// Resolves the effective tick interval, honouring the test override when
- /// set. Falls back to .
+ /// Minimum interval the config-bound
+ /// can resolve to. Clamps a misconfigured ReconciliationIntervalSeconds: 0
+ /// (or a negative value) away from , which would make
+ /// Akka's ScheduleTellRepeatedlyCancelable spin. The test-only
+ /// bypasses this clamp so unit tests
+ /// can still drop the cadence to milliseconds.
///
- public TimeSpan ReconciliationInterval =>
- ReconciliationIntervalOverride ?? TimeSpan.FromSeconds(ReconciliationIntervalSeconds);
+ private static readonly TimeSpan MinConfiguredInterval = TimeSpan.FromSeconds(1);
+
+ ///
+ /// Resolves the effective tick interval, honouring the test override when
+ /// set. Falls back to , clamped to at
+ /// least so a zero/negative config value can
+ /// never yield (which would spin the scheduler).
+ ///
+ public TimeSpan ReconciliationInterval
+ {
+ get
+ {
+ if (ReconciliationIntervalOverride is { } overrideValue)
+ {
+ return overrideValue;
+ }
+
+ var resolved = TimeSpan.FromSeconds(ReconciliationIntervalSeconds);
+ return resolved < MinConfiguredInterval ? MinConfiguredInterval : resolved;
+ }
+ }
///
/// Maximum number of
diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/SiteEnumerator.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/SiteEnumerator.cs
new file mode 100644
index 00000000..357ee4ee
--- /dev/null
+++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/Central/SiteEnumerator.cs
@@ -0,0 +1,77 @@
+using Microsoft.Extensions.DependencyInjection;
+using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
+
+namespace ZB.MOM.WW.ScadaBridge.AuditLog.Central;
+
+///
+/// Production backing the central
+/// . Enumerates the configured sites
+/// from the config DB via and
+/// projects each site to a using the site's
+/// SiteIdentifier as the cursor key and its GrpcNodeAAddress as
+/// the dial target.
+///
+///
+///
+/// Scope-per-call. is a SCOPED EF Core
+/// service (registered by AddConfigurationDatabase); resolving it from
+/// the root provider would fail DI scope validation. The enumerator therefore
+/// takes the root and opens one
+/// CreateAsyncScope per call — mirroring the
+/// per-tick scope pattern in .
+///
+///
+/// Blank-address skip. Sites with no GrpcNodeAAddress configured
+/// are silently skipped: the reconciliation pull cannot dial them, but absence
+/// of an address is a configuration decision, not a runtime error (per the
+/// contract).
+///
+///
+/// NodeA-only first cut. This implementation always uses NodeA's gRPC
+/// address. NodeA/NodeB failover endpoint selection (dial NodeB when NodeA is
+/// unreachable) is a follow-up — the shape already
+/// carries a single endpoint, so failover will live in the puller/client, not
+/// here.
+///
+///
+public sealed class SiteEnumerator : ISiteEnumerator
+{
+ private readonly IServiceProvider _services;
+
+ ///
+ /// Initializes the enumerator with the root service provider used to open a
+ /// fresh DI scope per enumeration call.
+ ///
+ /// Root service provider for resolving the scoped .
+ public SiteEnumerator(IServiceProvider services)
+ {
+ ArgumentNullException.ThrowIfNull(services);
+ _services = services;
+ }
+
+ ///
+ public async Task> EnumerateAsync(CancellationToken ct = default)
+ {
+ await using var scope = _services.CreateAsyncScope();
+ var repository = scope.ServiceProvider.GetRequiredService();
+
+ var sites = await repository.GetAllSitesAsync(ct).ConfigureAwait(false);
+
+ var entries = new List(sites.Count);
+ foreach (var site in sites)
+ {
+ // First cut: NodeA's gRPC address is the dial target. NodeA/NodeB
+ // failover endpoint selection is a follow-up.
+ if (string.IsNullOrWhiteSpace(site.GrpcNodeAAddress))
+ {
+ continue;
+ }
+
+ // The IsNullOrWhiteSpace guard above proves GrpcNodeAAddress is
+ // non-null here; explicit null-forgiving for clarity.
+ entries.Add(new SiteEntry(site.SiteIdentifier, site.GrpcNodeAAddress!));
+ }
+
+ return entries;
+ }
+}
diff --git a/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs b/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs
index 6b1e0255..631200a1 100644
--- a/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.AuditLog/ServiceCollectionExtensions.cs
@@ -50,6 +50,12 @@ public static class ServiceCollectionExtensions
/// Configuration section bound to .
public const string PartitionMaintenanceSectionName = "AuditLog:PartitionMaintenance";
+ /// Configuration section bound to .
+ public const string PurgeSectionName = "AuditLog:Purge";
+
+ /// Configuration section bound to .
+ public const string ReconciliationSectionName = "AuditLog:Reconciliation";
+
///
/// Registers the Audit Log (#23) component services: options, the site
/// SQLite writer chain (primary + ring fallback + failure-counter sink),
@@ -327,6 +333,24 @@ public static class ServiceCollectionExtensions
.Bind(config.GetSection(PartitionMaintenanceSectionName));
services.AddHostedService();
+ // I1 (review): bind the two central-singleton options HERE rather than in
+ // AddAuditLogCentralReconciliationClient. AkkaHostedService.RegisterCentralActors
+ // resolves IOptions /
+ // via GetRequiredService when it wires the AuditLogPurgeActor +
+ // SiteAuditReconciliationActor singletons; AddAuditLogCentralMaintenance is
+ // ALWAYS called on the central path (the reconciliation-client helper is the
+ // one that could in principle be dropped), so binding the options here means
+ // the singletons get a valid IOptions even if the gRPC-client helper is not
+ // wired — instead of a cryptic InvalidOperationException at GetRequiredService.
+ // Defaults are fine when the section is absent (24 h purge cadence /
+ // 5 min reconciliation tick); production exposes IntervalHours /
+ // ReconciliationIntervalSeconds only — the test-only *Override knobs are
+ // not intended to be set from config (see the options classes' remarks).
+ services.AddOptions()
+ .Bind(config.GetSection(PurgeSectionName));
+ services.AddOptions()
+ .Bind(config.GetSection(ReconciliationSectionName));
+
// M6 Bundle E (T8 + T9): central health snapshot — a single object
// that owns the CentralAuditWriteFailures + AuditRedactionFailure
// Interlocked counters AND surfaces them on
@@ -362,4 +386,118 @@ public static class ServiceCollectionExtensions
return services;
}
+
+ ///
+ /// Audit Log (#23) M6 — central-only registration of the production
+ /// ()
+ /// and its unary-call invoker () used
+ /// by to pull reconciliation
+ /// batches from each site over the PullAuditEvents gRPC RPC.
+ ///
+ ///
+ ///
+ /// Kept out of — which also runs on site
+ /// composition roots — because the client dials sites and resolves
+ /// (a central-only collaborator wired
+ /// alongside the reconciliation singleton). Folding it into
+ /// would register a site-dialing client on every
+ /// site host, violating the "every Add* call is safe from any
+ /// composition root" invariant. This helper is the central analogue of
+ /// .
+ ///
+ ///
+ /// The binds with default
+ ///
+ /// keepalive unless an IOptions<CommunicationOptions> is
+ /// already registered, in which case the configured timings flow through —
+ /// matching how SiteStreamGrpcClientFactory takes its keepalive from
+ /// the same options.
+ ///
+ ///
+ /// The production (,
+ /// wrapping the scoped ISiteRepository) IS registered here — so the
+ /// singleton wired in the Host can
+ /// resolve its enumerator + gRPC client from this central-only helper. Keeping
+ /// the enumerator on this central path preserves the "every Add* call is
+ /// safe from any composition root" invariant: a site host never calls this
+ /// helper, so it never registers a site-dialing enumerator. The
+ /// +
+ /// bindings live in instead (I1
+ /// review fix) — that helper is unconditionally called on the central path, so
+ /// the two maintenance singletons get a valid IOptions even if this
+ /// gRPC-client helper is ever dropped.
+ ///
+ ///
+ /// The service collection to register into.
+ /// Application configuration used to bind the gRPC client's communication options (purge + reconciliation options are bound by ).
+ /// The same for chaining.
+ public static IServiceCollection AddAuditLogCentralReconciliationClient(
+ this IServiceCollection services,
+ IConfiguration config)
+ {
+ ArgumentNullException.ThrowIfNull(services);
+ ArgumentNullException.ThrowIfNull(config);
+
+ // Production ISiteEnumerator: projects the config-DB Site rows into the
+ // reconciliation targets the SiteAuditReconciliationActor polls. Scoped
+ // ISiteRepository is resolved per call inside the enumerator, so the
+ // singleton takes the ROOT provider (mirrors the per-tick scope pattern
+ // in SiteAuditReconciliationActor / AuditLogPurgeActor).
+ services.TryAddSingleton(sp => new SiteEnumerator(sp));
+
+ // I1 (review): the AuditLogPurgeOptions / SiteAuditReconciliationOptions
+ // bindings moved to AddAuditLogCentralMaintenance — that helper is always
+ // called on the central path, so the two maintenance singletons resolve a
+ // valid IOptions even if this gRPC-client helper is ever dropped. Keep the
+ // ISiteEnumerator + gRPC client registrations here (they dial sites and are
+ // central-only by design).
+
+ // The invoker owns the per-endpoint GrpcChannel cache, so it must be a
+ // singleton — a fresh invoker per resolution would leak channels.
+ // Resolve CommunicationOptions if present (the central Host binds it),
+ // otherwise fall back to defaults so this helper stays standalone.
+ services.TryAddSingleton(sp =>
+ {
+ var options = sp
+ .GetService>();
+ return options is null
+ ? new GrpcPullAuditEventsInvoker()
+ : new GrpcPullAuditEventsInvoker(options.Value);
+ });
+ services.TryAddSingleton(
+ sp => sp.GetRequiredService());
+
+ services.TryAddSingleton(sp => new GrpcPullAuditEventsClient(
+ sp.GetRequiredService(),
+ sp.GetRequiredService(),
+ sp.GetRequiredService>()));
+
+ // Site Call Audit (#22) reconciliation pull client — central-only, the
+ // sibling of the audit pull client above. Lives here (not in the
+ // SiteCallAudit project) so it can reuse the central-only
+ // ISiteEnumerator registered just above; SiteCallAudit does not
+ // reference AuditLog. The invoker owns the per-endpoint GrpcChannel
+ // cache, so it must be a singleton (a fresh invoker per resolution
+ // would leak channels). CommunicationOptions flow through when bound by
+ // the central Host, else defaults — mirrors the audit invoker.
+ services.TryAddSingleton(sp =>
+ {
+ var options = sp
+ .GetService>();
+ return options is null
+ ? new GrpcPullSiteCallsInvoker()
+ : new GrpcPullSiteCallsInvoker(options.Value);
+ });
+ services.TryAddSingleton(
+ sp => sp.GetRequiredService());
+
+ services.TryAddSingleton(sp => new GrpcPullSiteCallsClient(
+ sp.GetRequiredService(),
+ sp.GetRequiredService(),
+ sp.GetRequiredService>()));
+
+ return services;
+ }
}
diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Services/IOperationTrackingStore.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Services/IOperationTrackingStore.cs
index b85bff5c..1d69023a 100644
--- a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Services/IOperationTrackingStore.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Services/IOperationTrackingStore.cs
@@ -118,4 +118,40 @@ public interface IOperationTrackingStore
Task PurgeTerminalAsync(
DateTime olderThanUtc,
CancellationToken ct = default);
+
+ ///
+ /// Reconciliation read (Site Call Audit #22): return tracking rows whose
+ /// UpdatedAtUtc is at or after as
+ /// projections, ordered by
+ /// UpdatedAtUtc ascending and capped at .
+ /// This is the site-side feed for central's PullSiteCalls RPC — the
+ /// documented periodic self-heal pull that backfills the eventually-consistent
+ /// central SiteCalls mirror when best-effort push telemetry is lost.
+ ///
+ ///
+ ///
+ /// The lower bound is inclusive so a caller can resume from the last
+ /// returned UpdatedAtUtc without skipping a row that shares that
+ /// instant; central ingest is insert-if-not-exists then upsert-on-newer, so
+ /// re-reading the boundary row is a harmless no-op. The oldest-first cap lets
+ /// the caller advance the cursor monotonically across follow-up pulls.
+ ///
+ ///
+ /// is left as the empty string:
+ /// the site id is not a tracking-store column, and the central client re-stamps
+ /// it from the siteId it dialed (the only authority that knows which
+ /// site the rows came from). is
+ /// projected from the row's Kind (DbWriteCached → DbOutbound,
+ /// otherwise ApiOutbound) and
+ /// from TargetSummary.
+ ///
+ ///
+ /// Inclusive lower bound on UpdatedAtUtc; reads from the start.
+ /// Maximum number of rows to return (oldest first).
+ /// Cancellation token.
+ /// The matching rows projected to , oldest-first, capped at .
+ Task> ReadChangedSinceAsync(
+ DateTime sinceUtc,
+ int batchSize,
+ CancellationToken ct = default);
}
diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Integration/PullSiteCallsResponse.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Integration/PullSiteCallsResponse.cs
new file mode 100644
index 00000000..fa3949cb
--- /dev/null
+++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Messages/Integration/PullSiteCallsResponse.cs
@@ -0,0 +1,17 @@
+using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
+
+namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.Integration;
+
+///
+/// Site Call Audit (#22) periodic reconciliation pull response: the next batch of
+/// site cached-call operational rows (the eventually-consistent SiteCalls
+/// mirror's self-heal feed) plus a flag signalling
+/// the caller to advance the watermark and pull again. Mirrors
+/// ; carries the central
+/// entity the ingest path upserts. See Component-SiteCallAudit.md.
+///
+/// The next batch of operational rows, ordered oldest-first by .
+/// True when the site saturated the requested batch size — the caller should advance the cursor and pull again.
+public sealed record PullSiteCallsResponse(
+ IReadOnlyList SiteCalls,
+ bool MoreAvailable);
diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteCallDtoMapper.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteCallDtoMapper.cs
index ec7a0dbd..265a37eb 100644
--- a/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteCallDtoMapper.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteCallDtoMapper.cs
@@ -1,5 +1,6 @@
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
+using Timestamp = Google.Protobuf.WellKnownTypes.Timestamp;
namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc;
@@ -20,10 +21,15 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc;
/// Mirrors the sibling .
///
///
-/// Only the DTO→entity direction is provided: nothing in the system maps a
-/// back onto the wire (sites emit the operational state
-/// from SiteCallOperational, never from the central
-/// entity), so an entity→DTO method would be dead code.
+/// Two directions are provided. rehydrates the central
+/// entity central writes into the SiteCalls table.
+/// projects a site-local
+/// onto the wire — used by the Site Call Audit (#22) PullSiteCalls
+/// reconciliation handler (the central→site self-heal pull). The
+/// entity itself is never mapped back onto the wire:
+/// sites emit operational state from , never
+/// from the central , so a SiteCall→DTO method
+/// would be dead code.
///
///
/// String nullability convention: proto3 scalar strings cannot be absent, so the
@@ -70,4 +76,54 @@ public static class SiteCallDtoMapper
IngestedAtUtc = DateTime.UtcNow, // overwritten by AuditLogIngestActor
};
}
+
+ ///
+ /// Projects a site-local onto its
+ /// wire-format DTO for the Site Call Audit (#22) PullSiteCalls
+ /// reconciliation RPC. The inverse of ; null
+ /// /
+ /// collapse to empty strings (proto3 scalar strings cannot be absent), while
+ /// the nullable HttpStatus and TerminalAtUtc stay unset on the
+ /// wire so true-null semantics survive the round-trip back through
+ /// .
+ ///
+ /// The site-local operational state to project to wire format.
+ /// A populated ready for transmission.
+ public static SiteCallOperationalDto ToDto(SiteCallOperational operational)
+ {
+ ArgumentNullException.ThrowIfNull(operational);
+
+ var dto = new SiteCallOperationalDto
+ {
+ TrackedOperationId = operational.TrackedOperationId.ToString(),
+ Channel = operational.Channel,
+ Target = operational.Target,
+ SourceSite = operational.SourceSite,
+ SourceNode = operational.SourceNode ?? string.Empty,
+ Status = operational.Status,
+ RetryCount = operational.RetryCount,
+ LastError = operational.LastError ?? string.Empty,
+ CreatedAtUtc = Timestamp.FromDateTime(EnsureUtc(operational.CreatedAtUtc)),
+ UpdatedAtUtc = Timestamp.FromDateTime(EnsureUtc(operational.UpdatedAtUtc)),
+ };
+
+ if (operational.HttpStatus.HasValue)
+ {
+ dto.HttpStatus = operational.HttpStatus.Value;
+ }
+
+ if (operational.TerminalAtUtc.HasValue)
+ {
+ dto.TerminalAtUtc = Timestamp.FromDateTime(EnsureUtc(operational.TerminalAtUtc.Value));
+ }
+
+ return dto;
+ }
+
+ // All ScadaBridge timestamps are UTC by invariant; Timestamp.FromDateTime
+ // requires UTC kind. Specify (never convert) so a row read back from SQLite
+ // with Kind=Utc passes through and a defensively-unspecified value is
+ // treated as the UTC it already is. Mirrors AuditEventDtoMapper.EnsureUtc.
+ private static DateTime EnsureUtc(DateTime value) =>
+ value.Kind == DateTimeKind.Utc ? value : DateTime.SpecifyKind(value, DateTimeKind.Utc);
}
diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs
index 7cb82444..7aedd140 100644
--- a/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Grpc/SiteStreamGrpcServer.cs
@@ -5,7 +5,9 @@ using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.Audit;
+using ZB.MOM.WW.ScadaBridge.Commons.Interfaces;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
+using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
using GrpcStatus = Grpc.Core.Status;
@@ -48,6 +50,14 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
// the missing queue as "nothing to ship" and returns an empty response so
// central retries on its next reconciliation cycle.
private ISiteAuditQueue? _siteAuditQueue;
+ // Site Call Audit (#22): site-local operation-tracking store handed in by
+ // AkkaHostedService on site roles so the central reconciliation puller's
+ // PullSiteCalls RPC can read tracking rows changed since a cursor. Null
+ // when not wired (central-only host or test composing the server in
+ // isolation) — the handler treats the missing store as "nothing to ship"
+ // and returns an empty response so central retries on its next cycle.
+ // Mirrors _siteAuditQueue.
+ private IOperationTrackingStore? _operationTrackingStore;
///
/// Test-only constructor — kept internal so the DI container sees a
@@ -137,6 +147,21 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
_siteAuditQueue = queue;
}
+ ///
+ /// Hands the site-local (the same
+ /// OperationTrackingStore singleton that backs
+ /// Tracking.Status(id) on the script thread) to the gRPC server so
+ /// the Site Call Audit (#22) RPC can serve
+ /// central's reconciliation pulls. Mirrors :
+ /// wired post-construction because the store and the gRPC server are both
+ /// DI singletons brought up in independent orders on site startup.
+ ///
+ /// The site operation-tracking store for serving reconciliation pulls.
+ public void SetOperationTrackingStore(IOperationTrackingStore store)
+ {
+ _operationTrackingStore = store;
+ }
+
///
/// Host-017 / REQ-HOST-7: signals the gRPC server to begin its part of the
/// site shutdown sequence — refuse new
@@ -432,7 +457,9 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
// sinceUtc defaults to DateTime.MinValue when the wrapper is absent —
// i.e. "pull from the beginning of recorded history", which is the
// intended behaviour for the very first reconciliation cycle.
- var since = request.SinceUtc?.ToDateTime().ToUniversalTime() ?? DateTime.MinValue;
+ var since = request.SinceUtc is not null
+ ? DateTime.SpecifyKind(request.SinceUtc.ToDateTime(), DateTimeKind.Utc)
+ : DateTime.MinValue;
IReadOnlyList events;
try
@@ -488,6 +515,69 @@ public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
return response;
}
+ ///
+ public override async Task PullSiteCalls(
+ PullSiteCallsRequest request,
+ ServerCallContext context)
+ {
+ var store = _operationTrackingStore;
+ if (store is null)
+ {
+ _logger.LogWarning(
+ "PullSiteCalls invoked before SetOperationTrackingStore was called; returning empty response.");
+ return new PullSiteCallsResponse();
+ }
+
+ if (request.BatchSize <= 0)
+ {
+ // Mirrors PullAuditEvents: reject malformed requests cleanly with
+ // InvalidArgument so the caller doesn't see a generic RpcException
+ // from the underlying SQLite parameter validation.
+ throw new RpcException(new GrpcStatus(
+ StatusCode.InvalidArgument, "batch_size must be > 0"));
+ }
+
+ // since_utc defaults to DateTime.MinValue when the wrapper is absent —
+ // i.e. "pull from the beginning of recorded history", the intended
+ // behaviour for the very first reconciliation cycle.
+ var since = request.SinceUtc is not null
+ ? DateTime.SpecifyKind(request.SinceUtc.ToDateTime(), DateTimeKind.Utc)
+ : DateTime.MinValue;
+
+ IReadOnlyList operationals;
+ try
+ {
+ operationals = await store.ReadChangedSinceAsync(
+ since, request.BatchSize, context.CancellationToken);
+ }
+ catch (Exception ex)
+ {
+ // Best-effort, like PullAuditEvents: a read fault must never abort
+ // the reconciliation tick — central retries on its next cycle.
+ _logger.LogError(ex,
+ "ReadChangedSinceAsync failed for since={Since} batch={Batch}; returning empty response.",
+ since, request.BatchSize);
+ return new PullSiteCallsResponse();
+ }
+
+ var response = new PullSiteCallsResponse
+ {
+ // batch_size saturated → tell central to issue a follow-up pull with
+ // an advanced cursor. The site doesn't compute the cursor — central
+ // walks it forward from the last returned UpdatedAtUtc. Unlike
+ // PullAuditEvents there is no MarkReconciled step: the tracking store
+ // is the operational source of truth and the central SiteCalls mirror
+ // is upsert-on-newer, so re-reading rows is a harmless no-op.
+ MoreAvailable = operationals.Count >= request.BatchSize,
+ };
+ foreach (var op in operationals)
+ {
+ response.Operationals.Add(SiteCallDtoMapper.ToDto(op));
+ }
+
+ return response;
+ }
+
///
/// Tracks a single active stream so cleanup only removes its own entry.
///
diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/Protos/sitestream.proto b/src/ZB.MOM.WW.ScadaBridge.Communication/Protos/sitestream.proto
index 6beae55c..df9ee7af 100644
--- a/src/ZB.MOM.WW.ScadaBridge.Communication/Protos/sitestream.proto
+++ b/src/ZB.MOM.WW.ScadaBridge.Communication/Protos/sitestream.proto
@@ -10,6 +10,7 @@ service SiteStreamService {
rpc IngestAuditEvents(AuditEventBatch) returns (IngestAck);
rpc IngestCachedTelemetry(CachedTelemetryBatch) returns (IngestAck);
rpc PullAuditEvents(PullAuditEventsRequest) returns (PullAuditEventsResponse);
+ rpc PullSiteCalls(PullSiteCallsRequest) returns (PullSiteCallsResponse);
}
message InstanceStreamRequest {
@@ -157,3 +158,20 @@ message PullAuditEventsResponse {
repeated AuditEventDto events = 1;
bool more_available = 2;
}
+
+// Site Call Audit (#22) reconciliation pull: central→site request for any
+// site-local operation-tracking rows whose UpdatedAtUtc >= since_utc — the
+// self-heal feed that backfills the eventually-consistent central SiteCalls
+// mirror when best-effort push telemetry is lost. Mirrors PullAuditEvents
+// but is a SEPARATE RPC (the tracking store is the operational source of
+// truth, distinct from the site audit queue). more_available signals
+// batch_size was saturated so the caller advances since_utc and pulls again.
+message PullSiteCallsRequest {
+ google.protobuf.Timestamp since_utc = 1;
+ int32 batch_size = 2;
+}
+
+message PullSiteCallsResponse {
+ repeated SiteCallOperationalDto operationals = 1;
+ bool more_available = 2;
+}
diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/Sitestream.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/Sitestream.cs
index cebfccab..a0e79003 100644
--- a/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/Sitestream.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/Sitestream.cs
@@ -81,23 +81,30 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc {
"dWVzdBItCglzaW5jZV91dGMYASABKAsyGi5nb29nbGUucHJvdG9idWYuVGlt",
"ZXN0YW1wEhIKCmJhdGNoX3NpemUYAiABKAUiXAoXUHVsbEF1ZGl0RXZlbnRz",
"UmVzcG9uc2USKQoGZXZlbnRzGAEgAygLMhkuc2l0ZXN0cmVhbS5BdWRpdEV2",
- "ZW50RHRvEhYKDm1vcmVfYXZhaWxhYmxlGAIgASgIKlwKB1F1YWxpdHkSFwoT",
- "UVVBTElUWV9VTlNQRUNJRklFRBAAEhAKDFFVQUxJVFlfR09PRBABEhUKEVFV",
- "QUxJVFlfVU5DRVJUQUlOEAISDwoLUVVBTElUWV9CQUQQAypdCg5BbGFybVN0",
- "YXRlRW51bRIbChdBTEFSTV9TVEFURV9VTlNQRUNJRklFRBAAEhYKEkFMQVJN",
- "X1NUQVRFX05PUk1BTBABEhYKEkFMQVJNX1NUQVRFX0FDVElWRRACKoUBCg5B",
- "bGFybUxldmVsRW51bRIUChBBTEFSTV9MRVZFTF9OT05FEAASEwoPQUxBUk1f",
- "TEVWRUxfTE9XEAESFwoTQUxBUk1fTEVWRUxfTE9XX0xPVxACEhQKEEFMQVJN",
- "X0xFVkVMX0hJR0gQAxIZChVBTEFSTV9MRVZFTF9ISUdIX0hJR0gQBDLhAgoR",
- "U2l0ZVN0cmVhbVNlcnZpY2USVQoRU3Vic2NyaWJlSW5zdGFuY2USIS5zaXRl",
- "c3RyZWFtLkluc3RhbmNlU3RyZWFtUmVxdWVzdBobLnNpdGVzdHJlYW0uU2l0",
- "ZVN0cmVhbUV2ZW50MAESRwoRSW5nZXN0QXVkaXRFdmVudHMSGy5zaXRlc3Ry",
- "ZWFtLkF1ZGl0RXZlbnRCYXRjaBoVLnNpdGVzdHJlYW0uSW5nZXN0QWNrElAK",
- "FUluZ2VzdENhY2hlZFRlbGVtZXRyeRIgLnNpdGVzdHJlYW0uQ2FjaGVkVGVs",
- "ZW1ldHJ5QmF0Y2gaFS5zaXRlc3RyZWFtLkluZ2VzdEFjaxJaCg9QdWxsQXVk",
- "aXRFdmVudHMSIi5zaXRlc3RyZWFtLlB1bGxBdWRpdEV2ZW50c1JlcXVlc3Qa",
- "Iy5zaXRlc3RyZWFtLlB1bGxBdWRpdEV2ZW50c1Jlc3BvbnNlQiuqAihaQi5N",
- "T00uV1cuU2NhZGFCcmlkZ2UuQ29tbXVuaWNhdGlvbi5HcnBjYgZwcm90bzM="));
+ "ZW50RHRvEhYKDm1vcmVfYXZhaWxhYmxlGAIgASgIIlkKFFB1bGxTaXRlQ2Fs",
+ "bHNSZXF1ZXN0Ei0KCXNpbmNlX3V0YxgBIAEoCzIaLmdvb2dsZS5wcm90b2J1",
+ "Zi5UaW1lc3RhbXASEgoKYmF0Y2hfc2l6ZRgCIAEoBSJpChVQdWxsU2l0ZUNh",
+ "bGxzUmVzcG9uc2USOAoMb3BlcmF0aW9uYWxzGAEgAygLMiIuc2l0ZXN0cmVh",
+ "bS5TaXRlQ2FsbE9wZXJhdGlvbmFsRHRvEhYKDm1vcmVfYXZhaWxhYmxlGAIg",
+ "ASgIKlwKB1F1YWxpdHkSFwoTUVVBTElUWV9VTlNQRUNJRklFRBAAEhAKDFFV",
+ "QUxJVFlfR09PRBABEhUKEVFVQUxJVFlfVU5DRVJUQUlOEAISDwoLUVVBTElU",
+ "WV9CQUQQAypdCg5BbGFybVN0YXRlRW51bRIbChdBTEFSTV9TVEFURV9VTlNQ",
+ "RUNJRklFRBAAEhYKEkFMQVJNX1NUQVRFX05PUk1BTBABEhYKEkFMQVJNX1NU",
+ "QVRFX0FDVElWRRACKoUBCg5BbGFybUxldmVsRW51bRIUChBBTEFSTV9MRVZF",
+ "TF9OT05FEAASEwoPQUxBUk1fTEVWRUxfTE9XEAESFwoTQUxBUk1fTEVWRUxf",
+ "TE9XX0xPVxACEhQKEEFMQVJNX0xFVkVMX0hJR0gQAxIZChVBTEFSTV9MRVZF",
+ "TF9ISUdIX0hJR0gQBDK3AwoRU2l0ZVN0cmVhbVNlcnZpY2USVQoRU3Vic2Ny",
+ "aWJlSW5zdGFuY2USIS5zaXRlc3RyZWFtLkluc3RhbmNlU3RyZWFtUmVxdWVz",
+ "dBobLnNpdGVzdHJlYW0uU2l0ZVN0cmVhbUV2ZW50MAESRwoRSW5nZXN0QXVk",
+ "aXRFdmVudHMSGy5zaXRlc3RyZWFtLkF1ZGl0RXZlbnRCYXRjaBoVLnNpdGVz",
+ "dHJlYW0uSW5nZXN0QWNrElAKFUluZ2VzdENhY2hlZFRlbGVtZXRyeRIgLnNp",
+ "dGVzdHJlYW0uQ2FjaGVkVGVsZW1ldHJ5QmF0Y2gaFS5zaXRlc3RyZWFtLklu",
+ "Z2VzdEFjaxJaCg9QdWxsQXVkaXRFdmVudHMSIi5zaXRlc3RyZWFtLlB1bGxB",
+ "dWRpdEV2ZW50c1JlcXVlc3QaIy5zaXRlc3RyZWFtLlB1bGxBdWRpdEV2ZW50",
+ "c1Jlc3BvbnNlElQKDVB1bGxTaXRlQ2FsbHMSIC5zaXRlc3RyZWFtLlB1bGxT",
+ "aXRlQ2FsbHNSZXF1ZXN0GiEuc2l0ZXN0cmVhbS5QdWxsU2l0ZUNhbGxzUmVz",
+ "cG9uc2VCK6oCKFpCLk1PTS5XVy5TY2FkYUJyaWRnZS5Db21tdW5pY2F0aW9u",
+ "LkdycGNiBnByb3RvMw=="));
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
new pbr::FileDescriptor[] { global::Google.Protobuf.WellKnownTypes.TimestampReflection.Descriptor, global::Google.Protobuf.WellKnownTypes.WrappersReflection.Descriptor, },
new pbr::GeneratedClrTypeInfo(new[] {typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.Quality), typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.AlarmStateEnum), typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.AlarmLevelEnum), }, null, new pbr::GeneratedClrTypeInfo[] {
@@ -112,7 +119,9 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc {
new pbr::GeneratedClrTypeInfo(typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryPacket), global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryPacket.Parser, new[]{ "AuditEvent", "Operational" }, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryBatch), global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.CachedTelemetryBatch.Parser, new[]{ "Packets" }, null, null, null, null),
new pbr::GeneratedClrTypeInfo(typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest), global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest.Parser, new[]{ "SinceUtc", "BatchSize" }, null, null, null, null),
- new pbr::GeneratedClrTypeInfo(typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse), global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse.Parser, new[]{ "Events", "MoreAvailable" }, null, null, null, null)
+ new pbr::GeneratedClrTypeInfo(typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse), global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse.Parser, new[]{ "Events", "MoreAvailable" }, null, null, null, null),
+ new pbr::GeneratedClrTypeInfo(typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest), global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest.Parser, new[]{ "SinceUtc", "BatchSize" }, null, null, null, null),
+ new pbr::GeneratedClrTypeInfo(typeof(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsResponse), global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsResponse.Parser, new[]{ "Operationals", "MoreAvailable" }, null, null, null, null)
}));
}
#endregion
@@ -5064,6 +5073,483 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc {
}
+ ///
+ /// Site Call Audit (#22) reconciliation pull: central→site request for any
+ /// site-local operation-tracking rows whose UpdatedAtUtc >= since_utc — the
+ /// self-heal feed that backfills the eventually-consistent central SiteCalls
+ /// mirror when best-effort push telemetry is lost. Mirrors PullAuditEvents
+ /// but is a SEPARATE RPC (the tracking store is the operational source of
+ /// truth, distinct from the site audit queue). more_available signals
+ /// batch_size was saturated so the caller advances since_utc and pulls again.
+ ///
+ [global::System.Diagnostics.DebuggerDisplayAttribute("{ToString(),nq}")]
+ public sealed partial class PullSiteCallsRequest : pb::IMessage
+ #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
+ , pb::IBufferMessage
+ #endif
+ {
+ private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new PullSiteCallsRequest());
+ private pb::UnknownFieldSet _unknownFields;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public static pb::MessageParser Parser { get { return _parser; } }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public static pbr::MessageDescriptor Descriptor {
+ get { return global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.SitestreamReflection.Descriptor.MessageTypes[12]; }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ pbr::MessageDescriptor pb::IMessage.Descriptor {
+ get { return Descriptor; }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public PullSiteCallsRequest() {
+ OnConstruction();
+ }
+
+ partial void OnConstruction();
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public PullSiteCallsRequest(PullSiteCallsRequest other) : this() {
+ sinceUtc_ = other.sinceUtc_ != null ? other.sinceUtc_.Clone() : null;
+ batchSize_ = other.batchSize_;
+ _unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public PullSiteCallsRequest Clone() {
+ return new PullSiteCallsRequest(this);
+ }
+
+ /// Field number for the "since_utc" field.
+ public const int SinceUtcFieldNumber = 1;
+ private global::Google.Protobuf.WellKnownTypes.Timestamp sinceUtc_;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public global::Google.Protobuf.WellKnownTypes.Timestamp SinceUtc {
+ get { return sinceUtc_; }
+ set {
+ sinceUtc_ = value;
+ }
+ }
+
+ /// Field number for the "batch_size" field.
+ public const int BatchSizeFieldNumber = 2;
+ private int batchSize_;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public int BatchSize {
+ get { return batchSize_; }
+ set {
+ batchSize_ = value;
+ }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public override bool Equals(object other) {
+ return Equals(other as PullSiteCallsRequest);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public bool Equals(PullSiteCallsRequest other) {
+ if (ReferenceEquals(other, null)) {
+ return false;
+ }
+ if (ReferenceEquals(other, this)) {
+ return true;
+ }
+ if (!object.Equals(SinceUtc, other.SinceUtc)) return false;
+ if (BatchSize != other.BatchSize) return false;
+ return Equals(_unknownFields, other._unknownFields);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public override int GetHashCode() {
+ int hash = 1;
+ if (sinceUtc_ != null) hash ^= SinceUtc.GetHashCode();
+ if (BatchSize != 0) hash ^= BatchSize.GetHashCode();
+ if (_unknownFields != null) {
+ hash ^= _unknownFields.GetHashCode();
+ }
+ return hash;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public override string ToString() {
+ return pb::JsonFormatter.ToDiagnosticString(this);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public void WriteTo(pb::CodedOutputStream output) {
+ #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
+ output.WriteRawMessage(this);
+ #else
+ if (sinceUtc_ != null) {
+ output.WriteRawTag(10);
+ output.WriteMessage(SinceUtc);
+ }
+ if (BatchSize != 0) {
+ output.WriteRawTag(16);
+ output.WriteInt32(BatchSize);
+ }
+ if (_unknownFields != null) {
+ _unknownFields.WriteTo(output);
+ }
+ #endif
+ }
+
+ #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ void pb::IBufferMessage.InternalWriteTo(ref pb::WriteContext output) {
+ if (sinceUtc_ != null) {
+ output.WriteRawTag(10);
+ output.WriteMessage(SinceUtc);
+ }
+ if (BatchSize != 0) {
+ output.WriteRawTag(16);
+ output.WriteInt32(BatchSize);
+ }
+ if (_unknownFields != null) {
+ _unknownFields.WriteTo(ref output);
+ }
+ }
+ #endif
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public int CalculateSize() {
+ int size = 0;
+ if (sinceUtc_ != null) {
+ size += 1 + pb::CodedOutputStream.ComputeMessageSize(SinceUtc);
+ }
+ if (BatchSize != 0) {
+ size += 1 + pb::CodedOutputStream.ComputeInt32Size(BatchSize);
+ }
+ if (_unknownFields != null) {
+ size += _unknownFields.CalculateSize();
+ }
+ return size;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public void MergeFrom(PullSiteCallsRequest other) {
+ if (other == null) {
+ return;
+ }
+ if (other.sinceUtc_ != null) {
+ if (sinceUtc_ == null) {
+ SinceUtc = new global::Google.Protobuf.WellKnownTypes.Timestamp();
+ }
+ SinceUtc.MergeFrom(other.SinceUtc);
+ }
+ if (other.BatchSize != 0) {
+ BatchSize = other.BatchSize;
+ }
+ _unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public void MergeFrom(pb::CodedInputStream input) {
+ #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
+ input.ReadRawMessage(this);
+ #else
+ uint tag;
+ while ((tag = input.ReadTag()) != 0) {
+ if ((tag & 7) == 4) {
+ // Abort on any end group tag.
+ return;
+ }
+ switch(tag) {
+ default:
+ _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input);
+ break;
+ case 10: {
+ if (sinceUtc_ == null) {
+ SinceUtc = new global::Google.Protobuf.WellKnownTypes.Timestamp();
+ }
+ input.ReadMessage(SinceUtc);
+ break;
+ }
+ case 16: {
+ BatchSize = input.ReadInt32();
+ break;
+ }
+ }
+ }
+ #endif
+ }
+
+ #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) {
+ uint tag;
+ while ((tag = input.ReadTag()) != 0) {
+ if ((tag & 7) == 4) {
+ // Abort on any end group tag.
+ return;
+ }
+ switch(tag) {
+ default:
+ _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input);
+ break;
+ case 10: {
+ if (sinceUtc_ == null) {
+ SinceUtc = new global::Google.Protobuf.WellKnownTypes.Timestamp();
+ }
+ input.ReadMessage(SinceUtc);
+ break;
+ }
+ case 16: {
+ BatchSize = input.ReadInt32();
+ break;
+ }
+ }
+ }
+ }
+ #endif
+
+ }
+
+ [global::System.Diagnostics.DebuggerDisplayAttribute("{ToString(),nq}")]
+ public sealed partial class PullSiteCallsResponse : pb::IMessage
+ #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
+ , pb::IBufferMessage
+ #endif
+ {
+ private static readonly pb::MessageParser _parser = new pb::MessageParser(() => new PullSiteCallsResponse());
+ private pb::UnknownFieldSet _unknownFields;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public static pb::MessageParser Parser { get { return _parser; } }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public static pbr::MessageDescriptor Descriptor {
+ get { return global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.SitestreamReflection.Descriptor.MessageTypes[13]; }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ pbr::MessageDescriptor pb::IMessage.Descriptor {
+ get { return Descriptor; }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public PullSiteCallsResponse() {
+ OnConstruction();
+ }
+
+ partial void OnConstruction();
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public PullSiteCallsResponse(PullSiteCallsResponse other) : this() {
+ operationals_ = other.operationals_.Clone();
+ moreAvailable_ = other.moreAvailable_;
+ _unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public PullSiteCallsResponse Clone() {
+ return new PullSiteCallsResponse(this);
+ }
+
+ /// Field number for the "operationals" field.
+ public const int OperationalsFieldNumber = 1;
+ private static readonly pb::FieldCodec _repeated_operationals_codec
+ = pb::FieldCodec.ForMessage(10, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.SiteCallOperationalDto.Parser);
+ private readonly pbc::RepeatedField operationals_ = new pbc::RepeatedField();
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public pbc::RepeatedField Operationals {
+ get { return operationals_; }
+ }
+
+ /// Field number for the "more_available" field.
+ public const int MoreAvailableFieldNumber = 2;
+ private bool moreAvailable_;
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public bool MoreAvailable {
+ get { return moreAvailable_; }
+ set {
+ moreAvailable_ = value;
+ }
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public override bool Equals(object other) {
+ return Equals(other as PullSiteCallsResponse);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public bool Equals(PullSiteCallsResponse other) {
+ if (ReferenceEquals(other, null)) {
+ return false;
+ }
+ if (ReferenceEquals(other, this)) {
+ return true;
+ }
+ if(!operationals_.Equals(other.operationals_)) return false;
+ if (MoreAvailable != other.MoreAvailable) return false;
+ return Equals(_unknownFields, other._unknownFields);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public override int GetHashCode() {
+ int hash = 1;
+ hash ^= operationals_.GetHashCode();
+ if (MoreAvailable != false) hash ^= MoreAvailable.GetHashCode();
+ if (_unknownFields != null) {
+ hash ^= _unknownFields.GetHashCode();
+ }
+ return hash;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public override string ToString() {
+ return pb::JsonFormatter.ToDiagnosticString(this);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public void WriteTo(pb::CodedOutputStream output) {
+ #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
+ output.WriteRawMessage(this);
+ #else
+ operationals_.WriteTo(output, _repeated_operationals_codec);
+ if (MoreAvailable != false) {
+ output.WriteRawTag(16);
+ output.WriteBool(MoreAvailable);
+ }
+ if (_unknownFields != null) {
+ _unknownFields.WriteTo(output);
+ }
+ #endif
+ }
+
+ #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ void pb::IBufferMessage.InternalWriteTo(ref pb::WriteContext output) {
+ operationals_.WriteTo(ref output, _repeated_operationals_codec);
+ if (MoreAvailable != false) {
+ output.WriteRawTag(16);
+ output.WriteBool(MoreAvailable);
+ }
+ if (_unknownFields != null) {
+ _unknownFields.WriteTo(ref output);
+ }
+ }
+ #endif
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public int CalculateSize() {
+ int size = 0;
+ size += operationals_.CalculateSize(_repeated_operationals_codec);
+ if (MoreAvailable != false) {
+ size += 1 + 1;
+ }
+ if (_unknownFields != null) {
+ size += _unknownFields.CalculateSize();
+ }
+ return size;
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public void MergeFrom(PullSiteCallsResponse other) {
+ if (other == null) {
+ return;
+ }
+ operationals_.Add(other.operationals_);
+ if (other.MoreAvailable != false) {
+ MoreAvailable = other.MoreAvailable;
+ }
+ _unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields);
+ }
+
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ public void MergeFrom(pb::CodedInputStream input) {
+ #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
+ input.ReadRawMessage(this);
+ #else
+ uint tag;
+ while ((tag = input.ReadTag()) != 0) {
+ if ((tag & 7) == 4) {
+ // Abort on any end group tag.
+ return;
+ }
+ switch(tag) {
+ default:
+ _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input);
+ break;
+ case 10: {
+ operationals_.AddEntriesFrom(input, _repeated_operationals_codec);
+ break;
+ }
+ case 16: {
+ MoreAvailable = input.ReadBool();
+ break;
+ }
+ }
+ }
+ #endif
+ }
+
+ #if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
+ [global::System.Diagnostics.DebuggerNonUserCodeAttribute]
+ [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
+ void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) {
+ uint tag;
+ while ((tag = input.ReadTag()) != 0) {
+ if ((tag & 7) == 4) {
+ // Abort on any end group tag.
+ return;
+ }
+ switch(tag) {
+ default:
+ _unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input);
+ break;
+ case 10: {
+ operationals_.AddEntriesFrom(ref input, _repeated_operationals_codec);
+ break;
+ }
+ case 16: {
+ MoreAvailable = input.ReadBool();
+ break;
+ }
+ }
+ }
+ }
+ #endif
+
+ }
+
#endregion
}
diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/SitestreamGrpc.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/SitestreamGrpc.cs
index 8993b16a..b57de38e 100644
--- a/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/SitestreamGrpc.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.Communication/SiteStreamGrpc/SitestreamGrpc.cs
@@ -59,6 +59,10 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc {
static readonly grpc::Marshaller __Marshaller_sitestream_PullAuditEventsRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsRequest.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller __Marshaller_sitestream_PullAuditEventsResponse = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullAuditEventsResponse.Parser));
+ [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
+ static readonly grpc::Marshaller __Marshaller_sitestream_PullSiteCallsRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest.Parser));
+ [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
+ static readonly grpc::Marshaller __Marshaller_sitestream_PullSiteCallsResponse = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsResponse.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Method __Method_SubscribeInstance = new grpc::Method(
@@ -92,6 +96,14 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc {
__Marshaller_sitestream_PullAuditEventsRequest,
__Marshaller_sitestream_PullAuditEventsResponse);
+ [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
+ static readonly grpc::Method __Method_PullSiteCalls = new grpc::Method(
+ grpc::MethodType.Unary,
+ __ServiceName,
+ "PullSiteCalls",
+ __Marshaller_sitestream_PullSiteCallsRequest,
+ __Marshaller_sitestream_PullSiteCallsResponse);
+
/// Service descriptor
public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor
{
@@ -126,6 +138,12 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc {
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}
+ [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
+ public virtual global::System.Threading.Tasks.Task PullSiteCalls(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest request, grpc::ServerCallContext context)
+ {
+ throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
+ }
+
}
/// Client for SiteStreamService
@@ -225,6 +243,26 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc {
{
return CallInvoker.AsyncUnaryCall(__Method_PullAuditEvents, null, options, request);
}
+ [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
+ public virtual global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsResponse PullSiteCalls(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
+ {
+ return PullSiteCalls(request, new grpc::CallOptions(headers, deadline, cancellationToken));
+ }
+ [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
+ public virtual global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsResponse PullSiteCalls(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest request, grpc::CallOptions options)
+ {
+ return CallInvoker.BlockingUnaryCall(__Method_PullSiteCalls, null, options, request);
+ }
+ [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
+ public virtual grpc::AsyncUnaryCall PullSiteCallsAsync(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
+ {
+ return PullSiteCallsAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken));
+ }
+ [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
+ public virtual grpc::AsyncUnaryCall PullSiteCallsAsync(global::ZB.MOM.WW.ScadaBridge.Communication.Grpc.PullSiteCallsRequest request, grpc::CallOptions options)
+ {
+ return CallInvoker.AsyncUnaryCall(__Method_PullSiteCalls, null, options, request);
+ }
/// Creates a new instance of client from given ClientBaseConfiguration.
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
protected override SiteStreamServiceClient NewInstance(ClientBaseConfiguration configuration)
@@ -242,7 +280,8 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc {
.AddMethod(__Method_SubscribeInstance, serviceImpl.SubscribeInstance)
.AddMethod(__Method_IngestAuditEvents, serviceImpl.IngestAuditEvents)
.AddMethod(__Method_IngestCachedTelemetry, serviceImpl.IngestCachedTelemetry)
- .AddMethod(__Method_PullAuditEvents, serviceImpl.PullAuditEvents).Build();
+ .AddMethod(__Method_PullAuditEvents, serviceImpl.PullAuditEvents)
+ .AddMethod(__Method_PullSiteCalls, serviceImpl.PullSiteCalls).Build();
}
/// Register service method with a service binder with or without implementation. Useful when customizing the service binding logic.
@@ -256,6 +295,7 @@ namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc {
serviceBinder.AddMethod(__Method_IngestAuditEvents, serviceImpl == null ? null : new grpc::UnaryServerMethod(serviceImpl.IngestAuditEvents));
serviceBinder.AddMethod(__Method_IngestCachedTelemetry, serviceImpl == null ? null : new grpc::UnaryServerMethod(serviceImpl.IngestCachedTelemetry));
serviceBinder.AddMethod(__Method_PullAuditEvents, serviceImpl == null ? null : new grpc::UnaryServerMethod(serviceImpl.PullAuditEvents));
+ serviceBinder.AddMethod(__Method_PullSiteCalls, serviceImpl == null ? null : new grpc::UnaryServerMethod(serviceImpl.PullSiteCalls));
}
}
diff --git a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs
index ab93ce56..2c45478f 100644
--- a/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.Host/Actors/AkkaHostedService.cs
@@ -588,6 +588,117 @@ akka {{
_logger.LogInformation(
"SiteCallAuditActor singleton created and registered with CentralCommunicationActor");
+ // Audit Log (#23) M6 Bundle B/C — start the two central-only maintenance
+ // singletons that were fully implemented but never instantiated: the
+ // daily AuditLog partition-switch purge (AuditLogPurgeActor) and the
+ // periodic per-site audit-event reconciliation pull
+ // (SiteAuditReconciliationActor). Both mirror the SiteCallAudit /
+ // NotificationOutbox singleton pattern above: a ClusterSingletonManager
+ // pins the actor to the active central node, a ClusterSingletonProxy
+ // gives a stable address, and a PhaseClusterLeave graceful-stop task
+ // drains the in-flight tick before handover. Options + the production
+ // ISiteEnumerator + IPullAuditEventsClient come from
+ // AddAuditLogCentralReconciliationClient (central composition root only).
+ // Both actors take the root IServiceProvider and open their own per-tick
+ // DI scope because IAuditLogRepository / ISiteRepository are scoped EF
+ // Core services.
+ var auditPurgeLogger = _serviceProvider.GetRequiredService()
+ .CreateLogger();
+ var auditPurgeOptions = _serviceProvider
+ .GetRequiredService>();
+ var auditLogOptions = _serviceProvider
+ .GetRequiredService>();
+
+ var auditPurgeSingletonProps = ClusterSingletonManager.Props(
+ singletonProps: Props.Create(() => new ZB.MOM.WW.ScadaBridge.AuditLog.Central.AuditLogPurgeActor(
+ _serviceProvider,
+ auditPurgeOptions,
+ auditLogOptions,
+ auditPurgeLogger)),
+ terminationMessage: PoisonPill.Instance,
+ settings: ClusterSingletonManagerSettings.Create(_actorSystem!)
+ .WithSingletonName("audit-log-purge"));
+ var auditPurgeSingletonManager =
+ _actorSystem!.ActorOf(auditPurgeSingletonProps, "audit-log-purge-singleton");
+
+ var auditPurgeShutdown = Akka.Actor.CoordinatedShutdown.Get(_actorSystem);
+ auditPurgeShutdown.AddTask(
+ Akka.Actor.CoordinatedShutdown.PhaseClusterLeave,
+ "drain-audit-log-purge-singleton",
+ async () =>
+ {
+ try
+ {
+ await auditPurgeSingletonManager.GracefulStop(TimeSpan.FromSeconds(10));
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex,
+ "AuditLogPurge singleton did not drain within the graceful-stop "
+ + "timeout; falling through to PoisonPill handover");
+ }
+ return Akka.Done.Instance;
+ });
+
+ var auditPurgeProxyProps = ClusterSingletonProxy.Props(
+ singletonManagerPath: "/user/audit-log-purge-singleton",
+ settings: ClusterSingletonProxySettings.Create(_actorSystem)
+ .WithSingletonName("audit-log-purge"));
+ _actorSystem.ActorOf(auditPurgeProxyProps, "audit-log-purge-proxy");
+ _logger.LogInformation("AuditLogPurgeActor singleton created");
+
+ // SiteAuditReconciliationActor — self-healing fallback puller. Resolves
+ // its production ISiteEnumerator (config-DB Site projection) and
+ // IPullAuditEventsClient (gRPC) from the central reconciliation-client
+ // helper registered in Program.cs.
+ var auditReconLogger = _serviceProvider.GetRequiredService()
+ .CreateLogger();
+ var auditReconOptions = _serviceProvider
+ .GetRequiredService>();
+ var auditReconSites = _serviceProvider
+ .GetRequiredService();
+ var auditReconClient = _serviceProvider
+ .GetRequiredService();
+
+ var auditReconSingletonProps = ClusterSingletonManager.Props(
+ singletonProps: Props.Create(() => new ZB.MOM.WW.ScadaBridge.AuditLog.Central.SiteAuditReconciliationActor(
+ auditReconSites,
+ auditReconClient,
+ _serviceProvider,
+ auditReconOptions,
+ auditReconLogger)),
+ terminationMessage: PoisonPill.Instance,
+ settings: ClusterSingletonManagerSettings.Create(_actorSystem!)
+ .WithSingletonName("site-audit-reconciliation"));
+ var auditReconSingletonManager =
+ _actorSystem!.ActorOf(auditReconSingletonProps, "site-audit-reconciliation-singleton");
+
+ var auditReconShutdown = Akka.Actor.CoordinatedShutdown.Get(_actorSystem);
+ auditReconShutdown.AddTask(
+ Akka.Actor.CoordinatedShutdown.PhaseClusterLeave,
+ "drain-site-audit-reconciliation-singleton",
+ async () =>
+ {
+ try
+ {
+ await auditReconSingletonManager.GracefulStop(TimeSpan.FromSeconds(10));
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning(ex,
+ "SiteAuditReconciliation singleton did not drain within the graceful-stop "
+ + "timeout; falling through to PoisonPill handover");
+ }
+ return Akka.Done.Instance;
+ });
+
+ var auditReconProxyProps = ClusterSingletonProxy.Props(
+ singletonManagerPath: "/user/site-audit-reconciliation-singleton",
+ settings: ClusterSingletonProxySettings.Create(_actorSystem)
+ .WithSingletonName("site-audit-reconciliation"));
+ _actorSystem.ActorOf(auditReconProxyProps, "site-audit-reconciliation-proxy");
+ _logger.LogInformation("SiteAuditReconciliationActor singleton created");
+
_logger.LogInformation("Central actors registered. CentralCommunicationActor created.");
}
@@ -898,6 +1009,18 @@ akka {{
// direction one-way (Host knows both; Communication doesn't reach back
// into AuditLog).
grpcServer?.SetSiteAuditQueue(siteAuditQueue);
+ // Site Call Audit (#22): hand the site-local OperationTrackingStore to
+ // the gRPC server so the PullSiteCalls reconciliation RPC can serve
+ // central's self-heal pulls. siteTrackingStore is resolved above with
+ // GetService — present on site composition roots, null on central — so
+ // wire the seam only when the store exists. Like SetSiteAuditQueue, both
+ // the store and the gRPC server are singletons; wiring here keeps the
+ // dependency direction one-way (Host knows both; Communication doesn't
+ // reach back into SiteRuntime).
+ if (siteTrackingStore is not null)
+ {
+ grpcServer?.SetOperationTrackingStore(siteTrackingStore);
+ }
grpcServer?.SetReady(_actorSystem!);
}
}
diff --git a/src/ZB.MOM.WW.ScadaBridge.Host/Program.cs b/src/ZB.MOM.WW.ScadaBridge.Host/Program.cs
index 8d3f36aa..5af0f3b5 100644
--- a/src/ZB.MOM.WW.ScadaBridge.Host/Program.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.Host/Program.cs
@@ -97,6 +97,13 @@ try
// pf_AuditLog_Month forward monthly. Depends on IPartitionMaintenance
// (registered below by AddConfigurationDatabase).
builder.Services.AddAuditLogCentralMaintenance(builder.Configuration);
+ // #23 M6 Bundle B/C — central-only registration backing the two
+ // maintenance singletons started in AkkaHostedService: the production
+ // ISiteEnumerator + IPullAuditEventsClient (gRPC) used by the
+ // SiteAuditReconciliationActor, plus the AuditLogPurgeOptions /
+ // SiteAuditReconciliationOptions bindings consumed by both singletons.
+ // Central-only by design (it dials sites), kept out of AddAuditLog.
+ builder.Services.AddAuditLogCentralReconciliationClient(builder.Configuration);
// Site Call Audit (#22) — central node owns the SiteCallAuditActor
// singleton (M3 Bundle F). The extension itself currently registers
// nothing — actor Props are constructed inline in AkkaHostedService —
diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs
index 320a7227..b89ae01a 100644
--- a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs
@@ -1,6 +1,7 @@
using Akka.Actor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
+using ZB.MOM.WW.ScadaBridge.AuditLog.Central;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
@@ -24,13 +25,17 @@ namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit;
///
///
/// Implemented: direct telemetry ingest,
-/// query, detail and KPI handlers (Task 4), and the central→site Retry/Discard
-/// relay (Task 5 — the relay handlers live in this actor). Deferred (per
-/// CLAUDE.md scope discipline — both land in a later follow-up): the periodic
-/// per-site reconciliation puller that backfills lost telemetry, and the daily
-/// terminal-row purge scheduler (the repository exposes
-/// PurgeTerminalAsync but nothing in this module currently invokes it
-/// on a schedule).
+/// query, detail and KPI handlers (Task 4), the central→site Retry/Discard
+/// relay (Task 5 — the relay handlers live in this actor), the periodic
+/// per-site reconciliation puller that backfills lost telemetry (Piece A —
+/// , the documented self-heal pull), and
+/// the daily terminal-row purge scheduler (Piece B —
+/// , which invokes
+/// on a timer). Both
+/// background timers are started in and gate on the
+/// reconciliation collaborators ( +
+/// ) being available — the repo-only test ctor
+/// injects neither, so neither timer runs there.
///
///
/// Per CLAUDE.md "audit-write failure NEVER aborts the user-facing action" —
@@ -68,6 +73,36 @@ public class SiteCallAuditActor : ReceiveActor
private readonly SiteCallAuditOptions _options;
private readonly ILogger _logger;
+ ///
+ /// Reconciliation collaborators (Piece A). The per-site self-heal pull
+ /// () and the site list
+ /// (). On the production path these are
+ /// resolved once from the root (central
+ /// singletons registered by AddAuditLogCentralReconciliationClient);
+ /// in the test path they are injected directly. They are null when
+ /// the actor was built via the repo-only test ctor — in that case the
+ /// reconciliation tick is NOT started (see );
+ /// the purge tick gates on the same collaborators (see ).
+ ///
+ private readonly IPullSiteCallsClient? _pullClient;
+ private readonly ISiteEnumerator? _siteEnumerator;
+
+ ///
+ /// Per-site reconciliation watermark — the highest
+ /// seen for that site on a previous
+ /// tick. The next tick asks for rows at or after this cursor; idempotent
+ /// monotonic swallows any
+ /// duplicate-with-same-timestamp rows. In-memory for the singleton's
+ /// lifetime — a failover / restart resets every cursor to
+ /// , which is conservative but correct
+ /// (the next tick re-pulls and idempotent upsert dedupes). Mirrors
+ /// SiteAuditReconciliationActor.
+ ///
+ private readonly Dictionary _reconciliationCursors = new();
+
+ private ICancelable? _reconciliationTimer;
+ private ICancelable? _purgeTimer;
+
///
/// Task 5 (#22): the central→site command transport — the
/// CentralCommunicationActor, which owns the per-site
@@ -87,6 +122,11 @@ public class SiteCallAuditActor : ReceiveActor
/// across every message. Used by Bundle C's MSSQL-backed TestKit fixture.
/// An optional lets a test pin the stuck/KPI
/// windows; when omitted the production defaults apply.
+ ///
+ /// This ctor injects NO reconciliation client/enumerator, so the
+ /// reconciliation tick is gated off (see )
+ /// — the MSSQL-backed read/upsert tests must not fire phantom pulls.
+ ///
///
/// Concrete repository instance to use for all messages.
/// Logger for diagnostics and error reporting.
@@ -106,6 +146,49 @@ public class SiteCallAuditActor : ReceiveActor
RegisterHandlers();
}
+ ///
+ /// Test-mode constructor for the reconciliation tick (Piece A) — injects a
+ /// concrete repository PLUS the two reconciliation collaborators directly,
+ /// so the per-site self-heal pull is unit-testable in-memory without a DI
+ /// container or a live gRPC channel. Because the client + enumerator are
+ /// present, the reconciliation tick IS started; the purge tick is also
+ /// started (both gate on the collaborators being available — see
+ /// / ).
+ ///
+ /// Concrete repository instance used for upserts and purges.
+ /// Enumerates the sites to reconcile each tick.
+ /// Pull client used to fetch changed rows from each site.
+ /// Logger for diagnostics and error reporting.
+ /// Optional configuration overrides; production defaults apply when null.
+ ///
+ /// Public (not internal) because Akka's default ActivatorProducer
+ /// instantiates the actor via reflection with public-only binding flags —
+ /// an internal ctor yields a MissingMethodException at actor
+ /// creation. Distinguished from the production
+ /// ctor by its concrete-collaborator parameter list; only the test project
+ /// (or a host that hand-resolves the collaborators) constructs it this way.
+ ///
+ public SiteCallAuditActor(
+ ISiteCallAuditRepository repository,
+ ISiteEnumerator siteEnumerator,
+ IPullSiteCallsClient pullClient,
+ ILogger logger,
+ SiteCallAuditOptions? options = null)
+ {
+ ArgumentNullException.ThrowIfNull(repository);
+ ArgumentNullException.ThrowIfNull(siteEnumerator);
+ ArgumentNullException.ThrowIfNull(pullClient);
+ ArgumentNullException.ThrowIfNull(logger);
+
+ _injectedRepository = repository;
+ _siteEnumerator = siteEnumerator;
+ _pullClient = pullClient;
+ _logger = logger;
+ _options = options ?? new SiteCallAuditOptions();
+
+ RegisterHandlers();
+ }
+
///
/// Production constructor — resolves
/// from a fresh DI scope per message because the repository is a scoped EF
@@ -129,6 +212,17 @@ public class SiteCallAuditActor : ReceiveActor
_options = options;
_logger = logger;
+ // Reconciliation collaborators (Piece A) are central-only singletons
+ // registered by AddAuditLogCentralReconciliationClient — always on the
+ // central composition root (Program.cs). Resolve them once here (the
+ // actor itself is a long-lived singleton; the repository is the only
+ // scoped service and is still resolved per-tick/per-message). GetService
+ // (not GetRequiredService) so a host that somehow omits the helper
+ // degrades to "no reconciliation tick" rather than a startup crash —
+ // the tick startup gates on both being non-null.
+ _pullClient = serviceProvider.GetService();
+ _siteEnumerator = serviceProvider.GetService();
+
RegisterHandlers();
}
@@ -154,6 +248,75 @@ public class SiteCallAuditActor : ReceiveActor
});
Receive(HandleRetrySiteCall);
Receive(HandleDiscardSiteCall);
+
+ // Piece A/B (#22): self-ticks for the periodic reconciliation pull and
+ // the daily terminal-row purge. Handlers stay alive across faults via
+ // their own per-site / per-tick try/catch (mirroring the ingest path);
+ // the timers are only started when their collaborators are available.
+ ReceiveAsync(_ => OnReconciliationTickAsync());
+ ReceiveAsync(_ => OnPurgeTickAsync());
+ }
+
+ ///
+ protected override void PreStart()
+ {
+ base.PreStart();
+ StartReconciliationTimer();
+ StartPurgeTimer();
+ }
+
+ ///
+ protected override void PostStop()
+ {
+ _reconciliationTimer?.Cancel();
+ _purgeTimer?.Cancel();
+ base.PostStop();
+ }
+
+ ///
+ /// Starts the periodic reconciliation tick — but ONLY when both the pull
+ /// client and the site enumerator are available. The repo-only test ctor
+ /// injects neither, so the tick is gated off there (the MSSQL read/upsert
+ /// tests must not fire phantom pulls); the reconciliation test ctor and the
+ /// production ctor (which resolves both from the SP) start it.
+ ///
+ private void StartReconciliationTimer()
+ {
+ if (_pullClient is null || _siteEnumerator is null)
+ {
+ return;
+ }
+
+ var interval = _options.ResolvedReconciliationInterval;
+ _reconciliationTimer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
+ initialDelay: interval,
+ interval: interval,
+ receiver: Self,
+ message: ReconciliationTick.Instance,
+ sender: Self);
+ }
+
+ ///
+ /// Starts the daily purge tick — gated on the same collaborator presence as
+ /// the reconciliation tick. The purge itself only needs the repository, but
+ /// gating both schedulers together keeps the repo-only test ctor (no
+ /// client/enumerator) free of BOTH background timers, so the MSSQL read/
+ /// upsert tests see no scheduled side effects.
+ ///
+ private void StartPurgeTimer()
+ {
+ if (_pullClient is null || _siteEnumerator is null)
+ {
+ return;
+ }
+
+ var interval = _options.ResolvedPurgeInterval;
+ _purgeTimer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
+ initialDelay: interval,
+ interval: interval,
+ receiver: Self,
+ message: PurgeTick.Instance,
+ sender: Self);
}
///
@@ -212,6 +375,228 @@ public class SiteCallAuditActor : ReceiveActor
}
}
+ // ── Piece A: periodic per-site reconciliation pull (self-heal) ──
+
+ ///
+ /// One reconciliation pass: enumerate every known site and, per site, pull
+ /// changed rows since that site's cursor and upsert
+ /// them idempotently — the documented self-heal when best-effort gRPC push
+ /// telemetry is lost. This is a mirror, NOT a dispatcher: cached-call
+ /// delivery stays site-local; upserting reconciled rows only refreshes the
+ /// eventually-consistent central SiteCalls mirror.
+ ///
+ ///
+ /// Mirrors SiteAuditReconciliationActor's structure (per-site cursor,
+ /// per-site try/catch failure isolation, advance the cursor by the max
+ /// observed ) but is deliberately simpler:
+ /// no stalled-detection EventStream machinery — just cursor + pull + upsert
+ /// + advance. One DI scope per tick is opened and the same repository reused
+ /// across every site in that tick.
+ ///
+ private async Task OnReconciliationTickAsync()
+ {
+ // The collaborators are guaranteed non-null: the tick is only scheduled
+ // when both are present (StartReconciliationTimer). Assert via the
+ // local copies so a future refactor that drops the gate fails loudly.
+ var enumerator = _siteEnumerator!;
+ var client = _pullClient!;
+
+ IReadOnlyList sites;
+ try
+ {
+ // No ambient CancellationToken in a ReceiveActor handler — None is
+ // intentional; the work is bounded by the reconciliation interval
+ // plus the singleton's graceful-stop drain on PhaseClusterLeave.
+ sites = await enumerator.EnumerateAsync().ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "SiteCallAudit site enumeration failed; skipping reconciliation tick.");
+ return;
+ }
+
+ if (sites.Count == 0)
+ {
+ return;
+ }
+
+ // AuditLog-003: open the scope INLINE with CreateAsyncScope + await using
+ // so the scoped EF Core repository (an IAsyncDisposable DbContext) disposes
+ // asynchronously at end of tick rather than blocking the Akka dispatcher
+ // thread on a synchronous Dispose() of pending connection cleanup — the tick
+ // holds the scope across many awaited UpsertAsync calls. Mirrors the sibling
+ // SiteAuditReconciliationActor.OnTickAsync. ResolveRepository() (sync Dispose)
+ // is retained for the synchronous message-handler paths. In the injected-
+ // repository test path there is no scope to open and the test repo is reused.
+ if (_injectedRepository is not null)
+ {
+ await ReconcileSitesAsync(sites, client, _injectedRepository).ConfigureAwait(false);
+ return;
+ }
+
+ await using var scope = _serviceProvider!.CreateAsyncScope();
+ var repository = scope.ServiceProvider.GetRequiredService();
+ await ReconcileSitesAsync(sites, client, repository).ConfigureAwait(false);
+ }
+
+ ///
+ /// Reconciles every site in the tick against a single resolved repository,
+ /// isolating per-site faults so one bad site never sinks the rest of the
+ /// pass (the failing site's cursor is left at its previous value so the next
+ /// tick retries the same window).
+ ///
+ private async Task ReconcileSitesAsync(
+ IReadOnlyList sites, IPullSiteCallsClient client, ISiteCallAuditRepository repository)
+ {
+ foreach (var site in sites)
+ {
+ try
+ {
+ await ReconcileSiteAsync(site, client, repository).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ // Failure-isolation invariant: one site's fault (transport,
+ // repository write) must NOT sink the rest of the tick. The
+ // failing site's cursor is left at its previous value so the
+ // next tick retries the same window.
+ _logger.LogWarning(
+ ex,
+ "SiteCallAudit reconciliation pull failed for site {SiteId}; other sites continue.",
+ site.SiteId);
+ }
+ }
+ }
+
+ ///
+ /// Issues one PullSiteCalls RPC against the site, upserts the
+ /// returned rows idempotently, and advances the site's cursor to the maximum
+ /// observed. The pull client returns rows
+ /// oldest-first with SourceSite already re-stamped from the dialed
+ /// site id, so the actor upserts them verbatim (re-stamping
+ /// IngestedAtUtc at central persist time, as the telemetry path does).
+ ///
+ ///
+ ///
+ /// Coarse per-site retry — a deliberate divergence from
+ /// SiteAuditReconciliationActor. That sibling (AuditLog-004) tracks
+ /// a per-EventId attempt counter and permanently abandons a row after a
+ /// threshold so a single un-insertable row cannot block a site's cursor
+ /// forever. This actor deliberately does NOT: any throw inside the loop
+ /// propagates to 's per-site catch,
+ /// which leaves the site's cursor at its previous value, so the next tick
+ /// re-pulls the whole batch from since. A persistently-bad row therefore
+ /// holds the site's cursor and re-pulls the batch every tick. This is
+ /// acceptable here because is
+ /// monotonic and idempotent — re-pulling already-ingested rows is a cheap
+ /// no-op — and the SiteCalls table is an eventually-consistent mirror,
+ /// not the source of truth, so a slow site simply lags rather than corrupts.
+ ///
+ ///
+ /// Inclusive cursor boundary. The cursor is advanced to the maximum
+ /// seen, and the pull asks for rows at or
+ /// after it (since is >=, not >). The row whose
+ /// timestamp equals the cursor is therefore re-pulled on the next tick and
+ /// deduplicated by the idempotent monotonic upsert — the same inclusive-boundary
+ /// contract as SiteAuditReconciliationActor's cursor.
+ ///
+ ///
+ private async Task ReconcileSiteAsync(
+ SiteEntry site, IPullSiteCallsClient client, ISiteCallAuditRepository repository)
+ {
+ var since = _reconciliationCursors.TryGetValue(site.SiteId, out var c) ? c : DateTime.MinValue;
+ var response = await client
+ .PullAsync(site.SiteId, since, _options.ReconciliationBatchSize, CancellationToken.None)
+ .ConfigureAwait(false);
+
+ var maxUpdated = since;
+ var nowUtc = DateTime.UtcNow;
+ foreach (var row in response.SiteCalls)
+ {
+ // IngestedAtUtc is the "central ingested (or last refreshed) this
+ // row" stamp — owned by the central actor, exactly as OnUpsertAsync
+ // does for the telemetry path. Monotonic UpsertAsync makes a row
+ // already present (from a prior push) a silent no-op.
+ var siteCall = row with { IngestedAtUtc = nowUtc };
+ await repository.UpsertAsync(siteCall).ConfigureAwait(false);
+
+ if (row.UpdatedAtUtc > maxUpdated)
+ {
+ maxUpdated = row.UpdatedAtUtc;
+ }
+ }
+
+ // Advance the cursor to the newest row seen. A MoreAvailable response
+ // means the site saturated the batch; the next tick continues draining
+ // from the advanced cursor (no immediate re-pull loop — the natural
+ // tick cadence drains the backlog, matching SiteAuditReconciliationActor).
+ _reconciliationCursors[site.SiteId] = maxUpdated;
+ }
+
+ // ── Piece B: daily terminal-row purge scheduler ──
+
+ ///
+ /// One purge pass: drops terminal SiteCalls rows whose
+ /// is older than
+ /// UtcNow - RetentionDays via
+ /// . Non-terminal
+ /// rows are never purged (enforced in the repository). The threshold is
+ /// computed each tick so an operator who lowers RetentionDays sees it
+ /// applied on the next purge without an actor restart. Mirrors
+ /// AuditLogPurgeActor's daily cadence + continue-on-error posture: a
+ /// purge fault is logged and swallowed so the singleton stays alive.
+ ///
+ private async Task OnPurgeTickAsync()
+ {
+ var threshold = DateTime.UtcNow - TimeSpan.FromDays(_options.RetentionDays);
+
+ // AuditLog-003: open the scope INLINE with CreateAsyncScope + await using
+ // so the scoped EF Core repository (an IAsyncDisposable DbContext) disposes
+ // asynchronously rather than blocking the Akka dispatcher thread on a
+ // synchronous Dispose(). Mirrors SiteAuditReconciliationActor; the
+ // injected-repository test path reuses the test repo with no scope.
+ if (_injectedRepository is not null)
+ {
+ await PurgeWithRepositoryAsync(_injectedRepository, threshold).ConfigureAwait(false);
+ return;
+ }
+
+ await using var scope = _serviceProvider!.CreateAsyncScope();
+ var repository = scope.ServiceProvider.GetRequiredService();
+ await PurgeWithRepositoryAsync(repository, threshold).ConfigureAwait(false);
+ }
+
+ ///
+ /// Runs one terminal-row purge against the resolved repository, logging and
+ /// swallowing any fault (continue-on-error) so a transient SQL failure or
+ /// contention never crashes the central singleton — the next tick retries
+ /// the same window.
+ ///
+ private async Task PurgeWithRepositoryAsync(ISiteCallAuditRepository repository, DateTime threshold)
+ {
+ try
+ {
+ var rowsDeleted = await repository.PurgeTerminalAsync(threshold).ConfigureAwait(false);
+ if (rowsDeleted > 0)
+ {
+ _logger.LogInformation(
+ "SiteCallAudit purged {RowsDeleted} terminal SiteCalls rows older than {ThresholdUtc:o}.",
+ rowsDeleted,
+ threshold);
+ }
+ }
+ catch (Exception ex)
+ {
+ // Continue-on-error: a purge fault (transient SQL failure,
+ // contention) must NOT crash the central singleton. The next tick
+ // retries the same window.
+ _logger.LogError(
+ ex,
+ "SiteCallAudit terminal-row purge failed (threshold {ThresholdUtc:o}); will retry next tick.",
+ threshold);
+ }
+ }
+
// ── Task 4: read-side (query / detail / KPI) ──
///
@@ -693,6 +1078,20 @@ public class SiteCallAuditActor : ReceiveActor
{
return string.IsNullOrWhiteSpace(value) ? null : value;
}
+
+ /// Self-tick triggering a reconciliation pass across all sites (Piece A).
+ internal sealed class ReconciliationTick
+ {
+ public static readonly ReconciliationTick Instance = new();
+ private ReconciliationTick() { }
+ }
+
+ /// Self-tick triggering a terminal-row purge pass (Piece B).
+ internal sealed class PurgeTick
+ {
+ public static readonly PurgeTick Instance = new();
+ private PurgeTick() { }
+ }
}
///
diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs
index a5db3102..317b29f9 100644
--- a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditOptions.cs
@@ -1,11 +1,13 @@
namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit;
///
-/// Configuration options for the Site Call Audit (#22) read-side: stuck-call
-/// detection and KPI windowing. Mirrors the KPI-relevant subset of
-/// NotificationOutboxOptions — the reconciliation, purge and dispatch
-/// cadence options the Notification Outbox carries are not part of the Site
-/// Call Audit read-side backend and are deliberately omitted here.
+/// Configuration options for the Site Call Audit (#22): stuck-call detection +
+/// KPI windowing for the read-side, plus the cadence/retention knobs for the
+/// two central-singleton schedulers — the periodic per-site reconciliation
+/// pull (self-heal for lost telemetry) and the daily terminal-row purge.
+/// Mirrors the KPI-relevant subset of NotificationOutboxOptions and the
+/// scheduler-cadence shape of SiteAuditReconciliationOptions /
+/// AuditLogPurgeOptions.
///
public class SiteCallAuditOptions
{
@@ -44,4 +46,99 @@ public class SiteCallAuditOptions
///
///
public TimeSpan RelayTimeout { get; set; } = TimeSpan.FromSeconds(10);
+
+ // ── Reconciliation tick (#22): periodic per-site self-heal pull ──
+
+ ///
+ /// Period of the reconciliation tick. Each tick visits every known site
+ /// once, pulls changed SiteCall rows since a per-site cursor, and
+ /// upserts them idempotently — the documented self-heal when best-effort
+ /// push telemetry is lost. Default 5 minutes, matching the sibling
+ /// SiteAuditReconciliationOptions (#23) cadence. Clamped to at least
+ /// via .
+ ///
+ public TimeSpan ReconciliationInterval { get; set; } = TimeSpan.FromMinutes(5);
+
+ ///
+ /// Test-only override for the reconciliation tick cadence — bypasses the
+ /// clamp so unit tests can drop the
+ /// cadence to milliseconds. Production config never sets this; leave null.
+ ///
+ public TimeSpan? ReconciliationIntervalOverride { get; set; }
+
+ ///
+ /// Maximum number of SiteCall rows requested per PullSiteCalls
+ /// RPC. Default 500. A MoreAvailable=true response signals the cursor
+ /// advanced and the next tick should keep draining the backlog.
+ ///
+ public int ReconciliationBatchSize { get; set; } = 500;
+
+ ///
+ /// Minimum interval the config-bound can
+ /// resolve to. Clamps a misconfigured 0 (or negative) value away from
+ /// , which would make Akka's
+ /// ScheduleTellRepeatedlyCancelable spin — the exact footgun flagged in
+ /// a prior review of the sibling reconciliation options.
+ ///
+ private static readonly TimeSpan MinReconciliationInterval = TimeSpan.FromSeconds(1);
+
+ ///
+ /// Resolves the effective reconciliation tick interval: the test override
+ /// when set (bypassing the clamp), otherwise
+ /// clamped to at least so a
+ /// zero/negative config value can never yield .
+ ///
+ public TimeSpan ResolvedReconciliationInterval =>
+ ReconciliationIntervalOverride is { } o
+ ? o
+ : ReconciliationInterval < MinReconciliationInterval
+ ? MinReconciliationInterval
+ : ReconciliationInterval;
+
+ // ── Purge scheduler (#22): daily terminal-row purge ──
+
+ ///
+ /// Period of the purge tick. Each tick drops terminal SiteCalls rows
+ /// older than the retention window via
+ /// .
+ /// Default 24 hours, matching AuditLogPurgeOptions. Clamped to at
+ /// least via .
+ ///
+ public TimeSpan PurgeInterval { get; set; } = TimeSpan.FromHours(24);
+
+ ///
+ /// Test-only override for the purge tick cadence — bypasses the
+ /// clamp so unit tests can drop the cadence
+ /// to milliseconds. Production config never sets this; leave null.
+ ///
+ public TimeSpan? PurgeIntervalOverride { get; set; }
+
+ ///
+ /// Retention window for terminal rows. On each purge tick a row whose
+ /// TerminalAtUtc is older than UtcNow - RetentionDays is
+ /// deleted; non-terminal rows are never purged. Default 365 days, matching
+ /// the central audit-store retention policy.
+ ///
+ public int RetentionDays { get; set; } = 365;
+
+ ///
+ /// Minimum interval the config-bound can resolve
+ /// to. Clamps a misconfigured 0 (or negative) value away from
+ /// for the same scheduler-spin reason as
+ /// ; the purge is daily so the floor
+ /// is a more generous 1 minute.
+ ///
+ private static readonly TimeSpan MinPurgeInterval = TimeSpan.FromMinutes(1);
+
+ ///
+ /// Resolves the effective purge tick interval: the test override when set
+ /// (bypassing the clamp), otherwise clamped to at
+ /// least .
+ ///
+ public TimeSpan ResolvedPurgeInterval =>
+ PurgeIntervalOverride is { } o
+ ? o
+ : PurgeInterval < MinPurgeInterval
+ ? MinPurgeInterval
+ : PurgeInterval;
}
diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/ZB.MOM.WW.ScadaBridge.SiteCallAudit.csproj b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/ZB.MOM.WW.ScadaBridge.SiteCallAudit.csproj
index cca4f3eb..c2de6728 100644
--- a/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/ZB.MOM.WW.ScadaBridge.SiteCallAudit.csproj
+++ b/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/ZB.MOM.WW.ScadaBridge.SiteCallAudit.csproj
@@ -29,6 +29,15 @@
the same transport every other central→site command uses. SiteEnvelope is defined
in ZB.MOM.WW.ScadaBridge.Communication (no cycle: Communication does not reference SiteCallAudit). -->
+
+
diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteEventLogging/ISiteEventLogger.cs b/src/ZB.MOM.WW.ScadaBridge.SiteEventLogging/ISiteEventLogger.cs
index 07fc9ea4..630822ef 100644
--- a/src/ZB.MOM.WW.ScadaBridge.SiteEventLogging/ISiteEventLogger.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.SiteEventLogging/ISiteEventLogger.cs
@@ -11,7 +11,7 @@ public interface ISiteEventLogger
/// completes once the event is durably persisted and faults if
/// the write fails, so callers that await it observe success or failure.
///
- /// Category: script, alarm, deployment, connection, store_and_forward, instance_lifecycle
+ /// Category: script, alarm, deployment, connection, store_and_forward, instance_lifecycle, notification
/// Info, Warning, or Error
/// Optional instance ID associated with the event
/// Source identifier, e.g., "ScriptActor:MonitorSpeed"
diff --git a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/AlarmActor.cs b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/AlarmActor.cs
index bafd20d9..ca9e2db6 100644
--- a/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/AlarmActor.cs
+++ b/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Actors/AlarmActor.cs
@@ -1,10 +1,12 @@
using Akka.Actor;
using Microsoft.CodeAnalysis.Scripting;
+using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Streaming;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening;
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
+using ZB.MOM.WW.ScadaBridge.SiteEventLogging;
using ZB.MOM.WW.ScadaBridge.SiteRuntime.Scripts;
using System.Globalization;
using System.Text.Json;
@@ -37,6 +39,25 @@ public class AlarmActor : ReceiveActor
private readonly SiteRuntimeOptions _options;
private readonly ILogger _logger;
private readonly ISiteHealthCollector? _healthCollector;
+ private readonly IServiceProvider? _serviceProvider;
+
+ ///
+ /// M1.5: the optional site operational-event log, resolved once from
+ /// at construction and cached. The
+ /// registration is process-lifetime (a singleton), so resolving once on
+ /// the actor's own thread is both correct and cheaper than a per-event
+ /// GetService on the hot path. null when no provider was
+ /// supplied (the test/no-logging path) — then
+ /// no-ops.
+ ///
+ private readonly ISiteEventLogger? _siteEventLogger;
+
+ ///
+ /// M1.5: priority at or above which a computed-alarm raise is logged as
+ /// Error to the site event log; below it, raises log as Warning.
+ /// Mirrors the 0–1000 alarm-severity scale.
+ ///
+ private const int ErrorPriorityThreshold = 700;
private AlarmState _currentState = AlarmState.Normal;
///
@@ -83,6 +104,9 @@ public class AlarmActor : ReceiveActor
/// Pre-compiled trigger expression, or null for non-expression triggers.
/// Seed attribute snapshot so static attributes evaluate correctly at startup.
/// Optional health collector for surfacing alarm execution metrics.
+ /// Optional DI service provider used to resolve the optional
+ /// for M1.5 alarm operational events. Fire-and-forget;
+ /// a logging failure never affects alarm evaluation.
public AlarmActor(
string alarmName,
string instanceName,
@@ -94,7 +118,8 @@ public class AlarmActor : ReceiveActor
ILogger logger,
Script