586 lines
25 KiB
C#
586 lines
25 KiB
C#
using System.Collections.Concurrent;
|
|
using System.Threading.Channels;
|
|
using Akka.Actor;
|
|
using Grpc.Core;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using ZB.MOM.WW.Audit;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Types;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Observability;
|
|
using GrpcStatus = Grpc.Core.Status;
|
|
|
|
namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc;
|
|
|
|
/// <summary>
|
|
/// gRPC service that accepts instance stream subscriptions from central nodes.
|
|
/// Creates a StreamRelayActor per subscription to bridge Akka domain events
|
|
/// through a Channel<T> to the gRPC response stream.
|
|
/// </summary>
|
|
public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
|
|
{
|
|
private readonly ISiteStreamSubscriber _streamSubscriber;
|
|
private ActorSystem? _actorSystem;
|
|
private readonly ILogger<SiteStreamGrpcServer> _logger;
|
|
private readonly ConcurrentDictionary<string, StreamEntry> _activeStreams = new();
|
|
private readonly int _maxConcurrentStreams;
|
|
private readonly TimeSpan _maxStreamLifetime;
|
|
private volatile bool _ready;
|
|
// Host-017 / REQ-HOST-7: flipped by CancelAllStreams() when the host enters
|
|
// CoordinatedShutdown so SubscribeInstance refuses new streams with
|
|
// Unavailable before the actor system tears down. Strictly monotonic — once
|
|
// true, never reset (the server is single-lifetime per host).
|
|
private volatile bool _shuttingDown;
|
|
private long _actorCounter;
|
|
// Audit Log (#23 M2): central-side ingest actor proxy. Set by the host
|
|
// after the cluster singleton starts (see Bundle E wiring). When null the
|
|
// IngestAuditEvents RPC replies with an empty IngestAck so sites can
|
|
// safely retry — wiring-incomplete is treated as transient, never fatal.
|
|
private IActorRef? _auditIngestActor;
|
|
// Per Bundle D's brief — Ask timeout is 30 s. The ingest actor's repo
|
|
// calls are sub-100 ms in steady state; a generous timeout absorbs a slow
|
|
// MSSQL connection without surfacing as a gRPC failure on a healthy site.
|
|
private static readonly TimeSpan AuditIngestAskTimeout = TimeSpan.FromSeconds(30);
|
|
// Audit Log (#23 M6): site-local queue handed in by AkkaHostedService on
|
|
// site roles so the central reconciliation puller's PullAuditEvents RPC
|
|
// can read Pending/Forwarded rows. Null when not wired (e.g. central-only
|
|
// host or test composing the server in isolation) — the handler treats
|
|
// the missing queue as "nothing to ship" and returns an empty response so
|
|
// central retries on its next reconciliation cycle.
|
|
private ISiteAuditQueue? _siteAuditQueue;
|
|
// Site Call Audit (#22): site-local operation-tracking store handed in by
|
|
// AkkaHostedService on site roles so the central reconciliation puller's
|
|
// PullSiteCalls RPC can read tracking rows changed since a cursor. Null
|
|
// when not wired (central-only host or test composing the server in
|
|
// isolation) — the handler treats the missing store as "nothing to ship"
|
|
// and returns an empty response so central retries on its next cycle.
|
|
// Mirrors _siteAuditQueue.
|
|
private IOperationTrackingStore? _operationTrackingStore;
|
|
|
|
/// <summary>
|
|
/// Test-only constructor — kept <c>internal</c> so the DI container sees a
|
|
/// single public constructor and is not faced with an ambiguous choice.
|
|
/// </summary>
|
|
/// <param name="streamSubscriber">The stream subscriber for managing subscriptions.</param>
|
|
/// <param name="logger">The logger instance.</param>
|
|
/// <param name="maxConcurrentStreams">The maximum concurrent streams (default 100).</param>
|
|
internal SiteStreamGrpcServer(
|
|
ISiteStreamSubscriber streamSubscriber,
|
|
ILogger<SiteStreamGrpcServer> logger,
|
|
int maxConcurrentStreams = 100)
|
|
: this(streamSubscriber, logger, maxConcurrentStreams, TimeSpan.FromHours(4))
|
|
{
|
|
}
|
|
|
|
/// <summary>
|
|
/// DI constructor — binds <see cref="CommunicationOptions.GrpcMaxConcurrentStreams"/>
|
|
/// and <see cref="CommunicationOptions.GrpcMaxStreamLifetime"/> so the documented
|
|
/// concurrency limit and the 4-hour zombie-stream session timeout are honoured
|
|
/// rather than hard-coded.
|
|
/// </summary>
|
|
/// <param name="streamSubscriber">The stream subscriber for managing subscriptions.</param>
|
|
/// <param name="logger">The logger instance.</param>
|
|
/// <param name="options">Communication options containing stream limits and timeouts.</param>
|
|
public SiteStreamGrpcServer(
|
|
ISiteStreamSubscriber streamSubscriber,
|
|
ILogger<SiteStreamGrpcServer> logger,
|
|
IOptions<CommunicationOptions> options)
|
|
: this(streamSubscriber, logger,
|
|
options.Value.GrpcMaxConcurrentStreams,
|
|
options.Value.GrpcMaxStreamLifetime)
|
|
{
|
|
}
|
|
|
|
private SiteStreamGrpcServer(
|
|
ISiteStreamSubscriber streamSubscriber,
|
|
ILogger<SiteStreamGrpcServer> logger,
|
|
int maxConcurrentStreams,
|
|
TimeSpan maxStreamLifetime)
|
|
{
|
|
_streamSubscriber = streamSubscriber;
|
|
_logger = logger;
|
|
_maxConcurrentStreams = maxConcurrentStreams;
|
|
_maxStreamLifetime = maxStreamLifetime;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Marks the server as ready to accept subscriptions and injects the ActorSystem.
|
|
/// Called after the site runtime actor system is fully initialized.
|
|
/// The ActorSystem is set here rather than via the constructor so that
|
|
/// the gRPC server can be created by DI before the actor system exists.
|
|
/// </summary>
|
|
/// <param name="actorSystem">The initialized Akka actor system.</param>
|
|
public void SetReady(ActorSystem actorSystem)
|
|
{
|
|
_actorSystem = actorSystem;
|
|
_ready = true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Hands the central-side <c>AuditLogIngestActor</c> proxy to the gRPC
|
|
/// server so the <see cref="IngestAuditEvents"/> RPC can route incoming
|
|
/// site batches. Audit Log (#23) M2 wiring point — mirrors the way
|
|
/// <c>CommunicationService.SetNotificationOutbox</c> takes the Notification
|
|
/// Outbox singleton proxy. Bundle E supplies the actor after the cluster
|
|
/// singleton starts.
|
|
/// </summary>
|
|
/// <param name="proxy">The audit log ingest actor proxy.</param>
|
|
public void SetAuditIngestActor(IActorRef proxy)
|
|
{
|
|
_auditIngestActor = proxy;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Hands the site-local <see cref="ISiteAuditQueue"/> (the same
|
|
/// <c>SqliteAuditWriter</c> singleton that backs <see cref="IAuditWriter"/>
|
|
/// on the script thread) to the gRPC server so the M6
|
|
/// <see cref="PullAuditEvents"/> RPC can serve central's reconciliation
|
|
/// pulls. Mirrors <see cref="SetAuditIngestActor"/>: wired post-construction
|
|
/// because the queue and the gRPC server are both DI singletons brought up
|
|
/// in independent orders on site startup.
|
|
/// </summary>
|
|
/// <param name="queue">The site audit queue for serving reconciliation pulls.</param>
|
|
public void SetSiteAuditQueue(ISiteAuditQueue queue)
|
|
{
|
|
_siteAuditQueue = queue;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Hands the site-local <see cref="IOperationTrackingStore"/> (the same
|
|
/// <c>OperationTrackingStore</c> singleton that backs
|
|
/// <c>Tracking.Status(id)</c> on the script thread) to the gRPC server so
|
|
/// the Site Call Audit (#22) <see cref="PullSiteCalls"/> RPC can serve
|
|
/// central's reconciliation pulls. Mirrors <see cref="SetSiteAuditQueue"/>:
|
|
/// wired post-construction because the store and the gRPC server are both
|
|
/// DI singletons brought up in independent orders on site startup.
|
|
/// </summary>
|
|
/// <param name="store">The site operation-tracking store for serving reconciliation pulls.</param>
|
|
public void SetOperationTrackingStore(IOperationTrackingStore store)
|
|
{
|
|
_operationTrackingStore = store;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Host-017 / REQ-HOST-7: signals the gRPC server to begin its part of the
|
|
/// site shutdown sequence — refuse new <see cref="SubscribeInstance"/>
|
|
/// streams with <see cref="StatusCode.Unavailable"/> and cancel every
|
|
/// active stream so its <c>await foreach</c> observes
|
|
/// <see cref="OperationCanceledException"/> and the response stream
|
|
/// completes with <c>Cancelled</c> on the client. Idempotent — safe to call
|
|
/// more than once. Invoked from the site host's
|
|
/// <c>IHostApplicationLifetime.ApplicationStopping</c> callback BEFORE
|
|
/// Akka's <c>CoordinatedShutdown</c> runs, so in-flight clients get a
|
|
/// clean cancellation they can reconnect on rather than a silent stream
|
|
/// that only times out via gRPC keepalive.
|
|
/// </summary>
|
|
public void CancelAllStreams()
|
|
{
|
|
_shuttingDown = true;
|
|
foreach (var entry in _activeStreams.Values)
|
|
{
|
|
try
|
|
{
|
|
entry.Cts.Cancel();
|
|
}
|
|
catch (ObjectDisposedException)
|
|
{
|
|
// Already cleaned up by its own finally — nothing to do.
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Host-017: exposed for test assertions on the shutdown state.
|
|
/// </summary>
|
|
internal bool IsShuttingDown => _shuttingDown;
|
|
|
|
/// <summary>
|
|
/// Number of currently active streaming subscriptions. Exposed for diagnostics.
|
|
/// </summary>
|
|
public int ActiveStreamCount => _activeStreams.Count;
|
|
|
|
/// <summary>Effective max concurrent stream limit. Exposed for tests.</summary>
|
|
internal int MaxConcurrentStreams => _maxConcurrentStreams;
|
|
|
|
/// <summary>Effective per-stream session lifetime. Exposed for tests.</summary>
|
|
internal TimeSpan MaxStreamLifetime => _maxStreamLifetime;
|
|
|
|
/// <inheritdoc />
|
|
public override async Task SubscribeInstance(
|
|
InstanceStreamRequest request,
|
|
IServerStreamWriter<SiteStreamEvent> responseStream,
|
|
ServerCallContext context)
|
|
{
|
|
if (!_ready)
|
|
throw new RpcException(new GrpcStatus(StatusCode.Unavailable, "Server not ready"));
|
|
|
|
// Host-017 / REQ-HOST-7: refuse new subscriptions during shutdown so
|
|
// CoordinatedShutdown can quiesce without racing fresh streams.
|
|
if (_shuttingDown)
|
|
throw new RpcException(new GrpcStatus(StatusCode.Unavailable, "Server shutting down"));
|
|
|
|
// Communication-014: correlation_id arrives off the wire on a public gRPC
|
|
// endpoint and is used (below) to compose an Akka actor name. Akka actor names
|
|
// have a restricted character set — a id containing '/', whitespace, or other
|
|
// disallowed characters would make ActorOf throw InvalidActorNameException,
|
|
// escaping as an unhandled RPC fault. Reject unsafe ids cleanly up front.
|
|
if (string.IsNullOrEmpty(request.CorrelationId) ||
|
|
!ActorPath.IsValidPathElement(request.CorrelationId))
|
|
{
|
|
throw new RpcException(new GrpcStatus(
|
|
StatusCode.InvalidArgument, "correlation_id is missing or not a valid identifier"));
|
|
}
|
|
|
|
// Duplicate prevention -- cancel existing stream for this correlationId
|
|
if (_activeStreams.TryRemove(request.CorrelationId, out var existingEntry))
|
|
{
|
|
existingEntry.Cts.Cancel();
|
|
existingEntry.Cts.Dispose();
|
|
}
|
|
|
|
// Check max concurrent streams after duplicate removal
|
|
if (_activeStreams.Count >= _maxConcurrentStreams)
|
|
throw new RpcException(new GrpcStatus(StatusCode.ResourceExhausted, "Max concurrent streams reached"));
|
|
|
|
using var streamCts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken);
|
|
// Session timeout (design doc "gRPC Connection Keepalive": 4-hour third layer
|
|
// of dead-client detection) — forces a long-lived zombie stream to terminate
|
|
// even if keepalive PINGs never detect the loss.
|
|
if (_maxStreamLifetime > TimeSpan.Zero && _maxStreamLifetime != Timeout.InfiniteTimeSpan)
|
|
streamCts.CancelAfter(_maxStreamLifetime);
|
|
var entry = new StreamEntry(streamCts);
|
|
_activeStreams[request.CorrelationId] = entry;
|
|
|
|
var channel = Channel.CreateBounded<SiteStreamEvent>(
|
|
new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.DropOldest });
|
|
|
|
var actorSeq = Interlocked.Increment(ref _actorCounter);
|
|
var relayActor = _actorSystem!.ActorOf(
|
|
Props.Create(typeof(Actors.StreamRelayActor), request.CorrelationId, channel.Writer),
|
|
$"stream-relay-{request.CorrelationId}-{actorSeq}");
|
|
|
|
// Communication-021: the previous code called _streamSubscriber.Subscribe
|
|
// OUTSIDE the try block that owns relay-actor cleanup. If Subscribe threw
|
|
// (stale instance name, index lookup fault, site runtime shutting down),
|
|
// the freshly-created relay actor, the _activeStreams entry, the
|
|
// StreamEntry.Cts, and the Channel<SiteStreamEvent> all leaked because the
|
|
// finally never ran. Wrap Subscribe in its own try so any throw deterministically
|
|
// stops the relay actor, removes the activeStreams entry, and completes the
|
|
// channel before the RpcException escapes to the caller.
|
|
string subscriptionId;
|
|
try
|
|
{
|
|
subscriptionId = _streamSubscriber.Subscribe(request.InstanceUniqueName, relayActor);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"Subscribe failed for {Instance} (correlation {CorrelationId}); cleaning up relay actor.",
|
|
request.InstanceUniqueName, request.CorrelationId);
|
|
_actorSystem!.Stop(relayActor);
|
|
channel.Writer.TryComplete();
|
|
_activeStreams.TryRemove(
|
|
new KeyValuePair<string, StreamEntry>(request.CorrelationId, entry));
|
|
throw;
|
|
}
|
|
|
|
_logger.LogInformation(
|
|
"Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})",
|
|
request.CorrelationId, request.InstanceUniqueName, subscriptionId);
|
|
|
|
// Telemetry follow-on: the connection is now fully established (Subscribe
|
|
// succeeded, so no leak via the catch above). Count it up here and balance
|
|
// it in the finally below so the scadabridge.site.connection.up gauge is
|
|
// decremented on EVERY exit path — normal completion, client-cancel /
|
|
// duplicate-replacement (OperationCanceledException), server shutdown
|
|
// (CancelAllStreams -> Cts.Cancel), and any other exception — guaranteeing
|
|
// exactly one Closed per Opened and a gauge that never drifts up.
|
|
ScadaBridgeTelemetry.SiteConnectionOpened();
|
|
try
|
|
{
|
|
await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token))
|
|
{
|
|
await responseStream.WriteAsync(evt, streamCts.Token);
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
// Normal cancellation (client disconnect or duplicate replacement)
|
|
}
|
|
finally
|
|
{
|
|
ScadaBridgeTelemetry.SiteConnectionClosed();
|
|
_streamSubscriber.RemoveSubscriber(relayActor);
|
|
_actorSystem!.Stop(relayActor);
|
|
channel.Writer.TryComplete();
|
|
|
|
// Only remove our own entry -- a replacement stream may have already taken the slot
|
|
_activeStreams.TryRemove(
|
|
new KeyValuePair<string, StreamEntry>(request.CorrelationId, entry));
|
|
|
|
_logger.LogInformation(
|
|
"Stream {CorrelationId} for {Instance} ended",
|
|
request.CorrelationId, request.InstanceUniqueName);
|
|
}
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public override async Task<IngestAck> IngestAuditEvents(
|
|
AuditEventBatch request,
|
|
ServerCallContext context)
|
|
{
|
|
// Empty batch is a no-op; reply immediately so the client moves on.
|
|
if (request.Events.Count == 0)
|
|
{
|
|
return new IngestAck();
|
|
}
|
|
|
|
var actor = _auditIngestActor;
|
|
if (actor is null)
|
|
{
|
|
// Wiring incomplete (host startup race). Sites treat an empty
|
|
// ack as "nothing was acked, leave rows Pending, retry next
|
|
// drain" — exactly the right behaviour during host bring-up.
|
|
_logger.LogWarning(
|
|
"IngestAuditEvents received {Count} events before SetAuditIngestActor was called; returning empty ack.",
|
|
request.Events.Count);
|
|
return new IngestAck();
|
|
}
|
|
|
|
var entities = new List<AuditEvent>(request.Events.Count);
|
|
foreach (var dto in request.Events)
|
|
{
|
|
entities.Add(AuditEventDtoMapper.FromDto(dto));
|
|
}
|
|
|
|
var cmd = new IngestAuditEventsCommand(entities);
|
|
IngestAuditEventsReply reply;
|
|
try
|
|
{
|
|
reply = await actor.Ask<IngestAuditEventsReply>(
|
|
cmd, AuditIngestAskTimeout, context.CancellationToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// Audit ingest is best-effort; failing this RPC at the gRPC layer
|
|
// would surface as a transport error and force the site to retry
|
|
// (which it would do anyway). Logging + an empty ack keeps the
|
|
// semantics consistent with the "wiring incomplete" path above.
|
|
_logger.LogError(ex,
|
|
"AuditLogIngestActor Ask failed for batch of {Count} events; returning empty ack.",
|
|
request.Events.Count);
|
|
return new IngestAck();
|
|
}
|
|
|
|
var ack = new IngestAck();
|
|
foreach (var id in reply.AcceptedEventIds)
|
|
{
|
|
ack.AcceptedEventIds.Add(id.ToString());
|
|
}
|
|
return ack;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public override async Task<IngestAck> IngestCachedTelemetry(
|
|
CachedTelemetryBatch request,
|
|
ServerCallContext context)
|
|
{
|
|
if (request.Packets.Count == 0)
|
|
{
|
|
return new IngestAck();
|
|
}
|
|
|
|
var actor = _auditIngestActor;
|
|
if (actor is null)
|
|
{
|
|
_logger.LogWarning(
|
|
"IngestCachedTelemetry received {Count} packets before SetAuditIngestActor was called; returning empty ack.",
|
|
request.Packets.Count);
|
|
return new IngestAck();
|
|
}
|
|
|
|
var entries = new List<CachedTelemetryEntry>(request.Packets.Count);
|
|
foreach (var packet in request.Packets)
|
|
{
|
|
var auditEvent = AuditEventDtoMapper.FromDto(packet.AuditEvent);
|
|
var siteCall = SiteCallDtoMapper.FromDto(packet.Operational);
|
|
entries.Add(new CachedTelemetryEntry(auditEvent, siteCall));
|
|
}
|
|
|
|
var cmd = new IngestCachedTelemetryCommand(entries);
|
|
IngestCachedTelemetryReply reply;
|
|
try
|
|
{
|
|
reply = await actor.Ask<IngestCachedTelemetryReply>(
|
|
cmd, AuditIngestAskTimeout, context.CancellationToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex,
|
|
"AuditLogIngestActor Ask failed for combined telemetry batch of {Count} packets; returning empty ack.",
|
|
request.Packets.Count);
|
|
return new IngestAck();
|
|
}
|
|
|
|
var ack = new IngestAck();
|
|
foreach (var id in reply.AcceptedEventIds)
|
|
{
|
|
ack.AcceptedEventIds.Add(id.ToString());
|
|
}
|
|
return ack;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public override async Task<PullAuditEventsResponse> PullAuditEvents(
|
|
PullAuditEventsRequest request,
|
|
ServerCallContext context)
|
|
{
|
|
var queue = _siteAuditQueue;
|
|
if (queue is null)
|
|
{
|
|
_logger.LogWarning(
|
|
"PullAuditEvents invoked before SetSiteAuditQueue was called; returning empty response.");
|
|
return new PullAuditEventsResponse();
|
|
}
|
|
|
|
if (request.BatchSize <= 0)
|
|
{
|
|
// Mirrors the SubscribeInstance guard: reject malformed requests
|
|
// cleanly with InvalidArgument so the caller doesn't see a generic
|
|
// RpcException from the underlying SQLite parameter validation.
|
|
throw new RpcException(new GrpcStatus(
|
|
StatusCode.InvalidArgument, "batch_size must be > 0"));
|
|
}
|
|
|
|
// sinceUtc defaults to DateTime.MinValue when the wrapper is absent —
|
|
// i.e. "pull from the beginning of recorded history", which is the
|
|
// intended behaviour for the very first reconciliation cycle.
|
|
var since = request.SinceUtc is not null
|
|
? DateTime.SpecifyKind(request.SinceUtc.ToDateTime(), DateTimeKind.Utc)
|
|
: DateTime.MinValue;
|
|
|
|
IReadOnlyList<AuditEvent> events;
|
|
try
|
|
{
|
|
events = await queue.ReadPendingSinceAsync(
|
|
since, request.BatchSize, context.CancellationToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex,
|
|
"ReadPendingSinceAsync failed for since={Since} batch={Batch}; returning empty response.",
|
|
since, request.BatchSize);
|
|
return new PullAuditEventsResponse();
|
|
}
|
|
|
|
var response = new PullAuditEventsResponse
|
|
{
|
|
// batch_size saturated → tell central to issue a follow-up pull
|
|
// with an advanced cursor. The site doesn't compute the cursor —
|
|
// central walks it forward from the last returned OccurredAtUtc.
|
|
MoreAvailable = events.Count >= request.BatchSize,
|
|
};
|
|
foreach (var evt in events)
|
|
{
|
|
response.Events.Add(AuditEventDtoMapper.ToDto(evt));
|
|
}
|
|
|
|
// Flip to Reconciled AFTER projecting the response so a fault below the
|
|
// try/catch (mid-response, mid-flip) leaves the rows in Pending/Forwarded
|
|
// and central pulls them again next cycle. The flip itself is
|
|
// best-effort — its failure is a warning, not a fault, because central
|
|
// will dedup on EventId on the next pull.
|
|
var ids = new List<Guid>(events.Count);
|
|
foreach (var evt in events)
|
|
{
|
|
ids.Add(evt.EventId);
|
|
}
|
|
|
|
if (ids.Count > 0)
|
|
{
|
|
try
|
|
{
|
|
await queue.MarkReconciledAsync(ids, context.CancellationToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"MarkReconciledAsync failed after PullAuditEvents response of {Count} rows; rows stay Pending for retry.",
|
|
ids.Count);
|
|
}
|
|
}
|
|
|
|
return response;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public override async Task<PullSiteCallsResponse> PullSiteCalls(
|
|
PullSiteCallsRequest request,
|
|
ServerCallContext context)
|
|
{
|
|
var store = _operationTrackingStore;
|
|
if (store is null)
|
|
{
|
|
_logger.LogWarning(
|
|
"PullSiteCalls invoked before SetOperationTrackingStore was called; returning empty response.");
|
|
return new PullSiteCallsResponse();
|
|
}
|
|
|
|
if (request.BatchSize <= 0)
|
|
{
|
|
// Mirrors PullAuditEvents: reject malformed requests cleanly with
|
|
// InvalidArgument so the caller doesn't see a generic RpcException
|
|
// from the underlying SQLite parameter validation.
|
|
throw new RpcException(new GrpcStatus(
|
|
StatusCode.InvalidArgument, "batch_size must be > 0"));
|
|
}
|
|
|
|
// since_utc defaults to DateTime.MinValue when the wrapper is absent —
|
|
// i.e. "pull from the beginning of recorded history", the intended
|
|
// behaviour for the very first reconciliation cycle.
|
|
var since = request.SinceUtc is not null
|
|
? DateTime.SpecifyKind(request.SinceUtc.ToDateTime(), DateTimeKind.Utc)
|
|
: DateTime.MinValue;
|
|
|
|
IReadOnlyList<SiteCallOperational> operationals;
|
|
try
|
|
{
|
|
operationals = await store.ReadChangedSinceAsync(
|
|
since, request.BatchSize, context.CancellationToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// Best-effort, like PullAuditEvents: a read fault must never abort
|
|
// the reconciliation tick — central retries on its next cycle.
|
|
_logger.LogError(ex,
|
|
"ReadChangedSinceAsync failed for since={Since} batch={Batch}; returning empty response.",
|
|
since, request.BatchSize);
|
|
return new PullSiteCallsResponse();
|
|
}
|
|
|
|
var response = new PullSiteCallsResponse
|
|
{
|
|
// batch_size saturated → tell central to issue a follow-up pull with
|
|
// an advanced cursor. The site doesn't compute the cursor — central
|
|
// walks it forward from the last returned UpdatedAtUtc. Unlike
|
|
// PullAuditEvents there is no MarkReconciled step: the tracking store
|
|
// is the operational source of truth and the central SiteCalls mirror
|
|
// is upsert-on-newer, so re-reading rows is a harmless no-op.
|
|
MoreAvailable = operationals.Count >= request.BatchSize,
|
|
};
|
|
foreach (var op in operationals)
|
|
{
|
|
response.Operationals.Add(SiteCallDtoMapper.ToDto(op));
|
|
}
|
|
|
|
return response;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Tracks a single active stream so cleanup only removes its own entry.
|
|
/// </summary>
|
|
private sealed record StreamEntry(CancellationTokenSource Cts);
|
|
}
|