using System.Collections.Concurrent; using System.Threading.Channels; using Akka.Actor; using Grpc.Core; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using ZB.MOM.WW.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Services; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Observability; using GrpcStatus = Grpc.Core.Status; namespace ZB.MOM.WW.ScadaBridge.Communication.Grpc; /// /// gRPC service that accepts instance stream subscriptions from central nodes. /// Creates a StreamRelayActor per subscription to bridge Akka domain events /// through a Channel<T> to the gRPC response stream. /// public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase { private readonly ISiteStreamSubscriber _streamSubscriber; private ActorSystem? _actorSystem; private readonly ILogger _logger; private readonly ConcurrentDictionary _activeStreams = new(); private readonly int _maxConcurrentStreams; private readonly TimeSpan _maxStreamLifetime; private volatile bool _ready; // Host-017 / REQ-HOST-7: flipped by CancelAllStreams() when the host enters // CoordinatedShutdown so SubscribeInstance refuses new streams with // Unavailable before the actor system tears down. Strictly monotonic — once // true, never reset (the server is single-lifetime per host). private volatile bool _shuttingDown; private long _actorCounter; // Audit Log (#23 M2): central-side ingest actor proxy. Set by the host // after the cluster singleton starts (see Bundle E wiring). When null the // IngestAuditEvents RPC replies with an empty IngestAck so sites can // safely retry — wiring-incomplete is treated as transient, never fatal. private IActorRef? _auditIngestActor; // Per Bundle D's brief — Ask timeout is 30 s. The ingest actor's repo // calls are sub-100 ms in steady state; a generous timeout absorbs a slow // MSSQL connection without surfacing as a gRPC failure on a healthy site. private static readonly TimeSpan AuditIngestAskTimeout = TimeSpan.FromSeconds(30); // Audit Log (#23 M6): site-local queue handed in by AkkaHostedService on // site roles so the central reconciliation puller's PullAuditEvents RPC // can read Pending/Forwarded rows. Null when not wired (e.g. central-only // host or test composing the server in isolation) — the handler treats // the missing queue as "nothing to ship" and returns an empty response so // central retries on its next reconciliation cycle. private ISiteAuditQueue? _siteAuditQueue; /// /// Test-only constructor — kept internal so the DI container sees a /// single public constructor and is not faced with an ambiguous choice. /// /// The stream subscriber for managing subscriptions. /// The logger instance. /// The maximum concurrent streams (default 100). internal SiteStreamGrpcServer( ISiteStreamSubscriber streamSubscriber, ILogger logger, int maxConcurrentStreams = 100) : this(streamSubscriber, logger, maxConcurrentStreams, TimeSpan.FromHours(4)) { } /// /// DI constructor — binds /// and so the documented /// concurrency limit and the 4-hour zombie-stream session timeout are honoured /// rather than hard-coded. /// /// The stream subscriber for managing subscriptions. /// The logger instance. /// Communication options containing stream limits and timeouts. public SiteStreamGrpcServer( ISiteStreamSubscriber streamSubscriber, ILogger logger, IOptions options) : this(streamSubscriber, logger, options.Value.GrpcMaxConcurrentStreams, options.Value.GrpcMaxStreamLifetime) { } private SiteStreamGrpcServer( ISiteStreamSubscriber streamSubscriber, ILogger logger, int maxConcurrentStreams, TimeSpan maxStreamLifetime) { _streamSubscriber = streamSubscriber; _logger = logger; _maxConcurrentStreams = maxConcurrentStreams; _maxStreamLifetime = maxStreamLifetime; } /// /// Marks the server as ready to accept subscriptions and injects the ActorSystem. /// Called after the site runtime actor system is fully initialized. /// The ActorSystem is set here rather than via the constructor so that /// the gRPC server can be created by DI before the actor system exists. /// /// The initialized Akka actor system. public void SetReady(ActorSystem actorSystem) { _actorSystem = actorSystem; _ready = true; } /// /// Hands the central-side AuditLogIngestActor proxy to the gRPC /// server so the RPC can route incoming /// site batches. Audit Log (#23) M2 wiring point — mirrors the way /// CommunicationService.SetNotificationOutbox takes the Notification /// Outbox singleton proxy. Bundle E supplies the actor after the cluster /// singleton starts. /// /// The audit log ingest actor proxy. public void SetAuditIngestActor(IActorRef proxy) { _auditIngestActor = proxy; } /// /// Hands the site-local (the same /// SqliteAuditWriter singleton that backs /// on the script thread) to the gRPC server so the M6 /// RPC can serve central's reconciliation /// pulls. Mirrors : wired post-construction /// because the queue and the gRPC server are both DI singletons brought up /// in independent orders on site startup. /// /// The site audit queue for serving reconciliation pulls. public void SetSiteAuditQueue(ISiteAuditQueue queue) { _siteAuditQueue = queue; } /// /// Host-017 / REQ-HOST-7: signals the gRPC server to begin its part of the /// site shutdown sequence — refuse new /// streams with and cancel every /// active stream so its await foreach observes /// and the response stream /// completes with Cancelled on the client. Idempotent — safe to call /// more than once. Invoked from the site host's /// IHostApplicationLifetime.ApplicationStopping callback BEFORE /// Akka's CoordinatedShutdown runs, so in-flight clients get a /// clean cancellation they can reconnect on rather than a silent stream /// that only times out via gRPC keepalive. /// public void CancelAllStreams() { _shuttingDown = true; foreach (var entry in _activeStreams.Values) { try { entry.Cts.Cancel(); } catch (ObjectDisposedException) { // Already cleaned up by its own finally — nothing to do. } } } /// /// Host-017: exposed for test assertions on the shutdown state. /// internal bool IsShuttingDown => _shuttingDown; /// /// Number of currently active streaming subscriptions. Exposed for diagnostics. /// public int ActiveStreamCount => _activeStreams.Count; /// Effective max concurrent stream limit. Exposed for tests. internal int MaxConcurrentStreams => _maxConcurrentStreams; /// Effective per-stream session lifetime. Exposed for tests. internal TimeSpan MaxStreamLifetime => _maxStreamLifetime; /// public override async Task SubscribeInstance( InstanceStreamRequest request, IServerStreamWriter responseStream, ServerCallContext context) { if (!_ready) throw new RpcException(new GrpcStatus(StatusCode.Unavailable, "Server not ready")); // Host-017 / REQ-HOST-7: refuse new subscriptions during shutdown so // CoordinatedShutdown can quiesce without racing fresh streams. if (_shuttingDown) throw new RpcException(new GrpcStatus(StatusCode.Unavailable, "Server shutting down")); // Communication-014: correlation_id arrives off the wire on a public gRPC // endpoint and is used (below) to compose an Akka actor name. Akka actor names // have a restricted character set — a id containing '/', whitespace, or other // disallowed characters would make ActorOf throw InvalidActorNameException, // escaping as an unhandled RPC fault. Reject unsafe ids cleanly up front. if (string.IsNullOrEmpty(request.CorrelationId) || !ActorPath.IsValidPathElement(request.CorrelationId)) { throw new RpcException(new GrpcStatus( StatusCode.InvalidArgument, "correlation_id is missing or not a valid identifier")); } // Duplicate prevention -- cancel existing stream for this correlationId if (_activeStreams.TryRemove(request.CorrelationId, out var existingEntry)) { existingEntry.Cts.Cancel(); existingEntry.Cts.Dispose(); } // Check max concurrent streams after duplicate removal if (_activeStreams.Count >= _maxConcurrentStreams) throw new RpcException(new GrpcStatus(StatusCode.ResourceExhausted, "Max concurrent streams reached")); using var streamCts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken); // Session timeout (design doc "gRPC Connection Keepalive": 4-hour third layer // of dead-client detection) — forces a long-lived zombie stream to terminate // even if keepalive PINGs never detect the loss. if (_maxStreamLifetime > TimeSpan.Zero && _maxStreamLifetime != Timeout.InfiniteTimeSpan) streamCts.CancelAfter(_maxStreamLifetime); var entry = new StreamEntry(streamCts); _activeStreams[request.CorrelationId] = entry; var channel = Channel.CreateBounded( new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.DropOldest }); var actorSeq = Interlocked.Increment(ref _actorCounter); var relayActor = _actorSystem!.ActorOf( Props.Create(typeof(Actors.StreamRelayActor), request.CorrelationId, channel.Writer), $"stream-relay-{request.CorrelationId}-{actorSeq}"); // Communication-021: the previous code called _streamSubscriber.Subscribe // OUTSIDE the try block that owns relay-actor cleanup. If Subscribe threw // (stale instance name, index lookup fault, site runtime shutting down), // the freshly-created relay actor, the _activeStreams entry, the // StreamEntry.Cts, and the Channel all leaked because the // finally never ran. Wrap Subscribe in its own try so any throw deterministically // stops the relay actor, removes the activeStreams entry, and completes the // channel before the RpcException escapes to the caller. string subscriptionId; try { subscriptionId = _streamSubscriber.Subscribe(request.InstanceUniqueName, relayActor); } catch (Exception ex) { _logger.LogWarning(ex, "Subscribe failed for {Instance} (correlation {CorrelationId}); cleaning up relay actor.", request.InstanceUniqueName, request.CorrelationId); _actorSystem!.Stop(relayActor); channel.Writer.TryComplete(); _activeStreams.TryRemove( new KeyValuePair(request.CorrelationId, entry)); throw; } _logger.LogInformation( "Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})", request.CorrelationId, request.InstanceUniqueName, subscriptionId); // Telemetry follow-on: the connection is now fully established (Subscribe // succeeded, so no leak via the catch above). Count it up here and balance // it in the finally below so the scadabridge.site.connection.up gauge is // decremented on EVERY exit path — normal completion, client-cancel / // duplicate-replacement (OperationCanceledException), server shutdown // (CancelAllStreams -> Cts.Cancel), and any other exception — guaranteeing // exactly one Closed per Opened and a gauge that never drifts up. ScadaBridgeTelemetry.SiteConnectionOpened(); try { await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token)) { await responseStream.WriteAsync(evt, streamCts.Token); } } catch (OperationCanceledException) { // Normal cancellation (client disconnect or duplicate replacement) } finally { ScadaBridgeTelemetry.SiteConnectionClosed(); _streamSubscriber.RemoveSubscriber(relayActor); _actorSystem!.Stop(relayActor); channel.Writer.TryComplete(); // Only remove our own entry -- a replacement stream may have already taken the slot _activeStreams.TryRemove( new KeyValuePair(request.CorrelationId, entry)); _logger.LogInformation( "Stream {CorrelationId} for {Instance} ended", request.CorrelationId, request.InstanceUniqueName); } } /// /// Audit Log (#23) M2 site→central push RPC. Decodes a site batch into /// rows, Asks the central AuditLogIngestActor /// proxy to persist them, and echoes the accepted EventIds back so the site /// can flip its local rows to Forwarded. /// /// /// /// The DTO→entity conversion uses the shared /// (hosted in ZB.MOM.WW.ScadaBridge.Communication so both this server and /// ZB.MOM.WW.ScadaBridge.AuditLog share one implementation without a /// project-reference cycle). /// /// /// When is not yet wired (host startup /// race window), the RPC returns an empty rather /// than failing — the site treats the missing ack as a transient outcome /// and retries on the next drain, which is the desired idempotent /// behaviour. /// /// /// /// The audit event batch to ingest. /// The server call context. public override async Task IngestAuditEvents( AuditEventBatch request, ServerCallContext context) { // Empty batch is a no-op; reply immediately so the client moves on. if (request.Events.Count == 0) { return new IngestAck(); } var actor = _auditIngestActor; if (actor is null) { // Wiring incomplete (host startup race). Sites treat an empty // ack as "nothing was acked, leave rows Pending, retry next // drain" — exactly the right behaviour during host bring-up. _logger.LogWarning( "IngestAuditEvents received {Count} events before SetAuditIngestActor was called; returning empty ack.", request.Events.Count); return new IngestAck(); } var entities = new List(request.Events.Count); foreach (var dto in request.Events) { entities.Add(AuditEventDtoMapper.FromDto(dto)); } var cmd = new IngestAuditEventsCommand(entities); IngestAuditEventsReply reply; try { reply = await actor.Ask( cmd, AuditIngestAskTimeout, context.CancellationToken); } catch (Exception ex) { // Audit ingest is best-effort; failing this RPC at the gRPC layer // would surface as a transport error and force the site to retry // (which it would do anyway). Logging + an empty ack keeps the // semantics consistent with the "wiring incomplete" path above. _logger.LogError(ex, "AuditLogIngestActor Ask failed for batch of {Count} events; returning empty ack.", request.Events.Count); return new IngestAck(); } var ack = new IngestAck(); foreach (var id in reply.AcceptedEventIds) { ack.AcceptedEventIds.Add(id.ToString()); } return ack; } /// /// Audit Log (#23) M3 site→central combined-telemetry push RPC. Decodes a /// batch of entries into matched /// (AuditEvent, SiteCall) pairs, Asks the central AuditLogIngestActor /// proxy to persist them in dual-write transactions, and echoes the /// AuditEvent EventIds that committed back so the site can flip its local /// rows to Forwarded. /// /// /// Same wiring-incomplete fallback as : when /// the actor proxy has not been set the RPC replies with an empty ack so /// sites treat the outcome as transient and retry, never a hard fault. /// /// /// The cached telemetry batch to ingest. /// The server call context. public override async Task IngestCachedTelemetry( CachedTelemetryBatch request, ServerCallContext context) { if (request.Packets.Count == 0) { return new IngestAck(); } var actor = _auditIngestActor; if (actor is null) { _logger.LogWarning( "IngestCachedTelemetry received {Count} packets before SetAuditIngestActor was called; returning empty ack.", request.Packets.Count); return new IngestAck(); } var entries = new List(request.Packets.Count); foreach (var packet in request.Packets) { var auditEvent = AuditEventDtoMapper.FromDto(packet.AuditEvent); var siteCall = SiteCallDtoMapper.FromDto(packet.Operational); entries.Add(new CachedTelemetryEntry(auditEvent, siteCall)); } var cmd = new IngestCachedTelemetryCommand(entries); IngestCachedTelemetryReply reply; try { reply = await actor.Ask( cmd, AuditIngestAskTimeout, context.CancellationToken); } catch (Exception ex) { _logger.LogError(ex, "AuditLogIngestActor Ask failed for combined telemetry batch of {Count} packets; returning empty ack.", request.Packets.Count); return new IngestAck(); } var ack = new IngestAck(); foreach (var id in reply.AcceptedEventIds) { ack.AcceptedEventIds.Add(id.ToString()); } return ack; } /// /// Audit Log (#23) M6 reconciliation pull RPC. Central asks the site for any /// AuditLog rows whose OccurredAtUtc >= since_utc and whose /// ForwardState is still Pending or Forwarded (i.e. not /// yet confirmed reconciled), bounded by batch_size. The site responds /// with the rows AND flips them to /// /// AFTER serializing the response. The flip is best-effort — if it fails /// (e.g. SQLite disposed mid-call), rows stay Pending/Forwarded and central /// pulls them again on the next reconciliation cycle. Idempotent. /// /// /// When is not wired (central-only host or a /// composition-root test exercising the server in isolation) the RPC returns /// an empty response — central treats that as "nothing to ship" and retries /// on its next cycle, which is the same self-healing semantics as the /// SetAuditIngestActor wiring race window. /// /// /// The pull request with time bounds and batch size. /// The server call context. public override async Task PullAuditEvents( PullAuditEventsRequest request, ServerCallContext context) { var queue = _siteAuditQueue; if (queue is null) { _logger.LogWarning( "PullAuditEvents invoked before SetSiteAuditQueue was called; returning empty response."); return new PullAuditEventsResponse(); } if (request.BatchSize <= 0) { // Mirrors the SubscribeInstance guard: reject malformed requests // cleanly with InvalidArgument so the caller doesn't see a generic // RpcException from the underlying SQLite parameter validation. throw new RpcException(new GrpcStatus( StatusCode.InvalidArgument, "batch_size must be > 0")); } // sinceUtc defaults to DateTime.MinValue when the wrapper is absent — // i.e. "pull from the beginning of recorded history", which is the // intended behaviour for the very first reconciliation cycle. var since = request.SinceUtc?.ToDateTime().ToUniversalTime() ?? DateTime.MinValue; IReadOnlyList events; try { events = await queue.ReadPendingSinceAsync( since, request.BatchSize, context.CancellationToken); } catch (Exception ex) { _logger.LogError(ex, "ReadPendingSinceAsync failed for since={Since} batch={Batch}; returning empty response.", since, request.BatchSize); return new PullAuditEventsResponse(); } var response = new PullAuditEventsResponse { // batch_size saturated → tell central to issue a follow-up pull // with an advanced cursor. The site doesn't compute the cursor — // central walks it forward from the last returned OccurredAtUtc. MoreAvailable = events.Count >= request.BatchSize, }; foreach (var evt in events) { response.Events.Add(AuditEventDtoMapper.ToDto(evt)); } // Flip to Reconciled AFTER projecting the response so a fault below the // try/catch (mid-response, mid-flip) leaves the rows in Pending/Forwarded // and central pulls them again next cycle. The flip itself is // best-effort — its failure is a warning, not a fault, because central // will dedup on EventId on the next pull. var ids = new List(events.Count); foreach (var evt in events) { ids.Add(evt.EventId); } if (ids.Count > 0) { try { await queue.MarkReconciledAsync(ids, context.CancellationToken); } catch (Exception ex) { _logger.LogWarning(ex, "MarkReconciledAsync failed after PullAuditEvents response of {Count} rows; rows stay Pending for retry.", ids.Count); } } return response; } /// /// Tracks a single active stream so cleanup only removes its own entry. /// private sealed record StreamEntry(CancellationTokenSource Cts); }