From f23e368a742c1be73c31467514af991d69528bb6 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 25 May 2026 02:22:59 -0400 Subject: [PATCH] fix(server, admin): wire sp_RegisterNodeGenerationApplied + overlay heartbeat onto ClusterNode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit dbo.sp_RegisterNodeGenerationApplied was defined by the initial StoredProcedures migration but had zero callers in src/. The server polled sp_GetCurrentGenerationForCluster every 5s but never reported back, so dbo.ClusterNodeGenerationState stayed empty for every node and both the Admin UI Fleet status page ("No node state recorded") and the cluster-detail Redundancy LastSeenAt indicator ("never STALE") showed broken liveness forever. Server side (GenerationRefreshHostedService): * New testable seam: Func? registerAppliedAsync constructor parameter, defaulting to a real sp_RegisterNodeGenerationApplied call against the central DB. * TickAsync now calls the proc at two points: after every successful apply with NodeApplyStatus.Applied, and on every no-change tick as a heartbeat (also Applied) so LastSeenAt stays fresh. * Apply failures now wrap the lease + coordinator.RefreshAsync in a try/catch, report NodeApplyStatus.Failed with the exception message, and advance LastAppliedGenerationId regardless of outcome so we don't loop on the same broken apply every 5s. * Register-call failures are best-effort (LogDebug heartbeat, LogWarning apply-report) — a transient DB outage during reporting must not crash the publisher or block the next apply. Admin side (ClusterNodeService.ListByClusterAsync): the Redundancy tab reads ClusterNode.LastSeenAt, but no current writer maintains that column — the heartbeat goes to ClusterNodeGenerationState.LastSeenAt. Overlay the GenerationState heartbeat onto the returned ClusterNode rows when more recent, so IsStale + the Redundancy table column reflect actual liveness without a schema change or new write path. Tests: 3 new cases on GenerationRefreshHostedServiceTests verify first-apply reports Applied, no-change ticks heartbeat with Applied, and register-call failure does not roll back the cursor or block subsequent ticks. All 8 GenerationRefresh tests pass. Verified live on node-dev-a / cluster-dev: dbo.ClusterNodeGenerationState now populated with CurrentGenerationId=1, LastAppliedStatus=Applied, fresh LastSeenAt. Fleet status page shows the node (KPIs NODES 1 / APPLIED 1 / STALE 0 / FAILED 0). Redundancy tab KPI STALE went 1\xe2\x86\x920 and the row shows a real LAST SEEN timestamp. Bonus: FleetStatusHub SignalR push now fires the cluster-page Live update banner on every heartbeat because there are finally state changes to push. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Services/ClusterNodeService.cs | 34 ++++++- .../Hosting/GenerationRefreshHostedService.cs | 96 +++++++++++++++++-- .../GenerationRefreshHostedServiceTests.cs | 68 ++++++++++++- 3 files changed, 187 insertions(+), 11 deletions(-) diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Admin/Services/ClusterNodeService.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Admin/Services/ClusterNodeService.cs index 1729a54..de41d64 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Admin/Services/ClusterNodeService.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Admin/Services/ClusterNodeService.cs @@ -16,12 +16,40 @@ public sealed class ClusterNodeService(OtOpcUaConfigDbContext db) /// tolerance covers a missed heartbeat plus publisher GC pauses. public static readonly TimeSpan StaleThreshold = TimeSpan.FromSeconds(30); - public Task> ListByClusterAsync(string clusterId, CancellationToken ct) => - db.ClusterNodes.AsNoTracking() + public async Task> ListByClusterAsync(string clusterId, CancellationToken ct) + { + var nodes = await db.ClusterNodes.AsNoTracking() .Where(n => n.ClusterId == clusterId) .OrderByDescending(n => n.ServiceLevelBase) .ThenBy(n => n.NodeId) - .ToListAsync(ct); + .ToListAsync(ct).ConfigureAwait(false); + + // Bug #12 fix follow-up — the live-node heartbeat lands on + // ClusterNodeGenerationState.LastSeenAt (written by sp_RegisterNodeGenerationApplied + // on every generation poll). The ClusterNode.LastSeenAt column is a legacy slot that + // no current writer maintains, so reading it directly would show "never STALE" + // forever for every running node. Overlay the GenerationState heartbeat onto the + // returned ClusterNode rows when it's more recent so the Redundancy tab + IsStale + // predicate reflect actual liveness without needing a new write path or schema change. + var nodeIds = nodes.Select(n => n.NodeId).ToList(); + if (nodeIds.Count > 0) + { + var heartbeats = await db.ClusterNodeGenerationStates.AsNoTracking() + .Where(s => nodeIds.Contains(s.NodeId)) + .Select(s => new { s.NodeId, s.LastSeenAt }) + .ToListAsync(ct).ConfigureAwait(false); + var beatByNode = heartbeats.ToDictionary(s => s.NodeId, s => s.LastSeenAt); + foreach (var n in nodes) + { + if (beatByNode.TryGetValue(n.NodeId, out var hb) && hb is not null + && (n.LastSeenAt is null || hb > n.LastSeenAt)) + { + n.LastSeenAt = hb; + } + } + } + return nodes; + } public static bool IsStale(ClusterNode node) => node.LastSeenAt is null || DateTime.UtcNow - node.LastSeenAt.Value > StaleThreshold; diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Server/Hosting/GenerationRefreshHostedService.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Server/Hosting/GenerationRefreshHostedService.cs index c0a401a..b26268a 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Server/Hosting/GenerationRefreshHostedService.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Server/Hosting/GenerationRefreshHostedService.cs @@ -1,6 +1,7 @@ using Microsoft.Data.SqlClient; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; using ZB.MOM.WW.OtOpcUa.Server.Redundancy; namespace ZB.MOM.WW.OtOpcUa.Server.Hosting; @@ -42,10 +43,20 @@ public sealed class GenerationRefreshHostedService( RedundancyCoordinator coordinator, ILogger logger, TimeSpan? tickInterval = null, - Func>? currentGenerationQuery = null) : BackgroundService + Func>? currentGenerationQuery = null, + Func? registerAppliedAsync = null) : BackgroundService { private readonly Func> _generationQuery = currentGenerationQuery ?? new Func>(ct => DefaultQueryCurrentGenerationAsync(options, logger, ct)); + + // Bug #12 fix — the server now reports applied-generation state + heartbeat back to the + // central DB via sp_RegisterNodeGenerationApplied. Before this wiring the proc had zero + // callers, so dbo.ClusterNodeGenerationState stayed empty for every node and the Admin UI + // Fleet status page + cluster-detail Redundancy LastSeenAt both showed "no node state / + // never STALE" indefinitely. Tests inject a stub via the registerAppliedAsync parameter. + private readonly Func _registerApplied = registerAppliedAsync + ?? new Func( + (gen, status, err, ct) => DefaultRegisterAppliedAsync(options, logger, gen, status, err, ct)); /// /// How often the service polls sp_GetCurrentGenerationForCluster. Default 5 s — /// low enough that operator publishes take effect promptly, high enough that the @@ -97,6 +108,18 @@ public sealed class GenerationRefreshHostedService( if (LastAppliedGenerationId is long last && current == last) { + // Heartbeat — re-stamps LastSeenAt on dbo.ClusterNodeGenerationState so the Admin + // Fleet status page + cluster Redundancy tab can detect the node is alive without + // a generation change. Best-effort: a transient DB error here must not throw out of + // the tick (the next tick will retry) and must not block applies. + try + { + await _registerApplied(current.Value, NodeApplyStatus.Applied, null, cancellationToken).ConfigureAwait(false); + } + catch (Exception hbEx) when (hbEx is not OperationCanceledException) + { + logger.LogDebug(hbEx, "Heartbeat to sp_RegisterNodeGenerationApplied failed; will retry next tick"); + } return; // no change } @@ -109,14 +132,44 @@ public sealed class GenerationRefreshHostedService( // lease is open. Publisher ticks in parallel (1s cadence) will observe the band // transition and push it onto the OPC UA Server.ServiceLevel node. var publishRequestId = Guid.NewGuid(); - await using (leases.BeginApplyLease(current.Value, publishRequestId)) + NodeApplyStatus applyStatus; + string? applyError = null; + try { - await coordinator.RefreshAsync(cancellationToken).ConfigureAwait(false); - // Future: fire a domain event that driver hosts / virtual-tag engine / - // scripted-alarm engine subscribe to. For now the topology refresh is the - // only thing we rewire — everything else still requires a process restart. + await using (leases.BeginApplyLease(current.Value, publishRequestId)) + { + await coordinator.RefreshAsync(cancellationToken).ConfigureAwait(false); + // Future: fire a domain event that driver hosts / virtual-tag engine / + // scripted-alarm engine subscribe to. For now the topology refresh is the + // only thing we rewire — everything else still requires a process restart. + } + applyStatus = NodeApplyStatus.Applied; + } + catch (Exception applyEx) when (applyEx is not OperationCanceledException) + { + applyStatus = NodeApplyStatus.Failed; + applyError = applyEx.Message; + logger.LogError(applyEx, "Apply of generation {Generation} failed; will report Failed status to central DB", current); + // fall through to register so operators see the failed apply in /fleet } + // Always tell the central DB what happened with this apply attempt — success or + // failure. The proc upserts dbo.ClusterNodeGenerationState (CurrentGenerationId + + // LastAppliedAt + LastAppliedStatus + LastAppliedError + LastSeenAt). Failure here + // mustn't prevent us from advancing LastAppliedGenerationId — the apply already + // happened (or already failed); the publish is purely observability. + try + { + await _registerApplied(current.Value, applyStatus, applyError, cancellationToken).ConfigureAwait(false); + } + catch (Exception regEx) when (regEx is not OperationCanceledException) + { + logger.LogWarning(regEx, "sp_RegisterNodeGenerationApplied call failed for gen {Generation} status {Status}", current, applyStatus); + } + + // Advance the cursor even on Failed — the proc has been told; next tick will heartbeat + // and a future generation will trigger a fresh apply attempt. Pinning the cursor on + // failure would loop us through the same broken apply every 5s. LastAppliedGenerationId = current; RefreshCount++; } @@ -157,4 +210,35 @@ public sealed class GenerationRefreshHostedService( return null; } } + + /// + /// Default register-applied implementation — calls sp_RegisterNodeGenerationApplied + /// to MERGE-upsert + /// for this node. Called both at apply completion (success or failure) and on every + /// no-change heartbeat tick so LastSeenAt stays fresh in the central DB and the + /// Admin UI Fleet status page + Redundancy LastSeenAt indicator can detect a healthy node. + /// Bug #12 fix — wires the previously-orphaned proc into the apply loop. + /// + private static async Task DefaultRegisterAppliedAsync( + NodeOptions options, + ILogger logger, + long generationId, + NodeApplyStatus status, + string? error, + CancellationToken cancellationToken) + { + await using var conn = new SqlConnection(options.ConfigDbConnectionString); + await conn.OpenAsync(cancellationToken).ConfigureAwait(false); + + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "EXEC dbo.sp_RegisterNodeGenerationApplied @NodeId=@n, @GenerationId=@g, @Status=@s, @Error=@e"; + cmd.Parameters.AddWithValue("@n", options.NodeId); + cmd.Parameters.AddWithValue("@g", generationId); + cmd.Parameters.AddWithValue("@s", status.ToString()); + cmd.Parameters.AddWithValue("@e", (object?)error ?? DBNull.Value); + + await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); + // Single-line trace so soak runs can see heartbeat ticks without flooding at Info. + logger.LogTrace("Reported gen {Generation} status {Status} to central DB", generationId, status); + } } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Server.Tests/GenerationRefreshHostedServiceTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Server.Tests/GenerationRefreshHostedServiceTests.cs index 82e9649..16f6e46 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Server.Tests/GenerationRefreshHostedServiceTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Server.Tests/GenerationRefreshHostedServiceTests.cs @@ -110,6 +110,66 @@ public sealed class GenerationRefreshHostedServiceTests : IDisposable leases.OpenLeaseCount.ShouldBe(0, "IAsyncDisposable dispose must fire regardless of outcome"); } + // Bug #12 fix — verifies the previously-missing wiring: applies and heartbeats both + // emit sp_RegisterNodeGenerationApplied so Admin UI Fleet status + Redundancy LastSeenAt + // surface live state. + + [Fact] + public async Task First_apply_reports_Applied_status_to_central_db() + { + var coordinator = await SeedCoordinatorAsync(); + var leases = new ApplyLeaseRegistry(); + var calls = new List<(long Gen, NodeApplyStatus Status, string? Error)>(); + var service = NewService(coordinator, leases, currentGeneration: () => 42, registerCalls: calls); + + await service.TickAsync(CancellationToken.None); + + calls.Count.ShouldBe(1, "exactly one register call per apply window"); + calls[0].Gen.ShouldBe(42); + calls[0].Status.ShouldBe(NodeApplyStatus.Applied); + calls[0].Error.ShouldBeNull(); + } + + [Fact] + public async Task No_change_tick_heartbeats_with_Applied_status() + { + var coordinator = await SeedCoordinatorAsync(); + var leases = new ApplyLeaseRegistry(); + var calls = new List<(long Gen, NodeApplyStatus Status, string? Error)>(); + var service = NewService(coordinator, leases, currentGeneration: () => 42, registerCalls: calls); + + await service.TickAsync(CancellationToken.None); // initial apply + await service.TickAsync(CancellationToken.None); // no-change heartbeat + await service.TickAsync(CancellationToken.None); // no-change heartbeat + + calls.Count.ShouldBe(3, "one apply call + two heartbeat calls"); + calls.ShouldAllBe(c => c.Gen == 42 && c.Status == NodeApplyStatus.Applied && c.Error == null); + } + + [Fact] + public async Task Register_call_failure_does_not_break_apply_or_block_subsequent_ticks() + { + var coordinator = await SeedCoordinatorAsync(); + var leases = new ApplyLeaseRegistry(); + var registerCallCount = 0; + var service = new GenerationRefreshHostedService( + new NodeOptions { NodeId = "A", ClusterId = "c1", ConfigDbConnectionString = "unused" }, + leases, coordinator, NullLogger.Instance, + tickInterval: TimeSpan.FromSeconds(1), + currentGenerationQuery: _ => Task.FromResult(42), + registerAppliedAsync: (gen, status, err, ct) => + { + registerCallCount++; + throw new InvalidOperationException("simulated DB outage during register"); + }); + + await service.TickAsync(CancellationToken.None); // apply succeeds, register throws + await service.TickAsync(CancellationToken.None); // heartbeat throws + + registerCallCount.ShouldBe(2, "both register attempts must run"); + service.LastAppliedGenerationId.ShouldBe(42, "register failure must not roll back the cursor"); + } + // ---- fixture helpers --------------------------------------------------- private async Task SeedCoordinatorAsync() @@ -136,11 +196,15 @@ public sealed class GenerationRefreshHostedServiceTests : IDisposable private static GenerationRefreshHostedService NewService( RedundancyCoordinator coordinator, ApplyLeaseRegistry leases, - Func currentGeneration) => + Func currentGeneration, + List<(long Gen, NodeApplyStatus Status, string? Error)>? registerCalls = null) => new(new NodeOptions { NodeId = "A", ClusterId = "c1", ConfigDbConnectionString = "unused" }, leases, coordinator, NullLogger.Instance, tickInterval: TimeSpan.FromSeconds(1), - currentGenerationQuery: _ => Task.FromResult(currentGeneration())); + currentGenerationQuery: _ => Task.FromResult(currentGeneration()), + registerAppliedAsync: registerCalls is null + ? (_, _, _, _) => Task.CompletedTask + : (gen, status, err, _) => { registerCalls.Add((gen, status, err)); return Task.CompletedTask; }); private sealed class DbContextFactory(DbContextOptions options) : IDbContextFactory