using Akka.Actor;
using Akka.Event;
using Microsoft.EntityFrameworkCore;
using ZB.MOM.WW.Audit;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Audit;
///
/// Cluster-singleton actor that batches messages from the cluster
/// and bulk-inserts them into ConfigAuditLog. Flush triggers:
/// - Buffer reaches events.
/// - elapses with a non-empty buffer.
/// - PreRestart / PostStop (supervisor swap or coordinated shutdown).
///
/// Dedup is two-layer: in-buffer (the below collapses
/// duplicate EventIds before flush) and at the database via the filtered unique index
/// UX_ConfigAuditLog_EventId (cross-restart safety — a retry of an already-flushed
/// batch hits the constraint and we drop the duplicate insert without losing the rest of
/// the batch).
///
/// Implements the shared seam: is a
/// best-effort, never-throwing entry point that simply Tells this actor and returns
/// a completed task, so non-Akka callers can emit canonical audit events through the same
/// batching/dedup pipeline as in-cluster Tell traffic.
///
public sealed class AuditWriterActor : ReceiveActor, IWithTimers, IAuditWriter
{
public const int FlushBatchSize = 500;
public static readonly TimeSpan FlushInterval = TimeSpan.FromSeconds(5);
private readonly IDbContextFactory _dbFactory;
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly Dictionary _buffer = new();
/// Gets or sets the timer scheduler for the actor.
public ITimerScheduler Timers { get; set; } = null!;
/// Creates a Props factory for the AuditWriterActor.
/// The database context factory for creating ConfigDb connections.
public static Props Props(IDbContextFactory dbFactory) =>
Akka.Actor.Props.Create(() => new AuditWriterActor(dbFactory));
/// Initializes a new instance of the AuditWriterActor class.
/// The database context factory for creating ConfigDb connections.
public AuditWriterActor(IDbContextFactory dbFactory)
{
_dbFactory = dbFactory;
Receive(HandleEvent);
Receive(_ => FlushBuffer());
}
///
protected override void PreStart()
{
Timers.StartPeriodicTimer("flush", Flush.Instance, FlushInterval);
}
///
/// seam. Best-effort and never throws: routes the event onto this
/// actor's mailbox via Tell (thread-safe from any caller) so it flows through the same
/// batching + dedup pipeline as in-cluster traffic, then returns immediately. The actual
/// persistence happens asynchronously on the next flush; a write failure there is logged and
/// the batch dropped (per the best-effort audit contract).
///
/// The canonical audit event to persist.
/// Unused — enqueue is synchronous and non-blocking.
/// A completed task.
public Task WriteAsync(AuditEvent evt, CancellationToken ct = default)
{
// Akka Tell is safe to call from any thread and never throws to the caller.
Self.Tell(evt);
return Task.CompletedTask;
}
private void HandleEvent(AuditEvent evt)
{
// In-buffer dedup. Last write wins on duplicate EventId within the batch — events
// with the same EventId are by contract identical, so this is a no-op.
_buffer[evt.EventId] = evt;
if (_buffer.Count >= FlushBatchSize) FlushBuffer();
}
private void FlushBuffer()
{
if (_buffer.Count == 0) return;
var snapshot = _buffer.Values.ToList();
_buffer.Clear();
try
{
using var db = _dbFactory.CreateDbContext();
foreach (var evt in snapshot)
{
db.ConfigAuditLogs.Add(new ConfigAuditLog
{
Timestamp = evt.OccurredAtUtc.UtcDateTime,
Principal = evt.Actor,
EventType = $"{evt.Category}:{evt.Action}",
NodeId = evt.SourceNode,
DetailsJson = evt.DetailsJson,
EventId = evt.EventId,
CorrelationId = evt.CorrelationId,
Outcome = evt.Outcome.ToString(),
});
}
db.SaveChanges();
_log.Debug("AuditWriter flushed {Count} events", snapshot.Count);
}
catch (Exception ex)
{
_log.Error(ex, "AuditWriter flush failed; {Count} events dropped", snapshot.Count);
}
}
///
protected override void PreRestart(Exception reason, object message)
{
FlushBuffer();
base.PreRestart(reason, message);
}
///
protected override void PostStop()
{
FlushBuffer();
base.PostStop();
}
public sealed class Flush
{
public static readonly Flush Instance = new();
private Flush() { }
}
}