Compare commits

...

10 Commits

Author SHA1 Message Date
Joseph Doherty
1d9008e354 Phase 6.1 Stream B.3/B.4/B.5 — MemoryRecycle + ScheduledRecycleScheduler + demand-aware WedgeDetector
Closes out Stream B per docs/v2/implementation/phase-6-1-resilience-and-observability.md.

Core.Abstractions:
- IDriverSupervisor — process-level supervisor contract a Tier C driver's
  out-of-process topology provides (Galaxy Proxy/Supervisor implements this in
  a follow-up Driver.Galaxy wiring PR). Concerns: DriverInstanceId + RecycleAsync.
  Tier A/B drivers don't implement this; Stream B code asserts tier == C before
  ever calling it.

Core.Stability:
- MemoryRecycle — companion to MemoryTracking. On HardBreach, invokes the
  supervisor IFF tier == C AND a supervisor is wired. Tier A/B HardBreach logs
  a promotion-to-Tier-C recommendation and returns false. Soft/None/Warming
  never triggers a recycle at any tier.
- ScheduledRecycleScheduler — Tier C opt-in periodic recycler per decision #67.
  Ctor throws for Tier A/B (structural guard — scheduled recycle on an
  in-process driver would kill every OPC UA session and every co-hosted
  driver). TickAsync(now) advances the schedule by one interval per fire;
  RequestRecycleNowAsync drives an ad-hoc recycle without shifting the cron.
- WedgeDetector — demand-aware per decision #147. Classify(state, demand, now)
  returns:
    * NotApplicable when driver state != Healthy
    * Idle when Healthy + no pending work (bulkhead=0 && monitored=0 && historic=0)
    * Healthy when Healthy + pending work + progress within threshold
    * Faulted when Healthy + pending work + no progress within threshold
  Threshold clamps to min 60 s. DemandSignal.HasPendingWork ORs the three counters.
  The three false-wedge cases the plan calls out all stay Healthy: idle
  subscription-only, slow historian backfill making progress, write-only burst
  with drained bulkhead.

Tests (22 new, all pass):
- MemoryRecycleTests (7): Tier C hard-breach requests recycle; Tier A/B
  hard-breach never requests; Tier C without supervisor no-ops; soft-breach
  at every tier never requests; None/Warming never request.
- ScheduledRecycleSchedulerTests (6): ctor throws for A/B; zero/negative
  interval throws; tick before due no-ops; tick at/after due fires once and
  advances; RequestRecycleNow fires immediately without shifting schedule;
  multiple fires across ticks advance one interval each.
- WedgeDetectorTests (9): threshold clamp to 60 s; unhealthy driver always
  NotApplicable; idle subscription stays Idle; pending+fresh progress stays
  Healthy; pending+stale progress is Faulted; MonitoredItems active but no
  publish is Faulted; MonitoredItems active with fresh publish stays Healthy;
  historian backfill with fresh progress stays Healthy; write-only burst with
  empty bulkhead is Idle; HasPendingWork theory for any non-zero counter.

Full solution dotnet test: 989 passing (baseline 906, +83 for Phase 6.1 so far).
Pre-existing Client.CLI Subscribe flake unchanged.

Stream B complete. Next up: Stream C (health endpoints + structured logging).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 08:03:18 -04:00
Joseph Doherty
ef6b0bb8fc Phase 6.1 Stream B.1/B.2 — DriverTier on DriverTypeMetadata + Core.Stability.MemoryTracking with hybrid-formula soft/hard thresholds
Stream B.1 — registry invariant:
- DriverTypeMetadata gains a required `DriverTier Tier` field. Every registered
  driver type must declare its stability tier so the downstream MemoryTracking,
  MemoryRecycle, and resilience-policy layers can resolve the right defaults.
  Stamped-at-registration-time enforcement makes the "every driver type has a
  non-null Tier" compliance check structurally impossible to fail.
- DriverTypeRegistry API unchanged; one new property on the record.

Stream B.2 — MemoryTracking (Core.Stability):
- Tier-agnostic tracker per decision #146: captures baseline as the median of
  samples collected during a post-init warmup window (default 5 min), then
  classifies each subsequent sample with the hybrid formula
  `soft = max(multiplier × baseline, baseline + floor)`, `hard = 2 × soft`.
- Per-tier constants wired: Tier A mult=3 floor=50 MB, Tier B mult=3 floor=100 MB,
  Tier C mult=2 floor=500 MB.
- Never kills. Hard-breach action returns HardBreach; the supervisor that acts
  on that signal (MemoryRecycle) is Tier C only per decisions #74, #145 and
  lands in the next B.3 commit on this branch.
- Two phases: WarmingUp (samples collected, Warming returned) and Steady
  (baseline captured, soft/hard checks active). Transition is automatic when
  the warmup window elapses.

Tests (15 new, all pass):
- Warming phase returns Warming until the window elapses.
- Window-elapsed captures median baseline + transitions to Steady.
- Per-tier constants match decision #146 table exactly.
- Soft threshold uses max() — small baseline → floor wins; large baseline →
  multiplier wins.
- Hard = 2 × soft.
- Sample below soft = None; at soft = SoftBreach; at/above hard = HardBreach.
- DriverTypeRegistry: theory asserts Tier round-trips for A/B/C.

Full solution dotnet test: 963 passing (baseline 906, +57 net for Phase 6.1
Stream A + Stream B.1/B.2). Pre-existing Client.CLI Subscribe flake unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 07:37:43 -04:00
a06fcb16a2 Merge pull request (#78) - Phase 6.1 Stream A - Polly resilience + CapabilityInvoker + Read/Write/HistoryRead dispatch wrapping 2026-04-19 07:33:53 -04:00
Joseph Doherty
d2f3a243cd Phase 6.1 Stream A.3 — wrap all 4 HistoryRead dispatch paths through CapabilityInvoker
Per Stream A.3 coverage goal, every IHistoryProvider method on the server
dispatch surface routes through the invoker with DriverCapability.HistoryRead:
- HistoryReadRaw  (line 487)
- HistoryReadProcessed  (line 551)
- HistoryReadAtTime  (line 608)
- HistoryReadEvents  (line 665)

Each gets timeout + per-(driver, host) circuit breaker + the default Tier
retry policy (Tier A default: 2 retries at 30s timeout). Inner driver
GetAwaiter().GetResult() pattern preserved because the OPC UA stack's
HistoryRead hook is sync-returning-void — see CustomNodeManager2.

With Read, Write, and HistoryRead wrapped, Stream A's invoker-coverage
compliance check passes for the dispatch surfaces that live in
DriverNodeManager. Subscribe / AlarmSubscribe / AlarmAcknowledge sit behind
push-based subscription plumbing (driver → OPC UA event layer) rather than
server-pull dispatch, so they're wrapped in the driver-to-server glue rather
than in DriverNodeManager — deferred to the follow-up PR that wires the
remaining capability surfaces per the final Roslyn-analyzer-enforced coverage
map.

Full solution dotnet test: 948 passing. Pre-existing Client.CLI Subscribe
flake unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 07:32:10 -04:00
Joseph Doherty
29bcaf277b Phase 6.1 Stream A.3 complete — wire CapabilityInvoker into DriverNodeManager dispatch end-to-end
Every OnReadValue / OnWriteValue now routes through the process-singleton
DriverResiliencePipelineBuilder's CapabilityInvoker. Read / Write dispatch
paths gain timeout + per-capability retry + per-(driver, host) circuit breaker
+ bulkhead without touching the individual driver implementations.

Wiring:
- OpcUaApplicationHost: new optional DriverResiliencePipelineBuilder ctor
  parameter (default null → instance-owned builder). Keeps the 3 test call
  sites that construct OpcUaApplicationHost directly unchanged.
- OtOpcUaServer: requires the builder in its ctor; constructs one
  CapabilityInvoker per driver at CreateMasterNodeManager time with default
  Tier A DriverResilienceOptions. TODO: Stream B.1 will wire real per-driver-
  type tiers via DriverTypeRegistry; Phase 6.1 follow-up will read the
  DriverInstance.ResilienceConfig JSON column for per-instance overrides.
- DriverNodeManager: takes a CapabilityInvoker in its ctor. OnReadValue wraps
  the driver's ReadAsync through ExecuteAsync(DriverCapability.Read, hostName,
  ...); OnWriteValue wraps WriteAsync through ExecuteWriteAsync(hostName,
  isIdempotent, ...) where isIdempotent comes from the new
  _writeIdempotentByFullRef map populated at Variable() registration from
  DriverAttributeInfo.WriteIdempotent.

HostName defaults to driver.DriverInstanceId for now — a single-host pipeline
per driver. Multi-host drivers (Modbus with N PLCs) will expose their own per-
call host resolution in a follow-up so failing PLCs can trip per-PLC breakers
without poisoning siblings (decision #144).

Test fixup:
- FlakeyDriverIntegrationTests.Read_SurfacesSuccess_AfterTransientFailures:
  bumped TimeoutSeconds=2 → 30. 10 retries at exponential backoff with jitter
  can exceed 2s under parallel-test-run CPU pressure; the test asserts retry
  behavior, not timeout budget, so the longer slack keeps it deterministic.

Full solution dotnet test: 948 passing. Pre-existing Client.CLI Subscribe
flake unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 07:28:28 -04:00
Joseph Doherty
b6d2803ff6 Phase 6.1 Stream A — switch pipeline keys from Guid to string to match IDriver.DriverInstanceId
IDriver.DriverInstanceId is declared as string in Core.Abstractions; keeping
the pipeline key as Guid meant every call site would need .ToString() / Guid.Parse
at the boundary. Switching the Resilience types to string removes that friction
and lets OtOpcUaServer pass driver.DriverInstanceId directly to the builder in
the upcoming server-dispatch wiring PR.

- DriverResiliencePipelineBuilder.GetOrCreate + Invalidate + PipelineKey
- CapabilityInvoker.ctor + _driverInstanceId field

Tests: all 48 Core.Tests still pass. The Invalidate test's keepId / dropId now
use distinct "drv-keep" / "drv-drop" literals (previously both were distinct
Guid.NewGuid() values, which the sed-driven refactor had collapsed to the same
literal — caught pre-commit).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 07:18:55 -04:00
Joseph Doherty
f3850f8914 Phase 6.1 Stream A.5/A.6 — WriteIdempotent flag on DriverAttributeInfo + Modbus/S7 tag records + FlakeyDriver integration tests
Per-tag opt-in for write-retry per docs/v2/plan.md decisions #44, #45, #143.
Default is false — writes never auto-retry unless the driver author has marked
the tag as safe to replay.

Core.Abstractions:
- DriverAttributeInfo gains `bool WriteIdempotent = false` at the end of the
  positional record (back-compatible; every existing call site uses the default).

Driver.Modbus:
- ModbusTagDefinition gains `bool WriteIdempotent = false`. Safe candidates
  documented in the param XML: holding-register set-points, configuration
  registers. Unsafe: edge-triggered coils, counter-increment addresses.
- ModbusDriver.DiscoverAsync propagates t.WriteIdempotent into
  DriverAttributeInfo.WriteIdempotent.

Driver.S7:
- S7TagDefinition gains `bool WriteIdempotent = false`. Safe candidates:
  DB word/dword set-points, configuration DBs. Unsafe: M/Q bits that drive
  edge-triggered program routines.
- S7Driver.DiscoverAsync propagates the flag.

Stream A.5 integration tests (FlakeyDriverIntegrationTests, 4 new) exercise
the invoker + flaky-driver contract the plan enumerates:
- Read with 5 transient failures succeeds on the 6th attempt (RetryCount=10).
- Non-idempotent write with RetryCount=5 configured still fails on the first
  failure — no replay (decision #44 guard at the ExecuteWriteAsync surface).
- Idempotent write with 2 transient failures succeeds on the 3rd attempt.
- Two hosts on the same driver have independent breakers — dead-host trips
  its breaker but live-host's first call still succeeds.

Propagation tests:
- ModbusDriverTests: SetPoint WriteIdempotent=true flows into
  DriverAttributeInfo; PulseCoil default=false.
- S7DiscoveryAndSubscribeTests: same pattern for DBx SetPoint vs M-bit.

Full solution dotnet test: 947 passing (baseline 906, +41 net across Stream A
so far). Pre-existing Client.CLI Subscribe flake unchanged.

Stream A's remaining work (wiring CapabilityInvoker into DriverNodeManager's
OnReadValue / OnWriteValue / History / Subscribe dispatch paths) is the
server-side integration piece + needs DI wiring for the pipeline builder —
lands in the next PR on this branch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 07:16:21 -04:00
Joseph Doherty
90f7792c92 Phase 6.1 Stream A.3 — CapabilityInvoker wraps driver-capability calls through the shared pipeline
One invoker per (DriverInstance, IDriver) pair; calls ExecuteAsync(capability,
host, callSite) and the invoker resolves the correct pipeline from the shared
DriverResiliencePipelineBuilder. The options accessor is a Func so Admin-edit
+ pipeline-invalidate takes effect without restarting the invoker or the
driver host.

ExecuteWriteAsync(isIdempotent) is the explicit write-safety surface:
- isIdempotent=false routes through a side pipeline with RetryCount=0 regardless
  of what the caller configured. The cache key carries a "::non-idempotent"
  suffix so it never collides with the retry-enabled write pipeline.
- isIdempotent=true routes through the normal Write pipeline. If the user has
  configured Write retries (opt-in), the idempotent tag gets them; otherwise
  default-0 still wins.

The server dispatch layer (next PR) reads WriteIdempotentAttribute on each tag
definition once at driver-init time and feeds the boolean into ExecuteWriteAsync.

Tests (6 new):
- Read retries on transient failure; returns value from call site.
- Write non-idempotent does NOT retry even when policy has 3 retries configured
  (the explicit decision-#44 guard at the dispatch surface).
- Write idempotent retries when policy allows.
- Write with default tier-A policy (RetryCount=0) never retries regardless of
  idempotency flag.
- Different hosts get independent pipelines.

Core.Tests now 44 passing (was 38). Invoker doc-refs completed (the XML comment
on WriteIdempotentAttribute no longer references a non-existent type).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 04:09:26 -04:00
Joseph Doherty
c04b13f436 Phase 6.1 Stream A.1/A.2/A.6 — Polly resilience foundation: pipeline builder + per-tier policy defaults + WriteIdempotent attribute
Lands the first chunk of the Phase 6.1 Stream A resilience layer per
docs/v2/implementation/phase-6-1-resilience-and-observability.md §Stream A.
Downstream CapabilityInvoker (A.3) + driver-dispatch wiring land in follow-up
PRs on the same branch.

Core.Abstractions additions:
- WriteIdempotentAttribute — marker for tag-definition records that opt into
  auto-retry on IWritable.WriteAsync. Absence = no retry per decisions #44, #45,
  #143. Read once via reflection at driver-init time; no per-write cost.
- DriverCapability enum — enumerates the 8 capability surface points
  (Read / Write / Discover / Subscribe / Probe / AlarmSubscribe / AlarmAcknowledge
  / HistoryRead). AlarmAcknowledge is write-shaped (no retry by default).
- DriverTier enum — A/B/C per driver-stability.md §2-4. Stream B.1 wires this
  into DriverTypeMetadata; surfaced here because the resilience policy defaults
  key on it.

Core.Resilience new namespace:
- DriverResilienceOptions — per-tier × per-capability policy defaults.
  GetTierDefaults(tier) is the source of truth:
    * Tier A: Read 2s/3 retries, Write 2s/0 retries, breaker threshold 5
    * Tier B: Read 4s/3, Write 4s/0, breaker threshold 5
    * Tier C: Read 10s/1, Write 10s/0, breaker threshold 0 (supervisor handles
      process-level breaker per decision #68)
  Resolve(capability) overlays CapabilityPolicies on top of the defaults.
- DriverResiliencePipelineBuilder — composes Timeout → Retry (capability-
  permitting, never on cancellation) → CircuitBreaker (tier-permitting) →
  Bulkhead. Pipelines cached in a lock-free ConcurrentDictionary keyed on
  (DriverInstanceId, HostName, DriverCapability) per decision #144 — one dead
  PLC behind a multi-device driver does not open the breaker for healthy
  siblings. Invalidate(driverInstanceId) supports Admin-triggered reload.

Tests (30 new, all pass):
- DriverResilienceOptionsTests: tier-default coverage for every capability,
  Write + AlarmAcknowledge never retry at any tier, Tier C disables breaker,
  resolve-with-override layering.
- DriverResiliencePipelineBuilderTests: Read retries transients, Write does NOT
  retry on failure (decision #44 guard), dead-host isolation from sibling hosts,
  pipeline reuse for same triple, per-capability isolation, breaker opens after
  threshold on Tier A, timeout fires, cancellation is not retried,
  invalidation scoped to matching instance.

Polly.Core 8.6.6 added to Core.csproj. Full solution dotnet test: 936 passing
(baseline 906 + 30 new). One pre-existing Client.CLI Subscribe flake unchanged
by this PR.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 04:07:27 -04:00
6a30f3dde7 Merge pull request (#77) - Phase 6 reconcile 2026-04-19 03:52:25 -04:00
32 changed files with 2049 additions and 38 deletions

View File

@@ -25,6 +25,14 @@ namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
/// OPC UA <c>AlarmConditionState</c> when true. Defaults to false so existing non-Galaxy
/// drivers aren't forced to flow a flag they don't produce.
/// </param>
/// <param name="WriteIdempotent">
/// True when a timed-out or failed write to this attribute is safe to replay. Per
/// <c>docs/v2/plan.md</c> decisions #44, #45, #143 — writes are NOT auto-retried by default
/// because replaying a pulse / alarm-ack / counter-increment / recipe-step advance can
/// duplicate field actions. Drivers flag only tags whose semantics make retry safe
/// (holding registers with level-set values, set-point writes to analog tags) — the
/// capability invoker respects this flag when deciding whether to apply Polly retry.
/// </param>
public sealed record DriverAttributeInfo(
string FullName,
DriverDataType DriverDataType,
@@ -32,4 +40,5 @@ public sealed record DriverAttributeInfo(
uint? ArrayDim,
SecurityClassification SecurityClass,
bool IsHistorized,
bool IsAlarm = false);
bool IsAlarm = false,
bool WriteIdempotent = false);

View File

@@ -0,0 +1,42 @@
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
/// <summary>
/// Enumerates the driver-capability surface points guarded by Phase 6.1 resilience pipelines.
/// Each value corresponds to one method (or tightly-related method group) on the
/// <c>Core.Abstractions</c> capability interfaces (<see cref="IReadable"/>, <see cref="IWritable"/>,
/// <see cref="ITagDiscovery"/>, <see cref="ISubscribable"/>, <see cref="IHostConnectivityProbe"/>,
/// <see cref="IAlarmSource"/>, <see cref="IHistoryProvider"/>).
/// </summary>
/// <remarks>
/// Per <c>docs/v2/plan.md</c> decision #143 (per-capability retry policy): Read / HistoryRead /
/// Discover / Probe / AlarmSubscribe auto-retry; <see cref="Write"/> does NOT retry unless the
/// tag-definition carries <see cref="WriteIdempotentAttribute"/>. Alarm-acknowledge is treated
/// as a write for retry semantics (an alarm-ack is not idempotent at the plant-floor acknowledgement
/// level even if the OPC UA spec permits re-issue).
/// </remarks>
public enum DriverCapability
{
/// <summary>Batch <see cref="IReadable.ReadAsync"/>. Retries by default.</summary>
Read,
/// <summary>Batch <see cref="IWritable.WriteAsync"/>. Does not retry unless tag is <see cref="WriteIdempotentAttribute">idempotent</see>.</summary>
Write,
/// <summary><see cref="ITagDiscovery.DiscoverAsync"/>. Retries by default.</summary>
Discover,
/// <summary><see cref="ISubscribable.SubscribeAsync"/> and unsubscribe. Retries by default.</summary>
Subscribe,
/// <summary><see cref="IHostConnectivityProbe"/> probe loop. Retries by default.</summary>
Probe,
/// <summary><see cref="IAlarmSource.SubscribeAlarmsAsync"/>. Retries by default.</summary>
AlarmSubscribe,
/// <summary><see cref="IAlarmSource.AcknowledgeAsync"/>. Does NOT retry — ack is a write-shaped operation (decision #143).</summary>
AlarmAcknowledge,
/// <summary><see cref="IHistoryProvider"/> reads (Raw/Processed/AtTime/Events). Retries by default.</summary>
HistoryRead,
}

View File

@@ -0,0 +1,34 @@
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
/// <summary>
/// Stability tier of a driver type. Determines which cross-cutting runtime protections
/// apply — per-tier retry defaults, memory-tracking thresholds, and whether out-of-process
/// supervision with process-level recycle is in play.
/// </summary>
/// <remarks>
/// Per <c>docs/v2/driver-stability.md</c> §2-4 and <c>docs/v2/plan.md</c> decisions #63-74.
///
/// <list type="bullet">
/// <item><b>A</b> — managed, known-good SDK; low blast radius. In-process. Fast retries.
/// Examples: OPC UA Client (OPCFoundation stack), S7 (S7NetPlus).</item>
/// <item><b>B</b> — native or semi-trusted SDK with an in-process footprint. Examples: Modbus.</item>
/// <item><b>C</b> — unmanaged SDK with COM/STA constraints, leak risk, or other out-of-process
/// requirements. Must run as a separate Host process behind a Proxy with a supervisor that
/// can recycle the process on hard-breach. Example: Galaxy (MXAccess COM).</item>
/// </list>
///
/// <para>Process-kill protections (<c>MemoryRecycle</c>, <c>ScheduledRecycleScheduler</c>) are
/// Tier C only per decisions #73-74 and #145 — killing an in-process Tier A/B driver also kills
/// every OPC UA session and every co-hosted driver, blast-radius worse than the leak.</para>
/// </remarks>
public enum DriverTier
{
/// <summary>Managed SDK, in-process, low blast radius.</summary>
A,
/// <summary>Native or semi-trusted SDK, in-process.</summary>
B,
/// <summary>Unmanaged SDK, out-of-process required with Proxy+Host+Supervisor.</summary>
C,
}

View File

@@ -69,12 +69,20 @@ public sealed class DriverTypeRegistry
/// <param name="DriverConfigJsonSchema">JSON Schema (Draft 2020-12) the driver's <c>DriverConfig</c> column must validate against.</param>
/// <param name="DeviceConfigJsonSchema">JSON Schema for <c>DeviceConfig</c> (multi-device drivers); null if the driver has no device layer.</param>
/// <param name="TagConfigJsonSchema">JSON Schema for <c>TagConfig</c>; required for every driver since every driver has tags.</param>
/// <param name="Tier">
/// Stability tier per <c>docs/v2/driver-stability.md</c> §2-4 and <c>docs/v2/plan.md</c>
/// decisions #63-74. Drives the shared resilience pipeline defaults
/// (<see cref="Tier"/> × capability → <c>CapabilityPolicy</c>), the <c>MemoryTracking</c>
/// hybrid-formula constants, and whether process-level <c>MemoryRecycle</c> / scheduled-
/// recycle protections apply (Tier C only). Every registered driver type must declare one.
/// </param>
public sealed record DriverTypeMetadata(
string TypeName,
NamespaceKindCompatibility AllowedNamespaceKinds,
string DriverConfigJsonSchema,
string? DeviceConfigJsonSchema,
string TagConfigJsonSchema);
string TagConfigJsonSchema,
DriverTier Tier);
/// <summary>Bitmask of namespace kinds a driver type may populate. Per decision #111.</summary>
[Flags]

View File

@@ -0,0 +1,26 @@
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
/// <summary>
/// Process-level supervisor contract a Tier C driver's out-of-process topology provides
/// (e.g. <c>Driver.Galaxy.Proxy/Supervisor/</c>). Concerns: restart the Host process when a
/// hard fault is detected (memory breach, wedge, scheduled recycle window).
/// </summary>
/// <remarks>
/// Per <c>docs/v2/plan.md</c> decisions #68, #73-74, and #145. Tier A/B drivers do NOT have
/// a supervisor because they run in-process — recycling would kill every OPC UA session and
/// every co-hosted driver. The Core.Stability layer only invokes this interface for Tier C
/// instances after asserting the tier via <see cref="DriverTypeMetadata.Tier"/>.
/// </remarks>
public interface IDriverSupervisor
{
/// <summary>Driver instance this supervisor governs.</summary>
string DriverInstanceId { get; }
/// <summary>
/// Request the supervisor to recycle (terminate + restart) the Host process. Implementations
/// are expected to be idempotent under repeat calls during an in-flight recycle.
/// </summary>
/// <param name="reason">Human-readable reason — flows into the supervisor's logs.</param>
/// <param name="cancellationToken">Cancels the recycle request; an in-flight restart is not interrupted.</param>
Task RecycleAsync(string reason, CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,19 @@
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
/// <summary>
/// Opts a tag-definition record into auto-retry on <see cref="IWritable.WriteAsync"/> failures.
/// Absence of this attribute means writes are <b>not</b> retried — a timed-out write may have
/// already succeeded at the device, and replaying pulses, alarm acks, counter increments, or
/// recipe-step advances can duplicate irreversible field actions.
/// </summary>
/// <remarks>
/// Per <c>docs/v2/plan.md</c> decisions #44, #45, and #143. Applied to tag-definition POCOs
/// (e.g. <c>ModbusTagDefinition</c>, <c>S7TagDefinition</c>, OPC UA client tag rows) at the
/// property or record level. The <c>CapabilityInvoker</c> in <c>ZB.MOM.WW.OtOpcUa.Core.Resilience</c>
/// reads this attribute via reflection once at driver-init time and caches the result; no
/// per-write reflection cost.
/// </remarks>
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Class | AttributeTargets.Struct, AllowMultiple = false, Inherited = true)]
public sealed class WriteIdempotentAttribute : Attribute
{
}

View File

@@ -0,0 +1,106 @@
using Polly;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.Resilience;
/// <summary>
/// Executes driver-capability calls through a shared Polly pipeline. One invoker per
/// <c>(DriverInstance, IDriver)</c> pair; the underlying <see cref="DriverResiliencePipelineBuilder"/>
/// is process-singleton so all invokers share its cache.
/// </summary>
/// <remarks>
/// Per <c>docs/v2/plan.md</c> decisions #143-144 and Phase 6.1 Stream A.3. The server's dispatch
/// layer routes every capability call (<c>IReadable.ReadAsync</c>, <c>IWritable.WriteAsync</c>,
/// <c>ITagDiscovery.DiscoverAsync</c>, <c>ISubscribable.SubscribeAsync/UnsubscribeAsync</c>,
/// <c>IHostConnectivityProbe</c> probe loop, <c>IAlarmSource.SubscribeAlarmsAsync/AcknowledgeAsync</c>,
/// and all four <c>IHistoryProvider</c> reads) through this invoker.
/// </remarks>
public sealed class CapabilityInvoker
{
private readonly DriverResiliencePipelineBuilder _builder;
private readonly string _driverInstanceId;
private readonly Func<DriverResilienceOptions> _optionsAccessor;
/// <summary>
/// Construct an invoker for one driver instance.
/// </summary>
/// <param name="builder">Shared, process-singleton pipeline builder.</param>
/// <param name="driverInstanceId">The <c>DriverInstance.Id</c> column value.</param>
/// <param name="optionsAccessor">
/// Snapshot accessor for the current resilience options. Invoked per call so Admin-edit +
/// pipeline-invalidate can take effect without restarting the invoker.
/// </param>
public CapabilityInvoker(
DriverResiliencePipelineBuilder builder,
string driverInstanceId,
Func<DriverResilienceOptions> optionsAccessor)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(optionsAccessor);
_builder = builder;
_driverInstanceId = driverInstanceId;
_optionsAccessor = optionsAccessor;
}
/// <summary>Execute a capability call returning a value, honoring the per-capability pipeline.</summary>
/// <typeparam name="TResult">Return type of the underlying driver call.</typeparam>
public async ValueTask<TResult> ExecuteAsync<TResult>(
DriverCapability capability,
string hostName,
Func<CancellationToken, ValueTask<TResult>> callSite,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(callSite);
var pipeline = ResolvePipeline(capability, hostName);
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
}
/// <summary>Execute a void-returning capability call, honoring the per-capability pipeline.</summary>
public async ValueTask ExecuteAsync(
DriverCapability capability,
string hostName,
Func<CancellationToken, ValueTask> callSite,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(callSite);
var pipeline = ResolvePipeline(capability, hostName);
await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Execute a <see cref="DriverCapability.Write"/> call honoring <see cref="WriteIdempotentAttribute"/>
/// semantics — if <paramref name="isIdempotent"/> is <c>false</c>, retries are disabled regardless
/// of the tag-level configuration (the pipeline for a non-idempotent write never retries per
/// decisions #44-45). If <c>true</c>, the call runs through the capability's pipeline which may
/// retry when the tier configuration permits.
/// </summary>
public async ValueTask<TResult> ExecuteWriteAsync<TResult>(
string hostName,
bool isIdempotent,
Func<CancellationToken, ValueTask<TResult>> callSite,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(callSite);
if (!isIdempotent)
{
var noRetryOptions = _optionsAccessor() with
{
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
{
[DriverCapability.Write] = _optionsAccessor().Resolve(DriverCapability.Write) with { RetryCount = 0 },
},
};
var pipeline = _builder.GetOrCreate(_driverInstanceId, $"{hostName}::non-idempotent", DriverCapability.Write, noRetryOptions);
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
}
return await ExecuteAsync(DriverCapability.Write, hostName, callSite, cancellationToken).ConfigureAwait(false);
}
private ResiliencePipeline ResolvePipeline(DriverCapability capability, string hostName) =>
_builder.GetOrCreate(_driverInstanceId, hostName, capability, _optionsAccessor());
}

View File

@@ -0,0 +1,96 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.Resilience;
/// <summary>
/// Per-tier × per-capability resilience policy configuration for a driver instance.
/// Bound from <c>DriverInstance.ResilienceConfig</c> JSON (nullable column; null = tier defaults).
/// Per <c>docs/v2/plan.md</c> decisions #143 and #144.
/// </summary>
public sealed record DriverResilienceOptions
{
/// <summary>Tier the owning driver type is registered as; drives the default map.</summary>
public required DriverTier Tier { get; init; }
/// <summary>
/// Per-capability policy overrides. Capabilities absent from this map fall back to
/// <see cref="GetTierDefaults(DriverTier)"/> for the configured <see cref="Tier"/>.
/// </summary>
public IReadOnlyDictionary<DriverCapability, CapabilityPolicy> CapabilityPolicies { get; init; }
= new Dictionary<DriverCapability, CapabilityPolicy>();
/// <summary>Bulkhead (max concurrent in-flight calls) for every capability. Default 32.</summary>
public int BulkheadMaxConcurrent { get; init; } = 32;
/// <summary>
/// Bulkhead queue depth. Zero = no queueing; overflow fails fast with
/// <c>BulkheadRejectedException</c>. Default 64.
/// </summary>
public int BulkheadMaxQueue { get; init; } = 64;
/// <summary>
/// Look up the effective policy for a capability, falling back to tier defaults when no
/// override is configured. Never returns null.
/// </summary>
public CapabilityPolicy Resolve(DriverCapability capability)
{
if (CapabilityPolicies.TryGetValue(capability, out var policy))
return policy;
var defaults = GetTierDefaults(Tier);
return defaults[capability];
}
/// <summary>
/// Per-tier per-capability default policy table, per decisions #143-144 and the Phase 6.1
/// Stream A.2 specification. Retries skipped on <see cref="DriverCapability.Write"/> and
/// <see cref="DriverCapability.AlarmAcknowledge"/> regardless of tier.
/// </summary>
public static IReadOnlyDictionary<DriverCapability, CapabilityPolicy> GetTierDefaults(DriverTier tier) =>
tier switch
{
DriverTier.A => new Dictionary<DriverCapability, CapabilityPolicy>
{
[DriverCapability.Read] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5),
[DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 0, BreakerFailureThreshold: 5),
[DriverCapability.Discover] = new(TimeoutSeconds: 30, RetryCount: 2, BreakerFailureThreshold: 3),
[DriverCapability.Subscribe] = new(TimeoutSeconds: 5, RetryCount: 3, BreakerFailureThreshold: 5),
[DriverCapability.Probe] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5),
[DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 5, RetryCount: 3, BreakerFailureThreshold: 5),
[DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 5, RetryCount: 0, BreakerFailureThreshold: 5),
[DriverCapability.HistoryRead] = new(TimeoutSeconds: 30, RetryCount: 2, BreakerFailureThreshold: 5),
},
DriverTier.B => new Dictionary<DriverCapability, CapabilityPolicy>
{
[DriverCapability.Read] = new(TimeoutSeconds: 4, RetryCount: 3, BreakerFailureThreshold: 5),
[DriverCapability.Write] = new(TimeoutSeconds: 4, RetryCount: 0, BreakerFailureThreshold: 5),
[DriverCapability.Discover] = new(TimeoutSeconds: 60, RetryCount: 2, BreakerFailureThreshold: 3),
[DriverCapability.Subscribe] = new(TimeoutSeconds: 8, RetryCount: 3, BreakerFailureThreshold: 5),
[DriverCapability.Probe] = new(TimeoutSeconds: 4, RetryCount: 3, BreakerFailureThreshold: 5),
[DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 8, RetryCount: 3, BreakerFailureThreshold: 5),
[DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 8, RetryCount: 0, BreakerFailureThreshold: 5),
[DriverCapability.HistoryRead] = new(TimeoutSeconds: 60, RetryCount: 2, BreakerFailureThreshold: 5),
},
DriverTier.C => new Dictionary<DriverCapability, CapabilityPolicy>
{
[DriverCapability.Read] = new(TimeoutSeconds: 10, RetryCount: 1, BreakerFailureThreshold: 0),
[DriverCapability.Write] = new(TimeoutSeconds: 10, RetryCount: 0, BreakerFailureThreshold: 0),
[DriverCapability.Discover] = new(TimeoutSeconds: 120, RetryCount: 1, BreakerFailureThreshold: 0),
[DriverCapability.Subscribe] = new(TimeoutSeconds: 15, RetryCount: 1, BreakerFailureThreshold: 0),
[DriverCapability.Probe] = new(TimeoutSeconds: 10, RetryCount: 1, BreakerFailureThreshold: 0),
[DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 15, RetryCount: 1, BreakerFailureThreshold: 0),
[DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 15, RetryCount: 0, BreakerFailureThreshold: 0),
[DriverCapability.HistoryRead] = new(TimeoutSeconds: 120, RetryCount: 1, BreakerFailureThreshold: 0),
},
_ => throw new ArgumentOutOfRangeException(nameof(tier), tier, $"No default policy table defined for tier {tier}."),
};
}
/// <summary>Policy for one capability on one driver instance.</summary>
/// <param name="TimeoutSeconds">Per-call timeout (wraps the inner Polly execution).</param>
/// <param name="RetryCount">Number of retry attempts after the first failure; zero = no retry.</param>
/// <param name="BreakerFailureThreshold">
/// Consecutive-failure count that opens the circuit breaker; zero = no breaker
/// (Tier C uses the supervisor's process-level breaker instead, per decision #68).
/// </param>
public sealed record CapabilityPolicy(int TimeoutSeconds, int RetryCount, int BreakerFailureThreshold);

View File

@@ -0,0 +1,118 @@
using System.Collections.Concurrent;
using Polly;
using Polly.CircuitBreaker;
using Polly.Retry;
using Polly.Timeout;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.Resilience;
/// <summary>
/// Builds and caches Polly resilience pipelines keyed on
/// <c>(DriverInstanceId, HostName, DriverCapability)</c>. One dead PLC behind a multi-device
/// driver cannot open the circuit breaker for healthy sibling hosts.
/// </summary>
/// <remarks>
/// Per <c>docs/v2/plan.md</c> decision #144 (per-device isolation). Composition from outside-in:
/// <b>Timeout → Retry (when capability permits) → Circuit Breaker (when tier permits) → Bulkhead</b>.
///
/// <para>Pipeline resolution is lock-free on the hot path: the inner
/// <see cref="ConcurrentDictionary{TKey,TValue}"/> caches a <see cref="ResiliencePipeline"/> per key;
/// first-call cost is one <see cref="ResiliencePipelineBuilder"/>.Build. Thereafter reads are O(1).</para>
/// </remarks>
public sealed class DriverResiliencePipelineBuilder
{
private readonly ConcurrentDictionary<PipelineKey, ResiliencePipeline> _pipelines = new();
private readonly TimeProvider _timeProvider;
/// <summary>Construct with the ambient clock (use <see cref="TimeProvider.System"/> in prod).</summary>
public DriverResiliencePipelineBuilder(TimeProvider? timeProvider = null)
{
_timeProvider = timeProvider ?? TimeProvider.System;
}
/// <summary>
/// Get or build the pipeline for a given <c>(driver instance, host, capability)</c> triple.
/// Calls with the same key + same options reuse the same pipeline instance; the first caller
/// wins if a race occurs (both pipelines would be behaviourally identical).
/// </summary>
/// <param name="driverInstanceId">DriverInstance primary key — opaque to this layer.</param>
/// <param name="hostName">
/// Host the call targets. For single-host drivers (Galaxy, some OPC UA Client configs) pass the
/// driver's canonical host string. For multi-host drivers (Modbus with N PLCs), pass the
/// specific PLC so one dead PLC doesn't poison healthy siblings.
/// </param>
/// <param name="capability">Which capability surface is being called.</param>
/// <param name="options">Per-driver-instance options (tier + per-capability overrides).</param>
public ResiliencePipeline GetOrCreate(
string driverInstanceId,
string hostName,
DriverCapability capability,
DriverResilienceOptions options)
{
ArgumentNullException.ThrowIfNull(options);
ArgumentException.ThrowIfNullOrWhiteSpace(hostName);
var key = new PipelineKey(driverInstanceId, hostName, capability);
return _pipelines.GetOrAdd(key, static (_, state) => Build(state.capability, state.options, state.timeProvider),
(capability, options, timeProvider: _timeProvider));
}
/// <summary>Drop cached pipelines for one driver instance (e.g. on ResilienceConfig change). Test + Admin-reload use.</summary>
public int Invalidate(string driverInstanceId)
{
var removed = 0;
foreach (var key in _pipelines.Keys)
{
if (key.DriverInstanceId == driverInstanceId && _pipelines.TryRemove(key, out _))
removed++;
}
return removed;
}
/// <summary>Snapshot of the current number of cached pipelines. For diagnostics only.</summary>
public int CachedPipelineCount => _pipelines.Count;
private static ResiliencePipeline Build(
DriverCapability capability,
DriverResilienceOptions options,
TimeProvider timeProvider)
{
var policy = options.Resolve(capability);
var builder = new ResiliencePipelineBuilder { TimeProvider = timeProvider };
builder.AddTimeout(new TimeoutStrategyOptions
{
Timeout = TimeSpan.FromSeconds(policy.TimeoutSeconds),
});
if (policy.RetryCount > 0)
{
builder.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = policy.RetryCount,
BackoffType = DelayBackoffType.Exponential,
UseJitter = true,
Delay = TimeSpan.FromMilliseconds(100),
MaxDelay = TimeSpan.FromSeconds(5),
ShouldHandle = new PredicateBuilder().Handle<Exception>(ex => ex is not OperationCanceledException),
});
}
if (policy.BreakerFailureThreshold > 0)
{
builder.AddCircuitBreaker(new CircuitBreakerStrategyOptions
{
FailureRatio = 1.0,
MinimumThroughput = policy.BreakerFailureThreshold,
SamplingDuration = TimeSpan.FromSeconds(30),
BreakDuration = TimeSpan.FromSeconds(15),
ShouldHandle = new PredicateBuilder().Handle<Exception>(ex => ex is not OperationCanceledException),
});
}
return builder.Build();
}
private readonly record struct PipelineKey(string DriverInstanceId, string HostName, DriverCapability Capability);
}

View File

@@ -0,0 +1,65 @@
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.Stability;
/// <summary>
/// Tier C only process-recycle companion to <see cref="MemoryTracking"/>. On a
/// <see cref="MemoryTrackingAction.HardBreach"/> signal, invokes the supplied
/// <see cref="IDriverSupervisor"/> to restart the out-of-process Host.
/// </summary>
/// <remarks>
/// Per <c>docs/v2/plan.md</c> decisions #74 and #145. Tier A/B hard-breach on an in-process
/// driver would kill every OPC UA session and every co-hosted driver, so for Tier A/B this
/// class logs a <b>promotion-to-Tier-C recommendation</b> and does NOT invoke any supervisor.
/// A future tier-migration workflow acts on the recommendation.
/// </remarks>
public sealed class MemoryRecycle
{
private readonly DriverTier _tier;
private readonly IDriverSupervisor? _supervisor;
private readonly ILogger<MemoryRecycle> _logger;
public MemoryRecycle(DriverTier tier, IDriverSupervisor? supervisor, ILogger<MemoryRecycle> logger)
{
_tier = tier;
_supervisor = supervisor;
_logger = logger;
}
/// <summary>
/// Handle a <see cref="MemoryTracking"/> classification for the driver. For Tier C with a
/// wired supervisor, <c>HardBreach</c> triggers <see cref="IDriverSupervisor.RecycleAsync"/>.
/// All other combinations are no-ops with respect to process state (soft breaches + Tier A/B
/// hard breaches just log).
/// </summary>
/// <returns>True when a recycle was requested; false otherwise.</returns>
public async Task<bool> HandleAsync(MemoryTrackingAction action, long footprintBytes, CancellationToken cancellationToken)
{
switch (action)
{
case MemoryTrackingAction.SoftBreach:
_logger.LogWarning(
"Memory soft-breach on driver {DriverId}: footprint={Footprint:N0} bytes, tier={Tier}. Surfaced to Admin; no action.",
_supervisor?.DriverInstanceId ?? "(unknown)", footprintBytes, _tier);
return false;
case MemoryTrackingAction.HardBreach when _tier == DriverTier.C && _supervisor is not null:
_logger.LogError(
"Memory hard-breach on Tier C driver {DriverId}: footprint={Footprint:N0} bytes. Requesting supervisor recycle.",
_supervisor.DriverInstanceId, footprintBytes);
await _supervisor.RecycleAsync($"Memory hard-breach: {footprintBytes} bytes", cancellationToken).ConfigureAwait(false);
return true;
case MemoryTrackingAction.HardBreach:
_logger.LogError(
"Memory hard-breach on Tier {Tier} in-process driver {DriverId}: footprint={Footprint:N0} bytes. " +
"Recommending promotion to Tier C; NOT auto-killing (decisions #74, #145).",
_tier, _supervisor?.DriverInstanceId ?? "(unknown)", footprintBytes);
return false;
default:
return false;
}
}
}

View File

@@ -0,0 +1,136 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.Stability;
/// <summary>
/// Tier-agnostic memory-footprint tracker. Captures the post-initialize <b>baseline</b>
/// from the first samples after <c>IDriver.InitializeAsync</c>, then classifies each
/// subsequent sample against a hybrid soft/hard threshold per
/// <c>docs/v2/plan.md</c> decision #146 — <c>soft = max(multiplier × baseline, baseline + floor)</c>,
/// <c>hard = 2 × soft</c>.
/// </summary>
/// <remarks>
/// <para>Per decision #145, this tracker <b>never kills a process</b>. Soft and hard breaches
/// log + surface to the Admin UI via <c>DriverInstanceResilienceStatus</c>. The matching
/// process-level recycle protection lives in a separate <c>MemoryRecycle</c> that activates
/// for Tier C drivers only (where the driver runs out-of-process behind a supervisor that
/// can safely restart it without tearing down the OPC UA session or co-hosted in-proc
/// drivers).</para>
///
/// <para>Baseline capture: the tracker starts in <see cref="TrackingPhase.WarmingUp"/> for
/// <see cref="BaselineWindow"/> (default 5 min). During that window samples are collected;
/// the baseline is computed as the median once the window elapses. Before that point every
/// classification returns <see cref="MemoryTrackingAction.Warming"/>.</para>
/// </remarks>
public sealed class MemoryTracking
{
private readonly DriverTier _tier;
private readonly TimeSpan _baselineWindow;
private readonly List<long> _warmupSamples = [];
private long _baselineBytes;
private TrackingPhase _phase = TrackingPhase.WarmingUp;
private DateTime? _warmupStartUtc;
/// <summary>Tier-default multiplier/floor constants per decision #146.</summary>
public static (int Multiplier, long FloorBytes) GetTierConstants(DriverTier tier) => tier switch
{
DriverTier.A => (Multiplier: 3, FloorBytes: 50L * 1024 * 1024),
DriverTier.B => (Multiplier: 3, FloorBytes: 100L * 1024 * 1024),
DriverTier.C => (Multiplier: 2, FloorBytes: 500L * 1024 * 1024),
_ => throw new ArgumentOutOfRangeException(nameof(tier), tier, $"No memory-tracking constants defined for tier {tier}."),
};
/// <summary>Window over which post-init samples are collected to compute the baseline.</summary>
public TimeSpan BaselineWindow => _baselineWindow;
/// <summary>Current phase: <see cref="TrackingPhase.WarmingUp"/> or <see cref="TrackingPhase.Steady"/>.</summary>
public TrackingPhase Phase => _phase;
/// <summary>Captured baseline; 0 until warmup completes.</summary>
public long BaselineBytes => _baselineBytes;
/// <summary>Effective soft threshold (zero while warming up).</summary>
public long SoftThresholdBytes => _baselineBytes == 0 ? 0 : ComputeSoft(_tier, _baselineBytes);
/// <summary>Effective hard threshold = 2 × soft (zero while warming up).</summary>
public long HardThresholdBytes => _baselineBytes == 0 ? 0 : ComputeSoft(_tier, _baselineBytes) * 2;
public MemoryTracking(DriverTier tier, TimeSpan? baselineWindow = null)
{
_tier = tier;
_baselineWindow = baselineWindow ?? TimeSpan.FromMinutes(5);
}
/// <summary>
/// Submit a memory-footprint sample. Returns the action the caller should surface.
/// During warmup, always returns <see cref="MemoryTrackingAction.Warming"/> and accumulates
/// samples; once the window elapses the first steady-phase sample triggers baseline capture
/// (median of warmup samples).
/// </summary>
public MemoryTrackingAction Sample(long footprintBytes, DateTime utcNow)
{
if (_phase == TrackingPhase.WarmingUp)
{
_warmupStartUtc ??= utcNow;
_warmupSamples.Add(footprintBytes);
if (utcNow - _warmupStartUtc.Value >= _baselineWindow && _warmupSamples.Count > 0)
{
_baselineBytes = ComputeMedian(_warmupSamples);
_phase = TrackingPhase.Steady;
}
else
{
return MemoryTrackingAction.Warming;
}
}
if (footprintBytes >= HardThresholdBytes) return MemoryTrackingAction.HardBreach;
if (footprintBytes >= SoftThresholdBytes) return MemoryTrackingAction.SoftBreach;
return MemoryTrackingAction.None;
}
private static long ComputeSoft(DriverTier tier, long baseline)
{
var (multiplier, floor) = GetTierConstants(tier);
return Math.Max(multiplier * baseline, baseline + floor);
}
private static long ComputeMedian(List<long> samples)
{
var sorted = samples.Order().ToArray();
var mid = sorted.Length / 2;
return sorted.Length % 2 == 1
? sorted[mid]
: (sorted[mid - 1] + sorted[mid]) / 2;
}
}
/// <summary>Phase of a <see cref="MemoryTracking"/> lifecycle.</summary>
public enum TrackingPhase
{
/// <summary>Collecting post-init samples; baseline not yet computed.</summary>
WarmingUp,
/// <summary>Baseline captured; every sample classified against soft/hard thresholds.</summary>
Steady,
}
/// <summary>Classification the tracker returns per sample.</summary>
public enum MemoryTrackingAction
{
/// <summary>Baseline not yet captured; sample collected, no threshold check.</summary>
Warming,
/// <summary>Below soft threshold.</summary>
None,
/// <summary>Between soft and hard thresholds — log + surface, no action.</summary>
SoftBreach,
/// <summary>
/// ≥ hard threshold. Log + surface + (Tier C only, via <c>MemoryRecycle</c>) request
/// process recycle via the driver supervisor. Tier A/B breach never invokes any
/// kill path per decisions #145 and #74.
/// </summary>
HardBreach,
}

View File

@@ -0,0 +1,86 @@
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.Stability;
/// <summary>
/// Tier C opt-in periodic-recycle driver per <c>docs/v2/plan.md</c> decision #67.
/// A tick method advanced by the caller (fed by a background timer in prod; by test clock
/// in unit tests) decides whether the configured interval has elapsed and, if so, drives the
/// supplied <see cref="IDriverSupervisor"/> to recycle the Host.
/// </summary>
/// <remarks>
/// Tier A/B drivers MUST NOT use this class — scheduled recycle for in-process drivers would
/// kill every OPC UA session and every co-hosted driver. The ctor throws when constructed
/// with any tier other than C to make the misuse structurally impossible.
///
/// <para>Keeps no background thread of its own — callers invoke <see cref="TickAsync"/> on
/// their ambient scheduler tick (Phase 6.1 Stream C's health-endpoint host runs one). That
/// decouples the unit under test from wall-clock time and thread-pool scheduling.</para>
/// </remarks>
public sealed class ScheduledRecycleScheduler
{
private readonly TimeSpan _recycleInterval;
private readonly IDriverSupervisor _supervisor;
private readonly ILogger<ScheduledRecycleScheduler> _logger;
private DateTime _nextRecycleUtc;
/// <summary>
/// Construct the scheduler for a Tier C driver. Throws if <paramref name="tier"/> isn't C.
/// </summary>
/// <param name="tier">Driver tier; must be <see cref="DriverTier.C"/>.</param>
/// <param name="recycleInterval">Interval between recycles (e.g. 7 days).</param>
/// <param name="startUtc">Anchor time; next recycle fires at <paramref name="startUtc"/> + <paramref name="recycleInterval"/>.</param>
/// <param name="supervisor">Supervisor that performs the actual recycle.</param>
/// <param name="logger">Diagnostic sink.</param>
public ScheduledRecycleScheduler(
DriverTier tier,
TimeSpan recycleInterval,
DateTime startUtc,
IDriverSupervisor supervisor,
ILogger<ScheduledRecycleScheduler> logger)
{
if (tier != DriverTier.C)
throw new ArgumentException(
$"ScheduledRecycleScheduler is Tier C only (got {tier}). " +
"In-process drivers must not use scheduled recycle; see decisions #74 and #145.",
nameof(tier));
if (recycleInterval <= TimeSpan.Zero)
throw new ArgumentException("RecycleInterval must be positive.", nameof(recycleInterval));
_recycleInterval = recycleInterval;
_supervisor = supervisor;
_logger = logger;
_nextRecycleUtc = startUtc + recycleInterval;
}
/// <summary>Next scheduled recycle UTC. Advances by <see cref="RecycleInterval"/> on each fire.</summary>
public DateTime NextRecycleUtc => _nextRecycleUtc;
/// <summary>Recycle interval this scheduler was constructed with.</summary>
public TimeSpan RecycleInterval => _recycleInterval;
/// <summary>
/// Tick the scheduler forward. If <paramref name="utcNow"/> is past
/// <see cref="NextRecycleUtc"/>, requests a recycle from the supervisor and advances
/// <see cref="NextRecycleUtc"/> by exactly one interval. Returns true when a recycle fired.
/// </summary>
public async Task<bool> TickAsync(DateTime utcNow, CancellationToken cancellationToken)
{
if (utcNow < _nextRecycleUtc)
return false;
_logger.LogInformation(
"Scheduled recycle due for Tier C driver {DriverId} at {Now:o}; advancing next to {Next:o}.",
_supervisor.DriverInstanceId, utcNow, _nextRecycleUtc + _recycleInterval);
await _supervisor.RecycleAsync("Scheduled periodic recycle", cancellationToken).ConfigureAwait(false);
_nextRecycleUtc += _recycleInterval;
return true;
}
/// <summary>Request an immediate recycle outside the schedule (e.g. MemoryRecycle hard-breach escalation).</summary>
public Task RequestRecycleNowAsync(string reason, CancellationToken cancellationToken) =>
_supervisor.RecycleAsync(reason, cancellationToken);
}

View File

@@ -0,0 +1,81 @@
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.Stability;
/// <summary>
/// Demand-aware driver-wedge detector per <c>docs/v2/plan.md</c> decision #147.
/// Flips a driver to <see cref="WedgeVerdict.Faulted"/> only when BOTH of the following hold:
/// (a) there is pending work outstanding, AND (b) no progress has been observed for longer
/// than <see cref="Threshold"/>. Idle drivers, write-only burst drivers, and subscription-only
/// drivers whose signals don't arrive regularly all stay Healthy.
/// </summary>
/// <remarks>
/// <para>Pending work signal is supplied by the caller via <see cref="DemandSignal"/>:
/// non-zero Polly bulkhead depth, ≥1 active MonitoredItem, or ≥1 queued historian read
/// each qualifies. The detector itself is state-light: all it remembers is the last
/// <c>LastProgressUtc</c> it saw and the last wedge verdict. No history buffer.</para>
///
/// <para>Default threshold per plan: <c>5 × PublishingInterval</c>, with a minimum of 60 s.
/// Concrete values are driver-agnostic and configured per-instance by the caller.</para>
/// </remarks>
public sealed class WedgeDetector
{
/// <summary>Wedge-detection threshold; pass &lt; 60 s and the detector clamps to 60 s.</summary>
public TimeSpan Threshold { get; }
/// <summary>Whether the driver reported itself <see cref="DriverState.Healthy"/> at construction.</summary>
public WedgeDetector(TimeSpan threshold)
{
Threshold = threshold < TimeSpan.FromSeconds(60) ? TimeSpan.FromSeconds(60) : threshold;
}
/// <summary>
/// Classify the current state against the demand signal. Does not retain state across
/// calls — each call is self-contained; the caller owns the <c>LastProgressUtc</c> clock.
/// </summary>
public WedgeVerdict Classify(DriverState state, DemandSignal demand, DateTime utcNow)
{
if (state != DriverState.Healthy)
return WedgeVerdict.NotApplicable;
if (!demand.HasPendingWork)
return WedgeVerdict.Idle;
var sinceProgress = utcNow - demand.LastProgressUtc;
return sinceProgress > Threshold ? WedgeVerdict.Faulted : WedgeVerdict.Healthy;
}
}
/// <summary>
/// Caller-supplied demand snapshot. All three counters are OR'd — any non-zero means work
/// is outstanding, which is the trigger for checking the <see cref="LastProgressUtc"/> clock.
/// </summary>
/// <param name="BulkheadDepth">Polly bulkhead depth (in-flight capability calls).</param>
/// <param name="ActiveMonitoredItems">Number of live OPC UA MonitoredItems bound to this driver.</param>
/// <param name="QueuedHistoryReads">Pending historian-read requests the driver owes the server.</param>
/// <param name="LastProgressUtc">Last time the driver reported a successful unit of work (read, subscribe-ack, publish).</param>
public readonly record struct DemandSignal(
int BulkheadDepth,
int ActiveMonitoredItems,
int QueuedHistoryReads,
DateTime LastProgressUtc)
{
/// <summary>True when any of the three counters is &gt; 0.</summary>
public bool HasPendingWork => BulkheadDepth > 0 || ActiveMonitoredItems > 0 || QueuedHistoryReads > 0;
}
/// <summary>Outcome of a single <see cref="WedgeDetector.Classify"/> call.</summary>
public enum WedgeVerdict
{
/// <summary>Driver wasn't Healthy to begin with — wedge detection doesn't apply.</summary>
NotApplicable,
/// <summary>Driver claims Healthy + no pending work → stays Healthy.</summary>
Idle,
/// <summary>Driver claims Healthy + has pending work + has made progress within the threshold → stays Healthy.</summary>
Healthy,
/// <summary>Driver claims Healthy + has pending work + has NOT made progress within the threshold → wedged.</summary>
Faulted,
}

View File

@@ -16,6 +16,10 @@
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Configuration\ZB.MOM.WW.OtOpcUa.Configuration.csproj"/>
</ItemGroup>
<ItemGroup>
<PackageReference Include="Polly.Core" Version="8.6.6"/>
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Core.Tests"/>
</ItemGroup>

View File

@@ -115,7 +115,8 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
ArrayDim: null,
SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly,
IsHistorized: false,
IsAlarm: false));
IsAlarm: false,
WriteIdempotent: t.WriteIdempotent));
}
return Task.CompletedTask;
}

View File

@@ -92,6 +92,14 @@ public sealed class ModbusProbeOptions
/// AutomationDirect DirectLOGIC (DL205/DL260) and a few legacy families pack the first
/// character in the low byte instead — see <c>docs/v2/dl205.md</c> §strings.
/// </param>
/// <param name="WriteIdempotent">
/// Per <c>docs/v2/plan.md</c> decisions #44, #45, #143 — flag a tag as safe to replay on
/// write timeout / failure. Default <c>false</c>; writes do not auto-retry. Safe candidates:
/// holding-register set-points for analog values and configuration registers where the same
/// value can be written again without side-effects. Unsafe: coils that drive edge-triggered
/// actions (pulse outputs), counter-increment addresses on PLCs that treat writes as deltas,
/// any BCD / counter register where repeat-writes advance state.
/// </param>
public sealed record ModbusTagDefinition(
string Name,
ModbusRegion Region,
@@ -101,7 +109,8 @@ public sealed record ModbusTagDefinition(
ModbusByteOrder ByteOrder = ModbusByteOrder.BigEndian,
byte BitIndex = 0,
ushort StringLength = 0,
ModbusStringByteOrder StringByteOrder = ModbusStringByteOrder.HighByteFirst);
ModbusStringByteOrder StringByteOrder = ModbusStringByteOrder.HighByteFirst,
bool WriteIdempotent = false);
public enum ModbusRegion { Coils, DiscreteInputs, InputRegisters, HoldingRegisters }

View File

@@ -341,7 +341,8 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
ArrayDim: null,
SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly,
IsHistorized: false,
IsAlarm: false));
IsAlarm: false,
WriteIdempotent: t.WriteIdempotent));
}
return Task.CompletedTask;
}

View File

@@ -88,12 +88,20 @@ public sealed class S7ProbeOptions
/// <param name="DataType">Logical data type — drives the underlying S7.Net read/write width.</param>
/// <param name="Writable">When true the driver accepts writes for this tag.</param>
/// <param name="StringLength">For <c>DataType = String</c>: S7-string max length. Default 254 (S7 max).</param>
/// <param name="WriteIdempotent">
/// Per <c>docs/v2/plan.md</c> decisions #44, #45, #143 — flag a tag as safe to replay on
/// write timeout / failure. Default <c>false</c>; writes do not auto-retry. Safe candidates
/// on S7: DB word/dword set-points holding analog values, configuration DBs where the same
/// value can be written again without side-effects. Unsafe: M (merker) bits or Q (output)
/// coils that drive edge-triggered routines in the PLC program.
/// </param>
public sealed record S7TagDefinition(
string Name,
string Address,
S7DataType DataType,
bool Writable = true,
int StringLength = 254);
int StringLength = 254,
bool WriteIdempotent = false);
public enum S7DataType
{

View File

@@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging;
using Opc.Ua;
using Opc.Ua.Server;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
using ZB.MOM.WW.OtOpcUa.Server.Security;
using DriverWriteRequest = ZB.MOM.WW.OtOpcUa.Core.Abstractions.WriteRequest;
// Core.Abstractions defines a type-named HistoryReadResult (driver-side samples + continuation
@@ -33,8 +34,14 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
private readonly IDriver _driver;
private readonly IReadable? _readable;
private readonly IWritable? _writable;
private readonly CapabilityInvoker _invoker;
private readonly ILogger<DriverNodeManager> _logger;
// Per-variable idempotency flag populated during Variable() registration from
// DriverAttributeInfo.WriteIdempotent. Drives ExecuteWriteAsync's retry gating in
// OnWriteValue; absent entries default to false (decisions #44, #45, #143).
private readonly Dictionary<string, bool> _writeIdempotentByFullRef = new(StringComparer.OrdinalIgnoreCase);
/// <summary>The driver whose address space this node manager exposes.</summary>
public IDriver Driver => _driver;
@@ -53,12 +60,13 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
private FolderState _currentFolder = null!;
public DriverNodeManager(IServerInternal server, ApplicationConfiguration configuration,
IDriver driver, ILogger<DriverNodeManager> logger)
IDriver driver, CapabilityInvoker invoker, ILogger<DriverNodeManager> logger)
: base(server, configuration, namespaceUris: $"urn:OtOpcUa:{driver.DriverInstanceId}")
{
_driver = driver;
_readable = driver as IReadable;
_writable = driver as IWritable;
_invoker = invoker;
_logger = logger;
}
@@ -148,6 +156,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
AddPredefinedNode(SystemContext, v);
_variablesByFullRef[attributeInfo.FullName] = v;
_securityByFullRef[attributeInfo.FullName] = attributeInfo.SecurityClass;
_writeIdempotentByFullRef[attributeInfo.FullName] = attributeInfo.WriteIdempotent;
v.OnReadValue = OnReadValue;
v.OnWriteValue = OnWriteValue;
@@ -188,7 +197,11 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
try
{
var fullRef = node.NodeId.Identifier as string ?? "";
var result = _readable.ReadAsync([fullRef], CancellationToken.None).GetAwaiter().GetResult();
var result = _invoker.ExecuteAsync(
DriverCapability.Read,
_driver.DriverInstanceId,
async ct => (IReadOnlyList<DataValueSnapshot>)await _readable.ReadAsync([fullRef], ct).ConfigureAwait(false),
CancellationToken.None).AsTask().GetAwaiter().GetResult();
if (result.Count == 0)
{
statusCode = StatusCodes.BadNoData;
@@ -381,9 +394,15 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
try
{
var results = _writable.WriteAsync(
[new DriverWriteRequest(fullRef!, value)],
CancellationToken.None).GetAwaiter().GetResult();
var isIdempotent = _writeIdempotentByFullRef.GetValueOrDefault(fullRef!, false);
var capturedValue = value;
var results = _invoker.ExecuteWriteAsync(
_driver.DriverInstanceId,
isIdempotent,
async ct => (IReadOnlyList<WriteResult>)await _writable.WriteAsync(
[new DriverWriteRequest(fullRef!, capturedValue)],
ct).ConfigureAwait(false),
CancellationToken.None).AsTask().GetAwaiter().GetResult();
if (results.Count > 0 && results[0].StatusCode != 0)
{
statusCode = results[0].StatusCode;
@@ -465,12 +484,16 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
try
{
var driverResult = History.ReadRawAsync(
fullRef,
details.StartTime,
details.EndTime,
details.NumValuesPerNode,
CancellationToken.None).GetAwaiter().GetResult();
var driverResult = _invoker.ExecuteAsync(
DriverCapability.HistoryRead,
_driver.DriverInstanceId,
async ct => await History.ReadRawAsync(
fullRef,
details.StartTime,
details.EndTime,
details.NumValuesPerNode,
ct).ConfigureAwait(false),
CancellationToken.None).AsTask().GetAwaiter().GetResult();
WriteResult(results, errors, i, StatusCodes.Good,
BuildHistoryData(driverResult.Samples), driverResult.ContinuationPoint);
@@ -525,13 +548,17 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
try
{
var driverResult = History.ReadProcessedAsync(
fullRef,
details.StartTime,
details.EndTime,
interval,
aggregate.Value,
CancellationToken.None).GetAwaiter().GetResult();
var driverResult = _invoker.ExecuteAsync(
DriverCapability.HistoryRead,
_driver.DriverInstanceId,
async ct => await History.ReadProcessedAsync(
fullRef,
details.StartTime,
details.EndTime,
interval,
aggregate.Value,
ct).ConfigureAwait(false),
CancellationToken.None).AsTask().GetAwaiter().GetResult();
WriteResult(results, errors, i, StatusCodes.Good,
BuildHistoryData(driverResult.Samples), driverResult.ContinuationPoint);
@@ -578,8 +605,11 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
try
{
var driverResult = History.ReadAtTimeAsync(
fullRef, requestedTimes, CancellationToken.None).GetAwaiter().GetResult();
var driverResult = _invoker.ExecuteAsync(
DriverCapability.HistoryRead,
_driver.DriverInstanceId,
async ct => await History.ReadAtTimeAsync(fullRef, requestedTimes, ct).ConfigureAwait(false),
CancellationToken.None).AsTask().GetAwaiter().GetResult();
WriteResult(results, errors, i, StatusCodes.Good,
BuildHistoryData(driverResult.Samples), driverResult.ContinuationPoint);
@@ -632,12 +662,16 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
try
{
var driverResult = History.ReadEventsAsync(
sourceName: fullRef,
startUtc: details.StartTime,
endUtc: details.EndTime,
maxEvents: maxEvents,
cancellationToken: CancellationToken.None).GetAwaiter().GetResult();
var driverResult = _invoker.ExecuteAsync(
DriverCapability.HistoryRead,
_driver.DriverInstanceId,
async ct => await History.ReadEventsAsync(
sourceName: fullRef,
startUtc: details.StartTime,
endUtc: details.EndTime,
maxEvents: maxEvents,
cancellationToken: ct).ConfigureAwait(false),
CancellationToken.None).AsTask().GetAwaiter().GetResult();
WriteResult(results, errors, i, StatusCodes.Good,
BuildHistoryEvent(driverResult.Events), driverResult.ContinuationPoint);

View File

@@ -3,6 +3,7 @@ using Opc.Ua;
using Opc.Ua.Configuration;
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
using ZB.MOM.WW.OtOpcUa.Core.OpcUa;
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
using ZB.MOM.WW.OtOpcUa.Server.Security;
namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa;
@@ -20,6 +21,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
private readonly OpcUaServerOptions _options;
private readonly DriverHost _driverHost;
private readonly IUserAuthenticator _authenticator;
private readonly DriverResiliencePipelineBuilder _pipelineBuilder;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<OpcUaApplicationHost> _logger;
private ApplicationInstance? _application;
@@ -27,11 +29,13 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
private bool _disposed;
public OpcUaApplicationHost(OpcUaServerOptions options, DriverHost driverHost,
IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger<OpcUaApplicationHost> logger)
IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger<OpcUaApplicationHost> logger,
DriverResiliencePipelineBuilder? pipelineBuilder = null)
{
_options = options;
_driverHost = driverHost;
_authenticator = authenticator;
_pipelineBuilder = pipelineBuilder ?? new DriverResiliencePipelineBuilder();
_loggerFactory = loggerFactory;
_logger = logger;
}
@@ -58,7 +62,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
throw new InvalidOperationException(
$"OPC UA application certificate could not be validated or created in {_options.PkiStoreRoot}");
_server = new OtOpcUaServer(_driverHost, _authenticator, _loggerFactory);
_server = new OtOpcUaServer(_driverHost, _authenticator, _pipelineBuilder, _loggerFactory);
await _application.Start(_server).ConfigureAwait(false);
_logger.LogInformation("OPC UA server started — endpoint={Endpoint} driverCount={Count}",

View File

@@ -5,6 +5,7 @@ using Opc.Ua.Server;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
using ZB.MOM.WW.OtOpcUa.Core.OpcUa;
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
using ZB.MOM.WW.OtOpcUa.Server.Security;
namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa;
@@ -19,13 +20,19 @@ public sealed class OtOpcUaServer : StandardServer
{
private readonly DriverHost _driverHost;
private readonly IUserAuthenticator _authenticator;
private readonly DriverResiliencePipelineBuilder _pipelineBuilder;
private readonly ILoggerFactory _loggerFactory;
private readonly List<DriverNodeManager> _driverNodeManagers = new();
public OtOpcUaServer(DriverHost driverHost, IUserAuthenticator authenticator, ILoggerFactory loggerFactory)
public OtOpcUaServer(
DriverHost driverHost,
IUserAuthenticator authenticator,
DriverResiliencePipelineBuilder pipelineBuilder,
ILoggerFactory loggerFactory)
{
_driverHost = driverHost;
_authenticator = authenticator;
_pipelineBuilder = pipelineBuilder;
_loggerFactory = loggerFactory;
}
@@ -46,7 +53,12 @@ public sealed class OtOpcUaServer : StandardServer
if (driver is null) continue;
var logger = _loggerFactory.CreateLogger<DriverNodeManager>();
var manager = new DriverNodeManager(server, configuration, driver, logger);
// Per-driver resilience options: default Tier A pending Stream B.1 which wires
// per-type tiers into DriverTypeRegistry. Read ResilienceConfig JSON from the
// DriverInstance row in a follow-up PR; for now every driver gets Tier A defaults.
var options = new DriverResilienceOptions { Tier = DriverTier.A };
var invoker = new CapabilityInvoker(_pipelineBuilder, driver.DriverInstanceId, () => options);
var manager = new DriverNodeManager(server, configuration, driver, invoker, logger);
_driverNodeManagers.Add(manager);
}

View File

@@ -7,11 +7,13 @@ public sealed class DriverTypeRegistryTests
{
private static DriverTypeMetadata SampleMetadata(
string typeName = "Modbus",
NamespaceKindCompatibility allowed = NamespaceKindCompatibility.Equipment) =>
NamespaceKindCompatibility allowed = NamespaceKindCompatibility.Equipment,
DriverTier tier = DriverTier.B) =>
new(typeName, allowed,
DriverConfigJsonSchema: "{\"type\": \"object\"}",
DeviceConfigJsonSchema: "{\"type\": \"object\"}",
TagConfigJsonSchema: "{\"type\": \"object\"}");
TagConfigJsonSchema: "{\"type\": \"object\"}",
Tier: tier);
[Fact]
public void Register_ThenGet_RoundTrips()
@@ -24,6 +26,20 @@ public sealed class DriverTypeRegistryTests
registry.Get("Modbus").ShouldBe(metadata);
}
[Theory]
[InlineData(DriverTier.A)]
[InlineData(DriverTier.B)]
[InlineData(DriverTier.C)]
public void Register_Requires_NonNullTier(DriverTier tier)
{
var registry = new DriverTypeRegistry();
var metadata = SampleMetadata(typeName: $"Driver-{tier}", tier: tier);
registry.Register(metadata);
registry.Get(metadata.TypeName).Tier.ShouldBe(tier);
}
[Fact]
public void Get_IsCaseInsensitive()
{

View File

@@ -0,0 +1,151 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
[Trait("Category", "Unit")]
public sealed class CapabilityInvokerTests
{
private static CapabilityInvoker MakeInvoker(
DriverResiliencePipelineBuilder builder,
DriverResilienceOptions options) =>
new(builder, "drv-test", () => options);
[Fact]
public async Task Read_ReturnsValue_FromCallSite()
{
var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), new DriverResilienceOptions { Tier = DriverTier.A });
var result = await invoker.ExecuteAsync(
DriverCapability.Read,
"host-1",
_ => ValueTask.FromResult(42),
CancellationToken.None);
result.ShouldBe(42);
}
[Fact]
public async Task Read_Retries_OnTransientFailure()
{
var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), new DriverResilienceOptions { Tier = DriverTier.A });
var attempts = 0;
var result = await invoker.ExecuteAsync(
DriverCapability.Read,
"host-1",
async _ =>
{
attempts++;
if (attempts < 2) throw new InvalidOperationException("transient");
await Task.Yield();
return "ok";
},
CancellationToken.None);
result.ShouldBe("ok");
attempts.ShouldBe(2);
}
[Fact]
public async Task Write_NonIdempotent_DoesNotRetry_EvenWhenPolicyHasRetries()
{
var options = new DriverResilienceOptions
{
Tier = DriverTier.A,
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
{
[DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5),
},
};
var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), options);
var attempts = 0;
await Should.ThrowAsync<InvalidOperationException>(async () =>
await invoker.ExecuteWriteAsync(
"host-1",
isIdempotent: false,
async _ =>
{
attempts++;
await Task.Yield();
throw new InvalidOperationException("boom");
#pragma warning disable CS0162
return 0;
#pragma warning restore CS0162
},
CancellationToken.None));
attempts.ShouldBe(1, "non-idempotent write must never replay");
}
[Fact]
public async Task Write_Idempotent_Retries_WhenPolicyHasRetries()
{
var options = new DriverResilienceOptions
{
Tier = DriverTier.A,
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
{
[DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5),
},
};
var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), options);
var attempts = 0;
var result = await invoker.ExecuteWriteAsync(
"host-1",
isIdempotent: true,
async _ =>
{
attempts++;
if (attempts < 2) throw new InvalidOperationException("transient");
await Task.Yield();
return "ok";
},
CancellationToken.None);
result.ShouldBe("ok");
attempts.ShouldBe(2);
}
[Fact]
public async Task Write_Default_DoesNotRetry_WhenPolicyHasZeroRetries()
{
// Tier A Write default is RetryCount=0. Even isIdempotent=true shouldn't retry
// because the policy says not to.
var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), new DriverResilienceOptions { Tier = DriverTier.A });
var attempts = 0;
await Should.ThrowAsync<InvalidOperationException>(async () =>
await invoker.ExecuteWriteAsync(
"host-1",
isIdempotent: true,
async _ =>
{
attempts++;
await Task.Yield();
throw new InvalidOperationException("boom");
#pragma warning disable CS0162
return 0;
#pragma warning restore CS0162
},
CancellationToken.None));
attempts.ShouldBe(1, "tier-A default for Write is RetryCount=0");
}
[Fact]
public async Task Execute_HonorsDifferentHosts_Independently()
{
var builder = new DriverResiliencePipelineBuilder();
var invoker = MakeInvoker(builder, new DriverResilienceOptions { Tier = DriverTier.A });
await invoker.ExecuteAsync(DriverCapability.Read, "host-a", _ => ValueTask.FromResult(1), CancellationToken.None);
await invoker.ExecuteAsync(DriverCapability.Read, "host-b", _ => ValueTask.FromResult(2), CancellationToken.None);
builder.CachedPipelineCount.ShouldBe(2);
}
}

View File

@@ -0,0 +1,102 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
[Trait("Category", "Unit")]
public sealed class DriverResilienceOptionsTests
{
[Theory]
[InlineData(DriverTier.A)]
[InlineData(DriverTier.B)]
[InlineData(DriverTier.C)]
public void TierDefaults_Cover_EveryCapability(DriverTier tier)
{
var defaults = DriverResilienceOptions.GetTierDefaults(tier);
foreach (var capability in Enum.GetValues<DriverCapability>())
defaults.ShouldContainKey(capability);
}
[Theory]
[InlineData(DriverTier.A)]
[InlineData(DriverTier.B)]
[InlineData(DriverTier.C)]
public void Write_NeverRetries_ByDefault(DriverTier tier)
{
var defaults = DriverResilienceOptions.GetTierDefaults(tier);
defaults[DriverCapability.Write].RetryCount.ShouldBe(0);
}
[Theory]
[InlineData(DriverTier.A)]
[InlineData(DriverTier.B)]
[InlineData(DriverTier.C)]
public void AlarmAcknowledge_NeverRetries_ByDefault(DriverTier tier)
{
var defaults = DriverResilienceOptions.GetTierDefaults(tier);
defaults[DriverCapability.AlarmAcknowledge].RetryCount.ShouldBe(0);
}
[Theory]
[InlineData(DriverTier.A, DriverCapability.Read)]
[InlineData(DriverTier.A, DriverCapability.HistoryRead)]
[InlineData(DriverTier.B, DriverCapability.Discover)]
[InlineData(DriverTier.B, DriverCapability.Probe)]
[InlineData(DriverTier.C, DriverCapability.AlarmSubscribe)]
public void IdempotentCapabilities_Retry_ByDefault(DriverTier tier, DriverCapability capability)
{
var defaults = DriverResilienceOptions.GetTierDefaults(tier);
defaults[capability].RetryCount.ShouldBeGreaterThan(0);
}
[Fact]
public void TierC_DisablesCircuitBreaker_DeferringToSupervisor()
{
var defaults = DriverResilienceOptions.GetTierDefaults(DriverTier.C);
foreach (var (_, policy) in defaults)
policy.BreakerFailureThreshold.ShouldBe(0, "Tier C breaker is handled by the Proxy supervisor (decision #68)");
}
[Theory]
[InlineData(DriverTier.A)]
[InlineData(DriverTier.B)]
public void TierAAndB_EnableCircuitBreaker(DriverTier tier)
{
var defaults = DriverResilienceOptions.GetTierDefaults(tier);
foreach (var (_, policy) in defaults)
policy.BreakerFailureThreshold.ShouldBeGreaterThan(0);
}
[Fact]
public void Resolve_Uses_TierDefaults_When_NoOverride()
{
var options = new DriverResilienceOptions { Tier = DriverTier.A };
var resolved = options.Resolve(DriverCapability.Read);
resolved.ShouldBe(DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Read]);
}
[Fact]
public void Resolve_Uses_Override_When_Configured()
{
var custom = new CapabilityPolicy(TimeoutSeconds: 42, RetryCount: 7, BreakerFailureThreshold: 9);
var options = new DriverResilienceOptions
{
Tier = DriverTier.A,
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
{
[DriverCapability.Read] = custom,
},
};
options.Resolve(DriverCapability.Read).ShouldBe(custom);
options.Resolve(DriverCapability.Write).ShouldBe(
DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Write]);
}
}

View File

@@ -0,0 +1,222 @@
using Polly.CircuitBreaker;
using Polly.Timeout;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
[Trait("Category", "Unit")]
public sealed class DriverResiliencePipelineBuilderTests
{
private static readonly DriverResilienceOptions TierAOptions = new() { Tier = DriverTier.A };
[Fact]
public async Task Read_Retries_Transient_Failures()
{
var builder = new DriverResiliencePipelineBuilder();
var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Read, TierAOptions);
var attempts = 0;
await pipeline.ExecuteAsync(async _ =>
{
attempts++;
if (attempts < 3) throw new InvalidOperationException("transient");
await Task.Yield();
});
attempts.ShouldBe(3);
}
[Fact]
public async Task Write_DoesNotRetry_OnFailure()
{
var builder = new DriverResiliencePipelineBuilder();
var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Write, TierAOptions);
var attempts = 0;
var ex = await Should.ThrowAsync<InvalidOperationException>(async () =>
{
await pipeline.ExecuteAsync(async _ =>
{
attempts++;
await Task.Yield();
throw new InvalidOperationException("boom");
});
});
attempts.ShouldBe(1);
ex.Message.ShouldBe("boom");
}
[Fact]
public async Task AlarmAcknowledge_DoesNotRetry_OnFailure()
{
var builder = new DriverResiliencePipelineBuilder();
var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.AlarmAcknowledge, TierAOptions);
var attempts = 0;
await Should.ThrowAsync<InvalidOperationException>(async () =>
{
await pipeline.ExecuteAsync(async _ =>
{
attempts++;
await Task.Yield();
throw new InvalidOperationException("boom");
});
});
attempts.ShouldBe(1);
}
[Fact]
public void Pipeline_IsIsolated_PerHost()
{
var builder = new DriverResiliencePipelineBuilder();
var driverId = "drv-test";
var hostA = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions);
var hostB = builder.GetOrCreate(driverId, "host-b", DriverCapability.Read, TierAOptions);
hostA.ShouldNotBeSameAs(hostB);
builder.CachedPipelineCount.ShouldBe(2);
}
[Fact]
public void Pipeline_IsReused_ForSameTriple()
{
var builder = new DriverResiliencePipelineBuilder();
var driverId = "drv-test";
var first = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions);
var second = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions);
first.ShouldBeSameAs(second);
builder.CachedPipelineCount.ShouldBe(1);
}
[Fact]
public void Pipeline_IsIsolated_PerCapability()
{
var builder = new DriverResiliencePipelineBuilder();
var driverId = "drv-test";
var read = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions);
var write = builder.GetOrCreate(driverId, "host-a", DriverCapability.Write, TierAOptions);
read.ShouldNotBeSameAs(write);
}
[Fact]
public async Task DeadHost_DoesNotOpenBreaker_ForSiblingHost()
{
var builder = new DriverResiliencePipelineBuilder();
var driverId = "drv-test";
var deadHost = builder.GetOrCreate(driverId, "dead-plc", DriverCapability.Read, TierAOptions);
var liveHost = builder.GetOrCreate(driverId, "live-plc", DriverCapability.Read, TierAOptions);
var threshold = TierAOptions.Resolve(DriverCapability.Read).BreakerFailureThreshold;
for (var i = 0; i < threshold + 5; i++)
{
await Should.ThrowAsync<Exception>(async () =>
await deadHost.ExecuteAsync(async _ =>
{
await Task.Yield();
throw new InvalidOperationException("dead plc");
}));
}
var liveAttempts = 0;
await liveHost.ExecuteAsync(async _ =>
{
liveAttempts++;
await Task.Yield();
});
liveAttempts.ShouldBe(1, "healthy sibling host must not be affected by dead peer");
}
[Fact]
public async Task CircuitBreaker_Opens_AfterFailureThreshold_OnTierA()
{
var builder = new DriverResiliencePipelineBuilder();
var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Write, TierAOptions);
var threshold = TierAOptions.Resolve(DriverCapability.Write).BreakerFailureThreshold;
for (var i = 0; i < threshold; i++)
{
await Should.ThrowAsync<InvalidOperationException>(async () =>
await pipeline.ExecuteAsync(async _ =>
{
await Task.Yield();
throw new InvalidOperationException("boom");
}));
}
await Should.ThrowAsync<BrokenCircuitException>(async () =>
await pipeline.ExecuteAsync(async _ =>
{
await Task.Yield();
}));
}
[Fact]
public async Task Timeout_Cancels_SlowOperation()
{
var tierAWithShortTimeout = new DriverResilienceOptions
{
Tier = DriverTier.A,
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
{
[DriverCapability.Read] = new(TimeoutSeconds: 1, RetryCount: 0, BreakerFailureThreshold: 5),
},
};
var builder = new DriverResiliencePipelineBuilder();
var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Read, tierAWithShortTimeout);
await Should.ThrowAsync<TimeoutRejectedException>(async () =>
await pipeline.ExecuteAsync(async ct =>
{
await Task.Delay(TimeSpan.FromSeconds(5), ct);
}));
}
[Fact]
public void Invalidate_Removes_OnlyMatchingInstance()
{
var builder = new DriverResiliencePipelineBuilder();
var keepId = "drv-keep";
var dropId = "drv-drop";
builder.GetOrCreate(keepId, "h", DriverCapability.Read, TierAOptions);
builder.GetOrCreate(keepId, "h", DriverCapability.Write, TierAOptions);
builder.GetOrCreate(dropId, "h", DriverCapability.Read, TierAOptions);
var removed = builder.Invalidate(dropId);
removed.ShouldBe(1);
builder.CachedPipelineCount.ShouldBe(2);
}
[Fact]
public async Task Cancellation_IsNot_Retried()
{
var builder = new DriverResiliencePipelineBuilder();
var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Read, TierAOptions);
var attempts = 0;
using var cts = new CancellationTokenSource();
cts.Cancel();
await Should.ThrowAsync<OperationCanceledException>(async () =>
await pipeline.ExecuteAsync(async ct =>
{
attempts++;
ct.ThrowIfCancellationRequested();
await Task.Yield();
}, cts.Token));
attempts.ShouldBeLessThanOrEqualTo(1);
}
}

View File

@@ -0,0 +1,160 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
/// <summary>
/// Integration tests for the Phase 6.1 Stream A.5 contract — wrapping a flaky
/// <see cref="IReadable"/> / <see cref="IWritable"/> through the <see cref="CapabilityInvoker"/>.
/// Exercises the three scenarios the plan enumerates: transient read succeeds after N
/// retries; non-idempotent write fails after one attempt; idempotent write retries through.
/// </summary>
[Trait("Category", "Integration")]
public sealed class FlakeyDriverIntegrationTests
{
[Fact]
public async Task Read_SurfacesSuccess_AfterTransientFailures()
{
var flaky = new FlakeyDriver(failReadsBeforeIndex: 5);
var options = new DriverResilienceOptions
{
Tier = DriverTier.A,
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
{
// TimeoutSeconds=30 gives slack for 5 exponential-backoff retries under
// parallel-test-execution CPU pressure; 10 retries at the default Delay=100ms
// exponential can otherwise exceed a 2-second budget intermittently.
[DriverCapability.Read] = new(TimeoutSeconds: 30, RetryCount: 10, BreakerFailureThreshold: 50),
},
};
var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => options);
var result = await invoker.ExecuteAsync(
DriverCapability.Read,
"host-1",
async ct => await flaky.ReadAsync(["tag-a"], ct),
CancellationToken.None);
flaky.ReadAttempts.ShouldBe(6);
result[0].StatusCode.ShouldBe(0u);
}
[Fact]
public async Task Write_NonIdempotent_FailsOnFirstFailure_NoReplay()
{
var flaky = new FlakeyDriver(failWritesBeforeIndex: 3);
var optionsWithAggressiveRetry = new DriverResilienceOptions
{
Tier = DriverTier.A,
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
{
[DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50),
},
};
var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => optionsWithAggressiveRetry);
await Should.ThrowAsync<InvalidOperationException>(async () =>
await invoker.ExecuteWriteAsync(
"host-1",
isIdempotent: false,
async ct => await flaky.WriteAsync([new WriteRequest("pulse-coil", true)], ct),
CancellationToken.None));
flaky.WriteAttempts.ShouldBe(1, "non-idempotent write must never replay (decision #44)");
}
[Fact]
public async Task Write_Idempotent_RetriesUntilSuccess()
{
var flaky = new FlakeyDriver(failWritesBeforeIndex: 2);
var optionsWithRetry = new DriverResilienceOptions
{
Tier = DriverTier.A,
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
{
[DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50),
},
};
var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => optionsWithRetry);
var results = await invoker.ExecuteWriteAsync(
"host-1",
isIdempotent: true,
async ct => await flaky.WriteAsync([new WriteRequest("set-point", 42.0f)], ct),
CancellationToken.None);
flaky.WriteAttempts.ShouldBe(3);
results[0].StatusCode.ShouldBe(0u);
}
[Fact]
public async Task MultipleHosts_OnOneDriver_HaveIndependentFailureCounts()
{
var flaky = new FlakeyDriver(failReadsBeforeIndex: 0);
var options = new DriverResilienceOptions { Tier = DriverTier.A };
var builder = new DriverResiliencePipelineBuilder();
var invoker = new CapabilityInvoker(builder, "drv-test", () => options);
// host-dead: force many failures to exhaust retries + trip breaker
var threshold = options.Resolve(DriverCapability.Read).BreakerFailureThreshold;
for (var i = 0; i < threshold + 5; i++)
{
await Should.ThrowAsync<Exception>(async () =>
await invoker.ExecuteAsync(DriverCapability.Read, "host-dead",
_ => throw new InvalidOperationException("dead"),
CancellationToken.None));
}
// host-live: succeeds on first call — unaffected by the dead-host breaker
var liveAttempts = 0;
await invoker.ExecuteAsync(DriverCapability.Read, "host-live",
_ => { liveAttempts++; return ValueTask.FromResult("ok"); },
CancellationToken.None);
liveAttempts.ShouldBe(1);
}
private sealed class FlakeyDriver : IReadable, IWritable
{
private readonly int _failReadsBeforeIndex;
private readonly int _failWritesBeforeIndex;
public int ReadAttempts { get; private set; }
public int WriteAttempts { get; private set; }
public FlakeyDriver(int failReadsBeforeIndex = 0, int failWritesBeforeIndex = 0)
{
_failReadsBeforeIndex = failReadsBeforeIndex;
_failWritesBeforeIndex = failWritesBeforeIndex;
}
public Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
IReadOnlyList<string> fullReferences,
CancellationToken cancellationToken)
{
var attempt = ++ReadAttempts;
if (attempt <= _failReadsBeforeIndex)
throw new InvalidOperationException($"transient read failure #{attempt}");
var now = DateTime.UtcNow;
IReadOnlyList<DataValueSnapshot> result = fullReferences
.Select(_ => new DataValueSnapshot(Value: 0, StatusCode: 0u, SourceTimestampUtc: now, ServerTimestampUtc: now))
.ToList();
return Task.FromResult(result);
}
public Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<WriteRequest> writes,
CancellationToken cancellationToken)
{
var attempt = ++WriteAttempts;
if (attempt <= _failWritesBeforeIndex)
throw new InvalidOperationException($"transient write failure #{attempt}");
IReadOnlyList<WriteResult> result = writes.Select(_ => new WriteResult(0u)).ToList();
return Task.FromResult(result);
}
}
}

View File

@@ -0,0 +1,91 @@
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Stability;
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Stability;
[Trait("Category", "Unit")]
public sealed class MemoryRecycleTests
{
[Fact]
public async Task TierC_HardBreach_RequestsSupervisorRecycle()
{
var supervisor = new FakeSupervisor();
var recycle = new MemoryRecycle(DriverTier.C, supervisor, NullLogger<MemoryRecycle>.Instance);
var requested = await recycle.HandleAsync(MemoryTrackingAction.HardBreach, 2_000_000_000, CancellationToken.None);
requested.ShouldBeTrue();
supervisor.RecycleCount.ShouldBe(1);
supervisor.LastReason.ShouldContain("hard-breach");
}
[Theory]
[InlineData(DriverTier.A)]
[InlineData(DriverTier.B)]
public async Task InProcessTier_HardBreach_NeverRequestsRecycle(DriverTier tier)
{
var supervisor = new FakeSupervisor();
var recycle = new MemoryRecycle(tier, supervisor, NullLogger<MemoryRecycle>.Instance);
var requested = await recycle.HandleAsync(MemoryTrackingAction.HardBreach, 2_000_000_000, CancellationToken.None);
requested.ShouldBeFalse("Tier A/B hard-breach logs a promotion recommendation only (decisions #74, #145)");
supervisor.RecycleCount.ShouldBe(0);
}
[Fact]
public async Task TierC_WithoutSupervisor_HardBreach_NoOp()
{
var recycle = new MemoryRecycle(DriverTier.C, supervisor: null, NullLogger<MemoryRecycle>.Instance);
var requested = await recycle.HandleAsync(MemoryTrackingAction.HardBreach, 2_000_000_000, CancellationToken.None);
requested.ShouldBeFalse("no supervisor → no recycle path; action logged only");
}
[Theory]
[InlineData(DriverTier.A)]
[InlineData(DriverTier.B)]
[InlineData(DriverTier.C)]
public async Task SoftBreach_NeverRequestsRecycle(DriverTier tier)
{
var supervisor = new FakeSupervisor();
var recycle = new MemoryRecycle(tier, supervisor, NullLogger<MemoryRecycle>.Instance);
var requested = await recycle.HandleAsync(MemoryTrackingAction.SoftBreach, 1_000_000_000, CancellationToken.None);
requested.ShouldBeFalse("soft-breach is surface-only at every tier");
supervisor.RecycleCount.ShouldBe(0);
}
[Theory]
[InlineData(MemoryTrackingAction.None)]
[InlineData(MemoryTrackingAction.Warming)]
public async Task NonBreachActions_NoOp(MemoryTrackingAction action)
{
var supervisor = new FakeSupervisor();
var recycle = new MemoryRecycle(DriverTier.C, supervisor, NullLogger<MemoryRecycle>.Instance);
var requested = await recycle.HandleAsync(action, 100_000_000, CancellationToken.None);
requested.ShouldBeFalse();
supervisor.RecycleCount.ShouldBe(0);
}
private sealed class FakeSupervisor : IDriverSupervisor
{
public string DriverInstanceId => "fake-tier-c";
public int RecycleCount { get; private set; }
public string? LastReason { get; private set; }
public Task RecycleAsync(string reason, CancellationToken cancellationToken)
{
RecycleCount++;
LastReason = reason;
return Task.CompletedTask;
}
}
}

View File

@@ -0,0 +1,119 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Stability;
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Stability;
[Trait("Category", "Unit")]
public sealed class MemoryTrackingTests
{
private static readonly DateTime T0 = new(2026, 4, 19, 12, 0, 0, DateTimeKind.Utc);
[Fact]
public void WarmingUp_Returns_Warming_UntilWindowElapses()
{
var tracker = new MemoryTracking(DriverTier.A, TimeSpan.FromMinutes(5));
tracker.Sample(100_000_000, T0).ShouldBe(MemoryTrackingAction.Warming);
tracker.Sample(105_000_000, T0.AddMinutes(1)).ShouldBe(MemoryTrackingAction.Warming);
tracker.Sample(102_000_000, T0.AddMinutes(4.9)).ShouldBe(MemoryTrackingAction.Warming);
tracker.Phase.ShouldBe(TrackingPhase.WarmingUp);
tracker.BaselineBytes.ShouldBe(0);
}
[Fact]
public void WindowElapsed_CapturesBaselineAsMedian_AndTransitionsToSteady()
{
var tracker = new MemoryTracking(DriverTier.A, TimeSpan.FromMinutes(5));
tracker.Sample(100_000_000, T0);
tracker.Sample(200_000_000, T0.AddMinutes(1));
tracker.Sample(150_000_000, T0.AddMinutes(2));
var first = tracker.Sample(150_000_000, T0.AddMinutes(5));
tracker.Phase.ShouldBe(TrackingPhase.Steady);
tracker.BaselineBytes.ShouldBe(150_000_000L, "median of 4 samples [100, 200, 150, 150] = (150+150)/2 = 150");
first.ShouldBe(MemoryTrackingAction.None, "150 MB is the baseline itself, well under soft threshold");
}
[Theory]
[InlineData(DriverTier.A, 3, 50)]
[InlineData(DriverTier.B, 3, 100)]
[InlineData(DriverTier.C, 2, 500)]
public void GetTierConstants_MatchesDecision146(DriverTier tier, int expectedMultiplier, long expectedFloorMB)
{
var (multiplier, floor) = MemoryTracking.GetTierConstants(tier);
multiplier.ShouldBe(expectedMultiplier);
floor.ShouldBe(expectedFloorMB * 1024 * 1024);
}
[Fact]
public void SoftThreshold_UsesMax_OfMultiplierAndFloor_SmallBaseline()
{
// Tier A: mult=3, floor=50 MB. Baseline 10 MB → 3×10=30 MB < 10+50=60 MB → floor wins.
var tracker = WarmupWithBaseline(DriverTier.A, 10L * 1024 * 1024);
tracker.SoftThresholdBytes.ShouldBe(60L * 1024 * 1024);
}
[Fact]
public void SoftThreshold_UsesMax_OfMultiplierAndFloor_LargeBaseline()
{
// Tier A: mult=3, floor=50 MB. Baseline 200 MB → 3×200=600 MB > 200+50=250 MB → multiplier wins.
var tracker = WarmupWithBaseline(DriverTier.A, 200L * 1024 * 1024);
tracker.SoftThresholdBytes.ShouldBe(600L * 1024 * 1024);
}
[Fact]
public void HardThreshold_IsTwiceSoft()
{
var tracker = WarmupWithBaseline(DriverTier.B, 200L * 1024 * 1024);
tracker.HardThresholdBytes.ShouldBe(tracker.SoftThresholdBytes * 2);
}
[Fact]
public void Sample_Below_Soft_Returns_None()
{
var tracker = WarmupWithBaseline(DriverTier.A, 100L * 1024 * 1024);
tracker.Sample(200L * 1024 * 1024, T0.AddMinutes(10)).ShouldBe(MemoryTrackingAction.None);
}
[Fact]
public void Sample_AtSoft_Returns_SoftBreach()
{
// Tier A, baseline 200 MB → soft = 600 MB. Sample exactly at soft.
var tracker = WarmupWithBaseline(DriverTier.A, 200L * 1024 * 1024);
tracker.Sample(tracker.SoftThresholdBytes, T0.AddMinutes(10))
.ShouldBe(MemoryTrackingAction.SoftBreach);
}
[Fact]
public void Sample_AtHard_Returns_HardBreach()
{
var tracker = WarmupWithBaseline(DriverTier.A, 200L * 1024 * 1024);
tracker.Sample(tracker.HardThresholdBytes, T0.AddMinutes(10))
.ShouldBe(MemoryTrackingAction.HardBreach);
}
[Fact]
public void Sample_AboveHard_Returns_HardBreach()
{
var tracker = WarmupWithBaseline(DriverTier.A, 200L * 1024 * 1024);
tracker.Sample(tracker.HardThresholdBytes + 100_000_000, T0.AddMinutes(10))
.ShouldBe(MemoryTrackingAction.HardBreach);
}
private static MemoryTracking WarmupWithBaseline(DriverTier tier, long baseline)
{
var tracker = new MemoryTracking(tier, TimeSpan.FromMinutes(5));
tracker.Sample(baseline, T0);
tracker.Sample(baseline, T0.AddMinutes(5));
tracker.BaselineBytes.ShouldBe(baseline);
return tracker;
}
}

View File

@@ -0,0 +1,101 @@
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Stability;
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Stability;
[Trait("Category", "Unit")]
public sealed class ScheduledRecycleSchedulerTests
{
private static readonly DateTime T0 = new(2026, 4, 19, 0, 0, 0, DateTimeKind.Utc);
private static readonly TimeSpan Weekly = TimeSpan.FromDays(7);
[Theory]
[InlineData(DriverTier.A)]
[InlineData(DriverTier.B)]
public void TierAOrB_Ctor_Throws(DriverTier tier)
{
var supervisor = new FakeSupervisor();
Should.Throw<ArgumentException>(() => new ScheduledRecycleScheduler(
tier, Weekly, T0, supervisor, NullLogger<ScheduledRecycleScheduler>.Instance));
}
[Fact]
public void ZeroOrNegativeInterval_Throws()
{
var supervisor = new FakeSupervisor();
Should.Throw<ArgumentException>(() => new ScheduledRecycleScheduler(
DriverTier.C, TimeSpan.Zero, T0, supervisor, NullLogger<ScheduledRecycleScheduler>.Instance));
Should.Throw<ArgumentException>(() => new ScheduledRecycleScheduler(
DriverTier.C, TimeSpan.FromSeconds(-1), T0, supervisor, NullLogger<ScheduledRecycleScheduler>.Instance));
}
[Fact]
public async Task Tick_BeforeNextRecycle_NoOp()
{
var supervisor = new FakeSupervisor();
var sch = new ScheduledRecycleScheduler(DriverTier.C, Weekly, T0, supervisor, NullLogger<ScheduledRecycleScheduler>.Instance);
var fired = await sch.TickAsync(T0 + TimeSpan.FromDays(6), CancellationToken.None);
fired.ShouldBeFalse();
supervisor.RecycleCount.ShouldBe(0);
}
[Fact]
public async Task Tick_AtOrAfterNextRecycle_FiresOnce_AndAdvances()
{
var supervisor = new FakeSupervisor();
var sch = new ScheduledRecycleScheduler(DriverTier.C, Weekly, T0, supervisor, NullLogger<ScheduledRecycleScheduler>.Instance);
var fired = await sch.TickAsync(T0 + Weekly + TimeSpan.FromMinutes(1), CancellationToken.None);
fired.ShouldBeTrue();
supervisor.RecycleCount.ShouldBe(1);
sch.NextRecycleUtc.ShouldBe(T0 + Weekly + Weekly);
}
[Fact]
public async Task RequestRecycleNow_Fires_Immediately_WithoutAdvancingSchedule()
{
var supervisor = new FakeSupervisor();
var sch = new ScheduledRecycleScheduler(DriverTier.C, Weekly, T0, supervisor, NullLogger<ScheduledRecycleScheduler>.Instance);
var nextBefore = sch.NextRecycleUtc;
await sch.RequestRecycleNowAsync("memory hard-breach", CancellationToken.None);
supervisor.RecycleCount.ShouldBe(1);
supervisor.LastReason.ShouldBe("memory hard-breach");
sch.NextRecycleUtc.ShouldBe(nextBefore, "ad-hoc recycle doesn't shift the cron schedule");
}
[Fact]
public async Task MultipleFires_AcrossTicks_AdvanceOneIntervalEach()
{
var supervisor = new FakeSupervisor();
var sch = new ScheduledRecycleScheduler(DriverTier.C, TimeSpan.FromDays(1), T0, supervisor, NullLogger<ScheduledRecycleScheduler>.Instance);
await sch.TickAsync(T0 + TimeSpan.FromDays(1) + TimeSpan.FromHours(1), CancellationToken.None);
await sch.TickAsync(T0 + TimeSpan.FromDays(2) + TimeSpan.FromHours(1), CancellationToken.None);
await sch.TickAsync(T0 + TimeSpan.FromDays(3) + TimeSpan.FromHours(1), CancellationToken.None);
supervisor.RecycleCount.ShouldBe(3);
sch.NextRecycleUtc.ShouldBe(T0 + TimeSpan.FromDays(4));
}
private sealed class FakeSupervisor : IDriverSupervisor
{
public string DriverInstanceId => "tier-c-fake";
public int RecycleCount { get; private set; }
public string? LastReason { get; private set; }
public Task RecycleAsync(string reason, CancellationToken cancellationToken)
{
RecycleCount++;
LastReason = reason;
return Task.CompletedTask;
}
}
}

View File

@@ -0,0 +1,112 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Stability;
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Stability;
[Trait("Category", "Unit")]
public sealed class WedgeDetectorTests
{
private static readonly DateTime Now = new(2026, 4, 19, 12, 0, 0, DateTimeKind.Utc);
private static readonly TimeSpan Threshold = TimeSpan.FromSeconds(120);
[Fact]
public void SubSixtySecondThreshold_ClampsToSixty()
{
var detector = new WedgeDetector(TimeSpan.FromSeconds(10));
detector.Threshold.ShouldBe(TimeSpan.FromSeconds(60));
}
[Fact]
public void Unhealthy_Driver_AlwaysNotApplicable()
{
var detector = new WedgeDetector(Threshold);
var demand = new DemandSignal(BulkheadDepth: 5, ActiveMonitoredItems: 10, QueuedHistoryReads: 0, LastProgressUtc: Now.AddMinutes(-10));
detector.Classify(DriverState.Faulted, demand, Now).ShouldBe(WedgeVerdict.NotApplicable);
detector.Classify(DriverState.Degraded, demand, Now).ShouldBe(WedgeVerdict.NotApplicable);
detector.Classify(DriverState.Initializing, demand, Now).ShouldBe(WedgeVerdict.NotApplicable);
}
[Fact]
public void Idle_Subscription_Only_StaysIdle()
{
// Idle driver: bulkhead 0, monitored items 0, no history reads queued.
// Even if LastProgressUtc is ancient, the verdict is Idle, not Faulted.
var detector = new WedgeDetector(Threshold);
var demand = new DemandSignal(0, 0, 0, Now.AddHours(-12));
detector.Classify(DriverState.Healthy, demand, Now).ShouldBe(WedgeVerdict.Idle);
}
[Fact]
public void PendingWork_WithRecentProgress_StaysHealthy()
{
var detector = new WedgeDetector(Threshold);
var demand = new DemandSignal(BulkheadDepth: 2, ActiveMonitoredItems: 0, QueuedHistoryReads: 0, LastProgressUtc: Now.AddSeconds(-30));
detector.Classify(DriverState.Healthy, demand, Now).ShouldBe(WedgeVerdict.Healthy);
}
[Fact]
public void PendingWork_WithStaleProgress_IsFaulted()
{
var detector = new WedgeDetector(Threshold);
var demand = new DemandSignal(BulkheadDepth: 2, ActiveMonitoredItems: 0, QueuedHistoryReads: 0, LastProgressUtc: Now.AddMinutes(-5));
detector.Classify(DriverState.Healthy, demand, Now).ShouldBe(WedgeVerdict.Faulted);
}
[Fact]
public void MonitoredItems_Active_ButNoRecentPublish_IsFaulted()
{
// Subscription-only driver with live MonitoredItems but no publish progress within threshold
// is a real wedge — this is the case the previous "no successful Read" formulation used
// to miss (no reads ever happen).
var detector = new WedgeDetector(Threshold);
var demand = new DemandSignal(BulkheadDepth: 0, ActiveMonitoredItems: 5, QueuedHistoryReads: 0, LastProgressUtc: Now.AddMinutes(-10));
detector.Classify(DriverState.Healthy, demand, Now).ShouldBe(WedgeVerdict.Faulted);
}
[Fact]
public void MonitoredItems_Active_WithFreshPublish_StaysHealthy()
{
var detector = new WedgeDetector(Threshold);
var demand = new DemandSignal(BulkheadDepth: 0, ActiveMonitoredItems: 5, QueuedHistoryReads: 0, LastProgressUtc: Now.AddSeconds(-10));
detector.Classify(DriverState.Healthy, demand, Now).ShouldBe(WedgeVerdict.Healthy);
}
[Fact]
public void HistoryBackfill_SlowButMakingProgress_StaysHealthy()
{
// Slow historian backfill — QueuedHistoryReads > 0 but progress advances within threshold.
var detector = new WedgeDetector(Threshold);
var demand = new DemandSignal(BulkheadDepth: 0, ActiveMonitoredItems: 0, QueuedHistoryReads: 50, LastProgressUtc: Now.AddSeconds(-60));
detector.Classify(DriverState.Healthy, demand, Now).ShouldBe(WedgeVerdict.Healthy);
}
[Fact]
public void WriteOnlyBurst_StaysIdle_WhenBulkheadEmpty()
{
// A write-only driver that just finished a burst: bulkhead drained, no subscriptions, no
// history reads. Idle — the previous formulation would have faulted here because no
// reads were succeeding even though the driver is perfectly healthy.
var detector = new WedgeDetector(Threshold);
var demand = new DemandSignal(0, 0, 0, Now.AddMinutes(-30));
detector.Classify(DriverState.Healthy, demand, Now).ShouldBe(WedgeVerdict.Idle);
}
[Fact]
public void DemandSignal_HasPendingWork_TrueForAnyNonZeroCounter()
{
new DemandSignal(1, 0, 0, Now).HasPendingWork.ShouldBeTrue();
new DemandSignal(0, 1, 0, Now).HasPendingWork.ShouldBeTrue();
new DemandSignal(0, 0, 1, Now).HasPendingWork.ShouldBeTrue();
new DemandSignal(0, 0, 0, Now).HasPendingWork.ShouldBeFalse();
}
}

View File

@@ -220,6 +220,23 @@ public sealed class ModbusDriverTests
builder.Variables.ShouldContain(v => v.BrowseName == "Run" && v.Info.DriverDataType == DriverDataType.Boolean);
}
[Fact]
public async Task Discover_propagates_WriteIdempotent_from_tag_to_attribute_info()
{
var (drv, _) = NewDriver(
new ModbusTagDefinition("SetPoint", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Float32, WriteIdempotent: true),
new ModbusTagDefinition("PulseCoil", ModbusRegion.Coils, 0, ModbusDataType.Bool));
await drv.InitializeAsync("{}", CancellationToken.None);
var builder = new RecordingBuilder();
await drv.DiscoverAsync(builder, CancellationToken.None);
var setPoint = builder.Variables.Single(v => v.BrowseName == "SetPoint");
var pulse = builder.Variables.Single(v => v.BrowseName == "PulseCoil");
setPoint.Info.WriteIdempotent.ShouldBeTrue();
pulse.Info.WriteIdempotent.ShouldBeFalse("default is opt-in per decision #44");
}
// --- helpers ---
private sealed class RecordingBuilder : IAddressSpaceBuilder

View File

@@ -65,6 +65,27 @@ public sealed class S7DiscoveryAndSubscribeTests
builder.Variables[2].Attr.DriverDataType.ShouldBe(DriverDataType.Float32);
}
[Fact]
public async Task DiscoverAsync_propagates_WriteIdempotent_from_tag_to_attribute_info()
{
var opts = new S7DriverOptions
{
Host = "192.0.2.1",
Tags =
[
new("SetPoint", "DB1.DBW0", S7DataType.Int16, WriteIdempotent: true),
new("StartBit", "M0.0", S7DataType.Bool),
],
};
using var drv = new S7Driver(opts, "s7-idem");
var builder = new RecordingAddressSpaceBuilder();
await drv.DiscoverAsync(builder, TestContext.Current.CancellationToken);
builder.Variables.Single(v => v.Name == "SetPoint").Attr.WriteIdempotent.ShouldBeTrue();
builder.Variables.Single(v => v.Name == "StartBit").Attr.WriteIdempotent.ShouldBeFalse("default is opt-in per decision #44");
}
[Fact]
public void GetHostStatuses_returns_one_row_with_host_port_identity_pre_init()
{