feat(audit): SecuredWrite audit kinds + best-effort per-lifecycle central direct-write; guard approve Decode (T14b)
This commit is contained in:
@@ -2,12 +2,14 @@ namespace ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
||||
|
||||
/// <summary>
|
||||
/// Top-level Audit Log (#23) channel — the trust boundary the audited action crosses.
|
||||
/// One of: outbound API call, outbound DB write, notification send/deliver, or inbound API request.
|
||||
/// One of: outbound API call, outbound DB write, notification send/deliver, inbound API request,
|
||||
/// or a two-person ("secured") write through its submit/approve/reject/execute lifecycle.
|
||||
/// </summary>
|
||||
public enum AuditChannel
|
||||
{
|
||||
ApiOutbound,
|
||||
DbOutbound,
|
||||
Notification,
|
||||
ApiInbound
|
||||
ApiInbound,
|
||||
SecuredWrite
|
||||
}
|
||||
|
||||
@@ -3,6 +3,8 @@ namespace ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
||||
/// <summary>
|
||||
/// Specific Audit Log (#23) event kind within a channel — what action produced the row.
|
||||
/// Cached variants emit multiple rows per operation (submit → forward → attempt → resolve).
|
||||
/// The <c>SecuredWrite*</c> kinds emit one row per two-person-write lifecycle event
|
||||
/// (submit → approve → execute, or submit → reject).
|
||||
/// See alog.md §4 for the full taxonomy.
|
||||
/// </summary>
|
||||
public enum AuditKind
|
||||
@@ -16,5 +18,9 @@ public enum AuditKind
|
||||
InboundRequest,
|
||||
InboundAuthFailure,
|
||||
CachedSubmit,
|
||||
CachedResolve
|
||||
CachedResolve,
|
||||
SecuredWriteSubmit,
|
||||
SecuredWriteApprove,
|
||||
SecuredWriteReject,
|
||||
SecuredWriteExecute
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
using System.Buffers.Binary;
|
||||
using System.Security.Cryptography;
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Serialization;
|
||||
@@ -21,6 +22,8 @@ using ZB.MOM.WW.ScadaBridge.Commons.Messages.DebugView;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Transport;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Management;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.RemoteQuery;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.InboundApi;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Transport;
|
||||
using ZB.MOM.WW.ScadaBridge.DeploymentManager;
|
||||
@@ -893,6 +896,74 @@ public class ManagementActor : ReceiveActor
|
||||
e.Status, e.OperatorUser, e.OperatorComment, e.SubmittedAtUtc,
|
||||
e.VerifierUser, e.VerifierComment, e.DecidedAtUtc, e.ExecutedAtUtc, e.ExecutionError);
|
||||
|
||||
/// <summary>
|
||||
/// Deterministic, reversible map from a <see cref="PendingSecuredWrite.Id"/> (a
|
||||
/// store-assigned <see cref="long"/>) to the canonical AuditLog
|
||||
/// <c>CorrelationId</c> (a <see cref="Guid"/>): the 8-byte big-endian id occupies
|
||||
/// the final 8 bytes of an otherwise-zero Guid. Every row across one secured-write
|
||||
/// lifecycle (submit → approve → execute, or submit → reject) shares this value so
|
||||
/// they join into one operation; the encoding is stable (same id ⇒ same Guid).
|
||||
/// </summary>
|
||||
private static Guid SecuredWriteCorrelation(long id)
|
||||
{
|
||||
Span<byte> bytes = stackalloc byte[16];
|
||||
BinaryPrimitives.WriteInt64BigEndian(bytes[8..], id);
|
||||
return new Guid(bytes);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Best-effort emission of ONE secured-write AuditLog row via the central direct-write
|
||||
/// path (<see cref="IAuditLogRepository.InsertIfNotExistsAsync"/>) — mirrors the
|
||||
/// Notification Outbox / Inbound API central-origin pattern. The row is built through
|
||||
/// the canonical <see cref="ScadaBridgeAuditEventFactory"/> so Action/Category/Outcome
|
||||
/// map identically to every other emit site.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Standing audit invariant: an audit-write failure NEVER aborts the secured-write
|
||||
/// action. Every exception (repository resolution OR the insert) is caught, logged at
|
||||
/// warning, and swallowed — the caller's own success/failure path is authoritative.
|
||||
/// </remarks>
|
||||
private static async Task EmitSecuredWriteAuditAsync(
|
||||
IServiceProvider sp,
|
||||
AuditKind kind,
|
||||
AuditStatus status,
|
||||
PendingSecuredWrite row,
|
||||
string actor,
|
||||
string? errorMessage = null)
|
||||
{
|
||||
try
|
||||
{
|
||||
var evt = ScadaBridgeAuditEventFactory.Create(
|
||||
channel: AuditChannel.SecuredWrite,
|
||||
kind: kind,
|
||||
status: status,
|
||||
actor: actor,
|
||||
target: $"{row.SiteId}/{row.ConnectionName}/{row.TagPath}",
|
||||
correlationId: SecuredWriteCorrelation(row.Id),
|
||||
sourceSiteId: row.SiteId,
|
||||
errorMessage: errorMessage,
|
||||
// Carry the counterparty (operator on a verifier-actioned row, and
|
||||
// vice-versa) so a single row names both parties to the two-person write.
|
||||
extra: JsonSerializer.Serialize(new
|
||||
{
|
||||
operatorUser = row.OperatorUser,
|
||||
verifierUser = row.VerifierUser
|
||||
}));
|
||||
|
||||
using var scope = sp.CreateScope();
|
||||
var auditRepo = scope.ServiceProvider.GetRequiredService<IAuditLogRepository>();
|
||||
await auditRepo.InsertIfNotExistsAsync(evt);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Audit is best-effort — swallow + log; never abort the secured-write action.
|
||||
sp.GetService<ILogger<ManagementActor>>()?.LogWarning(
|
||||
ex,
|
||||
"Best-effort secured-write audit emission failed (kind={Kind}, id={Id}); the write itself is unaffected.",
|
||||
kind, row.Id);
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task<object?> HandleSubmitSecuredWrite(
|
||||
IServiceProvider sp, SubmitSecuredWriteCommand cmd, AuthenticatedUser user)
|
||||
{
|
||||
@@ -926,6 +997,12 @@ public class ManagementActor : ReceiveActor
|
||||
|
||||
var repo = sp.GetRequiredService<ISecuredWriteRepository>();
|
||||
entity.Id = await repo.AddAsync(entity);
|
||||
|
||||
// T14b — one append-only audit row per lifecycle event. Emitted AFTER the row is
|
||||
// persisted (so it carries the store-assigned id); best-effort — see helper.
|
||||
await EmitSecuredWriteAuditAsync(
|
||||
sp, AuditKind.SecuredWriteSubmit, AuditStatus.Submitted, entity, actor: entity.OperatorUser);
|
||||
|
||||
return ToSecuredWriteDto(entity);
|
||||
}
|
||||
|
||||
@@ -962,6 +1039,12 @@ public class ManagementActor : ReceiveActor
|
||||
row.VerifierComment = cmd.Comment;
|
||||
row.DecidedAtUtc = decidedAtUtc;
|
||||
|
||||
// T14b — the approval decision is itself an audited lifecycle event (the
|
||||
// verifier won the CAS). Emitted with the in-flight Submitted status; the
|
||||
// Execute row below records the terminal write outcome.
|
||||
await EmitSecuredWriteAuditAsync(
|
||||
sp, AuditKind.SecuredWriteApprove, AuditStatus.Submitted, row, actor: user.Username);
|
||||
|
||||
// Validate the value type BEFORE attempting the relay. An unknown type can
|
||||
// never be decoded/written, so fail the row deterministically rather than
|
||||
// leaving it stuck Approved. (Addresses the C2 reviewer's deferred
|
||||
@@ -972,10 +1055,32 @@ public class ManagementActor : ReceiveActor
|
||||
row.ExecutedAtUtc = DateTime.UtcNow;
|
||||
row.ExecutionError = "unknown value type";
|
||||
await repo.UpdateAsync(row);
|
||||
await EmitSecuredWriteAuditAsync(
|
||||
sp, AuditKind.SecuredWriteExecute, AuditStatus.Failed, row,
|
||||
actor: user.Username, errorMessage: row.ExecutionError);
|
||||
return ToSecuredWriteDto(row);
|
||||
}
|
||||
|
||||
var value = Commons.Types.AttributeValueCodec.Decode(row.ValueJson, dataType, elementType: null);
|
||||
// C3 robustness fix: Decode is UNGUARDED in the pre-T14b code — a List-typed
|
||||
// value carrying corrupt JSON throws out of the handler and leaves the row
|
||||
// stuck Approved. Contain it: fail the row deterministically with the decode
|
||||
// error, audit the failure, and return WITHOUT relaying (nothing to write).
|
||||
object? value;
|
||||
try
|
||||
{
|
||||
value = Commons.Types.AttributeValueCodec.Decode(row.ValueJson, dataType, elementType: null);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
row.Status = "Failed";
|
||||
row.ExecutedAtUtc = DateTime.UtcNow;
|
||||
row.ExecutionError = $"value decode error: {ex.Message}";
|
||||
await repo.UpdateAsync(row);
|
||||
await EmitSecuredWriteAuditAsync(
|
||||
sp, AuditKind.SecuredWriteExecute, AuditStatus.Failed, row,
|
||||
actor: user.Username, errorMessage: row.ExecutionError);
|
||||
return ToSecuredWriteDto(row);
|
||||
}
|
||||
|
||||
// Relay the write to the site MxGateway connection. A transport exception is
|
||||
// contained so the row is never left stuck Approved.
|
||||
@@ -1007,6 +1112,17 @@ public class ManagementActor : ReceiveActor
|
||||
|
||||
// UpdateAsync overwrites all columns -> pass the fully-populated entity.
|
||||
await repo.UpdateAsync(row);
|
||||
|
||||
// T14b — terminal execute outcome: Delivered (relay succeeded) maps to canonical
|
||||
// Success, Failed maps to canonical Failure (the error rides in the row detail).
|
||||
await EmitSecuredWriteAuditAsync(
|
||||
sp,
|
||||
AuditKind.SecuredWriteExecute,
|
||||
success ? AuditStatus.Delivered : AuditStatus.Failed,
|
||||
row,
|
||||
actor: user.Username,
|
||||
errorMessage: error);
|
||||
|
||||
return ToSecuredWriteDto(row);
|
||||
}
|
||||
|
||||
@@ -1033,6 +1149,12 @@ public class ManagementActor : ReceiveActor
|
||||
|
||||
// UpdateAsync overwrites all columns -> pass the fully-populated entity.
|
||||
await repo.UpdateAsync(entity);
|
||||
|
||||
// T14b — reject is a terminal lifecycle event (canonical Discarded outcome).
|
||||
// Actor = the verifier; the operator is carried in the row's extra detail.
|
||||
await EmitSecuredWriteAuditAsync(
|
||||
sp, AuditKind.SecuredWriteReject, AuditStatus.Discarded, entity, actor: user.Username);
|
||||
|
||||
return ToSecuredWriteDto(entity);
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,9 @@ using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Management;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
|
||||
using ZB.MOM.WW.ScadaBridge.Communication;
|
||||
using AuditEvent = ZB.MOM.WW.Audit.AuditEvent;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.ManagementService.Tests;
|
||||
|
||||
@@ -24,6 +26,7 @@ public class SecuredWriteHandlerTests : TestKit, IDisposable
|
||||
{
|
||||
private readonly ISiteRepository _siteRepo;
|
||||
private readonly ISecuredWriteRepository _securedWriteRepo;
|
||||
private readonly IAuditLogRepository _auditRepo;
|
||||
private readonly StubCommunicationService _comms;
|
||||
private readonly ServiceCollection _services;
|
||||
|
||||
@@ -31,14 +34,44 @@ public class SecuredWriteHandlerTests : TestKit, IDisposable
|
||||
{
|
||||
_siteRepo = Substitute.For<ISiteRepository>();
|
||||
_securedWriteRepo = Substitute.For<ISecuredWriteRepository>();
|
||||
_auditRepo = Substitute.For<IAuditLogRepository>();
|
||||
_comms = new StubCommunicationService();
|
||||
|
||||
_services = new ServiceCollection();
|
||||
_services.AddScoped(_ => _siteRepo);
|
||||
_services.AddScoped(_ => _securedWriteRepo);
|
||||
_services.AddScoped(_ => _auditRepo);
|
||||
_services.AddSingleton<CommunicationService>(_comms);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Captures every <see cref="AuditEvent"/> handed to the substituted
|
||||
/// <see cref="IAuditLogRepository.InsertIfNotExistsAsync"/>. Audit emission is
|
||||
/// best-effort and asynchronous off the actor thread, so a short await-condition
|
||||
/// poll lets the captured list settle before assertions.
|
||||
/// </summary>
|
||||
private List<AuditEvent> CaptureAuditEvents()
|
||||
{
|
||||
var captured = new List<AuditEvent>();
|
||||
_auditRepo
|
||||
.When(r => r.InsertIfNotExistsAsync(Arg.Any<AuditEvent>(), Arg.Any<CancellationToken>()))
|
||||
.Do(ci => captured.Add(ci.Arg<AuditEvent>()));
|
||||
return captured;
|
||||
}
|
||||
|
||||
/// <summary>Spins briefly until the captured audit list reaches <paramref name="count"/> rows.</summary>
|
||||
private static void WaitForAuditRows(List<AuditEvent> captured, int count)
|
||||
{
|
||||
var deadline = DateTime.UtcNow.AddSeconds(5);
|
||||
while (captured.Count < count && DateTime.UtcNow < deadline)
|
||||
{
|
||||
Thread.Sleep(10);
|
||||
}
|
||||
}
|
||||
|
||||
private static AuditEvent SingleOfKind(List<AuditEvent> captured, AuditKind kind) =>
|
||||
Assert.Single(captured, e => e.Action == $"{AuditChannel.SecuredWrite}.{kind}");
|
||||
|
||||
/// <summary>
|
||||
/// Test double for the site-write seam. <see cref="CommunicationService.WriteTagAsync"/>
|
||||
/// is virtual so the approve relay can be exercised without a live actor system;
|
||||
@@ -467,4 +500,159 @@ public class SecuredWriteHandlerTests : TestKit, IDisposable
|
||||
Assert.Equal("Failed", updated!.Status);
|
||||
Assert.Equal("unknown value type", updated.ExecutionError);
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------
|
||||
// Audit emission (T14b — SecuredWrite AuditLog rows)
|
||||
// ------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// The correlation id stamped on a secured-write audit row encodes the row's long Id
|
||||
/// as a big-endian value in the final 8 bytes of an otherwise-zero Guid — mirrors the
|
||||
/// production encoding in <c>ManagementActor.SecuredWriteCorrelation</c>.
|
||||
/// </summary>
|
||||
private static Guid CorrelationFor(long id)
|
||||
{
|
||||
Span<byte> bytes = stackalloc byte[16];
|
||||
System.Buffers.Binary.BinaryPrimitives.WriteInt64BigEndian(bytes[8..], id);
|
||||
return new Guid(bytes);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Submit_EmitsExactlyOneSubmitAuditRow_WithOperatorAndCorrelation()
|
||||
{
|
||||
SeedSiteWithConnection(1, "SITE1", "Mx1", "MxGateway");
|
||||
_securedWriteRepo.AddAsync(Arg.Any<PendingSecuredWrite>(), Arg.Any<CancellationToken>())
|
||||
.Returns(55L);
|
||||
var captured = CaptureAuditEvents();
|
||||
|
||||
var actor = CreateActor();
|
||||
var envelope = Envelope(
|
||||
new SubmitSecuredWriteCommand("SITE1", "Mx1", "Tag.Setpoint", "42.5", "Double", "raise"),
|
||||
"alice", "Operator");
|
||||
|
||||
actor.Tell(envelope);
|
||||
|
||||
ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
|
||||
WaitForAuditRows(captured, 1);
|
||||
|
||||
var evt = SingleOfKind(captured, AuditKind.SecuredWriteSubmit);
|
||||
Assert.Equal("alice", evt.Actor);
|
||||
Assert.Equal(CorrelationFor(55L), evt.CorrelationId);
|
||||
Assert.Equal("SecuredWrite", evt.Category);
|
||||
Assert.Equal("SITE1/Mx1/Tag.Setpoint", evt.Target);
|
||||
// Exactly one row — no stray duplicates.
|
||||
Assert.Single(captured);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Reject_EmitsOneRejectAuditRow_WithVerifier()
|
||||
{
|
||||
SeedPendingWrite(7, operatorUser: "alice");
|
||||
var captured = CaptureAuditEvents();
|
||||
|
||||
var actor = CreateActor();
|
||||
var envelope = Envelope(new RejectSecuredWriteCommand(7, "not authorized"), "bob", "Verifier");
|
||||
|
||||
actor.Tell(envelope);
|
||||
|
||||
ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
|
||||
WaitForAuditRows(captured, 1);
|
||||
|
||||
var evt = SingleOfKind(captured, AuditKind.SecuredWriteReject);
|
||||
Assert.Equal("bob", evt.Actor);
|
||||
Assert.Equal(CorrelationFor(7L), evt.CorrelationId);
|
||||
Assert.Single(captured);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Approve_Success_EmitsApproveThenExecuteDeliveredAuditRows()
|
||||
{
|
||||
SeedPendingWrite(7, operatorUser: "alice");
|
||||
ArmCasSuccess(7);
|
||||
var captured = CaptureAuditEvents();
|
||||
|
||||
var actor = CreateActor();
|
||||
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier");
|
||||
|
||||
actor.Tell(envelope);
|
||||
|
||||
ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
|
||||
WaitForAuditRows(captured, 2);
|
||||
|
||||
var approve = SingleOfKind(captured, AuditKind.SecuredWriteApprove);
|
||||
Assert.Equal("bob", approve.Actor);
|
||||
Assert.Equal(CorrelationFor(7L), approve.CorrelationId);
|
||||
|
||||
var execute = SingleOfKind(captured, AuditKind.SecuredWriteExecute);
|
||||
Assert.Equal("bob", execute.Actor);
|
||||
Assert.Equal(CorrelationFor(7L), execute.CorrelationId);
|
||||
// Delivered outcome → canonical Success.
|
||||
Assert.Equal(ZB.MOM.WW.Audit.AuditOutcome.Success, execute.Outcome);
|
||||
Assert.Equal(2, captured.Count);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Submit_AuditInsertThrows_StillSucceeds_RowStillPersisted()
|
||||
{
|
||||
SeedSiteWithConnection(1, "SITE1", "Mx1", "MxGateway");
|
||||
PendingSecuredWrite? inserted = null;
|
||||
_securedWriteRepo
|
||||
.When(r => r.AddAsync(Arg.Any<PendingSecuredWrite>(), Arg.Any<CancellationToken>()))
|
||||
.Do(ci => inserted = ci.Arg<PendingSecuredWrite>());
|
||||
_securedWriteRepo.AddAsync(Arg.Any<PendingSecuredWrite>(), Arg.Any<CancellationToken>())
|
||||
.Returns(55L);
|
||||
// Best-effort: a thrown audit insert must NOT abort the secured-write action.
|
||||
_auditRepo.InsertIfNotExistsAsync(Arg.Any<AuditEvent>(), Arg.Any<CancellationToken>())
|
||||
.Returns(Task.FromException(new InvalidOperationException("audit db down")));
|
||||
|
||||
var actor = CreateActor();
|
||||
var envelope = Envelope(
|
||||
new SubmitSecuredWriteCommand("SITE1", "Mx1", "Tag.A", "true", "Boolean", null),
|
||||
"alice", "Operator");
|
||||
|
||||
actor.Tell(envelope);
|
||||
|
||||
// The action still succeeds and the row is still persisted despite the audit failure.
|
||||
var response = ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
|
||||
Assert.Equal(envelope.CorrelationId, response.CorrelationId);
|
||||
Assert.NotNull(inserted);
|
||||
Assert.Equal("Pending", inserted!.Status);
|
||||
Assert.Equal("alice", inserted.OperatorUser);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Approve_CorruptListValueJson_FlipsStatusToFailed_DecodeError_NoRelay()
|
||||
{
|
||||
// C3 robustness fix: a List-typed value with corrupt JSON must not throw
|
||||
// out of the handler and leave the row stuck Approved — it is contained,
|
||||
// the row flips to Failed with a "value decode error", and no relay occurs.
|
||||
var row = SeedPendingWrite(7, operatorUser: "alice");
|
||||
row.ValueType = "List";
|
||||
row.ValueJson = "{not valid json";
|
||||
ArmCasSuccess(7);
|
||||
var captured = CaptureAuditEvents();
|
||||
|
||||
PendingSecuredWrite? updated = null;
|
||||
_securedWriteRepo
|
||||
.When(r => r.UpdateAsync(Arg.Any<PendingSecuredWrite>(), Arg.Any<CancellationToken>()))
|
||||
.Do(ci => updated = ci.Arg<PendingSecuredWrite>());
|
||||
|
||||
var actor = CreateActor();
|
||||
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier");
|
||||
|
||||
actor.Tell(envelope);
|
||||
|
||||
ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
|
||||
// No relay — the value never decoded.
|
||||
Assert.Equal(0, _comms.CallCount);
|
||||
Assert.NotNull(updated);
|
||||
Assert.Equal("Failed", updated!.Status);
|
||||
Assert.NotNull(updated.ExecutedAtUtc);
|
||||
Assert.StartsWith("value decode error:", updated.ExecutionError);
|
||||
|
||||
// The Execute audit row records the failure (canonical Failure outcome).
|
||||
WaitForAuditRows(captured, 2);
|
||||
var execute = SingleOfKind(captured, AuditKind.SecuredWriteExecute);
|
||||
Assert.Equal(ZB.MOM.WW.Audit.AuditOutcome.Failure, execute.Outcome);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user