using System.Collections.Concurrent;
using System.Threading.Channels;
using Akka.Actor;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Enums;
using GrpcStatus = Grpc.Core.Status;
namespace ScadaLink.Communication.Grpc;
///
/// gRPC service that accepts instance stream subscriptions from central nodes.
/// Creates a StreamRelayActor per subscription to bridge Akka domain events
/// through a Channel<T> to the gRPC response stream.
///
public class SiteStreamGrpcServer : SiteStreamService.SiteStreamServiceBase
{
private readonly ISiteStreamSubscriber _streamSubscriber;
private ActorSystem? _actorSystem;
private readonly ILogger _logger;
private readonly ConcurrentDictionary _activeStreams = new();
private readonly int _maxConcurrentStreams;
private readonly TimeSpan _maxStreamLifetime;
private volatile bool _ready;
private long _actorCounter;
// Audit Log (#23 M2): central-side ingest actor proxy. Set by the host
// after the cluster singleton starts (see Bundle E wiring). When null the
// IngestAuditEvents RPC replies with an empty IngestAck so sites can
// safely retry — wiring-incomplete is treated as transient, never fatal.
private IActorRef? _auditIngestActor;
// Per Bundle D's brief — Ask timeout is 30 s. The ingest actor's repo
// calls are sub-100 ms in steady state; a generous timeout absorbs a slow
// MSSQL connection without surfacing as a gRPC failure on a healthy site.
private static readonly TimeSpan AuditIngestAskTimeout = TimeSpan.FromSeconds(30);
///
/// Test-only constructor — kept internal so the DI container sees a
/// single public constructor and is not faced with an ambiguous choice.
///
internal SiteStreamGrpcServer(
ISiteStreamSubscriber streamSubscriber,
ILogger logger,
int maxConcurrentStreams = 100)
: this(streamSubscriber, logger, maxConcurrentStreams, TimeSpan.FromHours(4))
{
}
///
/// DI constructor — binds
/// and so the documented
/// concurrency limit and the 4-hour zombie-stream session timeout are honoured
/// rather than hard-coded.
///
public SiteStreamGrpcServer(
ISiteStreamSubscriber streamSubscriber,
ILogger logger,
IOptions options)
: this(streamSubscriber, logger,
options.Value.GrpcMaxConcurrentStreams,
options.Value.GrpcMaxStreamLifetime)
{
}
private SiteStreamGrpcServer(
ISiteStreamSubscriber streamSubscriber,
ILogger logger,
int maxConcurrentStreams,
TimeSpan maxStreamLifetime)
{
_streamSubscriber = streamSubscriber;
_logger = logger;
_maxConcurrentStreams = maxConcurrentStreams;
_maxStreamLifetime = maxStreamLifetime;
}
///
/// Marks the server as ready to accept subscriptions and injects the ActorSystem.
/// Called after the site runtime actor system is fully initialized.
/// The ActorSystem is set here rather than via the constructor so that
/// the gRPC server can be created by DI before the actor system exists.
///
public void SetReady(ActorSystem actorSystem)
{
_actorSystem = actorSystem;
_ready = true;
}
///
/// Hands the central-side AuditLogIngestActor proxy to the gRPC
/// server so the RPC can route incoming
/// site batches. Audit Log (#23) M2 wiring point — mirrors the way
/// CommunicationService.SetNotificationOutbox takes the Notification
/// Outbox singleton proxy. Bundle E supplies the actor after the cluster
/// singleton starts.
///
public void SetAuditIngestActor(IActorRef proxy)
{
_auditIngestActor = proxy;
}
///
/// Number of currently active streaming subscriptions. Exposed for diagnostics.
///
public int ActiveStreamCount => _activeStreams.Count;
/// Effective max concurrent stream limit. Exposed for tests.
internal int MaxConcurrentStreams => _maxConcurrentStreams;
/// Effective per-stream session lifetime. Exposed for tests.
internal TimeSpan MaxStreamLifetime => _maxStreamLifetime;
public override async Task SubscribeInstance(
InstanceStreamRequest request,
IServerStreamWriter responseStream,
ServerCallContext context)
{
if (!_ready)
throw new RpcException(new GrpcStatus(StatusCode.Unavailable, "Server not ready"));
// Communication-014: correlation_id arrives off the wire on a public gRPC
// endpoint and is used (below) to compose an Akka actor name. Akka actor names
// have a restricted character set — a id containing '/', whitespace, or other
// disallowed characters would make ActorOf throw InvalidActorNameException,
// escaping as an unhandled RPC fault. Reject unsafe ids cleanly up front.
if (string.IsNullOrEmpty(request.CorrelationId) ||
!ActorPath.IsValidPathElement(request.CorrelationId))
{
throw new RpcException(new GrpcStatus(
StatusCode.InvalidArgument, "correlation_id is missing or not a valid identifier"));
}
// Duplicate prevention -- cancel existing stream for this correlationId
if (_activeStreams.TryRemove(request.CorrelationId, out var existingEntry))
{
existingEntry.Cts.Cancel();
existingEntry.Cts.Dispose();
}
// Check max concurrent streams after duplicate removal
if (_activeStreams.Count >= _maxConcurrentStreams)
throw new RpcException(new GrpcStatus(StatusCode.ResourceExhausted, "Max concurrent streams reached"));
using var streamCts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken);
// Session timeout (design doc "gRPC Connection Keepalive": 4-hour third layer
// of dead-client detection) — forces a long-lived zombie stream to terminate
// even if keepalive PINGs never detect the loss.
if (_maxStreamLifetime > TimeSpan.Zero && _maxStreamLifetime != Timeout.InfiniteTimeSpan)
streamCts.CancelAfter(_maxStreamLifetime);
var entry = new StreamEntry(streamCts);
_activeStreams[request.CorrelationId] = entry;
var channel = Channel.CreateBounded(
new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.DropOldest });
var actorSeq = Interlocked.Increment(ref _actorCounter);
var relayActor = _actorSystem!.ActorOf(
Props.Create(typeof(Actors.StreamRelayActor), request.CorrelationId, channel.Writer),
$"stream-relay-{request.CorrelationId}-{actorSeq}");
var subscriptionId = _streamSubscriber.Subscribe(request.InstanceUniqueName, relayActor);
_logger.LogInformation(
"Stream {CorrelationId} started for {Instance} (subscription {SubscriptionId})",
request.CorrelationId, request.InstanceUniqueName, subscriptionId);
try
{
await foreach (var evt in channel.Reader.ReadAllAsync(streamCts.Token))
{
await responseStream.WriteAsync(evt, streamCts.Token);
}
}
catch (OperationCanceledException)
{
// Normal cancellation (client disconnect or duplicate replacement)
}
finally
{
_streamSubscriber.RemoveSubscriber(relayActor);
_actorSystem!.Stop(relayActor);
channel.Writer.TryComplete();
// Only remove our own entry -- a replacement stream may have already taken the slot
_activeStreams.TryRemove(
new KeyValuePair(request.CorrelationId, entry));
_logger.LogInformation(
"Stream {CorrelationId} for {Instance} ended",
request.CorrelationId, request.InstanceUniqueName);
}
}
///
/// Audit Log (#23) M2 site→central push RPC. Decodes a site batch into
/// rows, Asks the central AuditLogIngestActor
/// proxy to persist them, and echoes the accepted EventIds back so the site
/// can flip its local rows to Forwarded.
///
///
///
/// The DTO→entity conversion is inlined here (rather than calling the
/// AuditLog mapper) to avoid a project-reference cycle:
/// ScadaLink.AuditLog already references
/// ScadaLink.Communication, so the gRPC server cannot reach back
/// into AuditLog for its mapper. The shape mirrors
/// AuditEventMapper.FromDto in ScadaLink.AuditLog.Telemetry;
/// the two must evolve together.
///
///
/// When is not yet wired (host startup
/// race window), the RPC returns an empty rather
/// than failing — the site treats the missing ack as a transient outcome
/// and retries on the next drain, which is the desired idempotent
/// behaviour.
///
///
public override async Task IngestAuditEvents(
AuditEventBatch request,
ServerCallContext context)
{
// Empty batch is a no-op; reply immediately so the client moves on.
if (request.Events.Count == 0)
{
return new IngestAck();
}
var actor = _auditIngestActor;
if (actor is null)
{
// Wiring incomplete (host startup race). Sites treat an empty
// ack as "nothing was acked, leave rows Pending, retry next
// drain" — exactly the right behaviour during host bring-up.
_logger.LogWarning(
"IngestAuditEvents received {Count} events before SetAuditIngestActor was called; returning empty ack.",
request.Events.Count);
return new IngestAck();
}
// Inlined FromDto. Keep in sync with AuditEventMapper.FromDto in
// ScadaLink.AuditLog.Telemetry — there is no shared mapper because
// doing so would create a project-reference cycle (AuditLog → Communication).
var entities = new List(request.Events.Count);
foreach (var dto in request.Events)
{
entities.Add(new AuditEvent
{
EventId = Guid.Parse(dto.EventId),
OccurredAtUtc = DateTime.SpecifyKind(dto.OccurredAtUtc.ToDateTime(), DateTimeKind.Utc),
IngestedAtUtc = null,
Channel = Enum.Parse(dto.Channel),
Kind = Enum.Parse(dto.Kind),
CorrelationId = string.IsNullOrEmpty(dto.CorrelationId) ? null : Guid.Parse(dto.CorrelationId),
SourceSiteId = NullIfEmpty(dto.SourceSiteId),
SourceInstanceId = NullIfEmpty(dto.SourceInstanceId),
SourceScript = NullIfEmpty(dto.SourceScript),
Actor = NullIfEmpty(dto.Actor),
Target = NullIfEmpty(dto.Target),
Status = Enum.Parse(dto.Status),
HttpStatus = dto.HttpStatus,
DurationMs = dto.DurationMs,
ErrorMessage = NullIfEmpty(dto.ErrorMessage),
ErrorDetail = NullIfEmpty(dto.ErrorDetail),
RequestSummary = NullIfEmpty(dto.RequestSummary),
ResponseSummary = NullIfEmpty(dto.ResponseSummary),
PayloadTruncated = dto.PayloadTruncated,
Extra = NullIfEmpty(dto.Extra),
ForwardState = null,
});
}
var cmd = new IngestAuditEventsCommand(entities);
IngestAuditEventsReply reply;
try
{
reply = await actor.Ask(
cmd, AuditIngestAskTimeout, context.CancellationToken);
}
catch (Exception ex)
{
// Audit ingest is best-effort; failing this RPC at the gRPC layer
// would surface as a transport error and force the site to retry
// (which it would do anyway). Logging + an empty ack keeps the
// semantics consistent with the "wiring incomplete" path above.
_logger.LogError(ex,
"AuditLogIngestActor Ask failed for batch of {Count} events; returning empty ack.",
request.Events.Count);
return new IngestAck();
}
var ack = new IngestAck();
foreach (var id in reply.AcceptedEventIds)
{
ack.AcceptedEventIds.Add(id.ToString());
}
return ack;
}
///
/// Audit Log (#23) M3 site→central combined-telemetry push RPC. Decodes a
/// batch of entries into matched
/// (AuditEvent, SiteCall) pairs, Asks the central AuditLogIngestActor
/// proxy to persist them in dual-write transactions, and echoes the
/// AuditEvent EventIds that committed back so the site can flip its local
/// rows to Forwarded.
///
///
/// Same wiring-incomplete fallback as : when
/// the actor proxy has not been set the RPC replies with an empty ack so
/// sites treat the outcome as transient and retry, never a hard fault.
///
public override async Task IngestCachedTelemetry(
CachedTelemetryBatch request,
ServerCallContext context)
{
if (request.Packets.Count == 0)
{
return new IngestAck();
}
var actor = _auditIngestActor;
if (actor is null)
{
_logger.LogWarning(
"IngestCachedTelemetry received {Count} packets before SetAuditIngestActor was called; returning empty ack.",
request.Packets.Count);
return new IngestAck();
}
var entries = new List(request.Packets.Count);
foreach (var packet in request.Packets)
{
var auditEvent = MapAuditEventFromDto(packet.AuditEvent);
var siteCall = MapSiteCallFromDto(packet.Operational);
entries.Add(new CachedTelemetryEntry(auditEvent, siteCall));
}
var cmd = new IngestCachedTelemetryCommand(entries);
IngestCachedTelemetryReply reply;
try
{
reply = await actor.Ask(
cmd, AuditIngestAskTimeout, context.CancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"AuditLogIngestActor Ask failed for combined telemetry batch of {Count} packets; returning empty ack.",
request.Packets.Count);
return new IngestAck();
}
var ack = new IngestAck();
foreach (var id in reply.AcceptedEventIds)
{
ack.AcceptedEventIds.Add(id.ToString());
}
return ack;
}
private static string? NullIfEmpty(string? value) =>
string.IsNullOrEmpty(value) ? null : value;
///
/// Inlined audit-event DTO→entity translation, kept in sync with the
/// handler above. Extracted to a private
/// helper so the M3 dual-write RPC can reuse it without duplicating yet
/// another copy. The shape still mirrors
/// AuditEventMapper.FromDto in ScadaLink.AuditLog.Telemetry;
/// the two must evolve together (the project-reference cycle that
/// prevents calling the AuditLog mapper directly is documented on
/// ).
///
private static AuditEvent MapAuditEventFromDto(AuditEventDto dto) =>
new()
{
EventId = Guid.Parse(dto.EventId),
OccurredAtUtc = DateTime.SpecifyKind(dto.OccurredAtUtc.ToDateTime(), DateTimeKind.Utc),
IngestedAtUtc = null,
Channel = Enum.Parse(dto.Channel),
Kind = Enum.Parse(dto.Kind),
CorrelationId = NullIfEmpty(dto.CorrelationId) is { } cid ? Guid.Parse(cid) : null,
SourceSiteId = NullIfEmpty(dto.SourceSiteId),
SourceInstanceId = NullIfEmpty(dto.SourceInstanceId),
SourceScript = NullIfEmpty(dto.SourceScript),
Actor = NullIfEmpty(dto.Actor),
Target = NullIfEmpty(dto.Target),
Status = Enum.Parse(dto.Status),
HttpStatus = dto.HttpStatus,
DurationMs = dto.DurationMs,
ErrorMessage = NullIfEmpty(dto.ErrorMessage),
ErrorDetail = NullIfEmpty(dto.ErrorDetail),
RequestSummary = NullIfEmpty(dto.RequestSummary),
ResponseSummary = NullIfEmpty(dto.ResponseSummary),
PayloadTruncated = dto.PayloadTruncated,
Extra = NullIfEmpty(dto.Extra),
ForwardState = null,
};
///
/// Translates a into the persistence
/// entity. is stamped here as a
/// placeholder; the central ingest actor overwrites it inside the
/// dual-write transaction so the AuditLog and SiteCalls rows share one
/// instant.
///
private static SiteCall MapSiteCallFromDto(SiteCallOperationalDto dto) => new()
{
TrackedOperationId = TrackedOperationId.Parse(dto.TrackedOperationId),
Channel = dto.Channel,
Target = dto.Target,
SourceSite = dto.SourceSite,
Status = dto.Status,
RetryCount = dto.RetryCount,
LastError = string.IsNullOrEmpty(dto.LastError) ? null : dto.LastError,
HttpStatus = dto.HttpStatus,
CreatedAtUtc = DateTime.SpecifyKind(dto.CreatedAtUtc.ToDateTime(), DateTimeKind.Utc),
UpdatedAtUtc = DateTime.SpecifyKind(dto.UpdatedAtUtc.ToDateTime(), DateTimeKind.Utc),
TerminalAtUtc = dto.TerminalAtUtc is null
? null
: DateTime.SpecifyKind(dto.TerminalAtUtc.ToDateTime(), DateTimeKind.Utc),
IngestedAtUtc = DateTime.UtcNow, // overwritten by AuditLogIngestActor
};
///
/// Tracks a single active stream so cleanup only removes its own entry.
///
private sealed record StreamEntry(CancellationTokenSource Cts);
}