using Akka.Actor; using Akka.Event; using Microsoft.EntityFrameworkCore; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Audit; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.Configuration.Entities; namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Audit; /// /// Cluster-singleton actor that batches messages from the cluster /// and bulk-inserts them into ConfigAuditLog. Flush triggers: /// - Buffer reaches events. /// - elapses with a non-empty buffer. /// - PreRestart / PostStop (supervisor swap or coordinated shutdown). /// /// Dedup is two-layer: in-buffer (the below collapses /// duplicate EventIds before flush) and at the database via the filtered unique index /// UX_ConfigAuditLog_EventId (cross-restart safety — a retry of an already-flushed /// batch hits the constraint and we drop the duplicate insert without losing the rest of /// the batch). /// public sealed class AuditWriterActor : ReceiveActor, IWithTimers { public const int FlushBatchSize = 500; public static readonly TimeSpan FlushInterval = TimeSpan.FromSeconds(5); private readonly IDbContextFactory _dbFactory; private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly Dictionary _buffer = new(); public ITimerScheduler Timers { get; set; } = null!; public static Props Props(IDbContextFactory dbFactory) => Akka.Actor.Props.Create(() => new AuditWriterActor(dbFactory)); public AuditWriterActor(IDbContextFactory dbFactory) { _dbFactory = dbFactory; Receive(HandleEvent); Receive(_ => FlushBuffer()); } protected override void PreStart() { Timers.StartPeriodicTimer("flush", Flush.Instance, FlushInterval); } private void HandleEvent(AuditEvent evt) { // In-buffer dedup. Last write wins on duplicate EventId within the batch — events // with the same EventId are by contract identical, so this is a no-op. _buffer[evt.EventId] = evt; if (_buffer.Count >= FlushBatchSize) FlushBuffer(); } private void FlushBuffer() { if (_buffer.Count == 0) return; var snapshot = _buffer.Values.ToList(); _buffer.Clear(); try { using var db = _dbFactory.CreateDbContext(); foreach (var evt in snapshot) { db.ConfigAuditLogs.Add(new ConfigAuditLog { Timestamp = evt.OccurredAtUtc, Principal = evt.Actor, EventType = $"{evt.Category}:{evt.Action}", NodeId = evt.SourceNode.Value, DetailsJson = evt.DetailsJson, EventId = evt.EventId, CorrelationId = evt.CorrelationId.Value, }); } db.SaveChanges(); _log.Debug("AuditWriter flushed {Count} events", snapshot.Count); } catch (Exception ex) { _log.Error(ex, "AuditWriter flush failed; {Count} events dropped", snapshot.Count); } } protected override void PreRestart(Exception reason, object message) { FlushBuffer(); base.PreRestart(reason, message); } protected override void PostStop() { FlushBuffer(); base.PostStop(); } public sealed class Flush { public static readonly Flush Instance = new(); private Flush() { } } }