using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.SecuredWrites;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Management;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.Communication;
using AuditEvent = ZB.MOM.WW.Audit.AuditEvent;
namespace ZB.MOM.WW.ScadaBridge.ManagementService.Tests;
///
/// Handler tests for the two-person ("secured") write submit/reject/list flow
/// (M7 OPC UA / MxGateway UX, Task T14b). Submit is gated to Operator, reject to
/// Verifier; the no-self-approval rule and the MxGateway-protocol precondition
/// are enforced inside the handlers and surface as ManagementError.
///
public class SecuredWriteHandlerTests : TestKit, IDisposable
{
private readonly ISiteRepository _siteRepo;
private readonly ISecuredWriteRepository _securedWriteRepo;
private readonly IAuditLogRepository _auditRepo;
private readonly StubCommunicationService _comms;
private readonly ServiceCollection _services;
public SecuredWriteHandlerTests()
{
_siteRepo = Substitute.For();
_securedWriteRepo = Substitute.For();
_auditRepo = Substitute.For();
_comms = new StubCommunicationService();
_services = new ServiceCollection();
_services.AddScoped(_ => _siteRepo);
_services.AddScoped(_ => _securedWriteRepo);
_services.AddScoped(_ => _auditRepo);
_services.AddSingleton(_comms);
}
///
/// Captures every handed to the substituted
/// . Audit emission is
/// best-effort and asynchronous off the actor thread, so a short await-condition
/// poll lets the captured list settle before assertions.
///
private List CaptureAuditEvents()
{
var captured = new List();
_auditRepo
.When(r => r.InsertIfNotExistsAsync(Arg.Any(), Arg.Any()))
.Do(ci => captured.Add(ci.Arg()));
return captured;
}
/// Spins briefly until the captured audit list reaches rows.
private static void WaitForAuditRows(List captured, int count)
{
var deadline = DateTime.UtcNow.AddSeconds(5);
while (captured.Count < count && DateTime.UtcNow < deadline)
{
Thread.Sleep(10);
}
}
private static AuditEvent SingleOfKind(List captured, AuditKind kind) =>
Assert.Single(captured, e => e.Action == $"{AuditChannel.SecuredWrite}.{kind}");
///
/// Test double for the site-write seam.
/// is virtual so the approve relay can be exercised without a live actor system;
/// records the relayed request and returns a canned response.
///
private sealed class StubCommunicationService : CommunicationService
{
public StubCommunicationService()
: base(Options.Create(new CommunicationOptions()), NullLogger.Instance)
{
}
public WriteTagRequest? LastRequest { get; private set; }
public int CallCount { get; private set; }
public Func Responder { get; set; } =
req => new WriteTagResponse(req.CorrelationId, Success: true, ErrorMessage: null, DateTimeOffset.UtcNow);
public Exception? ThrowOnWrite { get; set; }
public override Task WriteTagAsync(
string siteId, WriteTagRequest request, CancellationToken ct = default)
{
CallCount++;
LastRequest = request;
if (ThrowOnWrite is not null)
{
return Task.FromException(ThrowOnWrite);
}
return Task.FromResult(Responder(request));
}
}
private IActorRef CreateActor()
{
var sp = _services.BuildServiceProvider();
return Sys.ActorOf(Props.Create(() => new ManagementActor(
sp, NullLogger.Instance)));
}
private static ManagementEnvelope Envelope(object command, string username, params string[] roles) =>
new(new AuthenticatedUser(username, username, roles, Array.Empty()),
command, Guid.NewGuid().ToString("N"));
void IDisposable.Dispose() => Shutdown();
/// Wires a site whose single connection uses the given protocol.
private void SeedSiteWithConnection(int siteId, string identifier, string connectionName, string protocol)
{
_siteRepo.GetSiteByIdentifierAsync(identifier, Arg.Any())
.Returns(new Site($"Site{siteId}", identifier) { Id = siteId });
_siteRepo.GetDataConnectionsBySiteIdAsync(siteId, Arg.Any())
.Returns(new List
{
new(connectionName, protocol, siteId) { Id = 100 }
});
}
// ------------------------------------------------------------------------
// Submit
// ------------------------------------------------------------------------
[Fact]
public void Submit_OnNonMxGatewayConnection_ReturnsError_NoRowInserted()
{
SeedSiteWithConnection(1, "SITE1", "Plc1", "OpcUa");
var actor = CreateActor();
var envelope = Envelope(
new SubmitSecuredWriteCommand("SITE1", "Plc1", "Tag.A", "true", "Boolean", "go"),
"alice", "Operator");
actor.Tell(envelope);
var response = ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal("COMMAND_FAILED", response.ErrorCode);
Assert.Contains("MxGateway", response.Error);
_securedWriteRepo.DidNotReceiveWithAnyArgs().AddAsync(default!, default);
}
[Fact]
public void Submit_OnUnknownConnection_ReturnsError_NoRowInserted()
{
SeedSiteWithConnection(1, "SITE1", "Plc1", "MxGateway");
var actor = CreateActor();
var envelope = Envelope(
new SubmitSecuredWriteCommand("SITE1", "DoesNotExist", "Tag.A", "true", "Boolean", null),
"alice", "Operator");
actor.Tell(envelope);
var response = ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal("COMMAND_FAILED", response.ErrorCode);
_securedWriteRepo.DidNotReceiveWithAnyArgs().AddAsync(default!, default);
}
[Fact]
public void Submit_OnMxGatewayConnection_InsertsPendingRowWithOperatorUser()
{
SeedSiteWithConnection(1, "SITE1", "Mx1", "MxGateway");
PendingSecuredWrite? inserted = null;
_securedWriteRepo
.When(r => r.AddAsync(Arg.Any(), Arg.Any()))
.Do(ci => inserted = ci.Arg());
_securedWriteRepo.AddAsync(Arg.Any(), Arg.Any())
.Returns(55L);
var actor = CreateActor();
var envelope = Envelope(
new SubmitSecuredWriteCommand("SITE1", "Mx1", "Tag.Setpoint", "42.5", "Double", "raise setpoint"),
"alice", "Operator");
actor.Tell(envelope);
var response = ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal(envelope.CorrelationId, response.CorrelationId);
Assert.NotNull(inserted);
Assert.Equal("Pending", inserted!.Status);
Assert.Equal("alice", inserted.OperatorUser);
Assert.Equal("Mx1", inserted.ConnectionName);
Assert.Equal("SITE1", inserted.SiteId);
Assert.Equal("Tag.Setpoint", inserted.TagPath);
Assert.Equal("42.5", inserted.ValueJson);
Assert.Equal("Double", inserted.ValueType);
Assert.Equal("raise setpoint", inserted.OperatorComment);
// The returned DTO carries the store-assigned id and operator.
Assert.Contains("\"id\":55", response.JsonData);
Assert.Contains("alice", response.JsonData);
}
// ------------------------------------------------------------------------
// Reject
// ------------------------------------------------------------------------
private PendingSecuredWrite SeedPendingWrite(long id, string operatorUser)
{
var row = new PendingSecuredWrite
{
Id = id,
SiteId = "SITE1",
ConnectionName = "Mx1",
TagPath = "Tag.A",
ValueJson = "true",
ValueType = "Boolean",
Status = "Pending",
OperatorUser = operatorUser,
SubmittedAtUtc = DateTime.UtcNow
};
_securedWriteRepo.GetAsync(id, Arg.Any()).Returns(row);
// The approve handler re-asserts the MxGateway protocol AT EXECUTE (D2 / T14
// TOCTOU guard), so the connection named on the row must still resolve to an
// MxGateway connection at approve-time for the relay to proceed. Seed it here
// so every approve test that should relay sees a valid MxGateway target; tests
// exercising the reconfigured/missing-connection case re-stub this lookup.
SeedSiteWithConnection(1, "SITE1", "Mx1", "MxGateway");
return row;
}
[Fact]
public void Reject_BySubmittingUser_ReturnsError_StatusUnchanged()
{
SeedPendingWrite(7, operatorUser: "alice");
var actor = CreateActor();
// Same principal that submitted attempts to reject — separation of duties.
var envelope = Envelope(new RejectSecuredWriteCommand(7, "no"), "alice", "Verifier");
actor.Tell(envelope);
var response = ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal("COMMAND_FAILED", response.ErrorCode);
_securedWriteRepo.DidNotReceiveWithAnyArgs().UpdateAsync(default!, default);
}
[Fact]
public void Reject_ByDifferentUser_FlipsStatusToRejected()
{
SeedPendingWrite(7, operatorUser: "alice");
PendingSecuredWrite? updated = null;
_securedWriteRepo
.When(r => r.UpdateAsync(Arg.Any(), Arg.Any()))
.Do(ci => updated = ci.Arg());
var actor = CreateActor();
var envelope = Envelope(new RejectSecuredWriteCommand(7, "not authorized"), "bob", "Verifier");
actor.Tell(envelope);
var response = ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal(envelope.CorrelationId, response.CorrelationId);
Assert.NotNull(updated);
Assert.Equal("Rejected", updated!.Status);
Assert.Equal("bob", updated.VerifierUser);
Assert.Equal("not authorized", updated.VerifierComment);
Assert.NotNull(updated.DecidedAtUtc);
// Operator fields are preserved (fully-populated update).
Assert.Equal("alice", updated.OperatorUser);
}
[Fact]
public void Reject_NonPendingRow_ReturnsError()
{
var row = SeedPendingWrite(7, operatorUser: "alice");
row.Status = "Approved";
var actor = CreateActor();
var envelope = Envelope(new RejectSecuredWriteCommand(7, null), "bob", "Verifier");
actor.Tell(envelope);
var response = ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal("COMMAND_FAILED", response.ErrorCode);
_securedWriteRepo.DidNotReceiveWithAnyArgs().UpdateAsync(default!, default);
}
[Fact]
public void Reject_MissingRow_ReturnsError()
{
_securedWriteRepo.GetAsync(99, Arg.Any()).Returns((PendingSecuredWrite?)null);
var actor = CreateActor();
var envelope = Envelope(new RejectSecuredWriteCommand(99, null), "bob", "Verifier");
actor.Tell(envelope);
var response = ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal("COMMAND_FAILED", response.ErrorCode);
}
// ------------------------------------------------------------------------
// List
// ------------------------------------------------------------------------
[Fact]
public void List_FiltersByStatusAndSite()
{
_securedWriteRepo.QueryAsync("Pending", "SITE1", 0, 200, Arg.Any())
.Returns(new List
{
new()
{
Id = 3, SiteId = "SITE1", ConnectionName = "Mx1", TagPath = "Tag.A",
ValueJson = "true", ValueType = "Boolean", Status = "Pending",
OperatorUser = "alice", SubmittedAtUtc = DateTime.UtcNow
}
});
var actor = CreateActor();
var envelope = Envelope(new ListSecuredWritesCommand("Pending", "SITE1"), "carol", "Viewer");
actor.Tell(envelope);
var response = ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal(envelope.CorrelationId, response.CorrelationId);
Assert.Contains("\"id\":3", response.JsonData);
Assert.Contains("alice", response.JsonData);
_securedWriteRepo.Received(1).QueryAsync("Pending", "SITE1", 0, 200, Arg.Any());
}
[Fact]
public void List_NoFilters_PassesNullsToRepository()
{
_securedWriteRepo.QueryAsync(null, null, 0, 200, Arg.Any())
.Returns(new List());
var actor = CreateActor();
var envelope = Envelope(new ListSecuredWritesCommand(null, null), "carol");
actor.Tell(envelope);
ExpectMsg(TimeSpan.FromSeconds(5));
_securedWriteRepo.Received(1).QueryAsync(null, null, 0, 200, Arg.Any());
}
// ------------------------------------------------------------------------
// Approve (execute) — Task C3
// ------------------------------------------------------------------------
/// Arms the CAS to succeed (a different verifier wins the race).
private void ArmCasSuccess(long id) =>
_securedWriteRepo.TryMarkApprovedAsync(
id, Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any())
.Returns(true);
[Fact]
public void Approve_ByDifferentVerifier_RelaysWrite_FlipsStatusToExecuted()
{
SeedPendingWrite(7, operatorUser: "alice");
ArmCasSuccess(7);
PendingSecuredWrite? updated = null;
_securedWriteRepo
.When(r => r.UpdateAsync(Arg.Any(), Arg.Any()))
.Do(ci => updated = ci.Arg());
var actor = CreateActor();
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier");
actor.Tell(envelope);
var response = ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal(envelope.CorrelationId, response.CorrelationId);
// The CAS guarded the transition (exactly once).
_securedWriteRepo.Received(1).TryMarkApprovedAsync(
7, "bob", "approved", Arg.Any(), Arg.Any());
// The write was relayed to the site MxGateway connection.
Assert.Equal(1, _comms.CallCount);
Assert.NotNull(_comms.LastRequest);
Assert.Equal("Mx1", _comms.LastRequest!.ConnectionName);
Assert.Equal("Tag.A", _comms.LastRequest.TagPath);
Assert.NotNull(updated);
Assert.Equal("Executed", updated!.Status);
Assert.NotNull(updated.ExecutedAtUtc);
Assert.Null(updated.ExecutionError);
}
[Fact]
public void Approve_BySubmittingUser_ReturnsError_CasNotCalled_NoRelay()
{
SeedPendingWrite(7, operatorUser: "alice");
var actor = CreateActor();
// Same principal that submitted attempts to approve — separation of duties.
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "self"), "alice", "Verifier");
actor.Tell(envelope);
var response = ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal("COMMAND_FAILED", response.ErrorCode);
_securedWriteRepo.DidNotReceiveWithAnyArgs().TryMarkApprovedAsync(
default, default!, default, default, default);
Assert.Equal(0, _comms.CallCount);
}
[Fact]
public void Approve_WhenCasLosesRace_ReturnsError_NoRelay()
{
SeedPendingWrite(7, operatorUser: "alice");
_securedWriteRepo.TryMarkApprovedAsync(
7, Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any())
.Returns(false);
var actor = CreateActor();
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "race"), "bob", "Verifier");
actor.Tell(envelope);
var response = ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal("COMMAND_FAILED", response.ErrorCode);
Assert.Contains("already decided", response.Error);
Assert.Equal(0, _comms.CallCount);
}
[Fact]
public void Approve_WhenRelayFails_FlipsStatusToFailed_WithError()
{
SeedPendingWrite(7, operatorUser: "alice");
ArmCasSuccess(7);
_comms.Responder = req =>
new WriteTagResponse(req.CorrelationId, Success: false, ErrorMessage: "device offline", DateTimeOffset.UtcNow);
PendingSecuredWrite? updated = null;
_securedWriteRepo
.When(r => r.UpdateAsync(Arg.Any(), Arg.Any()))
.Do(ci => updated = ci.Arg());
var actor = CreateActor();
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier");
actor.Tell(envelope);
// The approve itself succeeds (the row was decided); the relay outcome is recorded on the row.
ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal(1, _comms.CallCount);
Assert.NotNull(updated);
Assert.Equal("Failed", updated!.Status);
Assert.Equal("device offline", updated.ExecutionError);
Assert.NotNull(updated.ExecutedAtUtc);
}
[Fact]
public void Approve_WhenRelayThrows_FlipsStatusToFailed_WithExceptionMessage()
{
SeedPendingWrite(7, operatorUser: "alice");
ArmCasSuccess(7);
_comms.ThrowOnWrite = new InvalidOperationException("transport down");
PendingSecuredWrite? updated = null;
_securedWriteRepo
.When(r => r.UpdateAsync(Arg.Any(), Arg.Any()))
.Do(ci => updated = ci.Arg());
var actor = CreateActor();
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier");
actor.Tell(envelope);
ExpectMsg(TimeSpan.FromSeconds(5));
Assert.NotNull(updated);
Assert.Equal("Failed", updated!.Status);
Assert.Contains("transport down", updated.ExecutionError);
}
[Fact]
public void Approve_UnknownValueType_FlipsStatusToFailed_NoRelay()
{
var row = SeedPendingWrite(7, operatorUser: "alice");
row.ValueType = "Bogus";
ArmCasSuccess(7);
PendingSecuredWrite? updated = null;
_securedWriteRepo
.When(r => r.UpdateAsync(Arg.Any(), Arg.Any()))
.Do(ci => updated = ci.Arg());
var actor = CreateActor();
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier");
actor.Tell(envelope);
ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal(0, _comms.CallCount);
Assert.NotNull(updated);
Assert.Equal("Failed", updated!.Status);
Assert.Equal("unknown value type", updated.ExecutionError);
}
[Fact]
public void Approve_WhenConnectionNoLongerMxGateway_FlipsStatusToFailed_NoRelay()
{
// D2 / T14 TOCTOU guard: the MxGateway protocol is re-asserted AT EXECUTE, not
// only at submit. The named connection was reconfigured/recreated as a
// non-MxGateway (OPC UA) connection between submit and approval; relaying then
// would execute the secured write against a non-MxGateway adapter, violating the
// feature's core safety invariant. The row must fail deterministically with the
// protocol error and NO relay — even though the CAS already won.
SeedPendingWrite(7, operatorUser: "alice");
ArmCasSuccess(7);
// Re-stub the approve-time connection lookup to a non-MxGateway protocol,
// overriding the MxGateway seed inside SeedPendingWrite.
SeedSiteWithConnection(1, "SITE1", "Mx1", "OpcUa");
var captured = CaptureAuditEvents();
PendingSecuredWrite? updated = null;
_securedWriteRepo
.When(r => r.UpdateAsync(Arg.Any(), Arg.Any()))
.Do(ci => updated = ci.Arg());
var actor = CreateActor();
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier");
actor.Tell(envelope);
ExpectMsg(TimeSpan.FromSeconds(5));
// No relay — the connection is no longer an MxGateway target.
Assert.Equal(0, _comms.CallCount);
Assert.NotNull(updated);
Assert.Equal("Failed", updated!.Status);
Assert.NotNull(updated.ExecutedAtUtc);
Assert.Contains("no longer an MxGateway connection", updated.ExecutionError);
// The Execute audit row records the failure (canonical Failure outcome).
WaitForAuditRows(captured, 2);
var execute = SingleOfKind(captured, AuditKind.SecuredWriteExecute);
Assert.Equal(ZB.MOM.WW.Audit.AuditOutcome.Failure, execute.Outcome);
}
[Fact]
public void Approve_WhenConnectionDeleted_FlipsStatusToFailed_NotFound_NoRelay()
{
// TOCTOU companion: the connection named on the row was deleted between submit
// and approval. The re-load at execute returns no match → fail with "not found"
// and no relay.
SeedPendingWrite(7, operatorUser: "alice");
ArmCasSuccess(7);
// Re-stub the approve-time lookup so the site resolves but exposes no connections.
_siteRepo.GetDataConnectionsBySiteIdAsync(1, Arg.Any())
.Returns(new List());
PendingSecuredWrite? updated = null;
_securedWriteRepo
.When(r => r.UpdateAsync(Arg.Any(), Arg.Any()))
.Do(ci => updated = ci.Arg());
var actor = CreateActor();
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier");
actor.Tell(envelope);
ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal(0, _comms.CallCount);
Assert.NotNull(updated);
Assert.Equal("Failed", updated!.Status);
Assert.Contains("not found", updated.ExecutionError);
}
// ------------------------------------------------------------------------
// Audit emission (T14b — SecuredWrite AuditLog rows)
// ------------------------------------------------------------------------
///
/// The correlation id stamped on a secured-write audit row encodes the row's long Id
/// as a big-endian value in the final 8 bytes of an otherwise-zero Guid — mirrors the
/// production encoding in ManagementActor.SecuredWriteCorrelation.
///
private static Guid CorrelationFor(long id)
{
Span bytes = stackalloc byte[16];
System.Buffers.Binary.BinaryPrimitives.WriteInt64BigEndian(bytes[8..], id);
return new Guid(bytes);
}
[Fact]
public void Submit_EmitsExactlyOneSubmitAuditRow_WithOperatorAndCorrelation()
{
SeedSiteWithConnection(1, "SITE1", "Mx1", "MxGateway");
_securedWriteRepo.AddAsync(Arg.Any(), Arg.Any())
.Returns(55L);
var captured = CaptureAuditEvents();
var actor = CreateActor();
var envelope = Envelope(
new SubmitSecuredWriteCommand("SITE1", "Mx1", "Tag.Setpoint", "42.5", "Double", "raise"),
"alice", "Operator");
actor.Tell(envelope);
ExpectMsg(TimeSpan.FromSeconds(5));
WaitForAuditRows(captured, 1);
var evt = SingleOfKind(captured, AuditKind.SecuredWriteSubmit);
Assert.Equal("alice", evt.Actor);
Assert.Equal(CorrelationFor(55L), evt.CorrelationId);
Assert.Equal("SecuredWrite", evt.Category);
Assert.Equal("SITE1/Mx1/Tag.Setpoint", evt.Target);
// Exactly one row — no stray duplicates.
Assert.Single(captured);
}
[Fact]
public void Reject_EmitsOneRejectAuditRow_WithVerifier()
{
SeedPendingWrite(7, operatorUser: "alice");
var captured = CaptureAuditEvents();
var actor = CreateActor();
var envelope = Envelope(new RejectSecuredWriteCommand(7, "not authorized"), "bob", "Verifier");
actor.Tell(envelope);
ExpectMsg(TimeSpan.FromSeconds(5));
WaitForAuditRows(captured, 1);
var evt = SingleOfKind(captured, AuditKind.SecuredWriteReject);
Assert.Equal("bob", evt.Actor);
Assert.Equal(CorrelationFor(7L), evt.CorrelationId);
Assert.Single(captured);
}
[Fact]
public void Approve_Success_EmitsApproveThenExecuteDeliveredAuditRows()
{
SeedPendingWrite(7, operatorUser: "alice");
ArmCasSuccess(7);
var captured = CaptureAuditEvents();
var actor = CreateActor();
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier");
actor.Tell(envelope);
ExpectMsg(TimeSpan.FromSeconds(5));
WaitForAuditRows(captured, 2);
var approve = SingleOfKind(captured, AuditKind.SecuredWriteApprove);
Assert.Equal("bob", approve.Actor);
Assert.Equal(CorrelationFor(7L), approve.CorrelationId);
var execute = SingleOfKind(captured, AuditKind.SecuredWriteExecute);
Assert.Equal("bob", execute.Actor);
Assert.Equal(CorrelationFor(7L), execute.CorrelationId);
// Delivered outcome → canonical Success.
Assert.Equal(ZB.MOM.WW.Audit.AuditOutcome.Success, execute.Outcome);
Assert.Equal(2, captured.Count);
}
[Fact]
public void Submit_AuditInsertThrows_StillSucceeds_RowStillPersisted()
{
SeedSiteWithConnection(1, "SITE1", "Mx1", "MxGateway");
PendingSecuredWrite? inserted = null;
_securedWriteRepo
.When(r => r.AddAsync(Arg.Any(), Arg.Any()))
.Do(ci => inserted = ci.Arg());
_securedWriteRepo.AddAsync(Arg.Any(), Arg.Any())
.Returns(55L);
// Best-effort: a thrown audit insert must NOT abort the secured-write action.
_auditRepo.InsertIfNotExistsAsync(Arg.Any(), Arg.Any())
.Returns(Task.FromException(new InvalidOperationException("audit db down")));
var actor = CreateActor();
var envelope = Envelope(
new SubmitSecuredWriteCommand("SITE1", "Mx1", "Tag.A", "true", "Boolean", null),
"alice", "Operator");
actor.Tell(envelope);
// The action still succeeds and the row is still persisted despite the audit failure.
var response = ExpectMsg(TimeSpan.FromSeconds(5));
Assert.Equal(envelope.CorrelationId, response.CorrelationId);
Assert.NotNull(inserted);
Assert.Equal("Pending", inserted!.Status);
Assert.Equal("alice", inserted.OperatorUser);
}
[Fact]
public void Approve_CorruptListValueJson_FlipsStatusToFailed_DecodeError_NoRelay()
{
// C3 robustness fix: a List-typed value with corrupt JSON must not throw
// out of the handler and leave the row stuck Approved — it is contained,
// the row flips to Failed with a "value decode error", and no relay occurs.
var row = SeedPendingWrite(7, operatorUser: "alice");
row.ValueType = "List";
row.ValueJson = "{not valid json";
ArmCasSuccess(7);
var captured = CaptureAuditEvents();
PendingSecuredWrite? updated = null;
_securedWriteRepo
.When(r => r.UpdateAsync(Arg.Any(), Arg.Any()))
.Do(ci => updated = ci.Arg());
var actor = CreateActor();
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier");
actor.Tell(envelope);
ExpectMsg(TimeSpan.FromSeconds(5));
// No relay — the value never decoded.
Assert.Equal(0, _comms.CallCount);
Assert.NotNull(updated);
Assert.Equal("Failed", updated!.Status);
Assert.NotNull(updated.ExecutedAtUtc);
Assert.StartsWith("value decode error:", updated.ExecutionError);
// The Execute audit row records the failure (canonical Failure outcome).
WaitForAuditRows(captured, 2);
var execute = SingleOfKind(captured, AuditKind.SecuredWriteExecute);
Assert.Equal(ZB.MOM.WW.Audit.AuditOutcome.Failure, execute.Outcome);
}
}