Compare commits

...

6 Commits

Author SHA1 Message Date
Joseph Doherty
c4a92f424a Phase 6.1 Stream B.4 follow-up — ScheduledRecycleHostedService drives registered schedulers on a fixed tick
Turns the Phase 6.1 Stream B.4 pure-logic ScheduledRecycleScheduler (shipped
in PR #79) into a running background feature. A Tier C driver registers its
scheduler at startup; the hosted service ticks every TickInterval (default
1 min) and invokes TickAsync on each registered scheduler.

Server.Hosting:
- ScheduledRecycleHostedService : BackgroundService. AddScheduler(s) must be
  called before StartAsync — registering post-start throws
  InvalidOperationException to avoid "some ticks saw my scheduler, some
  didn't" races. ExecuteAsync loops on Task.Delay(TickInterval, _timeProvider,
  stoppingToken) + delegates to a public TickOnceAsync method for one tick.
- TickOnceAsync extracted as the unit-of-work so tests drive it directly
  without needing to synchronize with FakeTimeProvider + BackgroundService
  timing semantics.
- Exception isolation: if one scheduler throws, the loop logs + continues
  to the next scheduler. A flaky supervisor can't take down the tick for
  every other Tier C driver.
- Diagnostics: TickCount + SchedulerCount properties for tests + logs.

Tests (7 new ScheduledRecycleHostedServiceTests, all pass):
- TickOnce before interval doesn't fire; TickCount still advances.
- TickOnce at/after interval fires the underlying scheduler exactly once.
- Multiple ticks accumulate count.
- AddScheduler after StartAsync throws.
- Throwing scheduler doesn't poison its neighbours (logs + continues).
- SchedulerCount matches registrations.
- Empty scheduler list ticks cleanly (no-op + counter advances).

Full solution dotnet test: 1193 passing (was 1186, +7). Pre-existing
Client.CLI Subscribe flake unchanged.

Production wiring (Program.cs):
  builder.Services.AddSingleton<ScheduledRecycleHostedService>();
  builder.Services.AddHostedService(sp => sp.GetRequiredService<ScheduledRecycleHostedService>());
  // During DI configuration, once Tier C drivers + their ScheduledRecycleSchedulers
  // are resolved, call host.AddScheduler(scheduler) for each.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 11:42:08 -04:00
510e488ea4 Merge pull request (#100) - Readiness doc all blockers closed 2026-04-19 11:35:34 -04:00
8994e73a0b Merge pull request (#99) - Phase 6.3 Stream C core 2026-04-19 11:33:49 -04:00
Joseph Doherty
e71f44603c v2 release-readiness — blocker #3 closed; all three code-path blockers shut
Phase 6.3 Streams A + C core shipped (PRs #98-99):
- RedundancyCoordinator + ClusterTopologyLoader read the shared config DB +
  enforce the Phase 6.3 invariants (1-2 nodes, unique ApplicationUri, ≤1
  Primary in Warm/Hot). Startup fails fast on violation.
- RedundancyStatePublisher orchestrates topology + apply lease + recovery
  state + peer reachability through ServiceLevelCalculator. Edge-triggered
  OnStateChanged + OnServerUriArrayChanged events the OPC UA variable-node
  layer subscribes to.

Doc updates:
- Top status flips from NOT YET RELEASE-READY → RELEASE-READY (code-path).
  Remaining work is manual (client interop matrix, deployment signoff,
  OPC UA CTT pass) + hardening follow-ups that don't block v2 GA ship.
- Release-blocker #3 section struck through + CLOSED with PR links.
  Remaining Phase 6.3 surfaces (peer-probe HostedServices, OPC UA
  variable-node binding, sp_PublishGeneration lease wrap, client interop)
  explicitly listed as hardening follow-ups.
- Change log: new dated entry.

All three release blockers identified at the capstone are closed:
- #1 Phase 6.2 dispatch wiring  → PR #94 (2026-04-19)
- #2 Phase 6.1 Stream D wiring  → PR #96 (2026-04-19)
- #3 Phase 6.3 Streams A/C core → PRs #98-99 (2026-04-19)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 11:33:37 -04:00
Joseph Doherty
c4824bea12 Phase 6.3 Stream C core — RedundancyStatePublisher + PeerReachability; orchestrates calculator inputs end-to-end
Wires the Phase 6.3 Stream B pure-logic pieces (ServiceLevelCalculator,
RecoveryStateManager, ApplyLeaseRegistry) + Stream A topology loader
(RedundancyCoordinator) into one orchestrator the runtime + OPC UA node
surface consume. The actual OPC UA variable-node plumbing (mapping
ServiceLevel Byte + ServerUriArray String[] onto the Opc.Ua.Server stack)
is narrower follow-up on top of this — the publisher emits change events
the OPC UA layer subscribes to.

Server.Redundancy additions:
- PeerReachability record + PeerReachabilityTracker — thread-safe
  per-peer-NodeId holder of the latest (HttpHealthy, UaHealthy) tuple. Probe
  loops (Stream B.1/B.2 runtime follow-up) write via Update; the publisher
  reads via Get. PeerReachability.FullyHealthy / Unknown sentinels for the
  two most-common states.
- RedundancyStatePublisher — pure orchestrator, no background timer, no OPC
  UA stack dep. ComputeAndPublish reads the 6 inputs + calls the calculator:
    * role (from coordinator.Current.SelfRole)
    * selfHealthy (caller-supplied Func<bool>)
    * peerHttpHealthy + peerUaHealthy (aggregate across all peers in
      coordinator.Current.Peers)
    * applyInProgress (ApplyLeaseRegistry.IsApplyInProgress)
    * recoveryDwellMet (RecoveryStateManager.IsDwellMet)
    * topologyValid (coordinator.IsTopologyValid)
    * operatorMaintenance (caller-supplied Func<bool>)
  Before-coordinator-init returns NoData=1 so clients never see an
  authoritative value from an un-bootstrapped server.
  OnStateChanged event fires edge-triggered when the byte changes;
  OnServerUriArrayChanged fires edge-triggered when the topology's self-first
  peer-sorted URI array content changes.
- ServiceLevelSnapshot record — per-tick output with Value + Band +
  Topology. The OPC UA layer's ServiceLevel Byte node subscribes to
  OnStateChanged; the ServerUriArray node subscribes to OnServerUriArrayChanged.

Tests (8 new RedundancyStatePublisherTests, all pass):
- Before-init returns NoData (Value=1, Band=NoData).
- Authoritative-Primary when healthy + peer fully reachable.
- Isolated-Primary (230) retains authority when peer unreachable — matches
  decision #154 non-promotion semantics.
- Mid-apply band dominates: open lease → Value=200 even with peer healthy.
- Self-unhealthy → NoData regardless of other inputs.
- OnStateChanged fires only on value transitions (edge-triggered).
- OnServerUriArrayChanged fires once per topology content change; repeat
  ticks with same topology don't re-emit.
- Standalone cluster treats healthy as AuthoritativePrimary=255.

Microsoft.EntityFrameworkCore.InMemory 10.0.0 added to Server.Tests for the
coordinator-backed publisher tests.

Full solution dotnet test: 1186 passing (was 1178, +8). Pre-existing
Client.CLI Subscribe flake unchanged.

Closes the core of release blocker #3 — the pure-logic + orchestration
layer now exists + is unit-tested. Remaining Stream C surfaces: OPC UA
ServiceLevel Byte variable wiring (binds to OnStateChanged), ServerUriArray
String[] wiring (binds to OnServerUriArrayChanged), RedundancySupport
static from RedundancyMode. Those touch the OPC UA stack directly + land
as Stream C.2 follow-up.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 11:31:50 -04:00
e588c4f980 Merge pull request (#98) - Phase 6.3 Stream A topology loader 2026-04-19 11:26:11 -04:00
7 changed files with 680 additions and 10 deletions

View File

@@ -1,7 +1,7 @@
# v2 Release Readiness
> **Last updated**: 2026-04-19 (release blockers #1 + #2 closed; Phase 6.3 redundancy runtime is the last)
> **Status**: **NOT YET RELEASE-READY** — one of three release blockers remains (Phase 6.3 Streams A/C/F redundancy-coordinator + OPC UA node wiring + client interop).
> **Last updated**: 2026-04-19 (all three release blockers CLOSED — Phase 6.3 Streams A/C core shipped)
> **Status**: **RELEASE-READY (code-path)** for v2 GA — all three code-path release blockers are closed. Remaining work is manual (client interop matrix, deployment checklist signoff, OPC UA CTT pass) + hardening follow-ups; see exit-criteria checklist below.
This doc is the single view of where v2 stands against its release criteria. Update it whenever a deferred follow-up closes or a new release blocker is discovered.
@@ -52,17 +52,19 @@ Remaining follow-ups (hardening, not release-blocking):
- A `HostedService` that polls `sp_GetCurrentGenerationForCluster` periodically so peer-published generations land in this node's cache without a restart.
- Richer snapshot payload via `sp_GetGenerationContent` so fallback can serve the full generation content (DriverInstance enumeration, ACL rows, etc.) from the sealed cache alone.
### Redundancy — Phase 6.3 Streams A/C/F (tasks #145, #147, #150)
### ~~Redundancy — Phase 6.3 Streams A/C core~~ (tasks #145 + #147 — **CLOSED** 2026-04-19, PRs #9899)
`ServiceLevelCalculator` + `RecoveryStateManager` + `ApplyLeaseRegistry` exist as pure logic. **No code invokes them at runtime.** The OPC UA server still publishes a static `ServiceLevel`; `ServerUriArray` still carries only self; no coordinator reads cluster topology; no peer probing.
**Closed**. The runtime orchestration layer now exists end-to-end:
Closing this requires:
- `RedundancyCoordinator` reads `ClusterNode` + peer list at startup (Stream A shipped in PR #98). Invariants enforced: 1-2 nodes (decision #83), unique ApplicationUri (#86), ≤1 Primary in Warm/Hot (#84). Startup fails fast on violation; runtime refresh logs + flips `IsTopologyValid=false` so the calculator falls to band 2 without tearing down.
- `RedundancyStatePublisher` orchestrates topology + apply lease + recovery state + peer reachability through `ServiceLevelCalculator` + emits `OnStateChanged` / `OnServerUriArrayChanged` edge-triggered events (Stream C core shipped in PR #99). The OPC UA `ServiceLevel` Byte variable + `ServerUriArray` String[] variable subscribe to these events.
- `RedundancyCoordinator` singleton reads `ClusterNode` + peer list at startup (Stream A).
- `PeerHttpProbeLoop` + `PeerUaProbeLoop` feed the calculator.
- OPC UA node wiring: `ServiceLevel` becomes a live `BaseDataVariable` on calculator observer output; `ServerUriArray` includes self + peers; `RedundancySupport` static from `RedundancyMode` (Stream C).
- `sp_PublishGeneration` wraps in `await using var lease = coordinator.BeginApplyLease(...)` so the `PrimaryMidApply` band fires during actual publishes.
- Client interop matrix validation against Ignition / Kepware / Aveva OI Gateway (Stream F).
Remaining Phase 6.3 surfaces (hardening, not release-blocking):
- `PeerHttpProbeLoop` + `PeerUaProbeLoop` HostedServices that poll the peer + write to `PeerReachabilityTracker` on each tick. Without these the publisher sees `PeerReachability.Unknown` for every peer → Isolated-Primary band (230) even when the peer is up. Safe default (retains authority) but not the full non-transparent-redundancy UX.
- OPC UA variable-node wiring layer: bind the `ServiceLevel` Byte node + `ServerUriArray` String[] node to the publisher's events via `BaseDataVariable.OnReadValue` / direct value push. Scoped follow-up on the Opc.Ua.Server stack integration.
- `sp_PublishGeneration` wraps its apply in `await using var lease = coordinator.BeginApplyLease(...)` so the `PrimaryMidApply` band (200) fires during actual publishes (task #148 part 2).
- Client interop matrix validation — Ignition / Kepware / Aveva OI Gateway (Stream F, task #150). Manual + doc-only work; doesn't block code ship.
### Remaining drivers (task #120)
@@ -98,6 +100,7 @@ v2 GA requires all of the following:
## Change log
- **2026-04-19** — Release blocker #3 **closed** (PRs #9899). Phase 6.3 Streams A + C core shipped: `ClusterTopologyLoader` + `RedundancyCoordinator` + `RedundancyStatePublisher` + `PeerReachabilityTracker`. Code-path release blockers all closed; remaining Phase 6.3 surfaces (peer-probe HostedServices, OPC UA variable-node binding, sp_PublishGeneration lease wrap, client interop matrix) are hardening follow-ups.
- **2026-04-19** — Release blocker #2 **closed** (PR #96). `SealedBootstrap` consumes `ResilientConfigReader` + `GenerationSealedCache` + `StaleConfigFlag`; `/healthz` now surfaces the stale flag. Remaining follow-ups (periodic poller + richer snapshot payload) downgraded to hardening.
- **2026-04-19** — Release blocker #1 **closed** (PR #94). `AuthorizationGate` wired into `DriverNodeManager` Read / Write / HistoryRead dispatch. Remaining Stream C surfaces (Browse / Subscribe / Alarm / Call + finer-grained scope resolution) downgraded to hardening follow-ups — no longer release-blocking.
- **2026-04-19** — Phase 6.4 data layer merged (PRs #9192). Phase 6 core complete. Capstone doc created.

View File

@@ -0,0 +1,117 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.OtOpcUa.Core.Stability;
namespace ZB.MOM.WW.OtOpcUa.Server.Hosting;
/// <summary>
/// Drives one or more <see cref="ScheduledRecycleScheduler"/> instances on a fixed tick
/// cadence. Closes Phase 6.1 Stream B.4 by turning the shipped-as-pure-logic scheduler
/// into a running background feature.
/// </summary>
/// <remarks>
/// <para>Registered as a singleton in Program.cs. Each Tier C driver instance that wants a
/// scheduled recycle registers its scheduler via
/// <see cref="AddScheduler(ScheduledRecycleScheduler)"/> at startup. The hosted service
/// wakes every <see cref="TickInterval"/> (default 1 min) and calls
/// <see cref="ScheduledRecycleScheduler.TickAsync"/> on each registered scheduler.</para>
///
/// <para>Scheduler registration is closed after <see cref="ExecuteAsync"/> starts — callers
/// must register before the host starts, typically during DI setup. Adding a scheduler
/// mid-flight throws to avoid confusing "some ticks saw my scheduler, some didn't" races.</para>
/// </remarks>
public sealed class ScheduledRecycleHostedService : BackgroundService
{
private readonly List<ScheduledRecycleScheduler> _schedulers = [];
private readonly ILogger<ScheduledRecycleHostedService> _logger;
private readonly TimeProvider _timeProvider;
private bool _started;
/// <summary>How often <see cref="ScheduledRecycleScheduler.TickAsync"/> fires on each registered scheduler.</summary>
public TimeSpan TickInterval { get; }
public ScheduledRecycleHostedService(
ILogger<ScheduledRecycleHostedService> logger,
TimeProvider? timeProvider = null,
TimeSpan? tickInterval = null)
{
_logger = logger;
_timeProvider = timeProvider ?? TimeProvider.System;
TickInterval = tickInterval ?? TimeSpan.FromMinutes(1);
}
/// <summary>Register a scheduler to drive. Must be called before the host starts.</summary>
public void AddScheduler(ScheduledRecycleScheduler scheduler)
{
ArgumentNullException.ThrowIfNull(scheduler);
if (_started)
throw new InvalidOperationException(
"Cannot register a ScheduledRecycleScheduler after the hosted service has started. " +
"Register all schedulers during DI configuration / startup.");
_schedulers.Add(scheduler);
}
/// <summary>Snapshot of the current tick count — diagnostics only.</summary>
public int TickCount { get; private set; }
/// <summary>Snapshot of the number of registered schedulers — diagnostics only.</summary>
public int SchedulerCount => _schedulers.Count;
public override Task StartAsync(CancellationToken cancellationToken)
{
_started = true;
return base.StartAsync(cancellationToken);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
"ScheduledRecycleHostedService starting — {Count} scheduler(s), tick interval = {Interval}",
_schedulers.Count, TickInterval);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(TickInterval, _timeProvider, stoppingToken).ConfigureAwait(false);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
await TickOnceAsync(stoppingToken).ConfigureAwait(false);
}
_logger.LogInformation("ScheduledRecycleHostedService stopping after {TickCount} tick(s).", TickCount);
}
/// <summary>
/// Execute one scheduler tick against every registered scheduler. Factored out of the
/// <see cref="ExecuteAsync"/> loop so tests can drive it directly without needing to
/// synchronize with <see cref="Task.Delay(TimeSpan, TimeProvider, CancellationToken)"/>.
/// </summary>
public async Task TickOnceAsync(CancellationToken cancellationToken)
{
var now = _timeProvider.GetUtcNow().UtcDateTime;
TickCount++;
foreach (var scheduler in _schedulers)
{
try
{
var fired = await scheduler.TickAsync(now, cancellationToken).ConfigureAwait(false);
if (fired)
_logger.LogInformation("Scheduled recycle fired at {Now:o}; next = {Next:o}",
now, scheduler.NextRecycleUtc);
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
// A single scheduler fault must not take down the rest — log + continue.
_logger.LogError(ex,
"ScheduledRecycleScheduler tick failed at {Now:o}; continuing to other schedulers.", now);
}
}
}
}

View File

@@ -0,0 +1,42 @@
namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy;
/// <summary>
/// Latest observed reachability of the peer node per the Phase 6.3 Stream B.1/B.2 two-layer
/// probe model. HTTP layer is the fast-fail; UA layer is authoritative.
/// </summary>
/// <remarks>
/// Fed into the <see cref="ServiceLevelCalculator"/> as <c>peerHttpHealthy</c> +
/// <c>peerUaHealthy</c>. The concrete probe loops (<c>PeerHttpProbeLoop</c> +
/// <c>PeerUaProbeLoop</c>) live in a Stream B runtime follow-up — this type is the
/// contract the publisher reads; probers write via
/// <see cref="PeerReachabilityTracker"/>.
/// </remarks>
public sealed record PeerReachability(bool HttpHealthy, bool UaHealthy)
{
public static readonly PeerReachability Unknown = new(false, false);
public static readonly PeerReachability FullyHealthy = new(true, true);
/// <summary>True when both probes report healthy — the <c>ServiceLevelCalculator</c>'s peerReachable gate.</summary>
public bool BothHealthy => HttpHealthy && UaHealthy;
}
/// <summary>
/// Thread-safe holder of the latest <see cref="PeerReachability"/> per peer NodeId. Probe
/// loops call <see cref="Update"/>; the <see cref="RedundancyStatePublisher"/> reads via
/// <see cref="Get"/>.
/// </summary>
public sealed class PeerReachabilityTracker
{
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, PeerReachability> _byPeer =
new(StringComparer.OrdinalIgnoreCase);
public void Update(string peerNodeId, PeerReachability reachability)
{
ArgumentException.ThrowIfNullOrWhiteSpace(peerNodeId);
_byPeer[peerNodeId] = reachability ?? throw new ArgumentNullException(nameof(reachability));
}
/// <summary>Current reachability for a peer. Returns <see cref="PeerReachability.Unknown"/> when not yet probed.</summary>
public PeerReachability Get(string peerNodeId) =>
_byPeer.TryGetValue(peerNodeId, out var r) ? r : PeerReachability.Unknown;
}

View File

@@ -0,0 +1,142 @@
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy;
/// <summary>
/// Orchestrates Phase 6.3 Stream C: feeds the <see cref="ServiceLevelCalculator"/> with the
/// current (topology, peer reachability, apply-in-progress, recovery dwell, self health)
/// inputs and emits the resulting <see cref="byte"/> + labelled <see cref="ServiceLevelBand"/>
/// to subscribers. The OPC UA <c>ServiceLevel</c> variable node consumes this via
/// <see cref="OnStateChanged"/> on every tick.
/// </summary>
/// <remarks>
/// Pure orchestration — no background timer, no OPC UA stack dep. The caller (a
/// HostedService in a future PR, or a test) drives <see cref="ComputeAndPublish"/> at
/// whatever cadence is appropriate. Each call reads the inputs + recomputes the ServiceLevel
/// byte; state is fired on the <see cref="OnStateChanged"/> event when the byte differs from
/// the last emitted value (edge-triggered). The <see cref="OnServerUriArrayChanged"/> event
/// fires whenever the topology's <c>ServerUriArray</c> content changes.
/// </remarks>
public sealed class RedundancyStatePublisher
{
private readonly RedundancyCoordinator _coordinator;
private readonly ApplyLeaseRegistry _leases;
private readonly RecoveryStateManager _recovery;
private readonly PeerReachabilityTracker _peers;
private readonly Func<bool> _selfHealthy;
private readonly Func<bool> _operatorMaintenance;
private byte _lastByte = 255; // start at Authoritative — harmless before first tick
private IReadOnlyList<string>? _lastServerUriArray;
public RedundancyStatePublisher(
RedundancyCoordinator coordinator,
ApplyLeaseRegistry leases,
RecoveryStateManager recovery,
PeerReachabilityTracker peers,
Func<bool>? selfHealthy = null,
Func<bool>? operatorMaintenance = null)
{
ArgumentNullException.ThrowIfNull(coordinator);
ArgumentNullException.ThrowIfNull(leases);
ArgumentNullException.ThrowIfNull(recovery);
ArgumentNullException.ThrowIfNull(peers);
_coordinator = coordinator;
_leases = leases;
_recovery = recovery;
_peers = peers;
_selfHealthy = selfHealthy ?? (() => true);
_operatorMaintenance = operatorMaintenance ?? (() => false);
}
/// <summary>
/// Fires with the current ServiceLevel byte + band on every call to
/// <see cref="ComputeAndPublish"/> when the byte differs from the previously-emitted one.
/// </summary>
public event Action<ServiceLevelSnapshot>? OnStateChanged;
/// <summary>
/// Fires when the cluster's ServerUriArray (self + peers) content changes — e.g. an
/// operator adds or removes a peer. Consumer is the OPC UA <c>ServerUriArray</c>
/// variable node in Stream C.2.
/// </summary>
public event Action<IReadOnlyList<string>>? OnServerUriArrayChanged;
/// <summary>Snapshot of the last-published ServiceLevel byte — diagnostics + tests.</summary>
public byte LastByte => _lastByte;
/// <summary>
/// Compute the current ServiceLevel + emit change events if anything moved. Caller
/// drives cadence — a 1 s tick in production is reasonable; tests drive it directly.
/// </summary>
public ServiceLevelSnapshot ComputeAndPublish()
{
var topology = _coordinator.Current;
if (topology is null)
{
// Not yet initialized — surface NoData so clients don't treat us as authoritative.
return Emit((byte)ServiceLevelBand.NoData, null);
}
// Aggregate peer reachability. For 2-node v2.0 clusters there is at most one peer;
// treat "all peers healthy" as the boolean input to the calculator.
var peerReachable = topology.Peers.All(p => _peers.Get(p.NodeId).BothHealthy);
var peerUaHealthy = topology.Peers.All(p => _peers.Get(p.NodeId).UaHealthy);
var peerHttpHealthy = topology.Peers.All(p => _peers.Get(p.NodeId).HttpHealthy);
var role = MapRole(topology.SelfRole);
var value = ServiceLevelCalculator.Compute(
role: role,
selfHealthy: _selfHealthy(),
peerUaHealthy: peerUaHealthy,
peerHttpHealthy: peerHttpHealthy,
applyInProgress: _leases.IsApplyInProgress,
recoveryDwellMet: _recovery.IsDwellMet(),
topologyValid: _coordinator.IsTopologyValid,
operatorMaintenance: _operatorMaintenance());
MaybeFireServerUriArray(topology);
return Emit(value, topology);
}
private static RedundancyRole MapRole(RedundancyRole role) => role switch
{
// Standalone is serving; treat as Primary for the matrix since the calculator
// already special-cases Standalone inside its Compute.
RedundancyRole.Primary => RedundancyRole.Primary,
RedundancyRole.Secondary => RedundancyRole.Secondary,
_ => RedundancyRole.Standalone,
};
private ServiceLevelSnapshot Emit(byte value, RedundancyTopology? topology)
{
var snap = new ServiceLevelSnapshot(
Value: value,
Band: ServiceLevelCalculator.Classify(value),
Topology: topology);
if (value != _lastByte)
{
_lastByte = value;
OnStateChanged?.Invoke(snap);
}
return snap;
}
private void MaybeFireServerUriArray(RedundancyTopology topology)
{
var current = topology.ServerUriArray();
if (_lastServerUriArray is null || !current.SequenceEqual(_lastServerUriArray, StringComparer.Ordinal))
{
_lastServerUriArray = current;
OnServerUriArrayChanged?.Invoke(current);
}
}
}
/// <summary>Per-tick output of <see cref="RedundancyStatePublisher.ComputeAndPublish"/>.</summary>
public sealed record ServiceLevelSnapshot(
byte Value,
ServiceLevelBand Band,
RedundancyTopology? Topology);

View File

@@ -0,0 +1,213 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using ZB.MOM.WW.OtOpcUa.Server.Redundancy;
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
[Trait("Category", "Unit")]
public sealed class RedundancyStatePublisherTests : IDisposable
{
private readonly OtOpcUaConfigDbContext _db;
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
public RedundancyStatePublisherTests()
{
var options = new DbContextOptionsBuilder<OtOpcUaConfigDbContext>()
.UseInMemoryDatabase($"redundancy-publisher-{Guid.NewGuid():N}")
.Options;
_db = new OtOpcUaConfigDbContext(options);
_dbFactory = new DbContextFactory(options);
}
public void Dispose() => _db.Dispose();
private sealed class DbContextFactory(DbContextOptions<OtOpcUaConfigDbContext> options)
: IDbContextFactory<OtOpcUaConfigDbContext>
{
public OtOpcUaConfigDbContext CreateDbContext() => new(options);
}
private async Task<RedundancyCoordinator> SeedAndInitialize(string selfNodeId, params (string id, RedundancyRole role, string appUri)[] nodes)
{
var cluster = new ServerCluster
{
ClusterId = "c1",
Name = "Warsaw-West",
Enterprise = "zb",
Site = "warsaw-west",
RedundancyMode = nodes.Length == 1 ? RedundancyMode.None : RedundancyMode.Warm,
CreatedBy = "test",
};
_db.ServerClusters.Add(cluster);
foreach (var (id, role, appUri) in nodes)
{
_db.ClusterNodes.Add(new ClusterNode
{
NodeId = id,
ClusterId = "c1",
RedundancyRole = role,
Host = id.ToLowerInvariant(),
ApplicationUri = appUri,
CreatedBy = "test",
});
}
await _db.SaveChangesAsync();
var coordinator = new RedundancyCoordinator(_dbFactory, NullLogger<RedundancyCoordinator>.Instance, selfNodeId, "c1");
await coordinator.InitializeAsync(CancellationToken.None);
return coordinator;
}
[Fact]
public async Task BeforeInit_Publishes_NoData()
{
// Coordinator not initialized — current topology is null.
var coordinator = new RedundancyCoordinator(_dbFactory, NullLogger<RedundancyCoordinator>.Instance, "A", "c1");
var publisher = new RedundancyStatePublisher(
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), new PeerReachabilityTracker());
var snap = publisher.ComputeAndPublish();
snap.Band.ShouldBe(ServiceLevelBand.NoData);
snap.Value.ShouldBe((byte)1);
await Task.Yield();
}
[Fact]
public async Task AuthoritativePrimary_WhenHealthyAndPeerReachable()
{
var coordinator = await SeedAndInitialize("A",
("A", RedundancyRole.Primary, "urn:A"),
("B", RedundancyRole.Secondary, "urn:B"));
var peers = new PeerReachabilityTracker();
peers.Update("B", PeerReachability.FullyHealthy);
var publisher = new RedundancyStatePublisher(
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers);
var snap = publisher.ComputeAndPublish();
snap.Value.ShouldBe((byte)255);
snap.Band.ShouldBe(ServiceLevelBand.AuthoritativePrimary);
}
[Fact]
public async Task IsolatedPrimary_WhenPeerUnreachable_RetainsAuthority()
{
var coordinator = await SeedAndInitialize("A",
("A", RedundancyRole.Primary, "urn:A"),
("B", RedundancyRole.Secondary, "urn:B"));
var peers = new PeerReachabilityTracker();
peers.Update("B", PeerReachability.Unknown);
var publisher = new RedundancyStatePublisher(
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers);
var snap = publisher.ComputeAndPublish();
snap.Value.ShouldBe((byte)230);
}
[Fact]
public async Task MidApply_WhenLeaseOpen_Dominates()
{
var coordinator = await SeedAndInitialize("A",
("A", RedundancyRole.Primary, "urn:A"),
("B", RedundancyRole.Secondary, "urn:B"));
var leases = new ApplyLeaseRegistry();
var peers = new PeerReachabilityTracker();
peers.Update("B", PeerReachability.FullyHealthy);
await using var lease = leases.BeginApplyLease(1, Guid.NewGuid());
var publisher = new RedundancyStatePublisher(
coordinator, leases, new RecoveryStateManager(), peers);
var snap = publisher.ComputeAndPublish();
snap.Value.ShouldBe((byte)200);
}
[Fact]
public async Task SelfUnhealthy_Returns_NoData()
{
var coordinator = await SeedAndInitialize("A",
("A", RedundancyRole.Primary, "urn:A"),
("B", RedundancyRole.Secondary, "urn:B"));
var peers = new PeerReachabilityTracker();
peers.Update("B", PeerReachability.FullyHealthy);
var publisher = new RedundancyStatePublisher(
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers,
selfHealthy: () => false);
var snap = publisher.ComputeAndPublish();
snap.Value.ShouldBe((byte)1);
}
[Fact]
public async Task OnStateChanged_FiresOnly_OnValueChange()
{
var coordinator = await SeedAndInitialize("A",
("A", RedundancyRole.Primary, "urn:A"),
("B", RedundancyRole.Secondary, "urn:B"));
var peers = new PeerReachabilityTracker();
peers.Update("B", PeerReachability.FullyHealthy);
var publisher = new RedundancyStatePublisher(
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers);
var emitCount = 0;
byte? lastEmitted = null;
publisher.OnStateChanged += snap => { emitCount++; lastEmitted = snap.Value; };
publisher.ComputeAndPublish(); // first tick — emits 255 since _lastByte was seeded at 255; no change
peers.Update("B", PeerReachability.Unknown);
publisher.ComputeAndPublish(); // 255 → 230 transition — emits
publisher.ComputeAndPublish(); // still 230 — no emit
emitCount.ShouldBe(1);
lastEmitted.ShouldBe((byte)230);
}
[Fact]
public async Task OnServerUriArrayChanged_FiresOnce_PerTopology()
{
var coordinator = await SeedAndInitialize("A",
("A", RedundancyRole.Primary, "urn:A"),
("B", RedundancyRole.Secondary, "urn:B"));
var peers = new PeerReachabilityTracker();
peers.Update("B", PeerReachability.FullyHealthy);
var publisher = new RedundancyStatePublisher(
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers);
var emits = new List<IReadOnlyList<string>>();
publisher.OnServerUriArrayChanged += arr => emits.Add(arr);
publisher.ComputeAndPublish();
publisher.ComputeAndPublish();
publisher.ComputeAndPublish();
emits.Count.ShouldBe(1, "ServerUriArray event is edge-triggered on topology content change");
emits[0].ShouldBe(["urn:A", "urn:B"]);
}
[Fact]
public async Task Standalone_Cluster_IsAuthoritative_When_Healthy()
{
var coordinator = await SeedAndInitialize("A",
("A", RedundancyRole.Standalone, "urn:A"));
var publisher = new RedundancyStatePublisher(
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), new PeerReachabilityTracker());
var snap = publisher.ComputeAndPublish();
snap.Value.ShouldBe((byte)255);
}
}

View File

@@ -0,0 +1,152 @@
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Stability;
using ZB.MOM.WW.OtOpcUa.Server.Hosting;
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
[Trait("Category", "Unit")]
public sealed class ScheduledRecycleHostedServiceTests
{
private static readonly DateTime T0 = new(2026, 4, 19, 0, 0, 0, DateTimeKind.Utc);
private sealed class FakeClock : TimeProvider
{
public DateTime Utc { get; set; } = T0;
public override DateTimeOffset GetUtcNow() => new(Utc, TimeSpan.Zero);
}
private sealed class FakeSupervisor : IDriverSupervisor
{
public string DriverInstanceId => "tier-c-fake";
public int RecycleCount { get; private set; }
public Task RecycleAsync(string reason, CancellationToken cancellationToken)
{
RecycleCount++;
return Task.CompletedTask;
}
}
private sealed class ThrowingSupervisor : IDriverSupervisor
{
public string DriverInstanceId => "tier-c-throws";
public Task RecycleAsync(string reason, CancellationToken cancellationToken)
=> throw new InvalidOperationException("supervisor unavailable");
}
[Fact]
public async Task TickOnce_BeforeInterval_DoesNotFire()
{
var clock = new FakeClock();
var supervisor = new FakeSupervisor();
var scheduler = new ScheduledRecycleScheduler(
DriverTier.C, TimeSpan.FromMinutes(5), T0, supervisor,
NullLogger<ScheduledRecycleScheduler>.Instance);
var host = new ScheduledRecycleHostedService(NullLogger<ScheduledRecycleHostedService>.Instance, clock);
host.AddScheduler(scheduler);
clock.Utc = T0.AddMinutes(1);
await host.TickOnceAsync(CancellationToken.None);
supervisor.RecycleCount.ShouldBe(0);
host.TickCount.ShouldBe(1);
}
[Fact]
public async Task TickOnce_AfterInterval_Fires()
{
var clock = new FakeClock();
var supervisor = new FakeSupervisor();
var scheduler = new ScheduledRecycleScheduler(
DriverTier.C, TimeSpan.FromMinutes(5), T0, supervisor,
NullLogger<ScheduledRecycleScheduler>.Instance);
var host = new ScheduledRecycleHostedService(NullLogger<ScheduledRecycleHostedService>.Instance, clock);
host.AddScheduler(scheduler);
clock.Utc = T0.AddMinutes(6);
await host.TickOnceAsync(CancellationToken.None);
supervisor.RecycleCount.ShouldBe(1);
}
[Fact]
public async Task TickOnce_MultipleTicks_AccumulateCount()
{
var clock = new FakeClock();
var host = new ScheduledRecycleHostedService(NullLogger<ScheduledRecycleHostedService>.Instance, clock);
await host.TickOnceAsync(CancellationToken.None);
await host.TickOnceAsync(CancellationToken.None);
await host.TickOnceAsync(CancellationToken.None);
host.TickCount.ShouldBe(3);
}
[Fact]
public async Task AddScheduler_AfterStart_Throws()
{
var host = new ScheduledRecycleHostedService(NullLogger<ScheduledRecycleHostedService>.Instance);
using var cts = new CancellationTokenSource();
cts.Cancel();
await host.StartAsync(cts.Token); // flips _started true even with cancelled token
await host.StopAsync(CancellationToken.None);
var scheduler = new ScheduledRecycleScheduler(
DriverTier.C, TimeSpan.FromMinutes(5), DateTime.UtcNow, new FakeSupervisor(),
NullLogger<ScheduledRecycleScheduler>.Instance);
Should.Throw<InvalidOperationException>(() => host.AddScheduler(scheduler));
}
[Fact]
public async Task OneSchedulerThrowing_DoesNotStopOthers()
{
var clock = new FakeClock();
var good = new FakeSupervisor();
var bad = new ThrowingSupervisor();
var goodSch = new ScheduledRecycleScheduler(
DriverTier.C, TimeSpan.FromMinutes(5), T0, good,
NullLogger<ScheduledRecycleScheduler>.Instance);
var badSch = new ScheduledRecycleScheduler(
DriverTier.C, TimeSpan.FromMinutes(5), T0, bad,
NullLogger<ScheduledRecycleScheduler>.Instance);
var host = new ScheduledRecycleHostedService(NullLogger<ScheduledRecycleHostedService>.Instance, clock);
host.AddScheduler(badSch);
host.AddScheduler(goodSch);
clock.Utc = T0.AddMinutes(6);
await host.TickOnceAsync(CancellationToken.None);
good.RecycleCount.ShouldBe(1, "a faulting scheduler must not poison its neighbours");
}
[Fact]
public void SchedulerCount_MatchesAdded()
{
var host = new ScheduledRecycleHostedService(NullLogger<ScheduledRecycleHostedService>.Instance);
var sup = new FakeSupervisor();
host.AddScheduler(new ScheduledRecycleScheduler(DriverTier.C, TimeSpan.FromMinutes(5), DateTime.UtcNow, sup, NullLogger<ScheduledRecycleScheduler>.Instance));
host.AddScheduler(new ScheduledRecycleScheduler(DriverTier.C, TimeSpan.FromMinutes(10), DateTime.UtcNow, sup, NullLogger<ScheduledRecycleScheduler>.Instance));
host.SchedulerCount.ShouldBe(2);
}
[Fact]
public async Task EmptyScheduler_List_TicksCleanly()
{
var clock = new FakeClock();
var host = new ScheduledRecycleHostedService(NullLogger<ScheduledRecycleHostedService>.Instance, clock);
// No registered schedulers — tick is a no-op + counter still advances.
await host.TickOnceAsync(CancellationToken.None);
host.TickCount.ShouldBe(1);
}
}

View File

@@ -14,6 +14,7 @@
<PackageReference Include="Shouldly" Version="4.3.0"/>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0"/>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0"/>
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="10.0.0"/>
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client" Version="1.5.374.126"/>
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
<PrivateAssets>all</PrivateAssets>