From 23f669c376bf403b465df599403438e4a95ff436 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 26 May 2026 04:44:01 -0400 Subject: [PATCH] feat(controlplane): AuditWriterActor with batched in-buffer-dedup insert --- .../Audit/AuditWriterActor.cs | 113 ++++++++++++++++++ .../AuditWriterActorTests.cs | 101 ++++++++++++++++ 2 files changed, 214 insertions(+) create mode 100644 src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Audit/AuditWriterActor.cs create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/AuditWriterActorTests.cs diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Audit/AuditWriterActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Audit/AuditWriterActor.cs new file mode 100644 index 0000000..af08463 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Audit/AuditWriterActor.cs @@ -0,0 +1,113 @@ +using Akka.Actor; +using Akka.Event; +using Microsoft.EntityFrameworkCore; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Audit; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; + +namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Audit; + +/// +/// Cluster-singleton actor that batches messages from the cluster +/// and bulk-inserts them into ConfigAuditLog. Flush triggers: +/// - Buffer reaches events. +/// - elapses with a non-empty buffer. +/// - PreRestart / PostStop (supervisor swap or coordinated shutdown). +/// +/// Dedup is in-buffer only — once a batch is flushed, the actor accepts a duplicate +/// as a new row. True cross-restart idempotency needs an +/// EventId column with a unique index on ConfigAuditLog; tracked as follow-up F3. +/// +public sealed class AuditWriterActor : ReceiveActor, IWithTimers +{ + public const int FlushBatchSize = 500; + public static readonly TimeSpan FlushInterval = TimeSpan.FromSeconds(5); + + private readonly IDbContextFactory _dbFactory; + private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly Dictionary _buffer = new(); + + public ITimerScheduler Timers { get; set; } = null!; + + public static Props Props(IDbContextFactory dbFactory) => + Akka.Actor.Props.Create(() => new AuditWriterActor(dbFactory)); + + public AuditWriterActor(IDbContextFactory dbFactory) + { + _dbFactory = dbFactory; + Receive(HandleEvent); + Receive(_ => FlushBuffer()); + } + + protected override void PreStart() + { + Timers.StartPeriodicTimer("flush", Flush.Instance, FlushInterval); + } + + private void HandleEvent(AuditEvent evt) + { + // In-buffer dedup. Last write wins on duplicate EventId within the batch — events + // with the same EventId are by contract identical, so this is a no-op. + _buffer[evt.EventId] = evt; + if (_buffer.Count >= FlushBatchSize) FlushBuffer(); + } + + private void FlushBuffer() + { + if (_buffer.Count == 0) return; + + var snapshot = _buffer.Values.ToList(); + _buffer.Clear(); + + try + { + using var db = _dbFactory.CreateDbContext(); + foreach (var evt in snapshot) + { + db.ConfigAuditLogs.Add(new ConfigAuditLog + { + Timestamp = evt.OccurredAtUtc, + Principal = evt.Actor, + EventType = $"{evt.Category}:{evt.Action}", + NodeId = evt.SourceNode.Value, + DetailsJson = WrapDetails(evt), + }); + } + db.SaveChanges(); + _log.Debug("AuditWriter flushed {Count} events", snapshot.Count); + } + catch (Exception ex) + { + _log.Error(ex, "AuditWriter flush failed; {Count} events dropped", snapshot.Count); + } + } + + /// + /// Wraps caller-supplied details with the EventId + CorrelationId so audit consumers can + /// reconstruct the original message. Until ConfigAuditLog gains a first-class EventId column + /// (follow-up F3), this is the only place these correlation IDs are persisted. + /// + private static string WrapDetails(AuditEvent evt) + { + var details = evt.DetailsJson ?? "null"; + return $"{{\"eventId\":\"{evt.EventId:N}\",\"correlationId\":\"{evt.CorrelationId.Value:N}\",\"details\":{details}}}"; + } + + protected override void PreRestart(Exception reason, object message) + { + FlushBuffer(); + base.PreRestart(reason, message); + } + + protected override void PostStop() + { + FlushBuffer(); + base.PostStop(); + } + + public sealed class Flush + { + public static readonly Flush Instance = new(); + private Flush() { } + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/AuditWriterActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/AuditWriterActorTests.cs new file mode 100644 index 0000000..a838217 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/AuditWriterActorTests.cs @@ -0,0 +1,101 @@ +using Akka.Actor; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Audit; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.ControlPlane.Audit; +using ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests; + +public sealed class AuditWriterActorTests : ControlPlaneActorTestBase +{ + private static AuditEvent NewEvent(Guid eventId, string action = "Edit", string actor = "joe") => + new( + eventId, + "Config", + action, + actor, + DateTime.UtcNow, + DetailsJson: "{\"field\":\"value\"}", + SourceNode: NodeId.Parse("node-a"), + CorrelationId: CorrelationId.NewId()); + + [Fact] + public void Buffered_events_flush_on_count_threshold() + { + var dbFactory = NewInMemoryDbFactory(); + var actor = Sys.ActorOf(AuditWriterActor.Props(dbFactory)); + + // Sending exactly FlushBatchSize events triggers a flush. + for (var i = 0; i < AuditWriterActor.FlushBatchSize; i++) + actor.Tell(NewEvent(Guid.NewGuid())); + + // Give the actor a beat to process the messages. + AwaitAssert(() => + { + using var db = dbFactory.CreateDbContext(); + db.ConfigAuditLogs.Count().ShouldBe(AuditWriterActor.FlushBatchSize); + }, duration: TimeSpan.FromSeconds(2)); + } + + [Fact] + public void Duplicate_eventIds_within_a_batch_dedup_in_buffer() + { + var dbFactory = NewInMemoryDbFactory(); + var actor = Sys.ActorOf(AuditWriterActor.Props(dbFactory)); + + // Send 1000 messages, but only 100 unique EventIds (10x duplication). + var uniqueIds = Enumerable.Range(0, 100).Select(_ => Guid.NewGuid()).ToArray(); + for (var i = 0; i < 1000; i++) + actor.Tell(NewEvent(uniqueIds[i % 100])); + + // Force a flush — send PoisonPill, which triggers PostStop → FlushBuffer. + Watch(actor); + actor.Tell(PoisonPill.Instance); + ExpectTerminated(actor); + + using var db = dbFactory.CreateDbContext(); + db.ConfigAuditLogs.Count().ShouldBe(100, "in-buffer dedup should collapse duplicate EventIds"); + } + + [Fact] + public void PostStop_flushes_pending_buffer() + { + var dbFactory = NewInMemoryDbFactory(); + var actor = Sys.ActorOf(AuditWriterActor.Props(dbFactory)); + + // 10 events — well below the threshold, so they sit in-buffer. + for (var i = 0; i < 10; i++) + actor.Tell(NewEvent(Guid.NewGuid())); + + Watch(actor); + actor.Tell(PoisonPill.Instance); + ExpectTerminated(actor); + + using var db = dbFactory.CreateDbContext(); + db.ConfigAuditLogs.Count().ShouldBe(10); + } + + [Fact] + public void Details_wrapper_embeds_eventId_and_correlationId() + { + var dbFactory = NewInMemoryDbFactory(); + var actor = Sys.ActorOf(AuditWriterActor.Props(dbFactory)); + + var eventId = Guid.NewGuid(); + actor.Tell(NewEvent(eventId)); + + Watch(actor); + actor.Tell(PoisonPill.Instance); + ExpectTerminated(actor); + + using var db = dbFactory.CreateDbContext(); + var row = db.ConfigAuditLogs.Single(); + row.DetailsJson.ShouldNotBeNull(); + row.DetailsJson.ShouldContain(eventId.ToString("N")); + row.DetailsJson.ShouldContain("\"correlationId\":"); + row.EventType.ShouldBe("Config:Edit"); + row.NodeId.ShouldBe("node-a"); + } +}