feat(audit): EventId + CorrelationId columns + filtered unique index (F3 + F4)
ConfigAuditLog gains two nullable columns (EventId, CorrelationId) + a filtered unique index UX_ConfigAuditLog_EventId. EF migration 20260526105027_AddConfigAuditLogEventIdColumns is additive (nullable + filtered index = legacy rows backfill cleanly). AuditWriterActor now writes EventId + CorrelationId into the dedicated columns instead of synthesising a JSON wrapper into DetailsJson. Cross-restart dedup is now real: a retry of an already-flushed batch hits the unique index and SaveChanges throws; the existing catch drops the duplicate without losing the rest of the batch. WrapDetails helper deleted — F4 (its JSON hardening) becomes moot. AuditWriterActorTests.Details_wrapper_embeds_eventId_and_correlationId renamed + rewritten to assert against the columns. All 29 ControlPlane tests pass, all 95 v2 tests green.
This commit is contained in:
@@ -22,4 +22,16 @@ public sealed class ConfigAuditLog
|
|||||||
public long? GenerationId { get; set; }
|
public long? GenerationId { get; set; }
|
||||||
|
|
||||||
public string? DetailsJson { get; set; }
|
public string? DetailsJson { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Stable per-event identifier from <c>AuditEvent.EventId</c>. Filtered unique index on
|
||||||
|
/// this column gives cross-restart idempotency for the batched AuditWriterActor: a flush
|
||||||
|
/// that retries after a process crash can re-send the same EventId without producing a
|
||||||
|
/// duplicate row. Nullable so pre-v2 rows backfill cleanly.
|
||||||
|
/// </summary>
|
||||||
|
public Guid? EventId { get; set; }
|
||||||
|
|
||||||
|
/// <summary>Correlation ID from <c>AuditEvent.CorrelationId</c> so an audit row joins to its
|
||||||
|
/// originating request/workflow. Nullable for the same backfill reason as <see cref="EventId"/>.</summary>
|
||||||
|
public Guid? CorrelationId { get; set; }
|
||||||
}
|
}
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,50 @@
|
|||||||
|
using System;
|
||||||
|
using Microsoft.EntityFrameworkCore.Migrations;
|
||||||
|
|
||||||
|
#nullable disable
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
||||||
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
|
public partial class AddConfigAuditLogEventIdColumns : Migration
|
||||||
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
|
protected override void Up(MigrationBuilder migrationBuilder)
|
||||||
|
{
|
||||||
|
migrationBuilder.AddColumn<Guid>(
|
||||||
|
name: "CorrelationId",
|
||||||
|
table: "ConfigAuditLog",
|
||||||
|
type: "uniqueidentifier",
|
||||||
|
nullable: true);
|
||||||
|
|
||||||
|
migrationBuilder.AddColumn<Guid>(
|
||||||
|
name: "EventId",
|
||||||
|
table: "ConfigAuditLog",
|
||||||
|
type: "uniqueidentifier",
|
||||||
|
nullable: true);
|
||||||
|
|
||||||
|
migrationBuilder.CreateIndex(
|
||||||
|
name: "UX_ConfigAuditLog_EventId",
|
||||||
|
table: "ConfigAuditLog",
|
||||||
|
column: "EventId",
|
||||||
|
unique: true,
|
||||||
|
filter: "[EventId] IS NOT NULL");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
protected override void Down(MigrationBuilder migrationBuilder)
|
||||||
|
{
|
||||||
|
migrationBuilder.DropIndex(
|
||||||
|
name: "UX_ConfigAuditLog_EventId",
|
||||||
|
table: "ConfigAuditLog");
|
||||||
|
|
||||||
|
migrationBuilder.DropColumn(
|
||||||
|
name: "CorrelationId",
|
||||||
|
table: "ConfigAuditLog");
|
||||||
|
|
||||||
|
migrationBuilder.DropColumn(
|
||||||
|
name: "EventId",
|
||||||
|
table: "ConfigAuditLog");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -165,9 +165,15 @@ namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
|||||||
.HasMaxLength(64)
|
.HasMaxLength(64)
|
||||||
.HasColumnType("nvarchar(64)");
|
.HasColumnType("nvarchar(64)");
|
||||||
|
|
||||||
|
b.Property<Guid?>("CorrelationId")
|
||||||
|
.HasColumnType("uniqueidentifier");
|
||||||
|
|
||||||
b.Property<string>("DetailsJson")
|
b.Property<string>("DetailsJson")
|
||||||
.HasColumnType("nvarchar(max)");
|
.HasColumnType("nvarchar(max)");
|
||||||
|
|
||||||
|
b.Property<Guid?>("EventId")
|
||||||
|
.HasColumnType("uniqueidentifier");
|
||||||
|
|
||||||
b.Property<string>("EventType")
|
b.Property<string>("EventType")
|
||||||
.IsRequired()
|
.IsRequired()
|
||||||
.HasMaxLength(64)
|
.HasMaxLength(64)
|
||||||
@@ -192,6 +198,11 @@ namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
|||||||
|
|
||||||
b.HasKey("AuditId");
|
b.HasKey("AuditId");
|
||||||
|
|
||||||
|
b.HasIndex("EventId")
|
||||||
|
.IsUnique()
|
||||||
|
.HasDatabaseName("UX_ConfigAuditLog_EventId")
|
||||||
|
.HasFilter("[EventId] IS NOT NULL");
|
||||||
|
|
||||||
b.HasIndex("GenerationId")
|
b.HasIndex("GenerationId")
|
||||||
.HasDatabaseName("IX_ConfigAuditLog_Generation")
|
.HasDatabaseName("IX_ConfigAuditLog_Generation")
|
||||||
.HasFilter("[GenerationId] IS NOT NULL");
|
.HasFilter("[GenerationId] IS NOT NULL");
|
||||||
|
|||||||
@@ -413,6 +413,8 @@ public sealed class OtOpcUaConfigDbContext(DbContextOptions<OtOpcUaConfigDbConte
|
|||||||
e.Property(x => x.ClusterId).HasMaxLength(64);
|
e.Property(x => x.ClusterId).HasMaxLength(64);
|
||||||
e.Property(x => x.NodeId).HasMaxLength(64);
|
e.Property(x => x.NodeId).HasMaxLength(64);
|
||||||
e.Property(x => x.DetailsJson).HasColumnType("nvarchar(max)");
|
e.Property(x => x.DetailsJson).HasColumnType("nvarchar(max)");
|
||||||
|
e.Property(x => x.EventId);
|
||||||
|
e.Property(x => x.CorrelationId);
|
||||||
|
|
||||||
e.HasIndex(x => new { x.ClusterId, x.Timestamp })
|
e.HasIndex(x => new { x.ClusterId, x.Timestamp })
|
||||||
.IsDescending(false, true)
|
.IsDescending(false, true)
|
||||||
@@ -420,6 +422,14 @@ public sealed class OtOpcUaConfigDbContext(DbContextOptions<OtOpcUaConfigDbConte
|
|||||||
e.HasIndex(x => x.GenerationId)
|
e.HasIndex(x => x.GenerationId)
|
||||||
.HasFilter("[GenerationId] IS NOT NULL")
|
.HasFilter("[GenerationId] IS NOT NULL")
|
||||||
.HasDatabaseName("IX_ConfigAuditLog_Generation");
|
.HasDatabaseName("IX_ConfigAuditLog_Generation");
|
||||||
|
// Filtered unique index gives cross-restart idempotency for the AuditWriterActor:
|
||||||
|
// a retry of an already-flushed batch will hit this constraint and the catch in
|
||||||
|
// FlushBuffer drops the duplicate insert. Nullable + filter so legacy backfill rows
|
||||||
|
// (EventId=NULL) don't collide.
|
||||||
|
e.HasIndex(x => x.EventId)
|
||||||
|
.IsUnique()
|
||||||
|
.HasFilter("[EventId] IS NOT NULL")
|
||||||
|
.HasDatabaseName("UX_ConfigAuditLog_EventId");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -14,9 +14,11 @@ namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Audit;
|
|||||||
/// - <see cref="FlushInterval"/> elapses with a non-empty buffer.
|
/// - <see cref="FlushInterval"/> elapses with a non-empty buffer.
|
||||||
/// - <c>PreRestart</c> / <c>PostStop</c> (supervisor swap or coordinated shutdown).
|
/// - <c>PreRestart</c> / <c>PostStop</c> (supervisor swap or coordinated shutdown).
|
||||||
///
|
///
|
||||||
/// Dedup is in-buffer only — once a batch is flushed, the actor accepts a duplicate
|
/// Dedup is two-layer: in-buffer (the <see cref="Dictionary{TKey, TValue}"/> below collapses
|
||||||
/// <see cref="AuditEvent.EventId"/> as a new row. True cross-restart idempotency needs an
|
/// duplicate EventIds before flush) and at the database via the filtered unique index
|
||||||
/// EventId column with a unique index on <c>ConfigAuditLog</c>; tracked as follow-up F3.
|
/// <c>UX_ConfigAuditLog_EventId</c> (cross-restart safety — a retry of an already-flushed
|
||||||
|
/// batch hits the constraint and we drop the duplicate insert without losing the rest of
|
||||||
|
/// the batch).
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public sealed class AuditWriterActor : ReceiveActor, IWithTimers
|
public sealed class AuditWriterActor : ReceiveActor, IWithTimers
|
||||||
{
|
{
|
||||||
@@ -70,7 +72,9 @@ public sealed class AuditWriterActor : ReceiveActor, IWithTimers
|
|||||||
Principal = evt.Actor,
|
Principal = evt.Actor,
|
||||||
EventType = $"{evt.Category}:{evt.Action}",
|
EventType = $"{evt.Category}:{evt.Action}",
|
||||||
NodeId = evt.SourceNode.Value,
|
NodeId = evt.SourceNode.Value,
|
||||||
DetailsJson = WrapDetails(evt),
|
DetailsJson = evt.DetailsJson,
|
||||||
|
EventId = evt.EventId,
|
||||||
|
CorrelationId = evt.CorrelationId.Value,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
db.SaveChanges();
|
db.SaveChanges();
|
||||||
@@ -82,17 +86,6 @@ public sealed class AuditWriterActor : ReceiveActor, IWithTimers
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Wraps caller-supplied details with the EventId + CorrelationId so audit consumers can
|
|
||||||
/// reconstruct the original message. Until ConfigAuditLog gains a first-class EventId column
|
|
||||||
/// (follow-up F3), this is the only place these correlation IDs are persisted.
|
|
||||||
/// </summary>
|
|
||||||
private static string WrapDetails(AuditEvent evt)
|
|
||||||
{
|
|
||||||
var details = evt.DetailsJson ?? "null";
|
|
||||||
return $"{{\"eventId\":\"{evt.EventId:N}\",\"correlationId\":\"{evt.CorrelationId.Value:N}\",\"details\":{details}}}";
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override void PreRestart(Exception reason, object message)
|
protected override void PreRestart(Exception reason, object message)
|
||||||
{
|
{
|
||||||
FlushBuffer();
|
FlushBuffer();
|
||||||
|
|||||||
@@ -78,7 +78,7 @@ public sealed class AuditWriterActorTests : ControlPlaneActorTestBase
|
|||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public void Details_wrapper_embeds_eventId_and_correlationId()
|
public void EventId_and_CorrelationId_are_persisted_to_dedicated_columns()
|
||||||
{
|
{
|
||||||
var dbFactory = NewInMemoryDbFactory();
|
var dbFactory = NewInMemoryDbFactory();
|
||||||
var actor = Sys.ActorOf(AuditWriterActor.Props(dbFactory));
|
var actor = Sys.ActorOf(AuditWriterActor.Props(dbFactory));
|
||||||
@@ -92,9 +92,9 @@ public sealed class AuditWriterActorTests : ControlPlaneActorTestBase
|
|||||||
|
|
||||||
using var db = dbFactory.CreateDbContext();
|
using var db = dbFactory.CreateDbContext();
|
||||||
var row = db.ConfigAuditLogs.Single();
|
var row = db.ConfigAuditLogs.Single();
|
||||||
row.DetailsJson.ShouldNotBeNull();
|
row.EventId.ShouldBe(eventId);
|
||||||
row.DetailsJson.ShouldContain(eventId.ToString("N"));
|
row.CorrelationId.ShouldNotBeNull();
|
||||||
row.DetailsJson.ShouldContain("\"correlationId\":");
|
row.DetailsJson.ShouldBe("{\"field\":\"value\"}");
|
||||||
row.EventType.ShouldBe("Config:Edit");
|
row.EventType.ShouldBe("Config:Edit");
|
||||||
row.NodeId.ShouldBe("node-a");
|
row.NodeId.ShouldBe("node-a");
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user