From 8f32b89fb9a746691086b668e38d60352784c74c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 26 May 2026 06:58:11 -0400 Subject: [PATCH] feat(adminui): FleetDiagnosticsClient real Akka ActorSelection round-trip (F17) - New Commons.Messages.Fleet.GetDiagnostics request record. - DriverHostActor handles GetDiagnostics in all three states (Steady, Applying, Stale); replies with a NodeDiagnosticsSnapshot built from _currentRevision + the local NodeId. Drivers list is empty until F7 wires the per-instance children. - FleetDiagnosticsClient now resolves the target via ActorSelection at akka.tcp://{system}@{nodeId}/user/driver-host and Asks with a 3s timeout. On timeout/peer-down it returns an empty snapshot so the UI degrades gracefully rather than throwing. Two new integration tests in Host.IntegrationTests: - GetDiagnostics_returns_snapshot_with_target_NodeId verifies the cross-node Ask/Reply works. - GetDiagnostics_after_deploy_reports_current_revision exercises the end-to-end path: AdminOps starts a deployment, both DriverHostActors apply, then diagnostics reports the new revision on both nodes. All 98 v2 tests pass (was 96 + 2 new). --- .../Messages/Fleet/GetDiagnostics.cs | 10 +++ .../Clients/FleetDiagnosticsClient.cs | 48 +++++++---- .../Drivers/DriverHostActor.cs | 17 ++++ .../FleetDiagnosticsRoundTripTests.cs | 81 +++++++++++++++++++ 4 files changed, 139 insertions(+), 17 deletions(-) create mode 100644 src/Core/ZB.MOM.WW.OtOpcUa.Commons/Messages/Fleet/GetDiagnostics.cs create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/FleetDiagnosticsRoundTripTests.cs diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Commons/Messages/Fleet/GetDiagnostics.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/Messages/Fleet/GetDiagnostics.cs new file mode 100644 index 0000000..a9efac9 --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Commons/Messages/Fleet/GetDiagnostics.cs @@ -0,0 +1,10 @@ +using ZB.MOM.WW.OtOpcUa.Commons.Types; + +namespace ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; + +/// +/// Request a diagnostic snapshot from a per-node DriverHostActor. Sent by the admin UI's +/// IFleetDiagnosticsClient via ActorSelection over the cluster; the local +/// DriverHostActor responds with a NodeDiagnosticsSnapshot. +/// +public sealed record GetDiagnostics(CorrelationId CorrelationId); diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Clients/FleetDiagnosticsClient.cs b/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Clients/FleetDiagnosticsClient.cs index 199445a..a29572a 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Clients/FleetDiagnosticsClient.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.AdminUI/Clients/FleetDiagnosticsClient.cs @@ -1,36 +1,50 @@ using Akka.Actor; +using Microsoft.Extensions.Options; +using ZB.MOM.WW.OtOpcUa.Cluster; using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; using ZB.MOM.WW.OtOpcUa.Commons.Types; namespace ZB.MOM.WW.OtOpcUa.AdminUI.Clients; /// -/// that targets a named node's DriverHostActor over -/// Akka cluster . +/// backed by an Akka against +/// the target node's DriverHostActor at akka.tcp://otopcua@<host>:<port>/user/driver-host. +/// Sends ; expects a reply. /// -/// The actual GetDiagnosticsRequest/NodeDiagnosticsSnapshot round-trip on the -/// driver side is staged for follow-up F17 (depends on DriverHostActor exposing the request -/// handler; right now it only handles DispatchDeployment). For now the client returns an empty -/// snapshot so the UI can render a "no data yet" state. +/// On timeout or any other failure (peer down, GetDiagnostics handler missing) returns an +/// empty snapshot so the UI degrades to "no data" instead of throwing. /// public sealed class FleetDiagnosticsClient : IFleetDiagnosticsClient { - private readonly ActorSystem _system; + private static readonly TimeSpan AskTimeout = TimeSpan.FromSeconds(3); - public FleetDiagnosticsClient(ActorSystem system) + private readonly ActorSystem _system; + private readonly string _systemName; + + public FleetDiagnosticsClient(ActorSystem system, IOptions options) { _system = system; + _systemName = options.Value.SystemName; } - public Task GetDiagnosticsAsync(NodeId nodeId, CancellationToken ct) + public async Task GetDiagnosticsAsync(NodeId nodeId, CancellationToken ct) { - // F17: ActorSelection at $"akka.tcp://{system}@{nodeId.Value}:4053/user/driver-host" - // → Ask(new GetDiagnostics(), timeout). - var snapshot = new NodeDiagnosticsSnapshot( - nodeId, - CurrentRevision: null, - Drivers: Array.Empty(), - AsOfUtc: DateTime.UtcNow); - return Task.FromResult(snapshot); + var selection = _system.ActorSelection($"akka.tcp://{_systemName}@{nodeId.Value}/user/driver-host"); + try + { + using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct); + linked.CancelAfter(AskTimeout); + return await selection.Ask( + new GetDiagnostics(CorrelationId.NewId()), AskTimeout, linked.Token); + } + catch (Exception) + { + return new NodeDiagnosticsSnapshot( + nodeId, + CurrentRevision: null, + Drivers: Array.Empty(), + AsOfUtc: DateTime.UtcNow); + } } } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index ab521a7..759ffa5 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -2,7 +2,9 @@ using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using Microsoft.EntityFrameworkCore; +using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.Configuration.Entities; @@ -134,6 +136,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers private void Steady() { Receive(HandleDispatchFromSteady); + Receive(HandleGetDiagnostics); Receive(_ => { /* PubSub ack */ }); } @@ -151,6 +154,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers _localNode, msg.DeploymentId, _applyingDeploymentId); Self.Forward(msg); // re-deliver after we transition back }); + Receive(HandleGetDiagnostics); Receive(_ => { /* PubSub ack */ }); } @@ -160,11 +164,24 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers { _log.Warning("DriverHost {Node}: ignoring DispatchDeployment while Stale (DB unreachable)", _localNode); }); + Receive(HandleGetDiagnostics); Receive(_ => TryRecoverFromStale()); Receive(_ => { /* PubSub ack */ }); Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval); } + private void HandleGetDiagnostics(GetDiagnostics msg) + { + // Driver-instance children aren't spawned yet (F7); the snapshot reports an empty driver + // list. CurrentRevision is real — it's what the host believes is its applied revision. + var snapshot = new NodeDiagnosticsSnapshot( + NodeId: _localNode, + CurrentRevision: _currentRevision, + Drivers: Array.Empty(), + AsOfUtc: DateTime.UtcNow); + Sender.Tell(snapshot); + } + private void HandleDispatchFromSteady(DispatchDeployment msg) { if (_currentRevision is { } cur && cur == msg.RevisionHash) diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/FleetDiagnosticsRoundTripTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/FleetDiagnosticsRoundTripTests.cs new file mode 100644 index 0000000..093e596 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests/FleetDiagnosticsRoundTripTests.cs @@ -0,0 +1,81 @@ +using Microsoft.Extensions.DependencyInjection; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; + +namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests; + +/// +/// End-to-end round-trip via the cluster: +/// admin node asks node-B's DriverHostActor for a snapshot via ActorSelection. +/// Verifies the cross-node Ask/Reply works and the snapshot reflects the target node's +/// view (NodeId + CurrentRevision after a deploy). +/// +public sealed class FleetDiagnosticsRoundTripTests +{ + private static CancellationToken Ct => TestContext.Current.CancellationToken; + + [Fact] + public async Task GetDiagnostics_returns_snapshot_with_target_NodeId() + { + await using var harness = await TwoNodeClusterHarness.StartAsync(); + + // Resolve target NodeId from the cluster — second member ordered by address. + var members = Akka.Cluster.Cluster.Get(harness.NodeASystem).State.Members + .OrderBy(m => m.Address.ToString()) + .ToArray(); + members.Length.ShouldBe(2); + var targetAddress = members[1].Address; + var targetNodeId = Commons.Types.NodeId.Parse($"{targetAddress.Host}:{targetAddress.Port}"); + + await using var scope = harness.NodeA.Services.CreateAsyncScope(); + var client = scope.ServiceProvider.GetRequiredService(); + + var snapshot = await client.GetDiagnosticsAsync(targetNodeId, Ct); + + snapshot.NodeId.ShouldBe(targetNodeId); + snapshot.Drivers.ShouldBeEmpty(); // No driver children yet (F7). + snapshot.AsOfUtc.ShouldBeGreaterThan(DateTime.UtcNow.AddSeconds(-30)); + } + + [Fact] + public async Task GetDiagnostics_after_deploy_reports_current_revision() + { + await using var harness = await TwoNodeClusterHarness.StartAsync(); + + await using var scope = harness.NodeA.Services.CreateAsyncScope(); + var adminOps = scope.ServiceProvider.GetRequiredService(); + var diagnostics = scope.ServiceProvider.GetRequiredService(); + + var deploy = await adminOps.StartDeploymentAsync(createdBy: "alice@test", Ct); + deploy.Outcome.ShouldBe(StartDeploymentOutcome.Accepted); + var expectedRev = deploy.RevisionHash!.Value; + + // Wait until both DriverHostActors have caught up to the deployed revision. + var members = Akka.Cluster.Cluster.Get(harness.NodeASystem).State.Members + .OrderBy(m => m.Address.ToString()) + .Select(m => Commons.Types.NodeId.Parse($"{m.Address.Host}:{m.Address.Port}")) + .ToArray(); + + foreach (var nodeId in members) + { + await WaitForAsync(async () => + { + var snap = await diagnostics.GetDiagnosticsAsync(nodeId, Ct); + return snap.CurrentRevision == expectedRev; + }, TimeSpan.FromSeconds(15)); + } + } + + private static async Task WaitForAsync(Func> condition, TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (DateTime.UtcNow < deadline) + { + if (await condition()) return; + await Task.Delay(200); + } + throw new TimeoutException($"Condition not met within {timeout}"); + } +}