using Akka.Actor;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.AuditLog.Telemetry;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.AuditLog.Site.Telemetry;
///
/// Site-side actor that drains the local SQLite audit queue and pushes Pending
/// rows to central via the IngestAuditEvents gRPC RPC. On a successful
/// ack the matching EventIds flip to
/// ; on
/// a gRPC failure the rows stay Pending and the next drain retries.
///
///
///
/// The drain self-tick is a private Drain message scheduled via the
/// actor system scheduler. The cadence is options-driven: BusyIntervalSeconds
/// when the previous drain found rows (or faulted — we want quick recovery),
/// IdleIntervalSeconds when the queue was empty.
///
///
/// Both collaborators are injected as interfaces (
/// and ) so unit tests substitute with
/// NSubstitute and never touch real SQLite or gRPC.
///
///
/// Per Bundle D's brief, audit-write paths must be fail-safe — a thrown
/// exception inside the actor MUST NOT crash it. The Drain handler wraps the
/// pipeline in a top-level try/catch that logs and re-schedules, and the
/// actor's defaults to
/// 's Restart for
/// child actors — but this actor has no children, so the catch is what matters.
///
///
public class SiteAuditTelemetryActor : ReceiveActor
{
private readonly ISiteAuditQueue _queue;
private readonly ISiteStreamAuditClient _client;
private readonly SiteAuditTelemetryOptions _options;
private readonly ILogger _logger;
private ICancelable? _pendingTick;
public SiteAuditTelemetryActor(
ISiteAuditQueue queue,
ISiteStreamAuditClient client,
IOptions options,
ILogger logger)
{
ArgumentNullException.ThrowIfNull(queue);
ArgumentNullException.ThrowIfNull(client);
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(logger);
_queue = queue;
_client = client;
_options = options.Value;
_logger = logger;
ReceiveAsync(_ => OnDrainAsync());
}
protected override void PreStart()
{
base.PreStart();
// Initial tick fires on the busy interval so the actor starts polling
// soon after host startup. A subsequent empty drain will move to the
// idle interval naturally.
ScheduleNext(TimeSpan.FromSeconds(_options.BusyIntervalSeconds));
}
protected override void PostStop()
{
_pendingTick?.Cancel();
base.PostStop();
}
private async Task OnDrainAsync()
{
var nextDelay = TimeSpan.FromSeconds(_options.BusyIntervalSeconds);
try
{
var pending = await _queue.ReadPendingAsync(_options.BatchSize, CancellationToken.None)
.ConfigureAwait(false);
if (pending.Count == 0)
{
// No rows — settle into the idle cadence until the next write
// bumps us back into the busy cadence.
nextDelay = TimeSpan.FromSeconds(_options.IdleIntervalSeconds);
return;
}
var batch = BuildBatch(pending);
IngestAck ack;
try
{
ack = await _client.IngestAuditEventsAsync(batch, CancellationToken.None)
.ConfigureAwait(false);
}
catch (Exception ex)
{
// gRPC fault — leave the rows in Pending so the next drain
// retries. Bundle D's brief: "On gRPC exception (any), log
// Warning, schedule next Drain in BusyIntervalSeconds."
_logger.LogWarning(ex,
"IngestAuditEvents push failed for {Count} pending events; will retry next drain.",
pending.Count);
return;
}
var acceptedIds = ParseAcceptedIds(ack);
if (acceptedIds.Count > 0)
{
await _queue.MarkForwardedAsync(acceptedIds, CancellationToken.None)
.ConfigureAwait(false);
}
}
catch (Exception ex)
{
// Catch-all so a SQLite hiccup or mapper bug never crashes the
// actor. The next tick is still scheduled in the finally block.
_logger.LogError(ex, "Unexpected error during audit-log telemetry drain.");
}
finally
{
ScheduleNext(nextDelay);
}
}
private static AuditEventBatch BuildBatch(IReadOnlyList events)
{
var batch = new AuditEventBatch();
foreach (var e in events)
{
batch.Events.Add(AuditEventMapper.ToDto(e));
}
return batch;
}
private static IReadOnlyList ParseAcceptedIds(IngestAck ack)
{
if (ack.AcceptedEventIds.Count == 0)
{
return Array.Empty();
}
var list = new List(ack.AcceptedEventIds.Count);
foreach (var raw in ack.AcceptedEventIds)
{
if (Guid.TryParse(raw, out var id))
{
list.Add(id);
}
// Malformed ids are ignored — central should never emit them, but
// we refuse to crash the actor over a bad string.
}
return list;
}
private void ScheduleNext(TimeSpan delay)
{
_pendingTick?.Cancel();
_pendingTick = Context.System.Scheduler.ScheduleTellOnceCancelable(
delay,
Self,
Drain.Instance,
Self);
}
/// Self-tick message that triggers a drain cycle.
private sealed class Drain
{
public static readonly Drain Instance = new();
private Drain() { }
}
}