feat(mgmt): secured-write approve relays to site MxGateway write with CAS race guard (T14b)
This commit is contained in:
@@ -54,4 +54,29 @@ public interface ISecuredWriteRepository
|
||||
int skip,
|
||||
int take,
|
||||
CancellationToken ct = default);
|
||||
|
||||
/// <summary>
|
||||
/// Atomically flips a row from <c>Pending</c> to <c>Approved</c>, stamping the
|
||||
/// verifier identity, comment, and decision time, but ONLY if the row is still
|
||||
/// <c>Pending</c>. This is the compare-and-swap guard for the two-verifier race
|
||||
/// (M7 / T14b): two verifiers may approve the same write concurrently, but the
|
||||
/// conditional <c>WHERE Status='Pending'</c> guarantees exactly one wins. The
|
||||
/// loser observes <c>false</c> and must not relay the write.
|
||||
/// </summary>
|
||||
/// <param name="id">Identity of the pending secured write.</param>
|
||||
/// <param name="verifierUser">The approving verifier's username.</param>
|
||||
/// <param name="verifierComment">Optional free-text comment from the verifier.</param>
|
||||
/// <param name="decidedAtUtc">UTC instant the approval decision was made.</param>
|
||||
/// <param name="ct">Cancellation token.</param>
|
||||
/// <returns>
|
||||
/// A task resolving to <c>true</c> if this caller won the approval (exactly one
|
||||
/// row transitioned), or <c>false</c> if the row was no longer <c>Pending</c>
|
||||
/// (already decided by another verifier).
|
||||
/// </returns>
|
||||
Task<bool> TryMarkApprovedAsync(
|
||||
long id,
|
||||
string verifierUser,
|
||||
string? verifierComment,
|
||||
DateTime decidedAtUtc,
|
||||
CancellationToken ct = default);
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Artifacts;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DebugView;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Health;
|
||||
@@ -417,6 +418,33 @@ public class CommunicationService
|
||||
envelope, _options.QueryTimeout, cancellationToken);
|
||||
}
|
||||
|
||||
// ── Secured Write Relay (M7 / T14b — approve → site MxGateway write) ──
|
||||
|
||||
/// <summary>
|
||||
/// Relays a single tag write to the site's data connection and awaits the
|
||||
/// outcome. Used by the secured-write (two-person) approve flow: once a
|
||||
/// Verifier's approval wins the compare-and-swap, the central ManagementActor
|
||||
/// calls this to execute the write against the site's MxGateway connection.
|
||||
/// The request is the existing <see cref="WriteTagRequest"/>, already handled
|
||||
/// site-side by <c>DataConnectionActor</c> (routed to the MxGateway adapter) —
|
||||
/// no site-side change is required. The Ask is bounded by
|
||||
/// <see cref="CommunicationOptions.QueryTimeout"/>, mirroring
|
||||
/// <see cref="BrowseNodeAsync"/> and the other one-shot site queries.
|
||||
/// </summary>
|
||||
/// <param name="siteId">The target site identifier.</param>
|
||||
/// <param name="request">The tag write request (correlation id + connection + tag + value + timestamp).</param>
|
||||
/// <param name="ct">Cancellation token.</param>
|
||||
/// <returns>The write response (success flag + optional error message).</returns>
|
||||
public virtual Task<WriteTagResponse> WriteTagAsync(
|
||||
string siteId,
|
||||
WriteTagRequest request,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
var envelope = new SiteEnvelope(siteId, request);
|
||||
return GetActor().Ask<WriteTagResponse>(
|
||||
envelope, _options.QueryTimeout, ct);
|
||||
}
|
||||
|
||||
// ── Pattern 8: Heartbeat (site→central, Tell) ──
|
||||
// Heartbeats are received by central, not sent. No method needed here.
|
||||
|
||||
|
||||
+27
@@ -79,4 +79,31 @@ public class SecuredWriteRepository : ISecuredWriteRepository
|
||||
.Take(take)
|
||||
.ToListAsync(ct);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<bool> TryMarkApprovedAsync(
|
||||
long id,
|
||||
string verifierUser,
|
||||
string? verifierComment,
|
||||
DateTime decidedAtUtc,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
// Single-statement compare-and-swap: the conditional WHERE Status='Pending'
|
||||
// makes the Pending->Approved transition atomic at the row level, so two
|
||||
// verifiers approving concurrently produce exactly one rowsAffected==1 (the
|
||||
// winner) and one rowsAffected==0 (the loser). Parameterised via
|
||||
// ExecuteSqlInterpolatedAsync — same raw-SQL conditional-update pattern as
|
||||
// SiteCallAuditRepository's upsert-on-newer-status path.
|
||||
var rowsAffected = await _context.Database.ExecuteSqlInterpolatedAsync(
|
||||
$@"UPDATE dbo.PendingSecuredWrites
|
||||
SET Status = 'Approved',
|
||||
VerifierUser = {verifierUser},
|
||||
VerifierComment = {verifierComment},
|
||||
DecidedAtUtc = {decidedAtUtc}
|
||||
WHERE Id = {id}
|
||||
AND Status = 'Pending';",
|
||||
ct);
|
||||
|
||||
return rowsAffected == 1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -378,10 +378,11 @@ public class ManagementActor : ReceiveActor
|
||||
DiscardParkedMessageCommand cmd => await HandleDiscardParkedMessage(sp, cmd, user),
|
||||
DebugSnapshotCommand cmd => await HandleDebugSnapshot(sp, cmd, user),
|
||||
|
||||
// Secured writes (M7 / T14b). Approve (execute) is intentionally NOT
|
||||
// dispatched here — the approve->execute relay is Task C3; it would
|
||||
// 'NotSupported' at runtime until then.
|
||||
// Secured writes (M7 / T14b). Approve executes the write — once a
|
||||
// Verifier wins the compare-and-swap the value is relayed to the site
|
||||
// MxGateway connection (Task C3).
|
||||
SubmitSecuredWriteCommand cmd => await HandleSubmitSecuredWrite(sp, cmd, user),
|
||||
ApproveSecuredWriteCommand cmd => await HandleApproveSecuredWrite(sp, cmd, user),
|
||||
RejectSecuredWriteCommand cmd => await HandleRejectSecuredWrite(sp, cmd, user),
|
||||
ListSecuredWritesCommand cmd => await HandleListSecuredWrites(sp, cmd),
|
||||
|
||||
@@ -928,6 +929,87 @@ public class ManagementActor : ReceiveActor
|
||||
return ToSecuredWriteDto(entity);
|
||||
}
|
||||
|
||||
private static async Task<object?> HandleApproveSecuredWrite(
|
||||
IServiceProvider sp, ApproveSecuredWriteCommand cmd, AuthenticatedUser user)
|
||||
{
|
||||
var repo = sp.GetRequiredService<ISecuredWriteRepository>();
|
||||
var row = await repo.GetAsync(cmd.Id)
|
||||
?? throw new ManagementCommandException($"Secured write {cmd.Id} not found.");
|
||||
|
||||
if (!string.Equals(row.Status, "Pending", StringComparison.Ordinal))
|
||||
throw new ManagementCommandException(
|
||||
$"Secured write {cmd.Id} is '{row.Status}', not Pending; it cannot be approved.");
|
||||
|
||||
// Separation of duties: a write may not be verified by its submitter. Checked
|
||||
// BEFORE the CAS so a self-approval never consumes the Pending->Approved
|
||||
// transition.
|
||||
if (string.Equals(row.OperatorUser, user.Username, StringComparison.OrdinalIgnoreCase))
|
||||
throw new ManagementCommandException(
|
||||
"A secured write cannot be verified by its submitter.");
|
||||
|
||||
// Compare-and-swap: guards the two-verifier race. Exactly one concurrent
|
||||
// approver flips Pending->Approved; the loser observes false here and must
|
||||
// not relay the write.
|
||||
var decidedAtUtc = DateTime.UtcNow;
|
||||
if (!await repo.TryMarkApprovedAsync(cmd.Id, user.Username, cmd.Comment, decidedAtUtc))
|
||||
throw new ManagementCommandException(
|
||||
$"Secured write {cmd.Id} is no longer pending — already decided.");
|
||||
|
||||
// We won the race. Stamp the verifier decision locally so the entity we
|
||||
// persist below carries the same values the CAS committed.
|
||||
row.Status = "Approved";
|
||||
row.VerifierUser = user.Username;
|
||||
row.VerifierComment = cmd.Comment;
|
||||
row.DecidedAtUtc = decidedAtUtc;
|
||||
|
||||
// Validate the value type BEFORE attempting the relay. An unknown type can
|
||||
// never be decoded/written, so fail the row deterministically rather than
|
||||
// leaving it stuck Approved. (Addresses the C2 reviewer's deferred
|
||||
// ValueType-validation note.)
|
||||
if (!Enum.TryParse<Commons.Types.Enums.DataType>(row.ValueType, ignoreCase: true, out var dataType))
|
||||
{
|
||||
row.Status = "Failed";
|
||||
row.ExecutedAtUtc = DateTime.UtcNow;
|
||||
row.ExecutionError = "unknown value type";
|
||||
await repo.UpdateAsync(row);
|
||||
return ToSecuredWriteDto(row);
|
||||
}
|
||||
|
||||
var value = Commons.Types.AttributeValueCodec.Decode(row.ValueJson, dataType, elementType: null);
|
||||
|
||||
// Relay the write to the site MxGateway connection. A transport exception is
|
||||
// contained so the row is never left stuck Approved.
|
||||
var commService = sp.GetRequiredService<CommunicationService>();
|
||||
bool success;
|
||||
string? error;
|
||||
try
|
||||
{
|
||||
var resp = await commService.WriteTagAsync(
|
||||
row.SiteId,
|
||||
new Commons.Messages.DataConnection.WriteTagRequest(
|
||||
CorrelationId: Guid.NewGuid().ToString("N"),
|
||||
ConnectionName: row.ConnectionName,
|
||||
TagPath: row.TagPath,
|
||||
Value: value,
|
||||
Timestamp: DateTimeOffset.UtcNow));
|
||||
success = resp.Success;
|
||||
error = resp.Success ? null : resp.ErrorMessage;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
success = false;
|
||||
error = ex.Message;
|
||||
}
|
||||
|
||||
row.Status = success ? "Executed" : "Failed";
|
||||
row.ExecutedAtUtc = DateTime.UtcNow;
|
||||
row.ExecutionError = error;
|
||||
|
||||
// UpdateAsync overwrites all columns -> pass the fully-populated entity.
|
||||
await repo.UpdateAsync(row);
|
||||
return ToSecuredWriteDto(row);
|
||||
}
|
||||
|
||||
private static async Task<object?> HandleRejectSecuredWrite(
|
||||
IServiceProvider sp, RejectSecuredWriteCommand cmd, AuthenticatedUser user)
|
||||
{
|
||||
|
||||
@@ -2,12 +2,15 @@ using Akka.Actor;
|
||||
using Akka.TestKit.Xunit2;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using NSubstitute;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Entities.SecuredWrites;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Management;
|
||||
using ZB.MOM.WW.ScadaBridge.Commons.Types;
|
||||
using ZB.MOM.WW.ScadaBridge.Communication;
|
||||
|
||||
namespace ZB.MOM.WW.ScadaBridge.ManagementService.Tests;
|
||||
|
||||
@@ -21,16 +24,54 @@ public class SecuredWriteHandlerTests : TestKit, IDisposable
|
||||
{
|
||||
private readonly ISiteRepository _siteRepo;
|
||||
private readonly ISecuredWriteRepository _securedWriteRepo;
|
||||
private readonly StubCommunicationService _comms;
|
||||
private readonly ServiceCollection _services;
|
||||
|
||||
public SecuredWriteHandlerTests()
|
||||
{
|
||||
_siteRepo = Substitute.For<ISiteRepository>();
|
||||
_securedWriteRepo = Substitute.For<ISecuredWriteRepository>();
|
||||
_comms = new StubCommunicationService();
|
||||
|
||||
_services = new ServiceCollection();
|
||||
_services.AddScoped(_ => _siteRepo);
|
||||
_services.AddScoped(_ => _securedWriteRepo);
|
||||
_services.AddSingleton<CommunicationService>(_comms);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Test double for the site-write seam. <see cref="CommunicationService.WriteTagAsync"/>
|
||||
/// is virtual so the approve relay can be exercised without a live actor system;
|
||||
/// records the relayed request and returns a canned response.
|
||||
/// </summary>
|
||||
private sealed class StubCommunicationService : CommunicationService
|
||||
{
|
||||
public StubCommunicationService()
|
||||
: base(Options.Create(new CommunicationOptions()), NullLogger<CommunicationService>.Instance)
|
||||
{
|
||||
}
|
||||
|
||||
public WriteTagRequest? LastRequest { get; private set; }
|
||||
|
||||
public int CallCount { get; private set; }
|
||||
|
||||
public Func<WriteTagRequest, WriteTagResponse> Responder { get; set; } =
|
||||
req => new WriteTagResponse(req.CorrelationId, Success: true, ErrorMessage: null, DateTimeOffset.UtcNow);
|
||||
|
||||
public Exception? ThrowOnWrite { get; set; }
|
||||
|
||||
public override Task<WriteTagResponse> WriteTagAsync(
|
||||
string siteId, WriteTagRequest request, CancellationToken ct = default)
|
||||
{
|
||||
CallCount++;
|
||||
LastRequest = request;
|
||||
if (ThrowOnWrite is not null)
|
||||
{
|
||||
return Task.FromException<WriteTagResponse>(ThrowOnWrite);
|
||||
}
|
||||
|
||||
return Task.FromResult(Responder(request));
|
||||
}
|
||||
}
|
||||
|
||||
private IActorRef CreateActor()
|
||||
@@ -270,4 +311,160 @@ public class SecuredWriteHandlerTests : TestKit, IDisposable
|
||||
ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
|
||||
_securedWriteRepo.Received(1).QueryAsync(null, null, 0, 200, Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------
|
||||
// Approve (execute) — Task C3
|
||||
// ------------------------------------------------------------------------
|
||||
|
||||
/// <summary>Arms the CAS to succeed (a different verifier wins the race).</summary>
|
||||
private void ArmCasSuccess(long id) =>
|
||||
_securedWriteRepo.TryMarkApprovedAsync(
|
||||
id, Arg.Any<string>(), Arg.Any<string?>(), Arg.Any<DateTime>(), Arg.Any<CancellationToken>())
|
||||
.Returns(true);
|
||||
|
||||
[Fact]
|
||||
public void Approve_ByDifferentVerifier_RelaysWrite_FlipsStatusToExecuted()
|
||||
{
|
||||
SeedPendingWrite(7, operatorUser: "alice");
|
||||
ArmCasSuccess(7);
|
||||
|
||||
PendingSecuredWrite? updated = null;
|
||||
_securedWriteRepo
|
||||
.When(r => r.UpdateAsync(Arg.Any<PendingSecuredWrite>(), Arg.Any<CancellationToken>()))
|
||||
.Do(ci => updated = ci.Arg<PendingSecuredWrite>());
|
||||
|
||||
var actor = CreateActor();
|
||||
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier");
|
||||
|
||||
actor.Tell(envelope);
|
||||
|
||||
var response = ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
|
||||
Assert.Equal(envelope.CorrelationId, response.CorrelationId);
|
||||
|
||||
// The CAS guarded the transition (exactly once).
|
||||
_securedWriteRepo.Received(1).TryMarkApprovedAsync(
|
||||
7, "bob", "approved", Arg.Any<DateTime>(), Arg.Any<CancellationToken>());
|
||||
|
||||
// The write was relayed to the site MxGateway connection.
|
||||
Assert.Equal(1, _comms.CallCount);
|
||||
Assert.NotNull(_comms.LastRequest);
|
||||
Assert.Equal("Mx1", _comms.LastRequest!.ConnectionName);
|
||||
Assert.Equal("Tag.A", _comms.LastRequest.TagPath);
|
||||
|
||||
Assert.NotNull(updated);
|
||||
Assert.Equal("Executed", updated!.Status);
|
||||
Assert.NotNull(updated.ExecutedAtUtc);
|
||||
Assert.Null(updated.ExecutionError);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Approve_BySubmittingUser_ReturnsError_CasNotCalled_NoRelay()
|
||||
{
|
||||
SeedPendingWrite(7, operatorUser: "alice");
|
||||
|
||||
var actor = CreateActor();
|
||||
// Same principal that submitted attempts to approve — separation of duties.
|
||||
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "self"), "alice", "Verifier");
|
||||
|
||||
actor.Tell(envelope);
|
||||
|
||||
var response = ExpectMsg<ManagementError>(TimeSpan.FromSeconds(5));
|
||||
Assert.Equal("COMMAND_FAILED", response.ErrorCode);
|
||||
_securedWriteRepo.DidNotReceiveWithAnyArgs().TryMarkApprovedAsync(
|
||||
default, default!, default, default, default);
|
||||
Assert.Equal(0, _comms.CallCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Approve_WhenCasLosesRace_ReturnsError_NoRelay()
|
||||
{
|
||||
SeedPendingWrite(7, operatorUser: "alice");
|
||||
_securedWriteRepo.TryMarkApprovedAsync(
|
||||
7, Arg.Any<string>(), Arg.Any<string?>(), Arg.Any<DateTime>(), Arg.Any<CancellationToken>())
|
||||
.Returns(false);
|
||||
|
||||
var actor = CreateActor();
|
||||
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "race"), "bob", "Verifier");
|
||||
|
||||
actor.Tell(envelope);
|
||||
|
||||
var response = ExpectMsg<ManagementError>(TimeSpan.FromSeconds(5));
|
||||
Assert.Equal("COMMAND_FAILED", response.ErrorCode);
|
||||
Assert.Contains("already decided", response.Error);
|
||||
Assert.Equal(0, _comms.CallCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Approve_WhenRelayFails_FlipsStatusToFailed_WithError()
|
||||
{
|
||||
SeedPendingWrite(7, operatorUser: "alice");
|
||||
ArmCasSuccess(7);
|
||||
_comms.Responder = req =>
|
||||
new WriteTagResponse(req.CorrelationId, Success: false, ErrorMessage: "device offline", DateTimeOffset.UtcNow);
|
||||
|
||||
PendingSecuredWrite? updated = null;
|
||||
_securedWriteRepo
|
||||
.When(r => r.UpdateAsync(Arg.Any<PendingSecuredWrite>(), Arg.Any<CancellationToken>()))
|
||||
.Do(ci => updated = ci.Arg<PendingSecuredWrite>());
|
||||
|
||||
var actor = CreateActor();
|
||||
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier");
|
||||
|
||||
actor.Tell(envelope);
|
||||
|
||||
// The approve itself succeeds (the row was decided); the relay outcome is recorded on the row.
|
||||
ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
|
||||
Assert.Equal(1, _comms.CallCount);
|
||||
Assert.NotNull(updated);
|
||||
Assert.Equal("Failed", updated!.Status);
|
||||
Assert.Equal("device offline", updated.ExecutionError);
|
||||
Assert.NotNull(updated.ExecutedAtUtc);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Approve_WhenRelayThrows_FlipsStatusToFailed_WithExceptionMessage()
|
||||
{
|
||||
SeedPendingWrite(7, operatorUser: "alice");
|
||||
ArmCasSuccess(7);
|
||||
_comms.ThrowOnWrite = new InvalidOperationException("transport down");
|
||||
|
||||
PendingSecuredWrite? updated = null;
|
||||
_securedWriteRepo
|
||||
.When(r => r.UpdateAsync(Arg.Any<PendingSecuredWrite>(), Arg.Any<CancellationToken>()))
|
||||
.Do(ci => updated = ci.Arg<PendingSecuredWrite>());
|
||||
|
||||
var actor = CreateActor();
|
||||
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier");
|
||||
|
||||
actor.Tell(envelope);
|
||||
|
||||
ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
|
||||
Assert.NotNull(updated);
|
||||
Assert.Equal("Failed", updated!.Status);
|
||||
Assert.Contains("transport down", updated.ExecutionError);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Approve_UnknownValueType_FlipsStatusToFailed_NoRelay()
|
||||
{
|
||||
var row = SeedPendingWrite(7, operatorUser: "alice");
|
||||
row.ValueType = "Bogus";
|
||||
ArmCasSuccess(7);
|
||||
|
||||
PendingSecuredWrite? updated = null;
|
||||
_securedWriteRepo
|
||||
.When(r => r.UpdateAsync(Arg.Any<PendingSecuredWrite>(), Arg.Any<CancellationToken>()))
|
||||
.Do(ci => updated = ci.Arg<PendingSecuredWrite>());
|
||||
|
||||
var actor = CreateActor();
|
||||
var envelope = Envelope(new ApproveSecuredWriteCommand(7, "approved"), "bob", "Verifier");
|
||||
|
||||
actor.Tell(envelope);
|
||||
|
||||
ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
|
||||
Assert.Equal(0, _comms.CallCount);
|
||||
Assert.NotNull(updated);
|
||||
Assert.Equal("Failed", updated!.Status);
|
||||
Assert.Equal("unknown value type", updated.ExecutionError);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user