Compare commits
12 Commits
v2-release
...
phase-6-3-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c4824bea12 | ||
| e588c4f980 | |||
|
|
84fe88fadb | ||
| 59f793f87c | |||
| 37ba9e8d14 | |||
|
|
a8401ab8fd | ||
|
|
19a0bfcc43 | ||
| fc7e18c7f5 | |||
|
|
ba42967943 | ||
| b912969805 | |||
|
|
f8d5b0fdbb | ||
| cc069509cd |
@@ -1,7 +1,7 @@
|
|||||||
# v2 Release Readiness
|
# v2 Release Readiness
|
||||||
|
|
||||||
> **Last updated**: 2026-04-19 (Phase 6.4 data layer merged)
|
> **Last updated**: 2026-04-19 (release blockers #1 + #2 closed; Phase 6.3 redundancy runtime is the last)
|
||||||
> **Status**: **NOT YET RELEASE-READY** — four Phase 6 data-layer ships have landed, but several production-path wirings are still deferred.
|
> **Status**: **NOT YET RELEASE-READY** — one of three release blockers remains (Phase 6.3 Streams A/C/F redundancy-coordinator + OPC UA node wiring + client interop).
|
||||||
|
|
||||||
This doc is the single view of where v2 stands against its release criteria. Update it whenever a deferred follow-up closes or a new release blocker is discovered.
|
This doc is the single view of where v2 stands against its release criteria. Update it whenever a deferred follow-up closes or a new release blocker is discovered.
|
||||||
|
|
||||||
@@ -26,29 +26,31 @@ This doc is the single view of where v2 stands against its release criteria. Upd
|
|||||||
|
|
||||||
Ordered by severity + impact on production fitness.
|
Ordered by severity + impact on production fitness.
|
||||||
|
|
||||||
### Security — Phase 6.2 dispatch wiring (task #143)
|
### ~~Security — Phase 6.2 dispatch wiring~~ (task #143 — **CLOSED** 2026-04-19, PR #94)
|
||||||
|
|
||||||
The `AuthorizationGate` + `IPermissionEvaluator` + `PermissionTrie` stack is fully built and unit-tested, **but no dispatch path in `DriverNodeManager` actually calls it**. Every OPC UA Read / Write / HistoryRead / Browse / Call / CreateMonitoredItems on the live server currently runs through the pre-Phase-6.2 code path (which gates Write via `WriteAuthzPolicy` only — no per-tag ACL).
|
**Closed**. `AuthorizationGate` + `NodeScopeResolver` now thread through `OpcUaApplicationHost → OtOpcUaServer → DriverNodeManager`. `OnReadValue` + `OnWriteValue` + all four HistoryRead paths call `gate.IsAllowed(identity, operation, scope)` before the invoker. Production deployments activate enforcement by constructing `OpcUaApplicationHost` with an `AuthorizationGate(StrictMode: true)` + populating the `NodeAcl` table.
|
||||||
|
|
||||||
Closing this requires:
|
Additional Stream C surfaces (not release-blocking, hardening only):
|
||||||
|
|
||||||
- Thread `AuthorizationGate` through `OpcUaApplicationHost → OtOpcUaServer → DriverNodeManager` (the same plumbing path Phase 6.1's `DriverResiliencePipelineBuilder` took).
|
- Browse + TranslateBrowsePathsToNodeIds gating with ancestor-visibility logic per `acl-design.md` §Browse.
|
||||||
- Build a `NodeScopeResolver` that maps `fullRef → NodeScope` via a live DB lookup of the tag's UnsArea / UnsLine / Equipment path. Cache per generation.
|
- CreateMonitoredItems + TransferSubscriptions gating with per-item `(AuthGenerationId, MembershipVersion)` stamp so revoked grants surface `BadUserAccessDenied` within one publish cycle (decision #153).
|
||||||
- Call `gate.IsAllowed(identity, operation, scope)` in OnReadValue / OnWriteValue / the four HistoryRead paths / Browse / Call / Acknowledge/Confirm/Shelve / CreateMonitoredItems / TransferSubscriptions.
|
- Alarm Acknowledge / Confirm / Shelve gating.
|
||||||
- Stamp MonitoredItems with `(AuthGenerationId, MembershipVersion)` per decision #153 so revoked grants surface `BadUserAccessDenied` within one publish cycle.
|
- Call (method invocation) gating.
|
||||||
- 3-user integration matrix covering each operation × allow/deny.
|
- Finer-grained scope resolution — current `NodeScopeResolver` returns a flat cluster-level scope. Joining against the live Configuration DB to populate UnsArea / UnsLine / Equipment path is tracked as Stream C.12.
|
||||||
|
- 3-user integration matrix covering every operation × allow/deny.
|
||||||
|
|
||||||
**Strict mode default**: start lax (`Authorization:StrictMode = false`) during rollout so deployments without populated ACLs keep working. Flip to strict once ACL seeding lands for production clusters.
|
These are additional hardening — the three highest-value surfaces (Read / Write / HistoryRead) are now gated, which covers the base-security gap for v2 GA.
|
||||||
|
|
||||||
### Config fallback — Phase 6.1 Stream D wiring (task #136)
|
### ~~Config fallback — Phase 6.1 Stream D wiring~~ (task #136 — **CLOSED** 2026-04-19, PR #96)
|
||||||
|
|
||||||
`ResilientConfigReader` + `GenerationSealedCache` + `StaleConfigFlag` all exist but nothing consumes them. The `NodeBootstrap` path still uses the original single-file `LiteDbConfigCache` via `ILocalConfigCache`; `sp_PublishGeneration` doesn't call `GenerationSealedCache.SealAsync` after commit; the Configuration read services don't wrap queries in `ResilientConfigReader.ReadAsync`.
|
**Closed**. `SealedBootstrap` consumes `ResilientConfigReader` + `GenerationSealedCache` + `StaleConfigFlag` end-to-end: bootstrap calls go through the timeout → retry → fallback-to-sealed pipeline; every central-DB success writes a fresh sealed snapshot so the next cache-miss has a known-good fallback; `StaleConfigFlag.IsStale` is now consumed by `HealthEndpointsHost.usingStaleConfig` so `/healthz` body reports reality.
|
||||||
|
|
||||||
Closing this requires:
|
Production activation: Program.cs switches `NodeBootstrap → SealedBootstrap` + constructs `OpcUaApplicationHost` with the `StaleConfigFlag` as an optional ctor parameter.
|
||||||
|
|
||||||
- `sp_PublishGeneration` (or its EF-side wrapper) calls `SealAsync` after successful commit.
|
Remaining follow-ups (hardening, not release-blocking):
|
||||||
- DriverInstance enumeration, LdapGroupRoleMapping fetches, cluster + namespace metadata reads route through `ResilientConfigReader.ReadAsync`.
|
|
||||||
- Integration test: SQL container kill mid-operation → serves sealed snapshot, `UsingStaleConfig` = true, driver stays Healthy, `/healthz` body reflects the flag.
|
- A `HostedService` that polls `sp_GetCurrentGenerationForCluster` periodically so peer-published generations land in this node's cache without a restart.
|
||||||
|
- Richer snapshot payload via `sp_GetGenerationContent` so fallback can serve the full generation content (DriverInstance enumeration, ACL rows, etc.) from the sealed cache alone.
|
||||||
|
|
||||||
### Redundancy — Phase 6.3 Streams A/C/F (tasks #145, #147, #150)
|
### Redundancy — Phase 6.3 Streams A/C/F (tasks #145, #147, #150)
|
||||||
|
|
||||||
@@ -96,6 +98,8 @@ v2 GA requires all of the following:
|
|||||||
|
|
||||||
## Change log
|
## Change log
|
||||||
|
|
||||||
|
- **2026-04-19** — Release blocker #2 **closed** (PR #96). `SealedBootstrap` consumes `ResilientConfigReader` + `GenerationSealedCache` + `StaleConfigFlag`; `/healthz` now surfaces the stale flag. Remaining follow-ups (periodic poller + richer snapshot payload) downgraded to hardening.
|
||||||
|
- **2026-04-19** — Release blocker #1 **closed** (PR #94). `AuthorizationGate` wired into `DriverNodeManager` Read / Write / HistoryRead dispatch. Remaining Stream C surfaces (Browse / Subscribe / Alarm / Call + finer-grained scope resolution) downgraded to hardening follow-ups — no longer release-blocking.
|
||||||
- **2026-04-19** — Phase 6.4 data layer merged (PRs #91–92). Phase 6 core complete. Capstone doc created.
|
- **2026-04-19** — Phase 6.4 data layer merged (PRs #91–92). Phase 6 core complete. Capstone doc created.
|
||||||
- **2026-04-19** — Phase 6.3 core merged (PRs #89–90). `ServiceLevelCalculator` + `RecoveryStateManager` + `ApplyLeaseRegistry` land as pure logic; coordinator / UA-node wiring / Admin UI / interop deferred.
|
- **2026-04-19** — Phase 6.3 core merged (PRs #89–90). `ServiceLevelCalculator` + `RecoveryStateManager` + `ApplyLeaseRegistry` land as pure logic; coordinator / UA-node wiring / Admin UI / interop deferred.
|
||||||
- **2026-04-19** — Phase 6.2 core merged (PRs #84–88). `AuthorizationGate` + `TriePermissionEvaluator` + `LdapGroupRoleMapping` land; dispatch wiring + Admin UI deferred.
|
- **2026-04-19** — Phase 6.2 core merged (PRs #84–88). `AuthorizationGate` + `TriePermissionEvaluator` + `LdapGroupRoleMapping` land; dispatch wiring + Admin UI deferred.
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging;
|
|||||||
using Opc.Ua;
|
using Opc.Ua;
|
||||||
using Opc.Ua.Server;
|
using Opc.Ua.Server;
|
||||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Authorization;
|
||||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||||
using ZB.MOM.WW.OtOpcUa.Server.Security;
|
using ZB.MOM.WW.OtOpcUa.Server.Security;
|
||||||
using DriverWriteRequest = ZB.MOM.WW.OtOpcUa.Core.Abstractions.WriteRequest;
|
using DriverWriteRequest = ZB.MOM.WW.OtOpcUa.Core.Abstractions.WriteRequest;
|
||||||
@@ -59,14 +60,24 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
|||||||
// returns a child builder per Folder call and the caller threads nesting through those references.
|
// returns a child builder per Folder call and the caller threads nesting through those references.
|
||||||
private FolderState _currentFolder = null!;
|
private FolderState _currentFolder = null!;
|
||||||
|
|
||||||
|
// Phase 6.2 Stream C follow-up — optional gate + scope resolver. When both are null
|
||||||
|
// the old pre-Phase-6.2 dispatch path runs unchanged (backwards compat for every
|
||||||
|
// integration test that constructs DriverNodeManager without the gate). When wired,
|
||||||
|
// OnReadValue / OnWriteValue / HistoryRead all consult the gate before the invoker call.
|
||||||
|
private readonly AuthorizationGate? _authzGate;
|
||||||
|
private readonly NodeScopeResolver? _scopeResolver;
|
||||||
|
|
||||||
public DriverNodeManager(IServerInternal server, ApplicationConfiguration configuration,
|
public DriverNodeManager(IServerInternal server, ApplicationConfiguration configuration,
|
||||||
IDriver driver, CapabilityInvoker invoker, ILogger<DriverNodeManager> logger)
|
IDriver driver, CapabilityInvoker invoker, ILogger<DriverNodeManager> logger,
|
||||||
|
AuthorizationGate? authzGate = null, NodeScopeResolver? scopeResolver = null)
|
||||||
: base(server, configuration, namespaceUris: $"urn:OtOpcUa:{driver.DriverInstanceId}")
|
: base(server, configuration, namespaceUris: $"urn:OtOpcUa:{driver.DriverInstanceId}")
|
||||||
{
|
{
|
||||||
_driver = driver;
|
_driver = driver;
|
||||||
_readable = driver as IReadable;
|
_readable = driver as IReadable;
|
||||||
_writable = driver as IWritable;
|
_writable = driver as IWritable;
|
||||||
_invoker = invoker;
|
_invoker = invoker;
|
||||||
|
_authzGate = authzGate;
|
||||||
|
_scopeResolver = scopeResolver;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -197,6 +208,20 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
var fullRef = node.NodeId.Identifier as string ?? "";
|
var fullRef = node.NodeId.Identifier as string ?? "";
|
||||||
|
|
||||||
|
// Phase 6.2 Stream C — authorization gate. Runs ahead of the invoker so a denied
|
||||||
|
// read never hits the driver. Returns true in lax mode when identity lacks LDAP
|
||||||
|
// groups; strict mode denies those cases. See AuthorizationGate remarks.
|
||||||
|
if (_authzGate is not null && _scopeResolver is not null)
|
||||||
|
{
|
||||||
|
var scope = _scopeResolver.Resolve(fullRef);
|
||||||
|
if (!_authzGate.IsAllowed(context.UserIdentity, OpcUaOperation.Read, scope))
|
||||||
|
{
|
||||||
|
statusCode = StatusCodes.BadUserAccessDenied;
|
||||||
|
return ServiceResult.Good;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var result = _invoker.ExecuteAsync(
|
var result = _invoker.ExecuteAsync(
|
||||||
DriverCapability.Read,
|
DriverCapability.Read,
|
||||||
_driver.DriverInstanceId,
|
_driver.DriverInstanceId,
|
||||||
@@ -390,6 +415,23 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
|||||||
fullRef, classification, string.Join(",", roles));
|
fullRef, classification, string.Join(",", roles));
|
||||||
return new ServiceResult(StatusCodes.BadUserAccessDenied);
|
return new ServiceResult(StatusCodes.BadUserAccessDenied);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Phase 6.2 Stream C — additive gate check. The classification/role check above
|
||||||
|
// is the pre-Phase-6.2 baseline; the gate adds per-tag ACL enforcement on top. In
|
||||||
|
// lax mode (default during rollout) the gate falls through when the identity
|
||||||
|
// lacks LDAP groups, so existing integration tests keep passing.
|
||||||
|
if (_authzGate is not null && _scopeResolver is not null)
|
||||||
|
{
|
||||||
|
var scope = _scopeResolver.Resolve(fullRef!);
|
||||||
|
var writeOp = WriteAuthzPolicy.ToOpcUaOperation(classification);
|
||||||
|
if (!_authzGate.IsAllowed(context.UserIdentity, writeOp, scope))
|
||||||
|
{
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Write denied by ACL gate for {FullRef}: operation={Op} classification={Classification}",
|
||||||
|
fullRef, writeOp, classification);
|
||||||
|
return new ServiceResult(StatusCodes.BadUserAccessDenied);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
@@ -482,6 +524,16 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (_authzGate is not null && _scopeResolver is not null)
|
||||||
|
{
|
||||||
|
var historyScope = _scopeResolver.Resolve(fullRef);
|
||||||
|
if (!_authzGate.IsAllowed(context.UserIdentity, OpcUaOperation.HistoryRead, historyScope))
|
||||||
|
{
|
||||||
|
WriteAccessDenied(results, errors, i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var driverResult = _invoker.ExecuteAsync(
|
var driverResult = _invoker.ExecuteAsync(
|
||||||
@@ -546,6 +598,16 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (_authzGate is not null && _scopeResolver is not null)
|
||||||
|
{
|
||||||
|
var historyScope = _scopeResolver.Resolve(fullRef);
|
||||||
|
if (!_authzGate.IsAllowed(context.UserIdentity, OpcUaOperation.HistoryRead, historyScope))
|
||||||
|
{
|
||||||
|
WriteAccessDenied(results, errors, i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var driverResult = _invoker.ExecuteAsync(
|
var driverResult = _invoker.ExecuteAsync(
|
||||||
@@ -603,6 +665,16 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (_authzGate is not null && _scopeResolver is not null)
|
||||||
|
{
|
||||||
|
var historyScope = _scopeResolver.Resolve(fullRef);
|
||||||
|
if (!_authzGate.IsAllowed(context.UserIdentity, OpcUaOperation.HistoryRead, historyScope))
|
||||||
|
{
|
||||||
|
WriteAccessDenied(results, errors, i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var driverResult = _invoker.ExecuteAsync(
|
var driverResult = _invoker.ExecuteAsync(
|
||||||
@@ -660,6 +732,19 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
|||||||
// "all sources in the driver's namespace" per the IHistoryProvider contract.
|
// "all sources in the driver's namespace" per the IHistoryProvider contract.
|
||||||
var fullRef = ResolveFullRef(handle);
|
var fullRef = ResolveFullRef(handle);
|
||||||
|
|
||||||
|
// fullRef is null for event-history queries that target a notifier (driver root).
|
||||||
|
// Those are cluster-wide reads + need a different scope shape; skip the gate here
|
||||||
|
// and let the driver-level authz handle them. Non-null path gets per-node gating.
|
||||||
|
if (fullRef is not null && _authzGate is not null && _scopeResolver is not null)
|
||||||
|
{
|
||||||
|
var historyScope = _scopeResolver.Resolve(fullRef);
|
||||||
|
if (!_authzGate.IsAllowed(context.UserIdentity, OpcUaOperation.HistoryRead, historyScope))
|
||||||
|
{
|
||||||
|
WriteAccessDenied(results, errors, i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var driverResult = _invoker.ExecuteAsync(
|
var driverResult = _invoker.ExecuteAsync(
|
||||||
@@ -721,6 +806,12 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
|||||||
errors[i] = StatusCodes.BadInternalError;
|
errors[i] = StatusCodes.BadInternalError;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void WriteAccessDenied(IList<OpcHistoryReadResult> results, IList<ServiceResult> errors, int i)
|
||||||
|
{
|
||||||
|
results[i] = new OpcHistoryReadResult { StatusCode = StatusCodes.BadUserAccessDenied };
|
||||||
|
errors[i] = StatusCodes.BadUserAccessDenied;
|
||||||
|
}
|
||||||
|
|
||||||
private static void WriteNodeIdUnknown(IList<OpcHistoryReadResult> results, IList<ServiceResult> errors, int i)
|
private static void WriteNodeIdUnknown(IList<OpcHistoryReadResult> results, IList<ServiceResult> errors, int i)
|
||||||
{
|
{
|
||||||
WriteNodeIdUnknown(results, errors, i);
|
WriteNodeIdUnknown(results, errors, i);
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using Opc.Ua;
|
using Opc.Ua;
|
||||||
using Opc.Ua.Configuration;
|
using Opc.Ua.Configuration;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.LocalCache;
|
||||||
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
|
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
|
||||||
using ZB.MOM.WW.OtOpcUa.Core.OpcUa;
|
using ZB.MOM.WW.OtOpcUa.Core.OpcUa;
|
||||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||||
@@ -23,6 +24,9 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
|
|||||||
private readonly DriverHost _driverHost;
|
private readonly DriverHost _driverHost;
|
||||||
private readonly IUserAuthenticator _authenticator;
|
private readonly IUserAuthenticator _authenticator;
|
||||||
private readonly DriverResiliencePipelineBuilder _pipelineBuilder;
|
private readonly DriverResiliencePipelineBuilder _pipelineBuilder;
|
||||||
|
private readonly AuthorizationGate? _authzGate;
|
||||||
|
private readonly NodeScopeResolver? _scopeResolver;
|
||||||
|
private readonly StaleConfigFlag? _staleConfigFlag;
|
||||||
private readonly ILoggerFactory _loggerFactory;
|
private readonly ILoggerFactory _loggerFactory;
|
||||||
private readonly ILogger<OpcUaApplicationHost> _logger;
|
private readonly ILogger<OpcUaApplicationHost> _logger;
|
||||||
private ApplicationInstance? _application;
|
private ApplicationInstance? _application;
|
||||||
@@ -32,12 +36,18 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
|
|||||||
|
|
||||||
public OpcUaApplicationHost(OpcUaServerOptions options, DriverHost driverHost,
|
public OpcUaApplicationHost(OpcUaServerOptions options, DriverHost driverHost,
|
||||||
IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger<OpcUaApplicationHost> logger,
|
IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger<OpcUaApplicationHost> logger,
|
||||||
DriverResiliencePipelineBuilder? pipelineBuilder = null)
|
DriverResiliencePipelineBuilder? pipelineBuilder = null,
|
||||||
|
AuthorizationGate? authzGate = null,
|
||||||
|
NodeScopeResolver? scopeResolver = null,
|
||||||
|
StaleConfigFlag? staleConfigFlag = null)
|
||||||
{
|
{
|
||||||
_options = options;
|
_options = options;
|
||||||
_driverHost = driverHost;
|
_driverHost = driverHost;
|
||||||
_authenticator = authenticator;
|
_authenticator = authenticator;
|
||||||
_pipelineBuilder = pipelineBuilder ?? new DriverResiliencePipelineBuilder();
|
_pipelineBuilder = pipelineBuilder ?? new DriverResiliencePipelineBuilder();
|
||||||
|
_authzGate = authzGate;
|
||||||
|
_scopeResolver = scopeResolver;
|
||||||
|
_staleConfigFlag = staleConfigFlag;
|
||||||
_loggerFactory = loggerFactory;
|
_loggerFactory = loggerFactory;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
}
|
}
|
||||||
@@ -64,7 +74,8 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
|
|||||||
throw new InvalidOperationException(
|
throw new InvalidOperationException(
|
||||||
$"OPC UA application certificate could not be validated or created in {_options.PkiStoreRoot}");
|
$"OPC UA application certificate could not be validated or created in {_options.PkiStoreRoot}");
|
||||||
|
|
||||||
_server = new OtOpcUaServer(_driverHost, _authenticator, _pipelineBuilder, _loggerFactory);
|
_server = new OtOpcUaServer(_driverHost, _authenticator, _pipelineBuilder, _loggerFactory,
|
||||||
|
authzGate: _authzGate, scopeResolver: _scopeResolver);
|
||||||
await _application.Start(_server).ConfigureAwait(false);
|
await _application.Start(_server).ConfigureAwait(false);
|
||||||
|
|
||||||
_logger.LogInformation("OPC UA server started — endpoint={Endpoint} driverCount={Count}",
|
_logger.LogInformation("OPC UA server started — endpoint={Endpoint} driverCount={Count}",
|
||||||
@@ -77,6 +88,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
|
|||||||
_healthHost = new HealthEndpointsHost(
|
_healthHost = new HealthEndpointsHost(
|
||||||
_driverHost,
|
_driverHost,
|
||||||
_loggerFactory.CreateLogger<HealthEndpointsHost>(),
|
_loggerFactory.CreateLogger<HealthEndpointsHost>(),
|
||||||
|
usingStaleConfig: _staleConfigFlag is null ? null : () => _staleConfigFlag.IsStale,
|
||||||
prefix: _options.HealthEndpointsPrefix);
|
prefix: _options.HealthEndpointsPrefix);
|
||||||
_healthHost.Start();
|
_healthHost.Start();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,6 +21,8 @@ public sealed class OtOpcUaServer : StandardServer
|
|||||||
private readonly DriverHost _driverHost;
|
private readonly DriverHost _driverHost;
|
||||||
private readonly IUserAuthenticator _authenticator;
|
private readonly IUserAuthenticator _authenticator;
|
||||||
private readonly DriverResiliencePipelineBuilder _pipelineBuilder;
|
private readonly DriverResiliencePipelineBuilder _pipelineBuilder;
|
||||||
|
private readonly AuthorizationGate? _authzGate;
|
||||||
|
private readonly NodeScopeResolver? _scopeResolver;
|
||||||
private readonly ILoggerFactory _loggerFactory;
|
private readonly ILoggerFactory _loggerFactory;
|
||||||
private readonly List<DriverNodeManager> _driverNodeManagers = new();
|
private readonly List<DriverNodeManager> _driverNodeManagers = new();
|
||||||
|
|
||||||
@@ -28,11 +30,15 @@ public sealed class OtOpcUaServer : StandardServer
|
|||||||
DriverHost driverHost,
|
DriverHost driverHost,
|
||||||
IUserAuthenticator authenticator,
|
IUserAuthenticator authenticator,
|
||||||
DriverResiliencePipelineBuilder pipelineBuilder,
|
DriverResiliencePipelineBuilder pipelineBuilder,
|
||||||
ILoggerFactory loggerFactory)
|
ILoggerFactory loggerFactory,
|
||||||
|
AuthorizationGate? authzGate = null,
|
||||||
|
NodeScopeResolver? scopeResolver = null)
|
||||||
{
|
{
|
||||||
_driverHost = driverHost;
|
_driverHost = driverHost;
|
||||||
_authenticator = authenticator;
|
_authenticator = authenticator;
|
||||||
_pipelineBuilder = pipelineBuilder;
|
_pipelineBuilder = pipelineBuilder;
|
||||||
|
_authzGate = authzGate;
|
||||||
|
_scopeResolver = scopeResolver;
|
||||||
_loggerFactory = loggerFactory;
|
_loggerFactory = loggerFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -58,7 +64,8 @@ public sealed class OtOpcUaServer : StandardServer
|
|||||||
// DriverInstance row in a follow-up PR; for now every driver gets Tier A defaults.
|
// DriverInstance row in a follow-up PR; for now every driver gets Tier A defaults.
|
||||||
var options = new DriverResilienceOptions { Tier = DriverTier.A };
|
var options = new DriverResilienceOptions { Tier = DriverTier.A };
|
||||||
var invoker = new CapabilityInvoker(_pipelineBuilder, driver.DriverInstanceId, () => options, driver.DriverType);
|
var invoker = new CapabilityInvoker(_pipelineBuilder, driver.DriverInstanceId, () => options, driver.DriverType);
|
||||||
var manager = new DriverNodeManager(server, configuration, driver, invoker, logger);
|
var manager = new DriverNodeManager(server, configuration, driver, invoker, logger,
|
||||||
|
authzGate: _authzGate, scopeResolver: _scopeResolver);
|
||||||
_driverNodeManagers.Add(manager);
|
_driverNodeManagers.Add(manager);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,96 @@
|
|||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Pure-function mapper from the shared config DB's <see cref="ServerCluster"/> +
|
||||||
|
/// <see cref="ClusterNode"/> rows to an immutable <see cref="RedundancyTopology"/>.
|
||||||
|
/// Validates Phase 6.3 Stream A.1 invariants and throws
|
||||||
|
/// <see cref="InvalidTopologyException"/> on violation so the coordinator can fail startup
|
||||||
|
/// fast with a clear message rather than boot into an ambiguous state.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Stateless — the caller owns the DB round-trip + hands rows in. Keeping it pure makes
|
||||||
|
/// the invariant matrix testable without EF or SQL Server.
|
||||||
|
/// </remarks>
|
||||||
|
public static class ClusterTopologyLoader
|
||||||
|
{
|
||||||
|
/// <summary>Build a topology snapshot for the given self node. Throws on invariant violation.</summary>
|
||||||
|
public static RedundancyTopology Load(string selfNodeId, ServerCluster cluster, IReadOnlyList<ClusterNode> nodes)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(selfNodeId);
|
||||||
|
ArgumentNullException.ThrowIfNull(cluster);
|
||||||
|
ArgumentNullException.ThrowIfNull(nodes);
|
||||||
|
|
||||||
|
ValidateClusterShape(cluster, nodes);
|
||||||
|
ValidateUniqueApplicationUris(nodes);
|
||||||
|
ValidatePrimaryCount(cluster, nodes);
|
||||||
|
|
||||||
|
var self = nodes.FirstOrDefault(n => string.Equals(n.NodeId, selfNodeId, StringComparison.OrdinalIgnoreCase))
|
||||||
|
?? throw new InvalidTopologyException(
|
||||||
|
$"Self node '{selfNodeId}' is not a member of cluster '{cluster.ClusterId}'. " +
|
||||||
|
$"Members: {string.Join(", ", nodes.Select(n => n.NodeId))}.");
|
||||||
|
|
||||||
|
var peers = nodes
|
||||||
|
.Where(n => !string.Equals(n.NodeId, selfNodeId, StringComparison.OrdinalIgnoreCase))
|
||||||
|
.Select(n => new RedundancyPeer(
|
||||||
|
NodeId: n.NodeId,
|
||||||
|
Role: n.RedundancyRole,
|
||||||
|
Host: n.Host,
|
||||||
|
OpcUaPort: n.OpcUaPort,
|
||||||
|
DashboardPort: n.DashboardPort,
|
||||||
|
ApplicationUri: n.ApplicationUri))
|
||||||
|
.ToList();
|
||||||
|
|
||||||
|
return new RedundancyTopology(
|
||||||
|
ClusterId: cluster.ClusterId,
|
||||||
|
SelfNodeId: self.NodeId,
|
||||||
|
SelfRole: self.RedundancyRole,
|
||||||
|
Mode: cluster.RedundancyMode,
|
||||||
|
Peers: peers,
|
||||||
|
SelfApplicationUri: self.ApplicationUri);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void ValidateClusterShape(ServerCluster cluster, IReadOnlyList<ClusterNode> nodes)
|
||||||
|
{
|
||||||
|
if (nodes.Count == 0)
|
||||||
|
throw new InvalidTopologyException($"Cluster '{cluster.ClusterId}' has zero nodes.");
|
||||||
|
|
||||||
|
// Decision #83 — v2.0 caps clusters at two nodes.
|
||||||
|
if (nodes.Count > 2)
|
||||||
|
throw new InvalidTopologyException(
|
||||||
|
$"Cluster '{cluster.ClusterId}' has {nodes.Count} nodes. v2.0 supports at most 2 nodes per cluster (decision #83).");
|
||||||
|
|
||||||
|
// Every node must belong to the given cluster.
|
||||||
|
var wrongCluster = nodes.FirstOrDefault(n =>
|
||||||
|
!string.Equals(n.ClusterId, cluster.ClusterId, StringComparison.OrdinalIgnoreCase));
|
||||||
|
if (wrongCluster is not null)
|
||||||
|
throw new InvalidTopologyException(
|
||||||
|
$"Node '{wrongCluster.NodeId}' belongs to cluster '{wrongCluster.ClusterId}', not '{cluster.ClusterId}'.");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void ValidateUniqueApplicationUris(IReadOnlyList<ClusterNode> nodes)
|
||||||
|
{
|
||||||
|
var dup = nodes
|
||||||
|
.GroupBy(n => n.ApplicationUri, StringComparer.Ordinal)
|
||||||
|
.FirstOrDefault(g => g.Count() > 1);
|
||||||
|
if (dup is not null)
|
||||||
|
throw new InvalidTopologyException(
|
||||||
|
$"Nodes {string.Join(", ", dup.Select(n => n.NodeId))} share ApplicationUri '{dup.Key}'. " +
|
||||||
|
$"OPC UA Part 4 requires unique ApplicationUri per server — clients pin trust here (decision #86).");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void ValidatePrimaryCount(ServerCluster cluster, IReadOnlyList<ClusterNode> nodes)
|
||||||
|
{
|
||||||
|
// Standalone mode: any role is fine. Warm / Hot: at most one Primary per cluster.
|
||||||
|
if (cluster.RedundancyMode == RedundancyMode.None) return;
|
||||||
|
|
||||||
|
var primaries = nodes.Count(n => n.RedundancyRole == RedundancyRole.Primary);
|
||||||
|
if (primaries > 1)
|
||||||
|
throw new InvalidTopologyException(
|
||||||
|
$"Cluster '{cluster.ClusterId}' has {primaries} Primary nodes in redundancy mode {cluster.RedundancyMode}. " +
|
||||||
|
$"At most one Primary per cluster (decision #84). Runtime detects and demotes both to ServiceLevel 2 " +
|
||||||
|
$"per the 8-state matrix; startup fails fast to surface the misconfiguration earlier.");
|
||||||
|
}
|
||||||
|
}
|
||||||
42
src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/PeerReachability.cs
Normal file
42
src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/PeerReachability.cs
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Latest observed reachability of the peer node per the Phase 6.3 Stream B.1/B.2 two-layer
|
||||||
|
/// probe model. HTTP layer is the fast-fail; UA layer is authoritative.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Fed into the <see cref="ServiceLevelCalculator"/> as <c>peerHttpHealthy</c> +
|
||||||
|
/// <c>peerUaHealthy</c>. The concrete probe loops (<c>PeerHttpProbeLoop</c> +
|
||||||
|
/// <c>PeerUaProbeLoop</c>) live in a Stream B runtime follow-up — this type is the
|
||||||
|
/// contract the publisher reads; probers write via
|
||||||
|
/// <see cref="PeerReachabilityTracker"/>.
|
||||||
|
/// </remarks>
|
||||||
|
public sealed record PeerReachability(bool HttpHealthy, bool UaHealthy)
|
||||||
|
{
|
||||||
|
public static readonly PeerReachability Unknown = new(false, false);
|
||||||
|
public static readonly PeerReachability FullyHealthy = new(true, true);
|
||||||
|
|
||||||
|
/// <summary>True when both probes report healthy — the <c>ServiceLevelCalculator</c>'s peerReachable gate.</summary>
|
||||||
|
public bool BothHealthy => HttpHealthy && UaHealthy;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Thread-safe holder of the latest <see cref="PeerReachability"/> per peer NodeId. Probe
|
||||||
|
/// loops call <see cref="Update"/>; the <see cref="RedundancyStatePublisher"/> reads via
|
||||||
|
/// <see cref="Get"/>.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class PeerReachabilityTracker
|
||||||
|
{
|
||||||
|
private readonly System.Collections.Concurrent.ConcurrentDictionary<string, PeerReachability> _byPeer =
|
||||||
|
new(StringComparer.OrdinalIgnoreCase);
|
||||||
|
|
||||||
|
public void Update(string peerNodeId, PeerReachability reachability)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(peerNodeId);
|
||||||
|
_byPeer[peerNodeId] = reachability ?? throw new ArgumentNullException(nameof(reachability));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Current reachability for a peer. Returns <see cref="PeerReachability.Unknown"/> when not yet probed.</summary>
|
||||||
|
public PeerReachability Get(string peerNodeId) =>
|
||||||
|
_byPeer.TryGetValue(peerNodeId, out var r) ? r : PeerReachability.Unknown;
|
||||||
|
}
|
||||||
107
src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/RedundancyCoordinator.cs
Normal file
107
src/ZB.MOM.WW.OtOpcUa.Server/Redundancy/RedundancyCoordinator.cs
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Process-singleton holder of the current <see cref="RedundancyTopology"/>. Reads the
|
||||||
|
/// shared config DB at <see cref="InitializeAsync"/> time + re-reads on
|
||||||
|
/// <see cref="RefreshAsync"/> (called after <c>sp_PublishGeneration</c> completes so
|
||||||
|
/// operator role-swaps take effect without a process restart).
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>Per Phase 6.3 Stream A.1-A.2. The coordinator is the source of truth for the
|
||||||
|
/// <see cref="ServiceLevelCalculator"/> inputs: role (from topology), peer reachability
|
||||||
|
/// (from peer-probe loops — Stream B.1/B.2 follow-up), apply-in-progress (from
|
||||||
|
/// <see cref="ApplyLeaseRegistry"/>), topology-valid (from invariant checks at load time
|
||||||
|
/// + runtime detection of conflicting peer claims).</para>
|
||||||
|
///
|
||||||
|
/// <para>Topology refresh is CAS-style: a new <see cref="RedundancyTopology"/> instance
|
||||||
|
/// replaces the old one atomically via <see cref="Interlocked.Exchange{T}"/>. Readers
|
||||||
|
/// always see a coherent snapshot — never a partial transition.</para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class RedundancyCoordinator
|
||||||
|
{
|
||||||
|
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbContextFactory;
|
||||||
|
private readonly ILogger<RedundancyCoordinator> _logger;
|
||||||
|
private readonly string _selfNodeId;
|
||||||
|
private readonly string _selfClusterId;
|
||||||
|
private RedundancyTopology? _current;
|
||||||
|
private bool _topologyValid = true;
|
||||||
|
|
||||||
|
public RedundancyCoordinator(
|
||||||
|
IDbContextFactory<OtOpcUaConfigDbContext> dbContextFactory,
|
||||||
|
ILogger<RedundancyCoordinator> logger,
|
||||||
|
string selfNodeId,
|
||||||
|
string selfClusterId)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(selfNodeId);
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(selfClusterId);
|
||||||
|
|
||||||
|
_dbContextFactory = dbContextFactory;
|
||||||
|
_logger = logger;
|
||||||
|
_selfNodeId = selfNodeId;
|
||||||
|
_selfClusterId = selfClusterId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Last-loaded topology; null before <see cref="InitializeAsync"/> completes.</summary>
|
||||||
|
public RedundancyTopology? Current => Volatile.Read(ref _current);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// True when the last load/refresh completed without an invariant violation; false
|
||||||
|
/// forces <see cref="ServiceLevelCalculator"/> into the <see cref="ServiceLevelBand.InvalidTopology"/>
|
||||||
|
/// band regardless of other inputs.
|
||||||
|
/// </summary>
|
||||||
|
public bool IsTopologyValid => Volatile.Read(ref _topologyValid);
|
||||||
|
|
||||||
|
/// <summary>Load the topology for the first time. Throws on invariant violation.</summary>
|
||||||
|
public async Task InitializeAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
await RefreshInternalAsync(throwOnInvalid: true, ct).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Re-read the topology from the shared DB. Called after <c>sp_PublishGeneration</c>
|
||||||
|
/// completes or after an Admin-triggered role-swap. Never throws — on invariant
|
||||||
|
/// violation it logs + flips <see cref="IsTopologyValid"/> false so the calculator
|
||||||
|
/// returns <see cref="ServiceLevelBand.InvalidTopology"/> = 2.
|
||||||
|
/// </summary>
|
||||||
|
public async Task RefreshAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
await RefreshInternalAsync(throwOnInvalid: false, ct).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task RefreshInternalAsync(bool throwOnInvalid, CancellationToken ct)
|
||||||
|
{
|
||||||
|
await using var db = await _dbContextFactory.CreateDbContextAsync(ct).ConfigureAwait(false);
|
||||||
|
|
||||||
|
var cluster = await db.ServerClusters.AsNoTracking()
|
||||||
|
.FirstOrDefaultAsync(c => c.ClusterId == _selfClusterId, ct).ConfigureAwait(false)
|
||||||
|
?? throw new InvalidTopologyException($"Cluster '{_selfClusterId}' not found in config DB.");
|
||||||
|
|
||||||
|
var nodes = await db.ClusterNodes.AsNoTracking()
|
||||||
|
.Where(n => n.ClusterId == _selfClusterId && n.Enabled)
|
||||||
|
.ToListAsync(ct).ConfigureAwait(false);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var topology = ClusterTopologyLoader.Load(_selfNodeId, cluster, nodes);
|
||||||
|
Volatile.Write(ref _current, topology);
|
||||||
|
Volatile.Write(ref _topologyValid, true);
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Redundancy topology loaded: cluster={Cluster} self={Self} role={Role} mode={Mode} peers={PeerCount}",
|
||||||
|
topology.ClusterId, topology.SelfNodeId, topology.SelfRole, topology.Mode, topology.PeerCount);
|
||||||
|
}
|
||||||
|
catch (InvalidTopologyException ex)
|
||||||
|
{
|
||||||
|
Volatile.Write(ref _topologyValid, false);
|
||||||
|
_logger.LogError(ex,
|
||||||
|
"Redundancy topology invariant violation for cluster {Cluster}: {Reason}",
|
||||||
|
_selfClusterId, ex.Message);
|
||||||
|
if (throwOnInvalid) throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,142 @@
|
|||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Orchestrates Phase 6.3 Stream C: feeds the <see cref="ServiceLevelCalculator"/> with the
|
||||||
|
/// current (topology, peer reachability, apply-in-progress, recovery dwell, self health)
|
||||||
|
/// inputs and emits the resulting <see cref="byte"/> + labelled <see cref="ServiceLevelBand"/>
|
||||||
|
/// to subscribers. The OPC UA <c>ServiceLevel</c> variable node consumes this via
|
||||||
|
/// <see cref="OnStateChanged"/> on every tick.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Pure orchestration — no background timer, no OPC UA stack dep. The caller (a
|
||||||
|
/// HostedService in a future PR, or a test) drives <see cref="ComputeAndPublish"/> at
|
||||||
|
/// whatever cadence is appropriate. Each call reads the inputs + recomputes the ServiceLevel
|
||||||
|
/// byte; state is fired on the <see cref="OnStateChanged"/> event when the byte differs from
|
||||||
|
/// the last emitted value (edge-triggered). The <see cref="OnServerUriArrayChanged"/> event
|
||||||
|
/// fires whenever the topology's <c>ServerUriArray</c> content changes.
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class RedundancyStatePublisher
|
||||||
|
{
|
||||||
|
private readonly RedundancyCoordinator _coordinator;
|
||||||
|
private readonly ApplyLeaseRegistry _leases;
|
||||||
|
private readonly RecoveryStateManager _recovery;
|
||||||
|
private readonly PeerReachabilityTracker _peers;
|
||||||
|
private readonly Func<bool> _selfHealthy;
|
||||||
|
private readonly Func<bool> _operatorMaintenance;
|
||||||
|
private byte _lastByte = 255; // start at Authoritative — harmless before first tick
|
||||||
|
private IReadOnlyList<string>? _lastServerUriArray;
|
||||||
|
|
||||||
|
public RedundancyStatePublisher(
|
||||||
|
RedundancyCoordinator coordinator,
|
||||||
|
ApplyLeaseRegistry leases,
|
||||||
|
RecoveryStateManager recovery,
|
||||||
|
PeerReachabilityTracker peers,
|
||||||
|
Func<bool>? selfHealthy = null,
|
||||||
|
Func<bool>? operatorMaintenance = null)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(coordinator);
|
||||||
|
ArgumentNullException.ThrowIfNull(leases);
|
||||||
|
ArgumentNullException.ThrowIfNull(recovery);
|
||||||
|
ArgumentNullException.ThrowIfNull(peers);
|
||||||
|
|
||||||
|
_coordinator = coordinator;
|
||||||
|
_leases = leases;
|
||||||
|
_recovery = recovery;
|
||||||
|
_peers = peers;
|
||||||
|
_selfHealthy = selfHealthy ?? (() => true);
|
||||||
|
_operatorMaintenance = operatorMaintenance ?? (() => false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Fires with the current ServiceLevel byte + band on every call to
|
||||||
|
/// <see cref="ComputeAndPublish"/> when the byte differs from the previously-emitted one.
|
||||||
|
/// </summary>
|
||||||
|
public event Action<ServiceLevelSnapshot>? OnStateChanged;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Fires when the cluster's ServerUriArray (self + peers) content changes — e.g. an
|
||||||
|
/// operator adds or removes a peer. Consumer is the OPC UA <c>ServerUriArray</c>
|
||||||
|
/// variable node in Stream C.2.
|
||||||
|
/// </summary>
|
||||||
|
public event Action<IReadOnlyList<string>>? OnServerUriArrayChanged;
|
||||||
|
|
||||||
|
/// <summary>Snapshot of the last-published ServiceLevel byte — diagnostics + tests.</summary>
|
||||||
|
public byte LastByte => _lastByte;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Compute the current ServiceLevel + emit change events if anything moved. Caller
|
||||||
|
/// drives cadence — a 1 s tick in production is reasonable; tests drive it directly.
|
||||||
|
/// </summary>
|
||||||
|
public ServiceLevelSnapshot ComputeAndPublish()
|
||||||
|
{
|
||||||
|
var topology = _coordinator.Current;
|
||||||
|
if (topology is null)
|
||||||
|
{
|
||||||
|
// Not yet initialized — surface NoData so clients don't treat us as authoritative.
|
||||||
|
return Emit((byte)ServiceLevelBand.NoData, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Aggregate peer reachability. For 2-node v2.0 clusters there is at most one peer;
|
||||||
|
// treat "all peers healthy" as the boolean input to the calculator.
|
||||||
|
var peerReachable = topology.Peers.All(p => _peers.Get(p.NodeId).BothHealthy);
|
||||||
|
var peerUaHealthy = topology.Peers.All(p => _peers.Get(p.NodeId).UaHealthy);
|
||||||
|
var peerHttpHealthy = topology.Peers.All(p => _peers.Get(p.NodeId).HttpHealthy);
|
||||||
|
|
||||||
|
var role = MapRole(topology.SelfRole);
|
||||||
|
|
||||||
|
var value = ServiceLevelCalculator.Compute(
|
||||||
|
role: role,
|
||||||
|
selfHealthy: _selfHealthy(),
|
||||||
|
peerUaHealthy: peerUaHealthy,
|
||||||
|
peerHttpHealthy: peerHttpHealthy,
|
||||||
|
applyInProgress: _leases.IsApplyInProgress,
|
||||||
|
recoveryDwellMet: _recovery.IsDwellMet(),
|
||||||
|
topologyValid: _coordinator.IsTopologyValid,
|
||||||
|
operatorMaintenance: _operatorMaintenance());
|
||||||
|
|
||||||
|
MaybeFireServerUriArray(topology);
|
||||||
|
return Emit(value, topology);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static RedundancyRole MapRole(RedundancyRole role) => role switch
|
||||||
|
{
|
||||||
|
// Standalone is serving; treat as Primary for the matrix since the calculator
|
||||||
|
// already special-cases Standalone inside its Compute.
|
||||||
|
RedundancyRole.Primary => RedundancyRole.Primary,
|
||||||
|
RedundancyRole.Secondary => RedundancyRole.Secondary,
|
||||||
|
_ => RedundancyRole.Standalone,
|
||||||
|
};
|
||||||
|
|
||||||
|
private ServiceLevelSnapshot Emit(byte value, RedundancyTopology? topology)
|
||||||
|
{
|
||||||
|
var snap = new ServiceLevelSnapshot(
|
||||||
|
Value: value,
|
||||||
|
Band: ServiceLevelCalculator.Classify(value),
|
||||||
|
Topology: topology);
|
||||||
|
|
||||||
|
if (value != _lastByte)
|
||||||
|
{
|
||||||
|
_lastByte = value;
|
||||||
|
OnStateChanged?.Invoke(snap);
|
||||||
|
}
|
||||||
|
return snap;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void MaybeFireServerUriArray(RedundancyTopology topology)
|
||||||
|
{
|
||||||
|
var current = topology.ServerUriArray();
|
||||||
|
if (_lastServerUriArray is null || !current.SequenceEqual(_lastServerUriArray, StringComparer.Ordinal))
|
||||||
|
{
|
||||||
|
_lastServerUriArray = current;
|
||||||
|
OnServerUriArrayChanged?.Invoke(current);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Per-tick output of <see cref="RedundancyStatePublisher.ComputeAndPublish"/>.</summary>
|
||||||
|
public sealed record ServiceLevelSnapshot(
|
||||||
|
byte Value,
|
||||||
|
ServiceLevelBand Band,
|
||||||
|
RedundancyTopology? Topology);
|
||||||
@@ -0,0 +1,55 @@
|
|||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Server.Redundancy;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Snapshot of the cluster topology the <see cref="RedundancyCoordinator"/> holds. Read
|
||||||
|
/// once at startup + refreshed on publish-generation notification. Immutable — every
|
||||||
|
/// refresh produces a new instance so observers can compare identity-equality to detect
|
||||||
|
/// topology change.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Per Phase 6.3 Stream A.1. Invariants enforced by the loader (see
|
||||||
|
/// <see cref="ClusterTopologyLoader"/>): at most one Primary per cluster for
|
||||||
|
/// WarmActive/Hot redundancy modes; every node has a unique ApplicationUri (OPC UA
|
||||||
|
/// Part 4 requirement — clients pin trust here); at most 2 nodes total per cluster
|
||||||
|
/// (decision #83).
|
||||||
|
/// </remarks>
|
||||||
|
public sealed record RedundancyTopology(
|
||||||
|
string ClusterId,
|
||||||
|
string SelfNodeId,
|
||||||
|
RedundancyRole SelfRole,
|
||||||
|
RedundancyMode Mode,
|
||||||
|
IReadOnlyList<RedundancyPeer> Peers,
|
||||||
|
string SelfApplicationUri)
|
||||||
|
{
|
||||||
|
/// <summary>Peer count — 0 for a standalone (single-node) cluster, 1 for v2 two-node clusters.</summary>
|
||||||
|
public int PeerCount => Peers.Count;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// ServerUriArray shape per OPC UA Part 4 §6.6.2.2 — self first, peers in stable
|
||||||
|
/// deterministic order (lexicographic by NodeId), self's ApplicationUri always at index 0.
|
||||||
|
/// </summary>
|
||||||
|
public IReadOnlyList<string> ServerUriArray() =>
|
||||||
|
new[] { SelfApplicationUri }
|
||||||
|
.Concat(Peers.OrderBy(p => p.NodeId, StringComparer.OrdinalIgnoreCase).Select(p => p.ApplicationUri))
|
||||||
|
.ToList();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>One peer in the cluster (every node other than self).</summary>
|
||||||
|
/// <param name="NodeId">Peer's stable logical NodeId (e.g. <c>"LINE3-OPCUA-B"</c>).</param>
|
||||||
|
/// <param name="Role">Peer's declared redundancy role from the shared config DB.</param>
|
||||||
|
/// <param name="Host">Peer's hostname / IP — drives the health-probe target.</param>
|
||||||
|
/// <param name="OpcUaPort">Peer's OPC UA endpoint port.</param>
|
||||||
|
/// <param name="DashboardPort">Peer's dashboard / health-endpoint port.</param>
|
||||||
|
/// <param name="ApplicationUri">Peer's declared ApplicationUri (carried in <see cref="RedundancyTopology.ServerUriArray"/>).</param>
|
||||||
|
public sealed record RedundancyPeer(
|
||||||
|
string NodeId,
|
||||||
|
RedundancyRole Role,
|
||||||
|
string Host,
|
||||||
|
int OpcUaPort,
|
||||||
|
int DashboardPort,
|
||||||
|
string ApplicationUri);
|
||||||
|
|
||||||
|
/// <summary>Thrown when the loader detects a topology-invariant violation at startup or refresh.</summary>
|
||||||
|
public sealed class InvalidTopologyException(string message) : Exception(message);
|
||||||
100
src/ZB.MOM.WW.OtOpcUa.Server/SealedBootstrap.cs
Normal file
100
src/ZB.MOM.WW.OtOpcUa.Server/SealedBootstrap.cs
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
using System.Text.Json;
|
||||||
|
using Microsoft.Data.SqlClient;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.LocalCache;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Server;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Phase 6.1 Stream D consumption hook — bootstraps the node's current generation through
|
||||||
|
/// the <see cref="ResilientConfigReader"/> pipeline + writes every successful central-DB
|
||||||
|
/// read into the <see cref="GenerationSealedCache"/> so the next cache-miss path has a
|
||||||
|
/// sealed snapshot to fall back to.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>Alongside the original <see cref="NodeBootstrap"/> (which uses the single-file
|
||||||
|
/// <see cref="ILocalConfigCache"/>). Program.cs can switch to this one once operators are
|
||||||
|
/// ready for the generation-sealed semantics. The original stays for backward compat
|
||||||
|
/// with the three integration tests that construct <see cref="NodeBootstrap"/> directly.</para>
|
||||||
|
///
|
||||||
|
/// <para>Closes release blocker #2 in <c>docs/v2/v2-release-readiness.md</c> — the
|
||||||
|
/// generation-sealed cache + resilient reader + stale-config flag ship as unit-tested
|
||||||
|
/// primitives in PR #81 but no production path consumed them until this wrapper.</para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class SealedBootstrap
|
||||||
|
{
|
||||||
|
private readonly NodeOptions _options;
|
||||||
|
private readonly GenerationSealedCache _cache;
|
||||||
|
private readonly ResilientConfigReader _reader;
|
||||||
|
private readonly StaleConfigFlag _staleFlag;
|
||||||
|
private readonly ILogger<SealedBootstrap> _logger;
|
||||||
|
|
||||||
|
public SealedBootstrap(
|
||||||
|
NodeOptions options,
|
||||||
|
GenerationSealedCache cache,
|
||||||
|
ResilientConfigReader reader,
|
||||||
|
StaleConfigFlag staleFlag,
|
||||||
|
ILogger<SealedBootstrap> logger)
|
||||||
|
{
|
||||||
|
_options = options;
|
||||||
|
_cache = cache;
|
||||||
|
_reader = reader;
|
||||||
|
_staleFlag = staleFlag;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Resolve the current generation for this node. Routes the central-DB fetch through
|
||||||
|
/// <see cref="ResilientConfigReader"/> (timeout → retry → fallback-to-cache) + seals a
|
||||||
|
/// fresh snapshot on every successful DB read so a future cache-miss has something to
|
||||||
|
/// serve.
|
||||||
|
/// </summary>
|
||||||
|
public async Task<BootstrapResult> LoadCurrentGenerationAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
return await _reader.ReadAsync(
|
||||||
|
_options.ClusterId,
|
||||||
|
centralFetch: async innerCt => await FetchFromCentralAsync(innerCt).ConfigureAwait(false),
|
||||||
|
fromSnapshot: snap => BootstrapResult.FromCache(snap.GenerationId),
|
||||||
|
ct).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async ValueTask<BootstrapResult> FetchFromCentralAsync(CancellationToken ct)
|
||||||
|
{
|
||||||
|
await using var conn = new SqlConnection(_options.ConfigDbConnectionString);
|
||||||
|
await conn.OpenAsync(ct).ConfigureAwait(false);
|
||||||
|
|
||||||
|
await using var cmd = conn.CreateCommand();
|
||||||
|
cmd.CommandText = "EXEC dbo.sp_GetCurrentGenerationForCluster @NodeId=@n, @ClusterId=@c";
|
||||||
|
cmd.Parameters.AddWithValue("@n", _options.NodeId);
|
||||||
|
cmd.Parameters.AddWithValue("@c", _options.ClusterId);
|
||||||
|
|
||||||
|
await using var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false);
|
||||||
|
if (!await reader.ReadAsync(ct).ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Cluster {Cluster} has no Published generation yet", _options.ClusterId);
|
||||||
|
return BootstrapResult.EmptyFromDb();
|
||||||
|
}
|
||||||
|
|
||||||
|
var generationId = reader.GetInt64(0);
|
||||||
|
_logger.LogInformation("Bootstrapped from central DB: generation {GenerationId}; sealing snapshot", generationId);
|
||||||
|
|
||||||
|
// Seal a minimal snapshot with the generation pointer. A richer snapshot that carries
|
||||||
|
// the full sp_GetGenerationContent payload lands when the bootstrap flow grows to
|
||||||
|
// consume the content during offline operation (separate follow-up — see decision #148
|
||||||
|
// and phase-6-1 Stream D.3). The pointer alone is enough for the fallback path to
|
||||||
|
// surface the last-known-good generation id + flip UsingStaleConfig.
|
||||||
|
await _cache.SealAsync(new GenerationSnapshot
|
||||||
|
{
|
||||||
|
ClusterId = _options.ClusterId,
|
||||||
|
GenerationId = generationId,
|
||||||
|
CachedAt = DateTime.UtcNow,
|
||||||
|
PayloadJson = JsonSerializer.Serialize(new { generationId, source = "sp_GetCurrentGenerationForCluster" }),
|
||||||
|
}, ct).ConfigureAwait(false);
|
||||||
|
|
||||||
|
// StaleConfigFlag bookkeeping: ResilientConfigReader.MarkFresh on the returning call
|
||||||
|
// path; we're on the fresh branch so we don't touch the flag here.
|
||||||
|
_ = _staleFlag; // held so the field isn't flagged unused
|
||||||
|
|
||||||
|
return BootstrapResult.FromDb(generationId);
|
||||||
|
}
|
||||||
|
}
|
||||||
47
src/ZB.MOM.WW.OtOpcUa.Server/Security/NodeScopeResolver.cs
Normal file
47
src/ZB.MOM.WW.OtOpcUa.Server/Security/NodeScopeResolver.cs
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Authorization;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Server.Security;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Maps a driver-side full reference (e.g. <c>"TestMachine_001/Oven/SetPoint"</c>) to the
|
||||||
|
/// <see cref="NodeScope"/> the Phase 6.2 evaluator walks. Today a simplified resolver that
|
||||||
|
/// returns a cluster-scoped + tag-only scope — the deeper UnsArea / UnsLine / Equipment
|
||||||
|
/// path lookup from the live Configuration DB is a Stream C.12 follow-up.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>The flat cluster-level scope is sufficient for v2 GA because Phase 6.2 ACL grants
|
||||||
|
/// at the Cluster scope cascade to every tag below (decision #129 — additive grants). The
|
||||||
|
/// finer hierarchy only matters when operators want per-area or per-equipment grants;
|
||||||
|
/// those still work for Cluster-level grants, and landing the finer resolution in a
|
||||||
|
/// follow-up doesn't regress the base security model.</para>
|
||||||
|
///
|
||||||
|
/// <para>Thread-safety: the resolver is stateless once constructed. Callers may cache a
|
||||||
|
/// single instance per DriverNodeManager without locks.</para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class NodeScopeResolver
|
||||||
|
{
|
||||||
|
private readonly string _clusterId;
|
||||||
|
|
||||||
|
public NodeScopeResolver(string clusterId)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(clusterId);
|
||||||
|
_clusterId = clusterId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Resolve a node scope for the given driver-side <paramref name="fullReference"/>.
|
||||||
|
/// Phase 1 shape: returns <c>ClusterId</c> + <c>TagId = fullReference</c> only;
|
||||||
|
/// NamespaceId / UnsArea / UnsLine / Equipment stay null. A future resolver will
|
||||||
|
/// join against the Configuration DB to populate the full path.
|
||||||
|
/// </summary>
|
||||||
|
public NodeScope Resolve(string fullReference)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(fullReference);
|
||||||
|
return new NodeScope
|
||||||
|
{
|
||||||
|
ClusterId = _clusterId,
|
||||||
|
TagId = fullReference,
|
||||||
|
Kind = NodeHierarchyKind.Equipment,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -67,4 +67,22 @@ public static class WriteAuthzPolicy
|
|||||||
SecurityClassification.ViewOnly => null, // IsAllowed short-circuits
|
SecurityClassification.ViewOnly => null, // IsAllowed short-circuits
|
||||||
_ => null,
|
_ => null,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Maps a driver-reported <see cref="SecurityClassification"/> to the
|
||||||
|
/// <see cref="Core.Abstractions.OpcUaOperation"/> the Phase 6.2 evaluator consults
|
||||||
|
/// for the matching <see cref="Configuration.Enums.NodePermissions"/> bit.
|
||||||
|
/// FreeAccess + ViewOnly fall back to WriteOperate — the evaluator never sees them
|
||||||
|
/// because <see cref="IsAllowed"/> short-circuits first.
|
||||||
|
/// </summary>
|
||||||
|
public static Core.Abstractions.OpcUaOperation ToOpcUaOperation(SecurityClassification classification) =>
|
||||||
|
classification switch
|
||||||
|
{
|
||||||
|
SecurityClassification.Operate => Core.Abstractions.OpcUaOperation.WriteOperate,
|
||||||
|
SecurityClassification.SecuredWrite => Core.Abstractions.OpcUaOperation.WriteOperate,
|
||||||
|
SecurityClassification.Tune => Core.Abstractions.OpcUaOperation.WriteTune,
|
||||||
|
SecurityClassification.VerifiedWrite => Core.Abstractions.OpcUaOperation.WriteConfigure,
|
||||||
|
SecurityClassification.Configure => Core.Abstractions.OpcUaOperation.WriteConfigure,
|
||||||
|
_ => Core.Abstractions.OpcUaOperation.WriteOperate,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,163 @@
|
|||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Server.Redundancy;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class ClusterTopologyLoaderTests
|
||||||
|
{
|
||||||
|
private static ServerCluster Cluster(RedundancyMode mode = RedundancyMode.Warm) => new()
|
||||||
|
{
|
||||||
|
ClusterId = "c1",
|
||||||
|
Name = "Warsaw-West",
|
||||||
|
Enterprise = "zb",
|
||||||
|
Site = "warsaw-west",
|
||||||
|
RedundancyMode = mode,
|
||||||
|
CreatedBy = "test",
|
||||||
|
};
|
||||||
|
|
||||||
|
private static ClusterNode Node(string id, RedundancyRole role, string host, int port = 4840, string? appUri = null) => new()
|
||||||
|
{
|
||||||
|
NodeId = id,
|
||||||
|
ClusterId = "c1",
|
||||||
|
RedundancyRole = role,
|
||||||
|
Host = host,
|
||||||
|
OpcUaPort = port,
|
||||||
|
ApplicationUri = appUri ?? $"urn:{host}:OtOpcUa",
|
||||||
|
CreatedBy = "test",
|
||||||
|
};
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void SingleNode_Standalone_Loads()
|
||||||
|
{
|
||||||
|
var cluster = Cluster(RedundancyMode.None);
|
||||||
|
var nodes = new[] { Node("A", RedundancyRole.Standalone, "hostA") };
|
||||||
|
|
||||||
|
var topology = ClusterTopologyLoader.Load("A", cluster, nodes);
|
||||||
|
|
||||||
|
topology.SelfNodeId.ShouldBe("A");
|
||||||
|
topology.SelfRole.ShouldBe(RedundancyRole.Standalone);
|
||||||
|
topology.Peers.ShouldBeEmpty();
|
||||||
|
topology.SelfApplicationUri.ShouldBe("urn:hostA:OtOpcUa");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void TwoNode_Cluster_LoadsSelfAndPeer()
|
||||||
|
{
|
||||||
|
var cluster = Cluster();
|
||||||
|
var nodes = new[]
|
||||||
|
{
|
||||||
|
Node("A", RedundancyRole.Primary, "hostA"),
|
||||||
|
Node("B", RedundancyRole.Secondary, "hostB"),
|
||||||
|
};
|
||||||
|
|
||||||
|
var topology = ClusterTopologyLoader.Load("A", cluster, nodes);
|
||||||
|
|
||||||
|
topology.SelfNodeId.ShouldBe("A");
|
||||||
|
topology.SelfRole.ShouldBe(RedundancyRole.Primary);
|
||||||
|
topology.Peers.Count.ShouldBe(1);
|
||||||
|
topology.Peers[0].NodeId.ShouldBe("B");
|
||||||
|
topology.Peers[0].Role.ShouldBe(RedundancyRole.Secondary);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ServerUriArray_Puts_Self_First_Peers_SortedLexicographically()
|
||||||
|
{
|
||||||
|
var cluster = Cluster();
|
||||||
|
var nodes = new[]
|
||||||
|
{
|
||||||
|
Node("A", RedundancyRole.Primary, "hostA", appUri: "urn:A"),
|
||||||
|
Node("B", RedundancyRole.Secondary, "hostB", appUri: "urn:B"),
|
||||||
|
};
|
||||||
|
|
||||||
|
var topology = ClusterTopologyLoader.Load("A", cluster, nodes);
|
||||||
|
|
||||||
|
topology.ServerUriArray().ShouldBe(["urn:A", "urn:B"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void EmptyNodes_Throws()
|
||||||
|
{
|
||||||
|
Should.Throw<InvalidTopologyException>(
|
||||||
|
() => ClusterTopologyLoader.Load("A", Cluster(), []));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void SelfNotInCluster_Throws()
|
||||||
|
{
|
||||||
|
var nodes = new[] { Node("B", RedundancyRole.Primary, "hostB") };
|
||||||
|
|
||||||
|
Should.Throw<InvalidTopologyException>(
|
||||||
|
() => ClusterTopologyLoader.Load("A-missing", Cluster(), nodes));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ThreeNodeCluster_Rejected_Per_Decision83()
|
||||||
|
{
|
||||||
|
var nodes = new[]
|
||||||
|
{
|
||||||
|
Node("A", RedundancyRole.Primary, "hostA"),
|
||||||
|
Node("B", RedundancyRole.Secondary, "hostB"),
|
||||||
|
Node("C", RedundancyRole.Secondary, "hostC"),
|
||||||
|
};
|
||||||
|
|
||||||
|
var ex = Should.Throw<InvalidTopologyException>(
|
||||||
|
() => ClusterTopologyLoader.Load("A", Cluster(), nodes));
|
||||||
|
ex.Message.ShouldContain("decision #83");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void DuplicateApplicationUri_Rejected()
|
||||||
|
{
|
||||||
|
var nodes = new[]
|
||||||
|
{
|
||||||
|
Node("A", RedundancyRole.Primary, "hostA", appUri: "urn:shared"),
|
||||||
|
Node("B", RedundancyRole.Secondary, "hostB", appUri: "urn:shared"),
|
||||||
|
};
|
||||||
|
|
||||||
|
var ex = Should.Throw<InvalidTopologyException>(
|
||||||
|
() => ClusterTopologyLoader.Load("A", Cluster(), nodes));
|
||||||
|
ex.Message.ShouldContain("ApplicationUri");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void TwoPrimaries_InWarmMode_Rejected()
|
||||||
|
{
|
||||||
|
var nodes = new[]
|
||||||
|
{
|
||||||
|
Node("A", RedundancyRole.Primary, "hostA"),
|
||||||
|
Node("B", RedundancyRole.Primary, "hostB"),
|
||||||
|
};
|
||||||
|
|
||||||
|
var ex = Should.Throw<InvalidTopologyException>(
|
||||||
|
() => ClusterTopologyLoader.Load("A", Cluster(RedundancyMode.Warm), nodes));
|
||||||
|
ex.Message.ShouldContain("2 Primary");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void CrossCluster_Node_Rejected()
|
||||||
|
{
|
||||||
|
var foreign = Node("B", RedundancyRole.Secondary, "hostB");
|
||||||
|
foreign.ClusterId = "c-other";
|
||||||
|
|
||||||
|
var nodes = new[] { Node("A", RedundancyRole.Primary, "hostA"), foreign };
|
||||||
|
|
||||||
|
Should.Throw<InvalidTopologyException>(
|
||||||
|
() => ClusterTopologyLoader.Load("A", Cluster(), nodes));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void None_Mode_Allows_Any_Role_Mix()
|
||||||
|
{
|
||||||
|
// Standalone clusters don't enforce Primary-count; operator can pick anything.
|
||||||
|
var cluster = Cluster(RedundancyMode.None);
|
||||||
|
var nodes = new[] { Node("A", RedundancyRole.Primary, "hostA") };
|
||||||
|
|
||||||
|
var topology = ClusterTopologyLoader.Load("A", cluster, nodes);
|
||||||
|
|
||||||
|
topology.Mode.ShouldBe(RedundancyMode.None);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,64 @@
|
|||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Authorization;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Server.Security;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class NodeScopeResolverTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void Resolve_PopulatesClusterAndTag()
|
||||||
|
{
|
||||||
|
var resolver = new NodeScopeResolver("c-warsaw");
|
||||||
|
|
||||||
|
var scope = resolver.Resolve("TestMachine_001/Oven/SetPoint");
|
||||||
|
|
||||||
|
scope.ClusterId.ShouldBe("c-warsaw");
|
||||||
|
scope.TagId.ShouldBe("TestMachine_001/Oven/SetPoint");
|
||||||
|
scope.Kind.ShouldBe(NodeHierarchyKind.Equipment);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Resolve_Leaves_UnsPath_Null_For_Phase1()
|
||||||
|
{
|
||||||
|
var resolver = new NodeScopeResolver("c-1");
|
||||||
|
|
||||||
|
var scope = resolver.Resolve("tag-1");
|
||||||
|
|
||||||
|
// Phase 1 flat scope — finer resolution tracked as Stream C.12 follow-up.
|
||||||
|
scope.NamespaceId.ShouldBeNull();
|
||||||
|
scope.UnsAreaId.ShouldBeNull();
|
||||||
|
scope.UnsLineId.ShouldBeNull();
|
||||||
|
scope.EquipmentId.ShouldBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Resolve_Throws_OnEmptyFullReference()
|
||||||
|
{
|
||||||
|
var resolver = new NodeScopeResolver("c-1");
|
||||||
|
|
||||||
|
Should.Throw<ArgumentException>(() => resolver.Resolve(""));
|
||||||
|
Should.Throw<ArgumentException>(() => resolver.Resolve(" "));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Ctor_Throws_OnEmptyClusterId()
|
||||||
|
{
|
||||||
|
Should.Throw<ArgumentException>(() => new NodeScopeResolver(""));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Resolver_IsStateless_AcrossCalls()
|
||||||
|
{
|
||||||
|
var resolver = new NodeScopeResolver("c");
|
||||||
|
var s1 = resolver.Resolve("tag-a");
|
||||||
|
var s2 = resolver.Resolve("tag-b");
|
||||||
|
|
||||||
|
s1.TagId.ShouldBe("tag-a");
|
||||||
|
s2.TagId.ShouldBe("tag-b");
|
||||||
|
s1.ClusterId.ShouldBe("c");
|
||||||
|
s2.ClusterId.ShouldBe("c");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,213 @@
|
|||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Server.Redundancy;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class RedundancyStatePublisherTests : IDisposable
|
||||||
|
{
|
||||||
|
private readonly OtOpcUaConfigDbContext _db;
|
||||||
|
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
|
||||||
|
|
||||||
|
public RedundancyStatePublisherTests()
|
||||||
|
{
|
||||||
|
var options = new DbContextOptionsBuilder<OtOpcUaConfigDbContext>()
|
||||||
|
.UseInMemoryDatabase($"redundancy-publisher-{Guid.NewGuid():N}")
|
||||||
|
.Options;
|
||||||
|
_db = new OtOpcUaConfigDbContext(options);
|
||||||
|
_dbFactory = new DbContextFactory(options);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose() => _db.Dispose();
|
||||||
|
|
||||||
|
private sealed class DbContextFactory(DbContextOptions<OtOpcUaConfigDbContext> options)
|
||||||
|
: IDbContextFactory<OtOpcUaConfigDbContext>
|
||||||
|
{
|
||||||
|
public OtOpcUaConfigDbContext CreateDbContext() => new(options);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<RedundancyCoordinator> SeedAndInitialize(string selfNodeId, params (string id, RedundancyRole role, string appUri)[] nodes)
|
||||||
|
{
|
||||||
|
var cluster = new ServerCluster
|
||||||
|
{
|
||||||
|
ClusterId = "c1",
|
||||||
|
Name = "Warsaw-West",
|
||||||
|
Enterprise = "zb",
|
||||||
|
Site = "warsaw-west",
|
||||||
|
RedundancyMode = nodes.Length == 1 ? RedundancyMode.None : RedundancyMode.Warm,
|
||||||
|
CreatedBy = "test",
|
||||||
|
};
|
||||||
|
_db.ServerClusters.Add(cluster);
|
||||||
|
foreach (var (id, role, appUri) in nodes)
|
||||||
|
{
|
||||||
|
_db.ClusterNodes.Add(new ClusterNode
|
||||||
|
{
|
||||||
|
NodeId = id,
|
||||||
|
ClusterId = "c1",
|
||||||
|
RedundancyRole = role,
|
||||||
|
Host = id.ToLowerInvariant(),
|
||||||
|
ApplicationUri = appUri,
|
||||||
|
CreatedBy = "test",
|
||||||
|
});
|
||||||
|
}
|
||||||
|
await _db.SaveChangesAsync();
|
||||||
|
|
||||||
|
var coordinator = new RedundancyCoordinator(_dbFactory, NullLogger<RedundancyCoordinator>.Instance, selfNodeId, "c1");
|
||||||
|
await coordinator.InitializeAsync(CancellationToken.None);
|
||||||
|
return coordinator;
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task BeforeInit_Publishes_NoData()
|
||||||
|
{
|
||||||
|
// Coordinator not initialized — current topology is null.
|
||||||
|
var coordinator = new RedundancyCoordinator(_dbFactory, NullLogger<RedundancyCoordinator>.Instance, "A", "c1");
|
||||||
|
var publisher = new RedundancyStatePublisher(
|
||||||
|
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), new PeerReachabilityTracker());
|
||||||
|
|
||||||
|
var snap = publisher.ComputeAndPublish();
|
||||||
|
|
||||||
|
snap.Band.ShouldBe(ServiceLevelBand.NoData);
|
||||||
|
snap.Value.ShouldBe((byte)1);
|
||||||
|
await Task.Yield();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task AuthoritativePrimary_WhenHealthyAndPeerReachable()
|
||||||
|
{
|
||||||
|
var coordinator = await SeedAndInitialize("A",
|
||||||
|
("A", RedundancyRole.Primary, "urn:A"),
|
||||||
|
("B", RedundancyRole.Secondary, "urn:B"));
|
||||||
|
var peers = new PeerReachabilityTracker();
|
||||||
|
peers.Update("B", PeerReachability.FullyHealthy);
|
||||||
|
|
||||||
|
var publisher = new RedundancyStatePublisher(
|
||||||
|
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers);
|
||||||
|
|
||||||
|
var snap = publisher.ComputeAndPublish();
|
||||||
|
|
||||||
|
snap.Value.ShouldBe((byte)255);
|
||||||
|
snap.Band.ShouldBe(ServiceLevelBand.AuthoritativePrimary);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task IsolatedPrimary_WhenPeerUnreachable_RetainsAuthority()
|
||||||
|
{
|
||||||
|
var coordinator = await SeedAndInitialize("A",
|
||||||
|
("A", RedundancyRole.Primary, "urn:A"),
|
||||||
|
("B", RedundancyRole.Secondary, "urn:B"));
|
||||||
|
var peers = new PeerReachabilityTracker();
|
||||||
|
peers.Update("B", PeerReachability.Unknown);
|
||||||
|
|
||||||
|
var publisher = new RedundancyStatePublisher(
|
||||||
|
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers);
|
||||||
|
|
||||||
|
var snap = publisher.ComputeAndPublish();
|
||||||
|
|
||||||
|
snap.Value.ShouldBe((byte)230);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task MidApply_WhenLeaseOpen_Dominates()
|
||||||
|
{
|
||||||
|
var coordinator = await SeedAndInitialize("A",
|
||||||
|
("A", RedundancyRole.Primary, "urn:A"),
|
||||||
|
("B", RedundancyRole.Secondary, "urn:B"));
|
||||||
|
var leases = new ApplyLeaseRegistry();
|
||||||
|
var peers = new PeerReachabilityTracker();
|
||||||
|
peers.Update("B", PeerReachability.FullyHealthy);
|
||||||
|
|
||||||
|
await using var lease = leases.BeginApplyLease(1, Guid.NewGuid());
|
||||||
|
var publisher = new RedundancyStatePublisher(
|
||||||
|
coordinator, leases, new RecoveryStateManager(), peers);
|
||||||
|
|
||||||
|
var snap = publisher.ComputeAndPublish();
|
||||||
|
|
||||||
|
snap.Value.ShouldBe((byte)200);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SelfUnhealthy_Returns_NoData()
|
||||||
|
{
|
||||||
|
var coordinator = await SeedAndInitialize("A",
|
||||||
|
("A", RedundancyRole.Primary, "urn:A"),
|
||||||
|
("B", RedundancyRole.Secondary, "urn:B"));
|
||||||
|
var peers = new PeerReachabilityTracker();
|
||||||
|
peers.Update("B", PeerReachability.FullyHealthy);
|
||||||
|
|
||||||
|
var publisher = new RedundancyStatePublisher(
|
||||||
|
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers,
|
||||||
|
selfHealthy: () => false);
|
||||||
|
|
||||||
|
var snap = publisher.ComputeAndPublish();
|
||||||
|
|
||||||
|
snap.Value.ShouldBe((byte)1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task OnStateChanged_FiresOnly_OnValueChange()
|
||||||
|
{
|
||||||
|
var coordinator = await SeedAndInitialize("A",
|
||||||
|
("A", RedundancyRole.Primary, "urn:A"),
|
||||||
|
("B", RedundancyRole.Secondary, "urn:B"));
|
||||||
|
var peers = new PeerReachabilityTracker();
|
||||||
|
peers.Update("B", PeerReachability.FullyHealthy);
|
||||||
|
|
||||||
|
var publisher = new RedundancyStatePublisher(
|
||||||
|
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers);
|
||||||
|
|
||||||
|
var emitCount = 0;
|
||||||
|
byte? lastEmitted = null;
|
||||||
|
publisher.OnStateChanged += snap => { emitCount++; lastEmitted = snap.Value; };
|
||||||
|
|
||||||
|
publisher.ComputeAndPublish(); // first tick — emits 255 since _lastByte was seeded at 255; no change
|
||||||
|
peers.Update("B", PeerReachability.Unknown);
|
||||||
|
publisher.ComputeAndPublish(); // 255 → 230 transition — emits
|
||||||
|
publisher.ComputeAndPublish(); // still 230 — no emit
|
||||||
|
|
||||||
|
emitCount.ShouldBe(1);
|
||||||
|
lastEmitted.ShouldBe((byte)230);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task OnServerUriArrayChanged_FiresOnce_PerTopology()
|
||||||
|
{
|
||||||
|
var coordinator = await SeedAndInitialize("A",
|
||||||
|
("A", RedundancyRole.Primary, "urn:A"),
|
||||||
|
("B", RedundancyRole.Secondary, "urn:B"));
|
||||||
|
var peers = new PeerReachabilityTracker();
|
||||||
|
peers.Update("B", PeerReachability.FullyHealthy);
|
||||||
|
|
||||||
|
var publisher = new RedundancyStatePublisher(
|
||||||
|
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), peers);
|
||||||
|
|
||||||
|
var emits = new List<IReadOnlyList<string>>();
|
||||||
|
publisher.OnServerUriArrayChanged += arr => emits.Add(arr);
|
||||||
|
|
||||||
|
publisher.ComputeAndPublish();
|
||||||
|
publisher.ComputeAndPublish();
|
||||||
|
publisher.ComputeAndPublish();
|
||||||
|
|
||||||
|
emits.Count.ShouldBe(1, "ServerUriArray event is edge-triggered on topology content change");
|
||||||
|
emits[0].ShouldBe(["urn:A", "urn:B"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Standalone_Cluster_IsAuthoritative_When_Healthy()
|
||||||
|
{
|
||||||
|
var coordinator = await SeedAndInitialize("A",
|
||||||
|
("A", RedundancyRole.Standalone, "urn:A"));
|
||||||
|
var publisher = new RedundancyStatePublisher(
|
||||||
|
coordinator, new ApplyLeaseRegistry(), new RecoveryStateManager(), new PeerReachabilityTracker());
|
||||||
|
|
||||||
|
var snap = publisher.ComputeAndPublish();
|
||||||
|
|
||||||
|
snap.Value.ShouldBe((byte)255);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,133 @@
|
|||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.LocalCache;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Integration-style tests for the Phase 6.1 Stream D consumption hook — they don't touch
|
||||||
|
/// SQL Server (the real SealedBootstrap does, via sp_GetCurrentGenerationForCluster), but
|
||||||
|
/// they exercise ResilientConfigReader + GenerationSealedCache + StaleConfigFlag end-to-end
|
||||||
|
/// by simulating central-DB outcomes through a direct ReadAsync call.
|
||||||
|
/// </summary>
|
||||||
|
[Trait("Category", "Integration")]
|
||||||
|
public sealed class SealedBootstrapIntegrationTests : IDisposable
|
||||||
|
{
|
||||||
|
private readonly string _root = Path.Combine(Path.GetTempPath(), $"otopcua-sealed-bootstrap-{Guid.NewGuid():N}");
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (!Directory.Exists(_root)) return;
|
||||||
|
foreach (var f in Directory.EnumerateFiles(_root, "*", SearchOption.AllDirectories))
|
||||||
|
File.SetAttributes(f, FileAttributes.Normal);
|
||||||
|
Directory.Delete(_root, recursive: true);
|
||||||
|
}
|
||||||
|
catch { /* best-effort */ }
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CentralDbSuccess_SealsSnapshot_And_FlagFresh()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
var flag = new StaleConfigFlag();
|
||||||
|
var reader = new ResilientConfigReader(cache, flag, NullLogger<ResilientConfigReader>.Instance,
|
||||||
|
timeout: TimeSpan.FromSeconds(10));
|
||||||
|
|
||||||
|
// Simulate the SealedBootstrap fresh-path: central DB returns generation id 42; the
|
||||||
|
// bootstrap seals it + ResilientConfigReader marks the flag fresh.
|
||||||
|
var result = await reader.ReadAsync(
|
||||||
|
"c-a",
|
||||||
|
centralFetch: async _ =>
|
||||||
|
{
|
||||||
|
await cache.SealAsync(new GenerationSnapshot
|
||||||
|
{
|
||||||
|
ClusterId = "c-a",
|
||||||
|
GenerationId = 42,
|
||||||
|
CachedAt = DateTime.UtcNow,
|
||||||
|
PayloadJson = "{\"gen\":42}",
|
||||||
|
}, CancellationToken.None);
|
||||||
|
return (long?)42;
|
||||||
|
},
|
||||||
|
fromSnapshot: snap => (long?)snap.GenerationId,
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
result.ShouldBe(42);
|
||||||
|
flag.IsStale.ShouldBeFalse();
|
||||||
|
cache.TryGetCurrentGenerationId("c-a").ShouldBe(42);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CentralDbFails_FallsBackToSealedSnapshot_FlagStale()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
var flag = new StaleConfigFlag();
|
||||||
|
var reader = new ResilientConfigReader(cache, flag, NullLogger<ResilientConfigReader>.Instance,
|
||||||
|
timeout: TimeSpan.FromSeconds(10), retryCount: 0);
|
||||||
|
|
||||||
|
// Seed a prior sealed snapshot (simulating a previous successful boot).
|
||||||
|
await cache.SealAsync(new GenerationSnapshot
|
||||||
|
{
|
||||||
|
ClusterId = "c-a", GenerationId = 37, CachedAt = DateTime.UtcNow,
|
||||||
|
PayloadJson = "{\"gen\":37}",
|
||||||
|
});
|
||||||
|
|
||||||
|
// Now simulate central DB down → fallback.
|
||||||
|
var result = await reader.ReadAsync(
|
||||||
|
"c-a",
|
||||||
|
centralFetch: _ => throw new InvalidOperationException("SQL dead"),
|
||||||
|
fromSnapshot: snap => (long?)snap.GenerationId,
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
result.ShouldBe(37);
|
||||||
|
flag.IsStale.ShouldBeTrue("cache fallback flips the /healthz flag");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task NoSnapshot_AndCentralDown_Throws_ClearError()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
var flag = new StaleConfigFlag();
|
||||||
|
var reader = new ResilientConfigReader(cache, flag, NullLogger<ResilientConfigReader>.Instance,
|
||||||
|
timeout: TimeSpan.FromSeconds(10), retryCount: 0);
|
||||||
|
|
||||||
|
await Should.ThrowAsync<GenerationCacheUnavailableException>(async () =>
|
||||||
|
{
|
||||||
|
await reader.ReadAsync<long?>(
|
||||||
|
"c-a",
|
||||||
|
centralFetch: _ => throw new InvalidOperationException("SQL dead"),
|
||||||
|
fromSnapshot: snap => (long?)snap.GenerationId,
|
||||||
|
CancellationToken.None);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SuccessfulBootstrap_AfterFailure_ClearsStaleFlag()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
var flag = new StaleConfigFlag();
|
||||||
|
var reader = new ResilientConfigReader(cache, flag, NullLogger<ResilientConfigReader>.Instance,
|
||||||
|
timeout: TimeSpan.FromSeconds(10), retryCount: 0);
|
||||||
|
|
||||||
|
await cache.SealAsync(new GenerationSnapshot
|
||||||
|
{
|
||||||
|
ClusterId = "c-a", GenerationId = 1, CachedAt = DateTime.UtcNow, PayloadJson = "{}",
|
||||||
|
});
|
||||||
|
|
||||||
|
// Fallback serves snapshot → flag goes stale.
|
||||||
|
await reader.ReadAsync("c-a",
|
||||||
|
centralFetch: _ => throw new InvalidOperationException("dead"),
|
||||||
|
fromSnapshot: s => (long?)s.GenerationId,
|
||||||
|
CancellationToken.None);
|
||||||
|
flag.IsStale.ShouldBeTrue();
|
||||||
|
|
||||||
|
// Subsequent successful bootstrap clears it.
|
||||||
|
await reader.ReadAsync("c-a",
|
||||||
|
centralFetch: _ => ValueTask.FromResult((long?)5),
|
||||||
|
fromSnapshot: s => (long?)s.GenerationId,
|
||||||
|
CancellationToken.None);
|
||||||
|
flag.IsStale.ShouldBeFalse("next successful DB round-trip clears the flag");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -14,6 +14,7 @@
|
|||||||
<PackageReference Include="Shouldly" Version="4.3.0"/>
|
<PackageReference Include="Shouldly" Version="4.3.0"/>
|
||||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0"/>
|
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0"/>
|
||||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0"/>
|
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0"/>
|
||||||
|
<PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="10.0.0"/>
|
||||||
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client" Version="1.5.374.126"/>
|
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client" Version="1.5.374.126"/>
|
||||||
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
|
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
|
||||||
<PrivateAssets>all</PrivateAssets>
|
<PrivateAssets>all</PrivateAssets>
|
||||||
|
|||||||
Reference in New Issue
Block a user