feat(mgmt): secured-write submit/reject/list handlers + Operator/Verifier gating (T14b)

This commit is contained in:
Joseph Doherty
2026-06-18 02:29:29 -04:00
parent 586d54359c
commit 25c9240415
3 changed files with 458 additions and 0 deletions
@@ -0,0 +1,83 @@
namespace ZB.MOM.WW.ScadaBridge.Commons.Messages.Management;
// ============================================================================
// Two-person ("secured") write commands (M7 OPC UA / MxGateway UX, Task T14b).
//
// An Operator SUBMITS a pending secured write against an MxGateway data
// connection; a distinct Verifier later APPROVES (executes — Task C3) or
// REJECTS it. Separation of duties is enforced at the handler: a write may not
// be verified by the same principal that submitted it. ListSecuredWrites is a
// read-only query (any authenticated user). Role gating lives in
// ManagementActor.GetRequiredRole (Operator for submit; Verifier for
// reject/approve).
// ============================================================================
/// <summary>
/// Operator request to submit a pending secured write. The target connection
/// must exist within the site and use the MxGateway protocol; the value is
/// captured as <paramref name="ValueJson"/> interpreted per <paramref name="ValueType"/>.
/// Returns the newly-created <see cref="SecuredWriteDto"/>.
/// </summary>
/// <param name="SiteId">Site identifier the write targets.</param>
/// <param name="ConnectionName">Data connection name within the site.</param>
/// <param name="TagPath">Fully-qualified tag path the value is written to.</param>
/// <param name="ValueJson">JSON-serialised value to write.</param>
/// <param name="ValueType">Target data type name (e.g. <c>Boolean</c>, <c>Double</c>).</param>
/// <param name="Comment">Optional free-text comment supplied by the operator.</param>
public record SubmitSecuredWriteCommand(
string SiteId,
string ConnectionName,
string TagPath,
string ValueJson,
string ValueType,
string? Comment);
/// <summary>
/// Verifier request to approve (and execute — handled by Task C3) a pending
/// secured write. Declared here so the secured-write contract is complete; the
/// approve→execute relay handler and dispatch arm are implemented in C3.
/// </summary>
/// <param name="Id">Identity of the pending secured write.</param>
/// <param name="Comment">Optional free-text comment supplied by the verifier.</param>
public record ApproveSecuredWriteCommand(long Id, string? Comment);
/// <summary>
/// Verifier request to reject a pending secured write. The write must still be
/// <c>Pending</c>, and the verifier must differ from the submitting operator
/// (separation of duties). Returns the updated <see cref="SecuredWriteDto"/>.
/// </summary>
/// <param name="Id">Identity of the pending secured write.</param>
/// <param name="Comment">Optional free-text comment supplied by the verifier.</param>
public record RejectSecuredWriteCommand(long Id, string? Comment);
/// <summary>
/// Read-only query for secured writes, optionally filtered by status and site.
/// A <c>null</c> filter matches every row.
/// </summary>
/// <param name="Status">Status filter; <c>null</c> matches every status.</param>
/// <param name="SiteId">Site id filter; <c>null</c> matches every site.</param>
public record ListSecuredWritesCommand(string? Status, string? SiteId);
/// <summary>
/// Result projection of a single pending secured write row, mirroring the
/// <c>PendingSecuredWrite</c> entity shape.
/// </summary>
public record SecuredWriteDto(
long Id,
string SiteId,
string ConnectionName,
string TagPath,
string ValueJson,
string ValueType,
string Status,
string OperatorUser,
string? OperatorComment,
DateTime SubmittedAtUtc,
string? VerifierUser,
string? VerifierComment,
DateTime? DecidedAtUtc,
DateTime? ExecutedAtUtc,
string? ExecutionError);
/// <summary>Result wrapper for <see cref="ListSecuredWritesCommand"/>.</summary>
public record SecuredWriteListResult(IReadOnlyList<SecuredWriteDto> Items);
@@ -11,6 +11,7 @@ using ZB.MOM.WW.ScadaBridge.Commons.Entities.Instances;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Scripts;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Templates;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Notifications;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.SecuredWrites;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Security;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
@@ -216,6 +217,15 @@ public class ManagementActor : ReceiveActor
or RetryParkedMessageCommand or DiscardParkedMessageCommand
or DebugSnapshotCommand => Roles.Deployer,
// Two-person secured write (M7 / T14b). Submit is an Operator action;
// approve/reject are Verifier actions. Separation of duties (a write may
// not be verified by its submitter) is enforced inside the handler — the
// role gate only ensures the caller holds the right coarse role.
SubmitSecuredWriteCommand => Roles.Operator,
RejectSecuredWriteCommand or ApproveSecuredWriteCommand => Roles.Verifier,
// ListSecuredWritesCommand is read-only -> falls through to "any
// authenticated user" below, like the other list/query commands.
// Read-only queries -- any authenticated user
_ => null
};
@@ -368,6 +378,13 @@ public class ManagementActor : ReceiveActor
DiscardParkedMessageCommand cmd => await HandleDiscardParkedMessage(sp, cmd, user),
DebugSnapshotCommand cmd => await HandleDebugSnapshot(sp, cmd, user),
// Secured writes (M7 / T14b). Approve (execute) is intentionally NOT
// dispatched here — the approve->execute relay is Task C3; it would
// 'NotSupported' at runtime until then.
SubmitSecuredWriteCommand cmd => await HandleSubmitSecuredWrite(sp, cmd, user),
RejectSecuredWriteCommand cmd => await HandleRejectSecuredWrite(sp, cmd, user),
ListSecuredWritesCommand cmd => await HandleListSecuredWrites(sp, cmd),
// Transport (#24) bundle operations
ExportBundleCommand cmd => await HandleExportBundle(sp, cmd, user.Username),
PreviewBundleCommand cmd => await HandlePreviewBundle(sp, cmd),
@@ -860,6 +877,91 @@ public class ManagementActor : ReceiveActor
return await commService.DiscardParkedMessageAsync(cmd.SiteIdentifier, request);
}
// ========================================================================
// Secured-write handlers (M7 / T14b)
//
// Two-person workflow: an Operator submits a pending write against an
// MxGateway data connection; a distinct Verifier approves (Task C3) or
// rejects it. The role gate (GetRequiredRole) only verifies the coarse role;
// the separation-of-duties rule (a write may not be verified by its
// submitter) is enforced here.
// ========================================================================
private static SecuredWriteDto ToSecuredWriteDto(PendingSecuredWrite e) => new(
e.Id, e.SiteId, e.ConnectionName, e.TagPath, e.ValueJson, e.ValueType,
e.Status, e.OperatorUser, e.OperatorComment, e.SubmittedAtUtc,
e.VerifierUser, e.VerifierComment, e.DecidedAtUtc, e.ExecutedAtUtc, e.ExecutionError);
private static async Task<object?> HandleSubmitSecuredWrite(
IServiceProvider sp, SubmitSecuredWriteCommand cmd, AuthenticatedUser user)
{
var siteRepo = sp.GetRequiredService<ISiteRepository>();
var site = await siteRepo.GetSiteByIdentifierAsync(cmd.SiteId)
?? throw new ManagementCommandException($"Site '{cmd.SiteId}' not found.");
var connections = await siteRepo.GetDataConnectionsBySiteIdAsync(site.Id);
var conn = connections.FirstOrDefault(c =>
string.Equals(c.Name, cmd.ConnectionName, StringComparison.Ordinal))
?? throw new ManagementCommandException(
$"Data connection '{cmd.ConnectionName}' not found on site '{cmd.SiteId}'.");
// Secured writes only apply to MxGateway connections.
if (!string.Equals(conn.Protocol, "MxGateway", StringComparison.OrdinalIgnoreCase))
throw new ManagementCommandException(
$"Secured writes require an MxGateway connection; '{cmd.ConnectionName}' uses protocol '{conn.Protocol}'.");
var entity = new PendingSecuredWrite
{
SiteId = cmd.SiteId,
ConnectionName = cmd.ConnectionName,
TagPath = cmd.TagPath,
ValueJson = cmd.ValueJson,
ValueType = cmd.ValueType,
Status = "Pending",
OperatorUser = user.Username,
OperatorComment = cmd.Comment,
SubmittedAtUtc = DateTime.UtcNow
};
var repo = sp.GetRequiredService<ISecuredWriteRepository>();
entity.Id = await repo.AddAsync(entity);
return ToSecuredWriteDto(entity);
}
private static async Task<object?> HandleRejectSecuredWrite(
IServiceProvider sp, RejectSecuredWriteCommand cmd, AuthenticatedUser user)
{
var repo = sp.GetRequiredService<ISecuredWriteRepository>();
var entity = await repo.GetAsync(cmd.Id)
?? throw new ManagementCommandException($"Secured write {cmd.Id} not found.");
if (!string.Equals(entity.Status, "Pending", StringComparison.Ordinal))
throw new ManagementCommandException(
$"Secured write {cmd.Id} is '{entity.Status}', not Pending; it cannot be rejected.");
// Separation of duties: the verifier must differ from the submitter.
if (string.Equals(entity.OperatorUser, user.Username, StringComparison.OrdinalIgnoreCase))
throw new ManagementCommandException(
"A secured write cannot be verified by its submitter.");
entity.Status = "Rejected";
entity.VerifierUser = user.Username;
entity.VerifierComment = cmd.Comment;
entity.DecidedAtUtc = DateTime.UtcNow;
// UpdateAsync overwrites all columns -> pass the fully-populated entity.
await repo.UpdateAsync(entity);
return ToSecuredWriteDto(entity);
}
private static async Task<object?> HandleListSecuredWrites(
IServiceProvider sp, ListSecuredWritesCommand cmd)
{
var repo = sp.GetRequiredService<ISecuredWriteRepository>();
var rows = await repo.QueryAsync(cmd.Status, cmd.SiteId, skip: 0, take: 200);
return new SecuredWriteListResult(rows.Select(ToSecuredWriteDto).ToList());
}
// ========================================================================
// Site handlers
// ========================================================================
@@ -0,0 +1,273 @@
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.SecuredWrites;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Sites;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Management;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
namespace ZB.MOM.WW.ScadaBridge.ManagementService.Tests;
/// <summary>
/// Handler tests for the two-person ("secured") write submit/reject/list flow
/// (M7 OPC UA / MxGateway UX, Task T14b). Submit is gated to Operator, reject to
/// Verifier; the no-self-approval rule and the MxGateway-protocol precondition
/// are enforced inside the handlers and surface as ManagementError.
/// </summary>
public class SecuredWriteHandlerTests : TestKit, IDisposable
{
private readonly ISiteRepository _siteRepo;
private readonly ISecuredWriteRepository _securedWriteRepo;
private readonly ServiceCollection _services;
public SecuredWriteHandlerTests()
{
_siteRepo = Substitute.For<ISiteRepository>();
_securedWriteRepo = Substitute.For<ISecuredWriteRepository>();
_services = new ServiceCollection();
_services.AddScoped(_ => _siteRepo);
_services.AddScoped(_ => _securedWriteRepo);
}
private IActorRef CreateActor()
{
var sp = _services.BuildServiceProvider();
return Sys.ActorOf(Props.Create(() => new ManagementActor(
sp, NullLogger<ManagementActor>.Instance)));
}
private static ManagementEnvelope Envelope(object command, string username, params string[] roles) =>
new(new AuthenticatedUser(username, username, roles, Array.Empty<string>()),
command, Guid.NewGuid().ToString("N"));
void IDisposable.Dispose() => Shutdown();
/// <summary>Wires a site whose single connection uses the given protocol.</summary>
private void SeedSiteWithConnection(int siteId, string identifier, string connectionName, string protocol)
{
_siteRepo.GetSiteByIdentifierAsync(identifier, Arg.Any<CancellationToken>())
.Returns(new Site($"Site{siteId}", identifier) { Id = siteId });
_siteRepo.GetDataConnectionsBySiteIdAsync(siteId, Arg.Any<CancellationToken>())
.Returns(new List<DataConnection>
{
new(connectionName, protocol, siteId) { Id = 100 }
});
}
// ------------------------------------------------------------------------
// Submit
// ------------------------------------------------------------------------
[Fact]
public void Submit_OnNonMxGatewayConnection_ReturnsError_NoRowInserted()
{
SeedSiteWithConnection(1, "SITE1", "Plc1", "OpcUa");
var actor = CreateActor();
var envelope = Envelope(
new SubmitSecuredWriteCommand("SITE1", "Plc1", "Tag.A", "true", "Boolean", "go"),
"alice", "Operator");
actor.Tell(envelope);
var response = ExpectMsg<ManagementError>(TimeSpan.FromSeconds(5));
Assert.Equal("COMMAND_FAILED", response.ErrorCode);
Assert.Contains("MxGateway", response.Error);
_securedWriteRepo.DidNotReceiveWithAnyArgs().AddAsync(default!, default);
}
[Fact]
public void Submit_OnUnknownConnection_ReturnsError_NoRowInserted()
{
SeedSiteWithConnection(1, "SITE1", "Plc1", "MxGateway");
var actor = CreateActor();
var envelope = Envelope(
new SubmitSecuredWriteCommand("SITE1", "DoesNotExist", "Tag.A", "true", "Boolean", null),
"alice", "Operator");
actor.Tell(envelope);
var response = ExpectMsg<ManagementError>(TimeSpan.FromSeconds(5));
Assert.Equal("COMMAND_FAILED", response.ErrorCode);
_securedWriteRepo.DidNotReceiveWithAnyArgs().AddAsync(default!, default);
}
[Fact]
public void Submit_OnMxGatewayConnection_InsertsPendingRowWithOperatorUser()
{
SeedSiteWithConnection(1, "SITE1", "Mx1", "MxGateway");
PendingSecuredWrite? inserted = null;
_securedWriteRepo
.When(r => r.AddAsync(Arg.Any<PendingSecuredWrite>(), Arg.Any<CancellationToken>()))
.Do(ci => inserted = ci.Arg<PendingSecuredWrite>());
_securedWriteRepo.AddAsync(Arg.Any<PendingSecuredWrite>(), Arg.Any<CancellationToken>())
.Returns(55L);
var actor = CreateActor();
var envelope = Envelope(
new SubmitSecuredWriteCommand("SITE1", "Mx1", "Tag.Setpoint", "42.5", "Double", "raise setpoint"),
"alice", "Operator");
actor.Tell(envelope);
var response = ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
Assert.Equal(envelope.CorrelationId, response.CorrelationId);
Assert.NotNull(inserted);
Assert.Equal("Pending", inserted!.Status);
Assert.Equal("alice", inserted.OperatorUser);
Assert.Equal("Mx1", inserted.ConnectionName);
Assert.Equal("SITE1", inserted.SiteId);
Assert.Equal("Tag.Setpoint", inserted.TagPath);
Assert.Equal("42.5", inserted.ValueJson);
Assert.Equal("Double", inserted.ValueType);
Assert.Equal("raise setpoint", inserted.OperatorComment);
// The returned DTO carries the store-assigned id and operator.
Assert.Contains("\"id\":55", response.JsonData);
Assert.Contains("alice", response.JsonData);
}
// ------------------------------------------------------------------------
// Reject
// ------------------------------------------------------------------------
private PendingSecuredWrite SeedPendingWrite(long id, string operatorUser)
{
var row = new PendingSecuredWrite
{
Id = id,
SiteId = "SITE1",
ConnectionName = "Mx1",
TagPath = "Tag.A",
ValueJson = "true",
ValueType = "Boolean",
Status = "Pending",
OperatorUser = operatorUser,
SubmittedAtUtc = DateTime.UtcNow
};
_securedWriteRepo.GetAsync(id, Arg.Any<CancellationToken>()).Returns(row);
return row;
}
[Fact]
public void Reject_BySubmittingUser_ReturnsError_StatusUnchanged()
{
SeedPendingWrite(7, operatorUser: "alice");
var actor = CreateActor();
// Same principal that submitted attempts to reject — separation of duties.
var envelope = Envelope(new RejectSecuredWriteCommand(7, "no"), "alice", "Verifier");
actor.Tell(envelope);
var response = ExpectMsg<ManagementError>(TimeSpan.FromSeconds(5));
Assert.Equal("COMMAND_FAILED", response.ErrorCode);
_securedWriteRepo.DidNotReceiveWithAnyArgs().UpdateAsync(default!, default);
}
[Fact]
public void Reject_ByDifferentUser_FlipsStatusToRejected()
{
SeedPendingWrite(7, operatorUser: "alice");
PendingSecuredWrite? updated = null;
_securedWriteRepo
.When(r => r.UpdateAsync(Arg.Any<PendingSecuredWrite>(), Arg.Any<CancellationToken>()))
.Do(ci => updated = ci.Arg<PendingSecuredWrite>());
var actor = CreateActor();
var envelope = Envelope(new RejectSecuredWriteCommand(7, "not authorized"), "bob", "Verifier");
actor.Tell(envelope);
var response = ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
Assert.Equal(envelope.CorrelationId, response.CorrelationId);
Assert.NotNull(updated);
Assert.Equal("Rejected", updated!.Status);
Assert.Equal("bob", updated.VerifierUser);
Assert.Equal("not authorized", updated.VerifierComment);
Assert.NotNull(updated.DecidedAtUtc);
// Operator fields are preserved (fully-populated update).
Assert.Equal("alice", updated.OperatorUser);
}
[Fact]
public void Reject_NonPendingRow_ReturnsError()
{
var row = SeedPendingWrite(7, operatorUser: "alice");
row.Status = "Approved";
var actor = CreateActor();
var envelope = Envelope(new RejectSecuredWriteCommand(7, null), "bob", "Verifier");
actor.Tell(envelope);
var response = ExpectMsg<ManagementError>(TimeSpan.FromSeconds(5));
Assert.Equal("COMMAND_FAILED", response.ErrorCode);
_securedWriteRepo.DidNotReceiveWithAnyArgs().UpdateAsync(default!, default);
}
[Fact]
public void Reject_MissingRow_ReturnsError()
{
_securedWriteRepo.GetAsync(99, Arg.Any<CancellationToken>()).Returns((PendingSecuredWrite?)null);
var actor = CreateActor();
var envelope = Envelope(new RejectSecuredWriteCommand(99, null), "bob", "Verifier");
actor.Tell(envelope);
var response = ExpectMsg<ManagementError>(TimeSpan.FromSeconds(5));
Assert.Equal("COMMAND_FAILED", response.ErrorCode);
}
// ------------------------------------------------------------------------
// List
// ------------------------------------------------------------------------
[Fact]
public void List_FiltersByStatusAndSite()
{
_securedWriteRepo.QueryAsync("Pending", "SITE1", 0, 200, Arg.Any<CancellationToken>())
.Returns(new List<PendingSecuredWrite>
{
new()
{
Id = 3, SiteId = "SITE1", ConnectionName = "Mx1", TagPath = "Tag.A",
ValueJson = "true", ValueType = "Boolean", Status = "Pending",
OperatorUser = "alice", SubmittedAtUtc = DateTime.UtcNow
}
});
var actor = CreateActor();
var envelope = Envelope(new ListSecuredWritesCommand("Pending", "SITE1"), "carol", "Viewer");
actor.Tell(envelope);
var response = ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
Assert.Equal(envelope.CorrelationId, response.CorrelationId);
Assert.Contains("\"id\":3", response.JsonData);
Assert.Contains("alice", response.JsonData);
_securedWriteRepo.Received(1).QueryAsync("Pending", "SITE1", 0, 200, Arg.Any<CancellationToken>());
}
[Fact]
public void List_NoFilters_PassesNullsToRepository()
{
_securedWriteRepo.QueryAsync(null, null, 0, 200, Arg.Any<CancellationToken>())
.Returns(new List<PendingSecuredWrite>());
var actor = CreateActor();
var envelope = Envelope(new ListSecuredWritesCommand(null, null), "carol");
actor.Tell(envelope);
ExpectMsg<ManagementSuccess>(TimeSpan.FromSeconds(5));
_securedWriteRepo.Received(1).QueryAsync(null, null, 0, 200, Arg.Any<CancellationToken>());
}
}