From 1f7bb7ace30d9ae87e73735e6025ff7ec2c0bb48 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 18 Jun 2026 02:59:43 -0400 Subject: [PATCH] feat(mgmt): secured-write approve relays to site MxGateway write with CAS race guard (T14b) --- .../Repositories/ISecuredWriteRepository.cs | 25 +++ .../CommunicationService.cs | 28 +++ .../Repositories/SecuredWriteRepository.cs | 27 +++ .../ManagementActor.cs | 88 +++++++- .../SecuredWriteHandlerTests.cs | 197 ++++++++++++++++++ 5 files changed, 362 insertions(+), 3 deletions(-) diff --git a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/ISecuredWriteRepository.cs b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/ISecuredWriteRepository.cs index f2247f59..d7f35b11 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/ISecuredWriteRepository.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Commons/Interfaces/Repositories/ISecuredWriteRepository.cs @@ -54,4 +54,29 @@ public interface ISecuredWriteRepository int skip, int take, CancellationToken ct = default); + + /// + /// Atomically flips a row from Pending to Approved, stamping the + /// verifier identity, comment, and decision time, but ONLY if the row is still + /// Pending. This is the compare-and-swap guard for the two-verifier race + /// (M7 / T14b): two verifiers may approve the same write concurrently, but the + /// conditional WHERE Status='Pending' guarantees exactly one wins. The + /// loser observes false and must not relay the write. + /// + /// Identity of the pending secured write. + /// The approving verifier's username. + /// Optional free-text comment from the verifier. + /// UTC instant the approval decision was made. + /// Cancellation token. + /// + /// A task resolving to true if this caller won the approval (exactly one + /// row transitioned), or false if the row was no longer Pending + /// (already decided by another verifier). + /// + Task TryMarkApprovedAsync( + long id, + string verifierUser, + string? verifierComment, + DateTime decidedAtUtc, + CancellationToken ct = default); } diff --git a/src/ZB.MOM.WW.ScadaBridge.Communication/CommunicationService.cs b/src/ZB.MOM.WW.ScadaBridge.Communication/CommunicationService.cs index b80f627e..b78da1af 100644 --- a/src/ZB.MOM.WW.ScadaBridge.Communication/CommunicationService.cs +++ b/src/ZB.MOM.WW.ScadaBridge.Communication/CommunicationService.cs @@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Artifacts; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection; using ZB.MOM.WW.ScadaBridge.Commons.Messages.DebugView; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Health; @@ -417,6 +418,33 @@ public class CommunicationService envelope, _options.QueryTimeout, cancellationToken); } + // ── Secured Write Relay (M7 / T14b — approve → site MxGateway write) ── + + /// + /// Relays a single tag write to the site's data connection and awaits the + /// outcome. Used by the secured-write (two-person) approve flow: once a + /// Verifier's approval wins the compare-and-swap, the central ManagementActor + /// calls this to execute the write against the site's MxGateway connection. + /// The request is the existing , already handled + /// site-side by DataConnectionActor (routed to the MxGateway adapter) — + /// no site-side change is required. The Ask is bounded by + /// , mirroring + /// and the other one-shot site queries. + /// + /// The target site identifier. + /// The tag write request (correlation id + connection + tag + value + timestamp). + /// Cancellation token. + /// The write response (success flag + optional error message). + public virtual Task WriteTagAsync( + string siteId, + WriteTagRequest request, + CancellationToken ct = default) + { + var envelope = new SiteEnvelope(siteId, request); + return GetActor().Ask( + envelope, _options.QueryTimeout, ct); + } + // ── Pattern 8: Heartbeat (site→central, Tell) ── // Heartbeats are received by central, not sent. No method needed here. diff --git a/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/SecuredWriteRepository.cs b/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/SecuredWriteRepository.cs index 5fa53fe0..cc34852b 100644 --- a/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/SecuredWriteRepository.cs +++ b/src/ZB.MOM.WW.ScadaBridge.ConfigurationDatabase/Repositories/SecuredWriteRepository.cs @@ -79,4 +79,31 @@ public class SecuredWriteRepository : ISecuredWriteRepository .Take(take) .ToListAsync(ct); } + + /// + public async Task TryMarkApprovedAsync( + long id, + string verifierUser, + string? verifierComment, + DateTime decidedAtUtc, + CancellationToken ct = default) + { + // Single-statement compare-and-swap: the conditional WHERE Status='Pending' + // makes the Pending->Approved transition atomic at the row level, so two + // verifiers approving concurrently produce exactly one rowsAffected==1 (the + // winner) and one rowsAffected==0 (the loser). Parameterised via + // ExecuteSqlInterpolatedAsync — same raw-SQL conditional-update pattern as + // SiteCallAuditRepository's upsert-on-newer-status path. + var rowsAffected = await _context.Database.ExecuteSqlInterpolatedAsync( + $@"UPDATE dbo.PendingSecuredWrites +SET Status = 'Approved', + VerifierUser = {verifierUser}, + VerifierComment = {verifierComment}, + DecidedAtUtc = {decidedAtUtc} +WHERE Id = {id} + AND Status = 'Pending';", + ct); + + return rowsAffected == 1; + } } diff --git a/src/ZB.MOM.WW.ScadaBridge.ManagementService/ManagementActor.cs b/src/ZB.MOM.WW.ScadaBridge.ManagementService/ManagementActor.cs index 8ef7e113..5a1db8db 100644 --- a/src/ZB.MOM.WW.ScadaBridge.ManagementService/ManagementActor.cs +++ b/src/ZB.MOM.WW.ScadaBridge.ManagementService/ManagementActor.cs @@ -378,10 +378,11 @@ public class ManagementActor : ReceiveActor DiscardParkedMessageCommand cmd => await HandleDiscardParkedMessage(sp, cmd, user), DebugSnapshotCommand cmd => await HandleDebugSnapshot(sp, cmd, user), - // Secured writes (M7 / T14b). Approve (execute) is intentionally NOT - // dispatched here — the approve->execute relay is Task C3; it would - // 'NotSupported' at runtime until then. + // Secured writes (M7 / T14b). Approve executes the write — once a + // Verifier wins the compare-and-swap the value is relayed to the site + // MxGateway connection (Task C3). SubmitSecuredWriteCommand cmd => await HandleSubmitSecuredWrite(sp, cmd, user), + ApproveSecuredWriteCommand cmd => await HandleApproveSecuredWrite(sp, cmd, user), RejectSecuredWriteCommand cmd => await HandleRejectSecuredWrite(sp, cmd, user), ListSecuredWritesCommand cmd => await HandleListSecuredWrites(sp, cmd), @@ -928,6 +929,87 @@ public class ManagementActor : ReceiveActor return ToSecuredWriteDto(entity); } + private static async Task HandleApproveSecuredWrite( + IServiceProvider sp, ApproveSecuredWriteCommand cmd, AuthenticatedUser user) + { + var repo = sp.GetRequiredService(); + var row = await repo.GetAsync(cmd.Id) + ?? throw new ManagementCommandException($"Secured write {cmd.Id} not found."); + + if (!string.Equals(row.Status, "Pending", StringComparison.Ordinal)) + throw new ManagementCommandException( + $"Secured write {cmd.Id} is '{row.Status}', not Pending; it cannot be approved."); + + // Separation of duties: a write may not be verified by its submitter. Checked + // BEFORE the CAS so a self-approval never consumes the Pending->Approved + // transition. + if (string.Equals(row.OperatorUser, user.Username, StringComparison.OrdinalIgnoreCase)) + throw new ManagementCommandException( + "A secured write cannot be verified by its submitter."); + + // Compare-and-swap: guards the two-verifier race. Exactly one concurrent + // approver flips Pending->Approved; the loser observes false here and must + // not relay the write. + var decidedAtUtc = DateTime.UtcNow; + if (!await repo.TryMarkApprovedAsync(cmd.Id, user.Username, cmd.Comment, decidedAtUtc)) + throw new ManagementCommandException( + $"Secured write {cmd.Id} is no longer pending — already decided."); + + // We won the race. Stamp the verifier decision locally so the entity we + // persist below carries the same values the CAS committed. + row.Status = "Approved"; + row.VerifierUser = user.Username; + row.VerifierComment = cmd.Comment; + row.DecidedAtUtc = decidedAtUtc; + + // Validate the value type BEFORE attempting the relay. An unknown type can + // never be decoded/written, so fail the row deterministically rather than + // leaving it stuck Approved. (Addresses the C2 reviewer's deferred + // ValueType-validation note.) + if (!Enum.TryParse(row.ValueType, ignoreCase: true, out var dataType)) + { + row.Status = "Failed"; + row.ExecutedAtUtc = DateTime.UtcNow; + row.ExecutionError = "unknown value type"; + await repo.UpdateAsync(row); + return ToSecuredWriteDto(row); + } + + var value = Commons.Types.AttributeValueCodec.Decode(row.ValueJson, dataType, elementType: null); + + // Relay the write to the site MxGateway connection. A transport exception is + // contained so the row is never left stuck Approved. + var commService = sp.GetRequiredService(); + bool success; + string? error; + try + { + var resp = await commService.WriteTagAsync( + row.SiteId, + new Commons.Messages.DataConnection.WriteTagRequest( + CorrelationId: Guid.NewGuid().ToString("N"), + ConnectionName: row.ConnectionName, + TagPath: row.TagPath, + Value: value, + Timestamp: DateTimeOffset.UtcNow)); + success = resp.Success; + error = resp.Success ? null : resp.ErrorMessage; + } + catch (Exception ex) + { + success = false; + error = ex.Message; + } + + row.Status = success ? "Executed" : "Failed"; + row.ExecutedAtUtc = DateTime.UtcNow; + row.ExecutionError = error; + + // UpdateAsync overwrites all columns -> pass the fully-populated entity. + await repo.UpdateAsync(row); + return ToSecuredWriteDto(row); + } + private static async Task HandleRejectSecuredWrite( IServiceProvider sp, RejectSecuredWriteCommand cmd, AuthenticatedUser user) { diff --git a/tests/ZB.MOM.WW.ScadaBridge.ManagementService.Tests/SecuredWriteHandlerTests.cs b/tests/ZB.MOM.WW.ScadaBridge.ManagementService.Tests/SecuredWriteHandlerTests.cs index aa30bda8..d9065278 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.ManagementService.Tests/SecuredWriteHandlerTests.cs +++ b/tests/ZB.MOM.WW.ScadaBridge.ManagementService.Tests/SecuredWriteHandlerTests.cs @@ -2,12 +2,15 @@ using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; using NSubstitute; using ZB.MOM.WW.ScadaBridge.Commons.Entities.SecuredWrites; using ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Management; using ZB.MOM.WW.ScadaBridge.Commons.Types; +using ZB.MOM.WW.ScadaBridge.Communication; namespace ZB.MOM.WW.ScadaBridge.ManagementService.Tests; @@ -21,16 +24,54 @@ public class SecuredWriteHandlerTests : TestKit, IDisposable { private readonly ISiteRepository _siteRepo; private readonly ISecuredWriteRepository _securedWriteRepo; + private readonly StubCommunicationService _comms; private readonly ServiceCollection _services; public SecuredWriteHandlerTests() { _siteRepo = Substitute.For(); _securedWriteRepo = Substitute.For(); + _comms = new StubCommunicationService(); _services = new ServiceCollection(); _services.AddScoped(_ => _siteRepo); _services.AddScoped(_ => _securedWriteRepo); + _services.AddSingleton(_comms); + } + + /// + /// Test double for the site-write seam. + /// is virtual so the approve relay can be exercised without a live actor system; + /// records the relayed request and returns a canned response. + /// + private sealed class StubCommunicationService : CommunicationService + { + public StubCommunicationService() + : base(Options.Create(new CommunicationOptions()), NullLogger.Instance) + { + } + + public WriteTagRequest? LastRequest { get; private set; } + + public int CallCount { get; private set; } + + public Func Responder { get; set; } = + req => new WriteTagResponse(req.CorrelationId, Success: true, ErrorMessage: null, DateTimeOffset.UtcNow); + + public Exception? ThrowOnWrite { get; set; } + + public override Task WriteTagAsync( + string siteId, WriteTagRequest request, CancellationToken ct = default) + { + CallCount++; + LastRequest = request; + if (ThrowOnWrite is not null) + { + return Task.FromException(ThrowOnWrite); + } + + return Task.FromResult(Responder(request)); + } } private IActorRef CreateActor() @@ -270,4 +311,160 @@ public class SecuredWriteHandlerTests : TestKit, IDisposable ExpectMsg(TimeSpan.FromSeconds(5)); _securedWriteRepo.Received(1).QueryAsync(null, null, 0, 200, Arg.Any()); } + + // ------------------------------------------------------------------------ + // Approve (execute) — Task C3 + // ------------------------------------------------------------------------ + + /// Arms the CAS to succeed (a different verifier wins the race). + private void ArmCasSuccess(long id) => + _securedWriteRepo.TryMarkApprovedAsync( + id, Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(true); + + [Fact] + public void Approve_ByDifferentVerifier_RelaysWrite_FlipsStatusToExecuted() + { + SeedPendingWrite(7, operatorUser: "alice"); + ArmCasSuccess(7); + + PendingSecuredWrite? updated = null; + _securedWriteRepo + .When(r => r.UpdateAsync(Arg.Any(), Arg.Any())) + .Do(ci => updated = ci.Arg()); + + var actor = CreateActor(); + var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier"); + + actor.Tell(envelope); + + var response = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal(envelope.CorrelationId, response.CorrelationId); + + // The CAS guarded the transition (exactly once). + _securedWriteRepo.Received(1).TryMarkApprovedAsync( + 7, "bob", "approved", Arg.Any(), Arg.Any()); + + // The write was relayed to the site MxGateway connection. + Assert.Equal(1, _comms.CallCount); + Assert.NotNull(_comms.LastRequest); + Assert.Equal("Mx1", _comms.LastRequest!.ConnectionName); + Assert.Equal("Tag.A", _comms.LastRequest.TagPath); + + Assert.NotNull(updated); + Assert.Equal("Executed", updated!.Status); + Assert.NotNull(updated.ExecutedAtUtc); + Assert.Null(updated.ExecutionError); + } + + [Fact] + public void Approve_BySubmittingUser_ReturnsError_CasNotCalled_NoRelay() + { + SeedPendingWrite(7, operatorUser: "alice"); + + var actor = CreateActor(); + // Same principal that submitted attempts to approve — separation of duties. + var envelope = Envelope(new ApproveSecuredWriteCommand(7, "self"), "alice", "Verifier"); + + actor.Tell(envelope); + + var response = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal("COMMAND_FAILED", response.ErrorCode); + _securedWriteRepo.DidNotReceiveWithAnyArgs().TryMarkApprovedAsync( + default, default!, default, default, default); + Assert.Equal(0, _comms.CallCount); + } + + [Fact] + public void Approve_WhenCasLosesRace_ReturnsError_NoRelay() + { + SeedPendingWrite(7, operatorUser: "alice"); + _securedWriteRepo.TryMarkApprovedAsync( + 7, Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(false); + + var actor = CreateActor(); + var envelope = Envelope(new ApproveSecuredWriteCommand(7, "race"), "bob", "Verifier"); + + actor.Tell(envelope); + + var response = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal("COMMAND_FAILED", response.ErrorCode); + Assert.Contains("already decided", response.Error); + Assert.Equal(0, _comms.CallCount); + } + + [Fact] + public void Approve_WhenRelayFails_FlipsStatusToFailed_WithError() + { + SeedPendingWrite(7, operatorUser: "alice"); + ArmCasSuccess(7); + _comms.Responder = req => + new WriteTagResponse(req.CorrelationId, Success: false, ErrorMessage: "device offline", DateTimeOffset.UtcNow); + + PendingSecuredWrite? updated = null; + _securedWriteRepo + .When(r => r.UpdateAsync(Arg.Any(), Arg.Any())) + .Do(ci => updated = ci.Arg()); + + var actor = CreateActor(); + var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier"); + + actor.Tell(envelope); + + // The approve itself succeeds (the row was decided); the relay outcome is recorded on the row. + ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal(1, _comms.CallCount); + Assert.NotNull(updated); + Assert.Equal("Failed", updated!.Status); + Assert.Equal("device offline", updated.ExecutionError); + Assert.NotNull(updated.ExecutedAtUtc); + } + + [Fact] + public void Approve_WhenRelayThrows_FlipsStatusToFailed_WithExceptionMessage() + { + SeedPendingWrite(7, operatorUser: "alice"); + ArmCasSuccess(7); + _comms.ThrowOnWrite = new InvalidOperationException("transport down"); + + PendingSecuredWrite? updated = null; + _securedWriteRepo + .When(r => r.UpdateAsync(Arg.Any(), Arg.Any())) + .Do(ci => updated = ci.Arg()); + + var actor = CreateActor(); + var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier"); + + actor.Tell(envelope); + + ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.NotNull(updated); + Assert.Equal("Failed", updated!.Status); + Assert.Contains("transport down", updated.ExecutionError); + } + + [Fact] + public void Approve_UnknownValueType_FlipsStatusToFailed_NoRelay() + { + var row = SeedPendingWrite(7, operatorUser: "alice"); + row.ValueType = "Bogus"; + ArmCasSuccess(7); + + PendingSecuredWrite? updated = null; + _securedWriteRepo + .When(r => r.UpdateAsync(Arg.Any(), Arg.Any())) + .Do(ci => updated = ci.Arg()); + + var actor = CreateActor(); + var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier"); + + actor.Tell(envelope); + + ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal(0, _comms.CallCount); + Assert.NotNull(updated); + Assert.Equal("Failed", updated!.Status); + Assert.Equal("unknown value type", updated.ExecutionError); + } }