feat(audit): OtOpcUa adopt canonical ZB.MOM.WW.Audit.AuditEvent + AuditWriterActor:IAuditWriter + Outcome derivation (Task 2.1)
Deep-adopt the shared audit record. Deletes the bespoke 8-field positional Commons AuditEvent and repoints the writer path at ZB.MOM.WW.Audit.AuditEvent (0.1.0, feed-mapped via dohertj2-gitea). Adds the package reference to both Commons and ControlPlane. - AuditWriterActor now implements IAuditWriter: WriteAsync(evt, ct) is a best-effort, never-throwing entry point that Self.Tell()s the event onto the same batching/dedup/flush pipeline and returns Task.CompletedTask. Existing Receive<AuditEvent> + 500/5s batching + two-layer dedup unchanged. - Flush mapping updated for the canonical field types: OccurredAtUtc is now DateTimeOffset (.UtcDateTime into the datetime2 column), SourceNode is string? (was NodeId.Value), CorrelationId is Guid? (stored null when null). Outcome is NOT yet persisted (column lands in Task 2.2). - New AuditOutcomeMapper.FromAction maps the OtOpcUa action vocabulary to the required canonical Outcome: OpcUaAccessDenied / CrossClusterNamespaceAttempt -> Denied; config verbs (DraftCreated/Edited, Published, RolledBack, NodeApplied, ClusterCreated, NodeAdded, CredentialAdded/Disabled, ExternalIdReleased) -> Success. OtOpcUa emits no Failure events. The Akka message shape changed, but the structured audit path is dormant (zero production emit/Tell sites; all live audit flows through the bespoke SP path), so there is no rolling-deploy wire-compat concern. Tested-not-exercised by design. ControlPlane.Tests: 44/44 green (AuditWriterActor suite rewritten to construct the canonical record + assert the Outcome derivation table + the WriteAsync best-effort/mailbox-routing contract + null SourceNode/CorrelationId handling).
This commit is contained in:
@@ -1,17 +0,0 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Commons.Messages.Audit;
|
||||
|
||||
/// <summary>
|
||||
/// Cluster-broadcast audit event consumed by the <c>AuditWriterActor</c> singleton, which
|
||||
/// batches and idempotently inserts into <c>ConfigAuditLog</c>.
|
||||
/// </summary>
|
||||
public sealed record AuditEvent(
|
||||
Guid EventId,
|
||||
string Category,
|
||||
string Action,
|
||||
string Actor,
|
||||
DateTime OccurredAtUtc,
|
||||
string? DetailsJson,
|
||||
NodeId SourceNode,
|
||||
CorrelationId CorrelationId);
|
||||
@@ -7,6 +7,7 @@
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Akka"/>
|
||||
<PackageReference Include="ZB.MOM.WW.Audit"/>
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
using ZB.MOM.WW.Audit;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Audit;
|
||||
|
||||
/// <summary>
|
||||
/// Maps OtOpcUa's audit <c>Action</c> vocabulary onto the canonical
|
||||
/// <see cref="AuditOutcome"/>. The vocabulary is the set of values documented on
|
||||
/// <c>ConfigAuditLog.EventType</c>: config verbs are <see cref="AuditOutcome.Success"/>,
|
||||
/// the two authorization-rejection events are <see cref="AuditOutcome.Denied"/>. OtOpcUa
|
||||
/// emits no <see cref="AuditOutcome.Failure"/> events today.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Pure function — no live emit sites construct an <see cref="AuditEvent"/> in production
|
||||
/// (the structured audit path is dormant; all live audit flows through the bespoke stored
|
||||
/// procedure path). This helper exists so that when the structured path is wired up, the
|
||||
/// required <c>Outcome</c> field is derived consistently from the action verb. Tested, not
|
||||
/// yet exercised in production.
|
||||
/// </remarks>
|
||||
public static class AuditOutcomeMapper
|
||||
{
|
||||
/// <summary>
|
||||
/// Derives the canonical <see cref="AuditOutcome"/> for an OtOpcUa audit action verb.
|
||||
/// Unknown verbs default to <see cref="AuditOutcome.Success"/> (config writes are the
|
||||
/// overwhelming majority and the only non-success cases are the two explicit
|
||||
/// authorization rejections enumerated below).
|
||||
/// </summary>
|
||||
/// <param name="action">The audit action verb (e.g. <c>DraftCreated</c>, <c>OpcUaAccessDenied</c>).</param>
|
||||
/// <returns>The mapped outcome.</returns>
|
||||
public static AuditOutcome FromAction(string action) => action switch
|
||||
{
|
||||
"OpcUaAccessDenied" or "CrossClusterNamespaceAttempt" => AuditOutcome.Denied,
|
||||
"DraftCreated"
|
||||
or "DraftEdited"
|
||||
or "Published"
|
||||
or "RolledBack"
|
||||
or "NodeApplied"
|
||||
or "ClusterCreated"
|
||||
or "NodeAdded"
|
||||
or "CredentialAdded"
|
||||
or "CredentialDisabled"
|
||||
or "ExternalIdReleased" => AuditOutcome.Success,
|
||||
_ => AuditOutcome.Success,
|
||||
};
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Event;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Audit;
|
||||
using ZB.MOM.WW.Audit;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
|
||||
@@ -19,8 +19,13 @@ namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Audit;
|
||||
/// <c>UX_ConfigAuditLog_EventId</c> (cross-restart safety — a retry of an already-flushed
|
||||
/// batch hits the constraint and we drop the duplicate insert without losing the rest of
|
||||
/// the batch).
|
||||
///
|
||||
/// Implements the shared <see cref="IAuditWriter"/> seam: <see cref="WriteAsync"/> is a
|
||||
/// best-effort, never-throwing entry point that simply <c>Tell</c>s this actor and returns
|
||||
/// a completed task, so non-Akka callers can emit canonical audit events through the same
|
||||
/// batching/dedup pipeline as in-cluster <c>Tell</c> traffic.
|
||||
/// </summary>
|
||||
public sealed class AuditWriterActor : ReceiveActor, IWithTimers
|
||||
public sealed class AuditWriterActor : ReceiveActor, IWithTimers, IAuditWriter
|
||||
{
|
||||
public const int FlushBatchSize = 500;
|
||||
public static readonly TimeSpan FlushInterval = TimeSpan.FromSeconds(5);
|
||||
@@ -52,6 +57,23 @@ public sealed class AuditWriterActor : ReceiveActor, IWithTimers
|
||||
Timers.StartPeriodicTimer("flush", Flush.Instance, FlushInterval);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// <see cref="IAuditWriter"/> seam. Best-effort and never throws: routes the event onto this
|
||||
/// actor's mailbox via <c>Tell</c> (thread-safe from any caller) so it flows through the same
|
||||
/// batching + dedup pipeline as in-cluster traffic, then returns immediately. The actual
|
||||
/// persistence happens asynchronously on the next flush; a write failure there is logged and
|
||||
/// the batch dropped (per the best-effort audit contract).
|
||||
/// </summary>
|
||||
/// <param name="evt">The canonical audit event to persist.</param>
|
||||
/// <param name="ct">Unused — enqueue is synchronous and non-blocking.</param>
|
||||
/// <returns>A completed task.</returns>
|
||||
public Task WriteAsync(AuditEvent evt, CancellationToken ct = default)
|
||||
{
|
||||
// Akka Tell is safe to call from any thread and never throws to the caller.
|
||||
Self.Tell(evt);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private void HandleEvent(AuditEvent evt)
|
||||
{
|
||||
// In-buffer dedup. Last write wins on duplicate EventId within the batch — events
|
||||
@@ -74,13 +96,13 @@ public sealed class AuditWriterActor : ReceiveActor, IWithTimers
|
||||
{
|
||||
db.ConfigAuditLogs.Add(new ConfigAuditLog
|
||||
{
|
||||
Timestamp = evt.OccurredAtUtc,
|
||||
Timestamp = evt.OccurredAtUtc.UtcDateTime,
|
||||
Principal = evt.Actor,
|
||||
EventType = $"{evt.Category}:{evt.Action}",
|
||||
NodeId = evt.SourceNode.Value,
|
||||
NodeId = evt.SourceNode,
|
||||
DetailsJson = evt.DetailsJson,
|
||||
EventId = evt.EventId,
|
||||
CorrelationId = evt.CorrelationId.Value,
|
||||
CorrelationId = evt.CorrelationId,
|
||||
});
|
||||
}
|
||||
db.SaveChanges();
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
<PackageReference Include="Akka.Cluster.Hosting"/>
|
||||
<PackageReference Include="Akka.Cluster.Tools"/>
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore"/>
|
||||
<PackageReference Include="ZB.MOM.WW.Audit"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
using Akka.Actor;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Audit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.Audit;
|
||||
using ZB.MOM.WW.OtOpcUa.ControlPlane.Audit;
|
||||
using ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.Harness;
|
||||
|
||||
@@ -11,15 +10,18 @@ namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests;
|
||||
public sealed class AuditWriterActorTests : ControlPlaneActorTestBase
|
||||
{
|
||||
private static AuditEvent NewEvent(Guid eventId, string action = "Edit", string actor = "joe") =>
|
||||
new(
|
||||
eventId,
|
||||
"Config",
|
||||
action,
|
||||
actor,
|
||||
DateTime.UtcNow,
|
||||
DetailsJson: "{\"field\":\"value\"}",
|
||||
SourceNode: NodeId.Parse("node-a"),
|
||||
CorrelationId: CorrelationId.NewId());
|
||||
new()
|
||||
{
|
||||
EventId = eventId,
|
||||
Category = "Config",
|
||||
Action = action,
|
||||
Actor = actor,
|
||||
OccurredAtUtc = DateTimeOffset.UtcNow,
|
||||
Outcome = AuditOutcomeMapper.FromAction(action),
|
||||
DetailsJson = "{\"field\":\"value\"}",
|
||||
SourceNode = "node-a",
|
||||
CorrelationId = Guid.NewGuid(),
|
||||
};
|
||||
|
||||
/// <summary>Verifies that buffered events flush when count threshold is reached.</summary>
|
||||
[Fact]
|
||||
@@ -102,4 +104,91 @@ public sealed class AuditWriterActorTests : ControlPlaneActorTestBase
|
||||
row.EventType.ShouldBe("Config:Edit");
|
||||
row.NodeId.ShouldBe("node-a");
|
||||
}
|
||||
|
||||
/// <summary>Verifies that a null SourceNode/CorrelationId on the canonical event persists as null
|
||||
/// (the canonical fields are now nullable; the actor must not assume they are set).</summary>
|
||||
[Fact]
|
||||
public void Null_sourceNode_and_correlationId_persist_as_null()
|
||||
{
|
||||
var dbFactory = NewInMemoryDbFactory();
|
||||
var actor = Sys.ActorOf(AuditWriterActor.Props(dbFactory));
|
||||
|
||||
actor.Tell(new AuditEvent
|
||||
{
|
||||
EventId = Guid.NewGuid(),
|
||||
Category = "Config",
|
||||
Action = "Published",
|
||||
Actor = "joe",
|
||||
OccurredAtUtc = DateTimeOffset.UtcNow,
|
||||
Outcome = AuditOutcome.Success,
|
||||
SourceNode = null,
|
||||
CorrelationId = null,
|
||||
});
|
||||
|
||||
Watch(actor);
|
||||
actor.Tell(PoisonPill.Instance);
|
||||
ExpectTerminated(actor);
|
||||
|
||||
using var db = dbFactory.CreateDbContext();
|
||||
var row = db.ConfigAuditLogs.Single();
|
||||
row.NodeId.ShouldBeNull();
|
||||
row.CorrelationId.ShouldBeNull();
|
||||
}
|
||||
|
||||
/// <summary>Verifies the IAuditWriter.WriteAsync seam is best-effort: it completes
|
||||
/// synchronously, never throws, and routes the event onto the actor's own mailbox
|
||||
/// (<c>Self.Tell</c>) — i.e. the same buffer + dedup + flush pipeline asserted by the Tell
|
||||
/// tests above. Reaches the concrete instance via a TestActorRef.</summary>
|
||||
[Fact]
|
||||
public async Task WriteAsync_is_best_effort_and_routes_onto_the_actor_mailbox()
|
||||
{
|
||||
var dbFactory = NewInMemoryDbFactory();
|
||||
var testRef = ActorOfAsTestActorRef<AuditWriterActor>(AuditWriterActor.Props(dbFactory));
|
||||
IAuditWriter writer = testRef.UnderlyingActor;
|
||||
|
||||
var task = writer.WriteAsync(NewEvent(Guid.NewGuid(), action: "Published"));
|
||||
task.IsCompletedSuccessfully.ShouldBeTrue("WriteAsync must be best-effort and complete synchronously");
|
||||
await Should.NotThrowAsync(async () => await task);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that an AuditEvent delivered to the actor's mailbox — which is exactly
|
||||
/// what the WriteAsync seam does via Self.Tell — is buffered and persisted with the canonical
|
||||
/// fields intact.</summary>
|
||||
[Fact]
|
||||
public void Mailbox_delivery_persists_the_canonical_fields()
|
||||
{
|
||||
var dbFactory = NewInMemoryDbFactory();
|
||||
var actor = Sys.ActorOf(AuditWriterActor.Props(dbFactory));
|
||||
|
||||
var eventId = Guid.NewGuid();
|
||||
actor.Tell(NewEvent(eventId, action: "Published"));
|
||||
|
||||
Watch(actor);
|
||||
actor.Tell(PoisonPill.Instance);
|
||||
ExpectTerminated(actor);
|
||||
|
||||
using var db = dbFactory.CreateDbContext();
|
||||
var row = db.ConfigAuditLogs.Single();
|
||||
row.EventId.ShouldBe(eventId);
|
||||
row.EventType.ShouldBe("Config:Published");
|
||||
row.NodeId.ShouldBe("node-a");
|
||||
}
|
||||
|
||||
/// <summary>Verifies the Outcome derivation table: config verbs → Success, the two
|
||||
/// authorization-rejection events → Denied.</summary>
|
||||
[Theory]
|
||||
[InlineData("DraftCreated", AuditOutcome.Success)]
|
||||
[InlineData("DraftEdited", AuditOutcome.Success)]
|
||||
[InlineData("Published", AuditOutcome.Success)]
|
||||
[InlineData("RolledBack", AuditOutcome.Success)]
|
||||
[InlineData("NodeApplied", AuditOutcome.Success)]
|
||||
[InlineData("ClusterCreated", AuditOutcome.Success)]
|
||||
[InlineData("NodeAdded", AuditOutcome.Success)]
|
||||
[InlineData("CredentialAdded", AuditOutcome.Success)]
|
||||
[InlineData("CredentialDisabled", AuditOutcome.Success)]
|
||||
[InlineData("ExternalIdReleased", AuditOutcome.Success)]
|
||||
[InlineData("OpcUaAccessDenied", AuditOutcome.Denied)]
|
||||
[InlineData("CrossClusterNamespaceAttempt", AuditOutcome.Denied)]
|
||||
public void Outcome_is_derived_from_the_action_vocabulary(string action, AuditOutcome expected) =>
|
||||
AuditOutcomeMapper.FromAction(action).ShouldBe(expected);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user