feat(adminui): FleetDiagnosticsClient real Akka ActorSelection round-trip (F17)
Some checks failed
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been cancelled
v2-ci / build (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been cancelled
v2-ci / integration (push) Has been cancelled
Some checks failed
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been cancelled
v2-ci / build (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been cancelled
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been cancelled
v2-ci / integration (push) Has been cancelled
- New Commons.Messages.Fleet.GetDiagnostics request record.
- DriverHostActor handles GetDiagnostics in all three states (Steady, Applying,
Stale); replies with a NodeDiagnosticsSnapshot built from _currentRevision
+ the local NodeId. Drivers list is empty until F7 wires the per-instance
children.
- FleetDiagnosticsClient now resolves the target via ActorSelection at
akka.tcp://{system}@{nodeId}/user/driver-host and Asks with a 3s timeout.
On timeout/peer-down it returns an empty snapshot so the UI degrades
gracefully rather than throwing.
Two new integration tests in Host.IntegrationTests:
- GetDiagnostics_returns_snapshot_with_target_NodeId verifies the
cross-node Ask/Reply works.
- GetDiagnostics_after_deploy_reports_current_revision exercises the
end-to-end path: AdminOps starts a deployment, both DriverHostActors
apply, then diagnostics reports the new revision on both nodes.
All 98 v2 tests pass (was 96 + 2 new).
This commit is contained in:
@@ -0,0 +1,10 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
|
||||
|
||||
/// <summary>
|
||||
/// Request a diagnostic snapshot from a per-node <c>DriverHostActor</c>. Sent by the admin UI's
|
||||
/// <c>IFleetDiagnosticsClient</c> via <c>ActorSelection</c> over the cluster; the local
|
||||
/// DriverHostActor responds with a <c>NodeDiagnosticsSnapshot</c>.
|
||||
/// </summary>
|
||||
public sealed record GetDiagnostics(CorrelationId CorrelationId);
|
||||
@@ -1,36 +1,50 @@
|
||||
using Akka.Actor;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ZB.MOM.WW.OtOpcUa.Cluster;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.AdminUI.Clients;
|
||||
|
||||
/// <summary>
|
||||
/// <see cref="IFleetDiagnosticsClient"/> that targets a named node's <c>DriverHostActor</c> over
|
||||
/// Akka cluster <see cref="ActorSelection"/>.
|
||||
/// <see cref="IFleetDiagnosticsClient"/> backed by an Akka <see cref="ActorSelection"/> against
|
||||
/// the target node's <c>DriverHostActor</c> at <c>akka.tcp://otopcua@<host>:<port>/user/driver-host</c>.
|
||||
/// Sends <see cref="GetDiagnostics"/>; expects a <see cref="NodeDiagnosticsSnapshot"/> reply.
|
||||
///
|
||||
/// The actual <c>GetDiagnosticsRequest</c>/<c>NodeDiagnosticsSnapshot</c> round-trip on the
|
||||
/// driver side is staged for follow-up F17 (depends on DriverHostActor exposing the request
|
||||
/// handler; right now it only handles DispatchDeployment). For now the client returns an empty
|
||||
/// snapshot so the UI can render a "no data yet" state.
|
||||
/// On timeout or any other failure (peer down, GetDiagnostics handler missing) returns an
|
||||
/// empty snapshot so the UI degrades to "no data" instead of throwing.
|
||||
/// </summary>
|
||||
public sealed class FleetDiagnosticsClient : IFleetDiagnosticsClient
|
||||
{
|
||||
private readonly ActorSystem _system;
|
||||
private static readonly TimeSpan AskTimeout = TimeSpan.FromSeconds(3);
|
||||
|
||||
public FleetDiagnosticsClient(ActorSystem system)
|
||||
private readonly ActorSystem _system;
|
||||
private readonly string _systemName;
|
||||
|
||||
public FleetDiagnosticsClient(ActorSystem system, IOptions<AkkaClusterOptions> options)
|
||||
{
|
||||
_system = system;
|
||||
_systemName = options.Value.SystemName;
|
||||
}
|
||||
|
||||
public Task<NodeDiagnosticsSnapshot> GetDiagnosticsAsync(NodeId nodeId, CancellationToken ct)
|
||||
public async Task<NodeDiagnosticsSnapshot> GetDiagnosticsAsync(NodeId nodeId, CancellationToken ct)
|
||||
{
|
||||
// F17: ActorSelection at $"akka.tcp://{system}@{nodeId.Value}:4053/user/driver-host"
|
||||
// → Ask<NodeDiagnosticsSnapshot>(new GetDiagnostics(), timeout).
|
||||
var snapshot = new NodeDiagnosticsSnapshot(
|
||||
nodeId,
|
||||
CurrentRevision: null,
|
||||
Drivers: Array.Empty<DriverInstanceDiagnostics>(),
|
||||
AsOfUtc: DateTime.UtcNow);
|
||||
return Task.FromResult(snapshot);
|
||||
var selection = _system.ActorSelection($"akka.tcp://{_systemName}@{nodeId.Value}/user/driver-host");
|
||||
try
|
||||
{
|
||||
using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||
linked.CancelAfter(AskTimeout);
|
||||
return await selection.Ask<NodeDiagnosticsSnapshot>(
|
||||
new GetDiagnostics(CorrelationId.NewId()), AskTimeout, linked.Token);
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
return new NodeDiagnosticsSnapshot(
|
||||
nodeId,
|
||||
CurrentRevision: null,
|
||||
Drivers: Array.Empty<DriverInstanceDiagnostics>(),
|
||||
AsOfUtc: DateTime.UtcNow);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,9 @@ using Akka.Actor;
|
||||
using Akka.Cluster.Tools.PublishSubscribe;
|
||||
using Akka.Event;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
@@ -134,6 +136,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
private void Steady()
|
||||
{
|
||||
Receive<DispatchDeployment>(HandleDispatchFromSteady);
|
||||
Receive<GetDiagnostics>(HandleGetDiagnostics);
|
||||
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
||||
}
|
||||
|
||||
@@ -151,6 +154,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
_localNode, msg.DeploymentId, _applyingDeploymentId);
|
||||
Self.Forward(msg); // re-deliver after we transition back
|
||||
});
|
||||
Receive<GetDiagnostics>(HandleGetDiagnostics);
|
||||
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
||||
}
|
||||
|
||||
@@ -160,11 +164,24 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
{
|
||||
_log.Warning("DriverHost {Node}: ignoring DispatchDeployment while Stale (DB unreachable)", _localNode);
|
||||
});
|
||||
Receive<GetDiagnostics>(HandleGetDiagnostics);
|
||||
Receive<RetryConfigDbConnection>(_ => TryRecoverFromStale());
|
||||
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
|
||||
Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval);
|
||||
}
|
||||
|
||||
private void HandleGetDiagnostics(GetDiagnostics msg)
|
||||
{
|
||||
// Driver-instance children aren't spawned yet (F7); the snapshot reports an empty driver
|
||||
// list. CurrentRevision is real — it's what the host believes is its applied revision.
|
||||
var snapshot = new NodeDiagnosticsSnapshot(
|
||||
NodeId: _localNode,
|
||||
CurrentRevision: _currentRevision,
|
||||
Drivers: Array.Empty<DriverInstanceDiagnostics>(),
|
||||
AsOfUtc: DateTime.UtcNow);
|
||||
Sender.Tell(snapshot);
|
||||
}
|
||||
|
||||
private void HandleDispatchFromSteady(DispatchDeployment msg)
|
||||
{
|
||||
if (_currentRevision is { } cur && cur == msg.RevisionHash)
|
||||
|
||||
@@ -0,0 +1,81 @@
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests;
|
||||
|
||||
/// <summary>
|
||||
/// End-to-end <see cref="IFleetDiagnosticsClient"/> round-trip via the cluster:
|
||||
/// admin node asks node-B's <c>DriverHostActor</c> for a snapshot via <c>ActorSelection</c>.
|
||||
/// Verifies the cross-node Ask/Reply works and the snapshot reflects the target node's
|
||||
/// view (NodeId + CurrentRevision after a deploy).
|
||||
/// </summary>
|
||||
public sealed class FleetDiagnosticsRoundTripTests
|
||||
{
|
||||
private static CancellationToken Ct => TestContext.Current.CancellationToken;
|
||||
|
||||
[Fact]
|
||||
public async Task GetDiagnostics_returns_snapshot_with_target_NodeId()
|
||||
{
|
||||
await using var harness = await TwoNodeClusterHarness.StartAsync();
|
||||
|
||||
// Resolve target NodeId from the cluster — second member ordered by address.
|
||||
var members = Akka.Cluster.Cluster.Get(harness.NodeASystem).State.Members
|
||||
.OrderBy(m => m.Address.ToString())
|
||||
.ToArray();
|
||||
members.Length.ShouldBe(2);
|
||||
var targetAddress = members[1].Address;
|
||||
var targetNodeId = Commons.Types.NodeId.Parse($"{targetAddress.Host}:{targetAddress.Port}");
|
||||
|
||||
await using var scope = harness.NodeA.Services.CreateAsyncScope();
|
||||
var client = scope.ServiceProvider.GetRequiredService<IFleetDiagnosticsClient>();
|
||||
|
||||
var snapshot = await client.GetDiagnosticsAsync(targetNodeId, Ct);
|
||||
|
||||
snapshot.NodeId.ShouldBe(targetNodeId);
|
||||
snapshot.Drivers.ShouldBeEmpty(); // No driver children yet (F7).
|
||||
snapshot.AsOfUtc.ShouldBeGreaterThan(DateTime.UtcNow.AddSeconds(-30));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task GetDiagnostics_after_deploy_reports_current_revision()
|
||||
{
|
||||
await using var harness = await TwoNodeClusterHarness.StartAsync();
|
||||
|
||||
await using var scope = harness.NodeA.Services.CreateAsyncScope();
|
||||
var adminOps = scope.ServiceProvider.GetRequiredService<IAdminOperationsClient>();
|
||||
var diagnostics = scope.ServiceProvider.GetRequiredService<IFleetDiagnosticsClient>();
|
||||
|
||||
var deploy = await adminOps.StartDeploymentAsync(createdBy: "alice@test", Ct);
|
||||
deploy.Outcome.ShouldBe(StartDeploymentOutcome.Accepted);
|
||||
var expectedRev = deploy.RevisionHash!.Value;
|
||||
|
||||
// Wait until both DriverHostActors have caught up to the deployed revision.
|
||||
var members = Akka.Cluster.Cluster.Get(harness.NodeASystem).State.Members
|
||||
.OrderBy(m => m.Address.ToString())
|
||||
.Select(m => Commons.Types.NodeId.Parse($"{m.Address.Host}:{m.Address.Port}"))
|
||||
.ToArray();
|
||||
|
||||
foreach (var nodeId in members)
|
||||
{
|
||||
await WaitForAsync(async () =>
|
||||
{
|
||||
var snap = await diagnostics.GetDiagnosticsAsync(nodeId, Ct);
|
||||
return snap.CurrentRevision == expectedRev;
|
||||
}, TimeSpan.FromSeconds(15));
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task WaitForAsync(Func<Task<bool>> condition, TimeSpan timeout)
|
||||
{
|
||||
var deadline = DateTime.UtcNow + timeout;
|
||||
while (DateTime.UtcNow < deadline)
|
||||
{
|
||||
if (await condition()) return;
|
||||
await Task.Delay(200);
|
||||
}
|
||||
throw new TimeoutException($"Condition not met within {timeout}");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user