feat(controlplane): AuditWriterActor with batched in-buffer-dedup insert
This commit is contained in:
@@ -0,0 +1,113 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Akka.Event;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Audit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Audit;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Cluster-singleton actor that batches <see cref="AuditEvent"/> messages from the cluster
|
||||||
|
/// and bulk-inserts them into <c>ConfigAuditLog</c>. Flush triggers:
|
||||||
|
/// - Buffer reaches <see cref="FlushBatchSize"/> events.
|
||||||
|
/// - <see cref="FlushInterval"/> elapses with a non-empty buffer.
|
||||||
|
/// - <c>PreRestart</c> / <c>PostStop</c> (supervisor swap or coordinated shutdown).
|
||||||
|
///
|
||||||
|
/// Dedup is in-buffer only — once a batch is flushed, the actor accepts a duplicate
|
||||||
|
/// <see cref="AuditEvent.EventId"/> as a new row. True cross-restart idempotency needs an
|
||||||
|
/// EventId column with a unique index on <c>ConfigAuditLog</c>; tracked as follow-up F3.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class AuditWriterActor : ReceiveActor, IWithTimers
|
||||||
|
{
|
||||||
|
public const int FlushBatchSize = 500;
|
||||||
|
public static readonly TimeSpan FlushInterval = TimeSpan.FromSeconds(5);
|
||||||
|
|
||||||
|
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
|
||||||
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||||
|
private readonly Dictionary<Guid, AuditEvent> _buffer = new();
|
||||||
|
|
||||||
|
public ITimerScheduler Timers { get; set; } = null!;
|
||||||
|
|
||||||
|
public static Props Props(IDbContextFactory<OtOpcUaConfigDbContext> dbFactory) =>
|
||||||
|
Akka.Actor.Props.Create(() => new AuditWriterActor(dbFactory));
|
||||||
|
|
||||||
|
public AuditWriterActor(IDbContextFactory<OtOpcUaConfigDbContext> dbFactory)
|
||||||
|
{
|
||||||
|
_dbFactory = dbFactory;
|
||||||
|
Receive<AuditEvent>(HandleEvent);
|
||||||
|
Receive<Flush>(_ => FlushBuffer());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override void PreStart()
|
||||||
|
{
|
||||||
|
Timers.StartPeriodicTimer("flush", Flush.Instance, FlushInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void HandleEvent(AuditEvent evt)
|
||||||
|
{
|
||||||
|
// In-buffer dedup. Last write wins on duplicate EventId within the batch — events
|
||||||
|
// with the same EventId are by contract identical, so this is a no-op.
|
||||||
|
_buffer[evt.EventId] = evt;
|
||||||
|
if (_buffer.Count >= FlushBatchSize) FlushBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void FlushBuffer()
|
||||||
|
{
|
||||||
|
if (_buffer.Count == 0) return;
|
||||||
|
|
||||||
|
var snapshot = _buffer.Values.ToList();
|
||||||
|
_buffer.Clear();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using var db = _dbFactory.CreateDbContext();
|
||||||
|
foreach (var evt in snapshot)
|
||||||
|
{
|
||||||
|
db.ConfigAuditLogs.Add(new ConfigAuditLog
|
||||||
|
{
|
||||||
|
Timestamp = evt.OccurredAtUtc,
|
||||||
|
Principal = evt.Actor,
|
||||||
|
EventType = $"{evt.Category}:{evt.Action}",
|
||||||
|
NodeId = evt.SourceNode.Value,
|
||||||
|
DetailsJson = WrapDetails(evt),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
db.SaveChanges();
|
||||||
|
_log.Debug("AuditWriter flushed {Count} events", snapshot.Count);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_log.Error(ex, "AuditWriter flush failed; {Count} events dropped", snapshot.Count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Wraps caller-supplied details with the EventId + CorrelationId so audit consumers can
|
||||||
|
/// reconstruct the original message. Until ConfigAuditLog gains a first-class EventId column
|
||||||
|
/// (follow-up F3), this is the only place these correlation IDs are persisted.
|
||||||
|
/// </summary>
|
||||||
|
private static string WrapDetails(AuditEvent evt)
|
||||||
|
{
|
||||||
|
var details = evt.DetailsJson ?? "null";
|
||||||
|
return $"{{\"eventId\":\"{evt.EventId:N}\",\"correlationId\":\"{evt.CorrelationId.Value:N}\",\"details\":{details}}}";
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override void PreRestart(Exception reason, object message)
|
||||||
|
{
|
||||||
|
FlushBuffer();
|
||||||
|
base.PreRestart(reason, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override void PostStop()
|
||||||
|
{
|
||||||
|
FlushBuffer();
|
||||||
|
base.PostStop();
|
||||||
|
}
|
||||||
|
|
||||||
|
public sealed class Flush
|
||||||
|
{
|
||||||
|
public static readonly Flush Instance = new();
|
||||||
|
private Flush() { }
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,101 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Audit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.ControlPlane.Audit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.Harness;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests;
|
||||||
|
|
||||||
|
public sealed class AuditWriterActorTests : ControlPlaneActorTestBase
|
||||||
|
{
|
||||||
|
private static AuditEvent NewEvent(Guid eventId, string action = "Edit", string actor = "joe") =>
|
||||||
|
new(
|
||||||
|
eventId,
|
||||||
|
"Config",
|
||||||
|
action,
|
||||||
|
actor,
|
||||||
|
DateTime.UtcNow,
|
||||||
|
DetailsJson: "{\"field\":\"value\"}",
|
||||||
|
SourceNode: NodeId.Parse("node-a"),
|
||||||
|
CorrelationId: CorrelationId.NewId());
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Buffered_events_flush_on_count_threshold()
|
||||||
|
{
|
||||||
|
var dbFactory = NewInMemoryDbFactory();
|
||||||
|
var actor = Sys.ActorOf(AuditWriterActor.Props(dbFactory));
|
||||||
|
|
||||||
|
// Sending exactly FlushBatchSize events triggers a flush.
|
||||||
|
for (var i = 0; i < AuditWriterActor.FlushBatchSize; i++)
|
||||||
|
actor.Tell(NewEvent(Guid.NewGuid()));
|
||||||
|
|
||||||
|
// Give the actor a beat to process the messages.
|
||||||
|
AwaitAssert(() =>
|
||||||
|
{
|
||||||
|
using var db = dbFactory.CreateDbContext();
|
||||||
|
db.ConfigAuditLogs.Count().ShouldBe(AuditWriterActor.FlushBatchSize);
|
||||||
|
}, duration: TimeSpan.FromSeconds(2));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Duplicate_eventIds_within_a_batch_dedup_in_buffer()
|
||||||
|
{
|
||||||
|
var dbFactory = NewInMemoryDbFactory();
|
||||||
|
var actor = Sys.ActorOf(AuditWriterActor.Props(dbFactory));
|
||||||
|
|
||||||
|
// Send 1000 messages, but only 100 unique EventIds (10x duplication).
|
||||||
|
var uniqueIds = Enumerable.Range(0, 100).Select(_ => Guid.NewGuid()).ToArray();
|
||||||
|
for (var i = 0; i < 1000; i++)
|
||||||
|
actor.Tell(NewEvent(uniqueIds[i % 100]));
|
||||||
|
|
||||||
|
// Force a flush — send PoisonPill, which triggers PostStop → FlushBuffer.
|
||||||
|
Watch(actor);
|
||||||
|
actor.Tell(PoisonPill.Instance);
|
||||||
|
ExpectTerminated(actor);
|
||||||
|
|
||||||
|
using var db = dbFactory.CreateDbContext();
|
||||||
|
db.ConfigAuditLogs.Count().ShouldBe(100, "in-buffer dedup should collapse duplicate EventIds");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void PostStop_flushes_pending_buffer()
|
||||||
|
{
|
||||||
|
var dbFactory = NewInMemoryDbFactory();
|
||||||
|
var actor = Sys.ActorOf(AuditWriterActor.Props(dbFactory));
|
||||||
|
|
||||||
|
// 10 events — well below the threshold, so they sit in-buffer.
|
||||||
|
for (var i = 0; i < 10; i++)
|
||||||
|
actor.Tell(NewEvent(Guid.NewGuid()));
|
||||||
|
|
||||||
|
Watch(actor);
|
||||||
|
actor.Tell(PoisonPill.Instance);
|
||||||
|
ExpectTerminated(actor);
|
||||||
|
|
||||||
|
using var db = dbFactory.CreateDbContext();
|
||||||
|
db.ConfigAuditLogs.Count().ShouldBe(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Details_wrapper_embeds_eventId_and_correlationId()
|
||||||
|
{
|
||||||
|
var dbFactory = NewInMemoryDbFactory();
|
||||||
|
var actor = Sys.ActorOf(AuditWriterActor.Props(dbFactory));
|
||||||
|
|
||||||
|
var eventId = Guid.NewGuid();
|
||||||
|
actor.Tell(NewEvent(eventId));
|
||||||
|
|
||||||
|
Watch(actor);
|
||||||
|
actor.Tell(PoisonPill.Instance);
|
||||||
|
ExpectTerminated(actor);
|
||||||
|
|
||||||
|
using var db = dbFactory.CreateDbContext();
|
||||||
|
var row = db.ConfigAuditLogs.Single();
|
||||||
|
row.DetailsJson.ShouldNotBeNull();
|
||||||
|
row.DetailsJson.ShouldContain(eventId.ToString("N"));
|
||||||
|
row.DetailsJson.ShouldContain("\"correlationId\":");
|
||||||
|
row.EventType.ShouldBe("Config:Edit");
|
||||||
|
row.NodeId.ShouldBe("node-a");
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user