Compare commits
4 Commits
phase-6-1-
...
phase-6-1-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cbcaf6593a | ||
| 8d81715079 | |||
|
|
854c3bcfec | ||
| ff4a74a81f |
@@ -0,0 +1,44 @@
|
|||||||
|
namespace ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Runtime resilience counters the CapabilityInvoker + MemoryTracking + MemoryRecycle
|
||||||
|
/// surfaces for each <c>(DriverInstanceId, HostName)</c> pair. Separate from
|
||||||
|
/// <see cref="DriverHostStatus"/> (which owns per-host <i>connectivity</i> state) so a
|
||||||
|
/// host that's Running but has tripped its breaker or is approaching its memory ceiling
|
||||||
|
/// shows up distinctly on Admin <c>/hosts</c>.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Per <c>docs/v2/implementation/phase-6-1-resilience-and-observability.md</c> §Stream E.1.
|
||||||
|
/// The Admin UI left-joins this table on DriverHostStatus for display; rows are written
|
||||||
|
/// by the runtime via a HostedService that samples the tracker at a configurable
|
||||||
|
/// interval (default 5 s) — writes are non-critical, a missed sample is tolerated.
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class DriverInstanceResilienceStatus
|
||||||
|
{
|
||||||
|
public required string DriverInstanceId { get; set; }
|
||||||
|
public required string HostName { get; set; }
|
||||||
|
|
||||||
|
/// <summary>Most recent time the circuit breaker for this (instance, host) opened; null if never.</summary>
|
||||||
|
public DateTime? LastCircuitBreakerOpenUtc { get; set; }
|
||||||
|
|
||||||
|
/// <summary>Rolling count of consecutive Polly pipeline failures for this (instance, host).</summary>
|
||||||
|
public int ConsecutiveFailures { get; set; }
|
||||||
|
|
||||||
|
/// <summary>Current Polly bulkhead depth (in-flight calls) for this (instance, host).</summary>
|
||||||
|
public int CurrentBulkheadDepth { get; set; }
|
||||||
|
|
||||||
|
/// <summary>Most recent process recycle time (Tier C only; null for in-process tiers).</summary>
|
||||||
|
public DateTime? LastRecycleUtc { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Post-init memory baseline captured by <c>MemoryTracking</c> (median of first
|
||||||
|
/// BaselineWindow samples). Zero while still warming up.
|
||||||
|
/// </summary>
|
||||||
|
public long BaselineFootprintBytes { get; set; }
|
||||||
|
|
||||||
|
/// <summary>Most recent footprint sample the tracker saw (steady-state read).</summary>
|
||||||
|
public long CurrentFootprintBytes { get; set; }
|
||||||
|
|
||||||
|
/// <summary>Row last-write timestamp — advances on every sampling tick.</summary>
|
||||||
|
public DateTime LastSampledUtc { get; set; }
|
||||||
|
}
|
||||||
@@ -0,0 +1,170 @@
|
|||||||
|
using LiteDB;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Configuration.LocalCache;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Generation-sealed LiteDB cache per <c>docs/v2/plan.md</c> decision #148 and Phase 6.1
|
||||||
|
/// Stream D.1. Each published generation writes one <b>read-only</b> LiteDB file under
|
||||||
|
/// <c><cache-root>/<clusterId>/<generationId>.db</c>. A per-cluster
|
||||||
|
/// <c>CURRENT</c> text file holds the currently-active generation id; it is updated
|
||||||
|
/// atomically (temp file + <see cref="File.Replace(string, string, string?)"/>) only after
|
||||||
|
/// the sealed file is fully written.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>Mixed-generation reads are impossible: any read opens the single file pointed to
|
||||||
|
/// by <c>CURRENT</c>, which is a coherent snapshot. Corruption of the CURRENT file or the
|
||||||
|
/// sealed file surfaces as <see cref="GenerationCacheUnavailableException"/> — the reader
|
||||||
|
/// fails closed rather than silently falling back to an older generation. Recovery path
|
||||||
|
/// is to re-fetch from the central DB (and the Phase 6.1 Stream C <c>UsingStaleConfig</c>
|
||||||
|
/// flag goes true until that succeeds).</para>
|
||||||
|
///
|
||||||
|
/// <para>This cache is the read-path fallback when the central DB is unreachable. The
|
||||||
|
/// write path (draft edits, publish) bypasses the cache and fails hard on DB outage per
|
||||||
|
/// Stream D.2 — inconsistent writes are worse than a temporary inability to edit.</para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class GenerationSealedCache
|
||||||
|
{
|
||||||
|
private const string CollectionName = "generation";
|
||||||
|
private const string CurrentPointerFileName = "CURRENT";
|
||||||
|
private readonly string _cacheRoot;
|
||||||
|
|
||||||
|
/// <summary>Root directory for all clusters' sealed caches.</summary>
|
||||||
|
public string CacheRoot => _cacheRoot;
|
||||||
|
|
||||||
|
public GenerationSealedCache(string cacheRoot)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(cacheRoot);
|
||||||
|
_cacheRoot = cacheRoot;
|
||||||
|
Directory.CreateDirectory(_cacheRoot);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Seal a generation: write the snapshot to <c><cluster>/<generationId>.db</c>,
|
||||||
|
/// mark the file read-only, then atomically publish the <c>CURRENT</c> pointer. Existing
|
||||||
|
/// sealed files for prior generations are preserved (prune separately).
|
||||||
|
/// </summary>
|
||||||
|
public async Task SealAsync(GenerationSnapshot snapshot, CancellationToken ct = default)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(snapshot);
|
||||||
|
ct.ThrowIfCancellationRequested();
|
||||||
|
|
||||||
|
var clusterDir = Path.Combine(_cacheRoot, snapshot.ClusterId);
|
||||||
|
Directory.CreateDirectory(clusterDir);
|
||||||
|
var sealedPath = Path.Combine(clusterDir, $"{snapshot.GenerationId}.db");
|
||||||
|
|
||||||
|
if (File.Exists(sealedPath))
|
||||||
|
{
|
||||||
|
// Already sealed — idempotent. Treat as no-op + update pointer in case an earlier
|
||||||
|
// seal succeeded but the pointer update failed (crash recovery).
|
||||||
|
WritePointerAtomically(clusterDir, snapshot.GenerationId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var tmpPath = sealedPath + ".tmp";
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using (var db = new LiteDatabase(new ConnectionString { Filename = tmpPath, Upgrade = false }))
|
||||||
|
{
|
||||||
|
var col = db.GetCollection<GenerationSnapshot>(CollectionName);
|
||||||
|
col.Insert(snapshot);
|
||||||
|
}
|
||||||
|
|
||||||
|
File.Move(tmpPath, sealedPath);
|
||||||
|
File.SetAttributes(sealedPath, File.GetAttributes(sealedPath) | FileAttributes.ReadOnly);
|
||||||
|
WritePointerAtomically(clusterDir, snapshot.GenerationId);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
try { if (File.Exists(tmpPath)) File.Delete(tmpPath); } catch { /* best-effort */ }
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
await Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Read the current sealed snapshot for <paramref name="clusterId"/>. Throws
|
||||||
|
/// <see cref="GenerationCacheUnavailableException"/> when the pointer is missing
|
||||||
|
/// (first-boot-no-snapshot case) or when the sealed file is corrupt. Never silently
|
||||||
|
/// falls back to a prior generation.
|
||||||
|
/// </summary>
|
||||||
|
public Task<GenerationSnapshot> ReadCurrentAsync(string clusterId, CancellationToken ct = default)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(clusterId);
|
||||||
|
ct.ThrowIfCancellationRequested();
|
||||||
|
|
||||||
|
var clusterDir = Path.Combine(_cacheRoot, clusterId);
|
||||||
|
var pointerPath = Path.Combine(clusterDir, CurrentPointerFileName);
|
||||||
|
if (!File.Exists(pointerPath))
|
||||||
|
throw new GenerationCacheUnavailableException(
|
||||||
|
$"No sealed generation for cluster '{clusterId}' at '{clusterDir}'. First-boot case: the central DB must be reachable at least once before cache fallback is possible.");
|
||||||
|
|
||||||
|
long generationId;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var text = File.ReadAllText(pointerPath).Trim();
|
||||||
|
generationId = long.Parse(text, System.Globalization.CultureInfo.InvariantCulture);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
throw new GenerationCacheUnavailableException(
|
||||||
|
$"CURRENT pointer at '{pointerPath}' is corrupt or unreadable.", ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
var sealedPath = Path.Combine(clusterDir, $"{generationId}.db");
|
||||||
|
if (!File.Exists(sealedPath))
|
||||||
|
throw new GenerationCacheUnavailableException(
|
||||||
|
$"CURRENT points at generation {generationId} but '{sealedPath}' is missing — fails closed rather than serving an older generation.");
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using var db = new LiteDatabase(new ConnectionString { Filename = sealedPath, ReadOnly = true });
|
||||||
|
var col = db.GetCollection<GenerationSnapshot>(CollectionName);
|
||||||
|
var snapshot = col.FindAll().FirstOrDefault()
|
||||||
|
?? throw new GenerationCacheUnavailableException(
|
||||||
|
$"Sealed file '{sealedPath}' contains no snapshot row — file is corrupt.");
|
||||||
|
return Task.FromResult(snapshot);
|
||||||
|
}
|
||||||
|
catch (GenerationCacheUnavailableException) { throw; }
|
||||||
|
catch (Exception ex) when (ex is LiteException or InvalidDataException or IOException
|
||||||
|
or NotSupportedException or FormatException)
|
||||||
|
{
|
||||||
|
throw new GenerationCacheUnavailableException(
|
||||||
|
$"Sealed file '{sealedPath}' is corrupt or unreadable — fails closed rather than falling back to an older generation.", ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Return the generation id the <c>CURRENT</c> pointer points at, or null if no pointer exists.</summary>
|
||||||
|
public long? TryGetCurrentGenerationId(string clusterId)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(clusterId);
|
||||||
|
var pointerPath = Path.Combine(_cacheRoot, clusterId, CurrentPointerFileName);
|
||||||
|
if (!File.Exists(pointerPath)) return null;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return long.Parse(File.ReadAllText(pointerPath).Trim(), System.Globalization.CultureInfo.InvariantCulture);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void WritePointerAtomically(string clusterDir, long generationId)
|
||||||
|
{
|
||||||
|
var pointerPath = Path.Combine(clusterDir, CurrentPointerFileName);
|
||||||
|
var tmpPath = pointerPath + ".tmp";
|
||||||
|
File.WriteAllText(tmpPath, generationId.ToString(System.Globalization.CultureInfo.InvariantCulture));
|
||||||
|
if (File.Exists(pointerPath))
|
||||||
|
File.Replace(tmpPath, pointerPath, destinationBackupFileName: null);
|
||||||
|
else
|
||||||
|
File.Move(tmpPath, pointerPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Sealed cache is unreachable — caller must fail closed.</summary>
|
||||||
|
public sealed class GenerationCacheUnavailableException : Exception
|
||||||
|
{
|
||||||
|
public GenerationCacheUnavailableException(string message) : base(message) { }
|
||||||
|
public GenerationCacheUnavailableException(string message, Exception inner) : base(message, inner) { }
|
||||||
|
}
|
||||||
@@ -0,0 +1,90 @@
|
|||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Polly;
|
||||||
|
using Polly.Retry;
|
||||||
|
using Polly.Timeout;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Configuration.LocalCache;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Wraps a central-DB fetch function with Phase 6.1 Stream D.2 resilience:
|
||||||
|
/// <b>timeout 2 s → retry 3× jittered → fallback to sealed cache</b>. Maintains the
|
||||||
|
/// <see cref="StaleConfigFlag"/> — fresh on central-DB success, stale on cache fallback.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>Read-path only per plan. The write path (draft save, publish) bypasses this
|
||||||
|
/// wrapper entirely and fails hard on DB outage so inconsistent writes never land.</para>
|
||||||
|
///
|
||||||
|
/// <para>Fallback is triggered by <b>any exception</b> the fetch raises (central-DB
|
||||||
|
/// unreachable, SqlException, timeout). If the sealed cache also fails (no pointer,
|
||||||
|
/// corrupt file, etc.), <see cref="GenerationCacheUnavailableException"/> surfaces — caller
|
||||||
|
/// must fail the current request (InitializeAsync for a driver, etc.).</para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class ResilientConfigReader
|
||||||
|
{
|
||||||
|
private readonly GenerationSealedCache _cache;
|
||||||
|
private readonly StaleConfigFlag _staleFlag;
|
||||||
|
private readonly ResiliencePipeline _pipeline;
|
||||||
|
private readonly ILogger<ResilientConfigReader> _logger;
|
||||||
|
|
||||||
|
public ResilientConfigReader(
|
||||||
|
GenerationSealedCache cache,
|
||||||
|
StaleConfigFlag staleFlag,
|
||||||
|
ILogger<ResilientConfigReader> logger,
|
||||||
|
TimeSpan? timeout = null,
|
||||||
|
int retryCount = 3)
|
||||||
|
{
|
||||||
|
_cache = cache;
|
||||||
|
_staleFlag = staleFlag;
|
||||||
|
_logger = logger;
|
||||||
|
var builder = new ResiliencePipelineBuilder()
|
||||||
|
.AddTimeout(new TimeoutStrategyOptions { Timeout = timeout ?? TimeSpan.FromSeconds(2) });
|
||||||
|
|
||||||
|
if (retryCount > 0)
|
||||||
|
{
|
||||||
|
builder.AddRetry(new RetryStrategyOptions
|
||||||
|
{
|
||||||
|
MaxRetryAttempts = retryCount,
|
||||||
|
BackoffType = DelayBackoffType.Exponential,
|
||||||
|
UseJitter = true,
|
||||||
|
Delay = TimeSpan.FromMilliseconds(100),
|
||||||
|
MaxDelay = TimeSpan.FromSeconds(1),
|
||||||
|
ShouldHandle = new PredicateBuilder().Handle<Exception>(ex => ex is not OperationCanceledException),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
_pipeline = builder.Build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Execute <paramref name="centralFetch"/> through the resilience pipeline. On full failure
|
||||||
|
/// (post-retry), reads the sealed cache for <paramref name="clusterId"/> and passes the
|
||||||
|
/// snapshot to <paramref name="fromSnapshot"/> to extract the requested shape.
|
||||||
|
/// </summary>
|
||||||
|
public async ValueTask<T> ReadAsync<T>(
|
||||||
|
string clusterId,
|
||||||
|
Func<CancellationToken, ValueTask<T>> centralFetch,
|
||||||
|
Func<GenerationSnapshot, T> fromSnapshot,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(clusterId);
|
||||||
|
ArgumentNullException.ThrowIfNull(centralFetch);
|
||||||
|
ArgumentNullException.ThrowIfNull(fromSnapshot);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var result = await _pipeline.ExecuteAsync(centralFetch, cancellationToken).ConfigureAwait(false);
|
||||||
|
_staleFlag.MarkFresh();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
catch (Exception ex) when (ex is not OperationCanceledException)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(ex, "Central-DB read failed after retries; falling back to sealed cache for cluster {ClusterId}", clusterId);
|
||||||
|
// GenerationCacheUnavailableException surfaces intentionally — fails the caller's
|
||||||
|
// operation. StaleConfigFlag stays unchanged; the flag only flips when we actually
|
||||||
|
// served a cache snapshot.
|
||||||
|
var snapshot = await _cache.ReadCurrentAsync(clusterId, cancellationToken).ConfigureAwait(false);
|
||||||
|
_staleFlag.MarkStale();
|
||||||
|
return fromSnapshot(snapshot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,20 @@
|
|||||||
|
namespace ZB.MOM.WW.OtOpcUa.Configuration.LocalCache;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Thread-safe <c>UsingStaleConfig</c> signal per Phase 6.1 Stream D.3. Flips true whenever
|
||||||
|
/// a read falls back to a sealed cache snapshot; flips false on the next successful central-DB
|
||||||
|
/// round-trip. Surfaced on <c>/healthz</c> body and on the Admin <c>/hosts</c> page.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class StaleConfigFlag
|
||||||
|
{
|
||||||
|
private int _stale;
|
||||||
|
|
||||||
|
/// <summary>True when the last config read was served from the sealed cache, not the central DB.</summary>
|
||||||
|
public bool IsStale => Volatile.Read(ref _stale) != 0;
|
||||||
|
|
||||||
|
/// <summary>Mark the current config as stale (a read fell back to the cache).</summary>
|
||||||
|
public void MarkStale() => Volatile.Write(ref _stale, 1);
|
||||||
|
|
||||||
|
/// <summary>Mark the current config as fresh (a central-DB read succeeded).</summary>
|
||||||
|
public void MarkFresh() => Volatile.Write(ref _stale, 0);
|
||||||
|
}
|
||||||
1287
src/ZB.MOM.WW.OtOpcUa.Configuration/Migrations/20260419124034_AddDriverInstanceResilienceStatus.Designer.cs
generated
Normal file
1287
src/ZB.MOM.WW.OtOpcUa.Configuration/Migrations/20260419124034_AddDriverInstanceResilienceStatus.Designer.cs
generated
Normal file
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,46 @@
|
|||||||
|
using System;
|
||||||
|
using Microsoft.EntityFrameworkCore.Migrations;
|
||||||
|
|
||||||
|
#nullable disable
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
||||||
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
|
public partial class AddDriverInstanceResilienceStatus : Migration
|
||||||
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
|
protected override void Up(MigrationBuilder migrationBuilder)
|
||||||
|
{
|
||||||
|
migrationBuilder.CreateTable(
|
||||||
|
name: "DriverInstanceResilienceStatus",
|
||||||
|
columns: table => new
|
||||||
|
{
|
||||||
|
DriverInstanceId = table.Column<string>(type: "nvarchar(64)", maxLength: 64, nullable: false),
|
||||||
|
HostName = table.Column<string>(type: "nvarchar(256)", maxLength: 256, nullable: false),
|
||||||
|
LastCircuitBreakerOpenUtc = table.Column<DateTime>(type: "datetime2(3)", nullable: true),
|
||||||
|
ConsecutiveFailures = table.Column<int>(type: "int", nullable: false),
|
||||||
|
CurrentBulkheadDepth = table.Column<int>(type: "int", nullable: false),
|
||||||
|
LastRecycleUtc = table.Column<DateTime>(type: "datetime2(3)", nullable: true),
|
||||||
|
BaselineFootprintBytes = table.Column<long>(type: "bigint", nullable: false),
|
||||||
|
CurrentFootprintBytes = table.Column<long>(type: "bigint", nullable: false),
|
||||||
|
LastSampledUtc = table.Column<DateTime>(type: "datetime2(3)", nullable: false)
|
||||||
|
},
|
||||||
|
constraints: table =>
|
||||||
|
{
|
||||||
|
table.PrimaryKey("PK_DriverInstanceResilienceStatus", x => new { x.DriverInstanceId, x.HostName });
|
||||||
|
});
|
||||||
|
|
||||||
|
migrationBuilder.CreateIndex(
|
||||||
|
name: "IX_DriverResilience_LastSampled",
|
||||||
|
table: "DriverInstanceResilienceStatus",
|
||||||
|
column: "LastSampledUtc");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
protected override void Down(MigrationBuilder migrationBuilder)
|
||||||
|
{
|
||||||
|
migrationBuilder.DropTable(
|
||||||
|
name: "DriverInstanceResilienceStatus");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -434,6 +434,45 @@ namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.DriverInstanceResilienceStatus", b =>
|
||||||
|
{
|
||||||
|
b.Property<string>("DriverInstanceId")
|
||||||
|
.HasMaxLength(64)
|
||||||
|
.HasColumnType("nvarchar(64)");
|
||||||
|
|
||||||
|
b.Property<string>("HostName")
|
||||||
|
.HasMaxLength(256)
|
||||||
|
.HasColumnType("nvarchar(256)");
|
||||||
|
|
||||||
|
b.Property<long>("BaselineFootprintBytes")
|
||||||
|
.HasColumnType("bigint");
|
||||||
|
|
||||||
|
b.Property<int>("ConsecutiveFailures")
|
||||||
|
.HasColumnType("int");
|
||||||
|
|
||||||
|
b.Property<int>("CurrentBulkheadDepth")
|
||||||
|
.HasColumnType("int");
|
||||||
|
|
||||||
|
b.Property<long>("CurrentFootprintBytes")
|
||||||
|
.HasColumnType("bigint");
|
||||||
|
|
||||||
|
b.Property<DateTime?>("LastCircuitBreakerOpenUtc")
|
||||||
|
.HasColumnType("datetime2(3)");
|
||||||
|
|
||||||
|
b.Property<DateTime?>("LastRecycleUtc")
|
||||||
|
.HasColumnType("datetime2(3)");
|
||||||
|
|
||||||
|
b.Property<DateTime>("LastSampledUtc")
|
||||||
|
.HasColumnType("datetime2(3)");
|
||||||
|
|
||||||
|
b.HasKey("DriverInstanceId", "HostName");
|
||||||
|
|
||||||
|
b.HasIndex("LastSampledUtc")
|
||||||
|
.HasDatabaseName("IX_DriverResilience_LastSampled");
|
||||||
|
|
||||||
|
b.ToTable("DriverInstanceResilienceStatus", (string)null);
|
||||||
|
});
|
||||||
|
|
||||||
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.Equipment", b =>
|
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.Equipment", b =>
|
||||||
{
|
{
|
||||||
b.Property<Guid>("EquipmentRowId")
|
b.Property<Guid>("EquipmentRowId")
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ public sealed class OtOpcUaConfigDbContext(DbContextOptions<OtOpcUaConfigDbConte
|
|||||||
public DbSet<ConfigAuditLog> ConfigAuditLogs => Set<ConfigAuditLog>();
|
public DbSet<ConfigAuditLog> ConfigAuditLogs => Set<ConfigAuditLog>();
|
||||||
public DbSet<ExternalIdReservation> ExternalIdReservations => Set<ExternalIdReservation>();
|
public DbSet<ExternalIdReservation> ExternalIdReservations => Set<ExternalIdReservation>();
|
||||||
public DbSet<DriverHostStatus> DriverHostStatuses => Set<DriverHostStatus>();
|
public DbSet<DriverHostStatus> DriverHostStatuses => Set<DriverHostStatus>();
|
||||||
|
public DbSet<DriverInstanceResilienceStatus> DriverInstanceResilienceStatuses => Set<DriverInstanceResilienceStatus>();
|
||||||
|
|
||||||
protected override void OnModelCreating(ModelBuilder modelBuilder)
|
protected override void OnModelCreating(ModelBuilder modelBuilder)
|
||||||
{
|
{
|
||||||
@@ -49,6 +50,7 @@ public sealed class OtOpcUaConfigDbContext(DbContextOptions<OtOpcUaConfigDbConte
|
|||||||
ConfigureConfigAuditLog(modelBuilder);
|
ConfigureConfigAuditLog(modelBuilder);
|
||||||
ConfigureExternalIdReservation(modelBuilder);
|
ConfigureExternalIdReservation(modelBuilder);
|
||||||
ConfigureDriverHostStatus(modelBuilder);
|
ConfigureDriverHostStatus(modelBuilder);
|
||||||
|
ConfigureDriverInstanceResilienceStatus(modelBuilder);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void ConfigureServerCluster(ModelBuilder modelBuilder)
|
private static void ConfigureServerCluster(ModelBuilder modelBuilder)
|
||||||
@@ -512,4 +514,21 @@ public sealed class OtOpcUaConfigDbContext(DbContextOptions<OtOpcUaConfigDbConte
|
|||||||
e.HasIndex(x => x.LastSeenUtc).HasDatabaseName("IX_DriverHostStatus_LastSeen");
|
e.HasIndex(x => x.LastSeenUtc).HasDatabaseName("IX_DriverHostStatus_LastSeen");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void ConfigureDriverInstanceResilienceStatus(ModelBuilder modelBuilder)
|
||||||
|
{
|
||||||
|
modelBuilder.Entity<DriverInstanceResilienceStatus>(e =>
|
||||||
|
{
|
||||||
|
e.ToTable("DriverInstanceResilienceStatus");
|
||||||
|
e.HasKey(x => new { x.DriverInstanceId, x.HostName });
|
||||||
|
e.Property(x => x.DriverInstanceId).HasMaxLength(64);
|
||||||
|
e.Property(x => x.HostName).HasMaxLength(256);
|
||||||
|
e.Property(x => x.LastCircuitBreakerOpenUtc).HasColumnType("datetime2(3)");
|
||||||
|
e.Property(x => x.LastRecycleUtc).HasColumnType("datetime2(3)");
|
||||||
|
e.Property(x => x.LastSampledUtc).HasColumnType("datetime2(3)");
|
||||||
|
// LastSampledUtc drives the Admin UI's stale-sample filter same way DriverHostStatus's
|
||||||
|
// LastSeenUtc index does for connectivity rows.
|
||||||
|
e.HasIndex(x => x.LastSampledUtc).HasDatabaseName("IX_DriverResilience_LastSampled");
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,7 +19,9 @@
|
|||||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
||||||
</PackageReference>
|
</PackageReference>
|
||||||
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="10.0.0"/>
|
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="10.0.0"/>
|
||||||
|
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0"/>
|
||||||
<PackageReference Include="LiteDB" Version="5.0.21"/>
|
<PackageReference Include="LiteDB" Version="5.0.21"/>
|
||||||
|
<PackageReference Include="Polly.Core" Version="8.6.6"/>
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|||||||
@@ -0,0 +1,104 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Process-singleton tracker of live resilience counters per
|
||||||
|
/// <c>(DriverInstanceId, HostName)</c>. Populated by the CapabilityInvoker and the
|
||||||
|
/// MemoryTracking layer; consumed by a HostedService that periodically persists a
|
||||||
|
/// snapshot to the <c>DriverInstanceResilienceStatus</c> table for Admin <c>/hosts</c>.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Per Phase 6.1 Stream E. No DB dependency here — the tracker is pure in-memory so
|
||||||
|
/// tests can exercise it without EF Core or SQL Server. The HostedService that writes
|
||||||
|
/// snapshots lives in the Server project (Stream E.2); the actual SignalR push + Blazor
|
||||||
|
/// page refresh (E.3) lands in a follow-up visual-review PR.
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class DriverResilienceStatusTracker
|
||||||
|
{
|
||||||
|
private readonly ConcurrentDictionary<StatusKey, ResilienceStatusSnapshot> _status = new();
|
||||||
|
|
||||||
|
/// <summary>Record a Polly pipeline failure for <paramref name="hostName"/>.</summary>
|
||||||
|
public void RecordFailure(string driverInstanceId, string hostName, DateTime utcNow)
|
||||||
|
{
|
||||||
|
var key = new StatusKey(driverInstanceId, hostName);
|
||||||
|
_status.AddOrUpdate(key,
|
||||||
|
_ => new ResilienceStatusSnapshot { ConsecutiveFailures = 1, LastSampledUtc = utcNow },
|
||||||
|
(_, existing) => existing with
|
||||||
|
{
|
||||||
|
ConsecutiveFailures = existing.ConsecutiveFailures + 1,
|
||||||
|
LastSampledUtc = utcNow,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Reset the consecutive-failure count on a successful pipeline execution.</summary>
|
||||||
|
public void RecordSuccess(string driverInstanceId, string hostName, DateTime utcNow)
|
||||||
|
{
|
||||||
|
var key = new StatusKey(driverInstanceId, hostName);
|
||||||
|
_status.AddOrUpdate(key,
|
||||||
|
_ => new ResilienceStatusSnapshot { ConsecutiveFailures = 0, LastSampledUtc = utcNow },
|
||||||
|
(_, existing) => existing with
|
||||||
|
{
|
||||||
|
ConsecutiveFailures = 0,
|
||||||
|
LastSampledUtc = utcNow,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Record a circuit-breaker open event.</summary>
|
||||||
|
public void RecordBreakerOpen(string driverInstanceId, string hostName, DateTime utcNow)
|
||||||
|
{
|
||||||
|
var key = new StatusKey(driverInstanceId, hostName);
|
||||||
|
_status.AddOrUpdate(key,
|
||||||
|
_ => new ResilienceStatusSnapshot { LastBreakerOpenUtc = utcNow, LastSampledUtc = utcNow },
|
||||||
|
(_, existing) => existing with { LastBreakerOpenUtc = utcNow, LastSampledUtc = utcNow });
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Record a process recycle event (Tier C only).</summary>
|
||||||
|
public void RecordRecycle(string driverInstanceId, string hostName, DateTime utcNow)
|
||||||
|
{
|
||||||
|
var key = new StatusKey(driverInstanceId, hostName);
|
||||||
|
_status.AddOrUpdate(key,
|
||||||
|
_ => new ResilienceStatusSnapshot { LastRecycleUtc = utcNow, LastSampledUtc = utcNow },
|
||||||
|
(_, existing) => existing with { LastRecycleUtc = utcNow, LastSampledUtc = utcNow });
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Capture / update the MemoryTracking-supplied baseline + current footprint.</summary>
|
||||||
|
public void RecordFootprint(string driverInstanceId, string hostName, long baselineBytes, long currentBytes, DateTime utcNow)
|
||||||
|
{
|
||||||
|
var key = new StatusKey(driverInstanceId, hostName);
|
||||||
|
_status.AddOrUpdate(key,
|
||||||
|
_ => new ResilienceStatusSnapshot
|
||||||
|
{
|
||||||
|
BaselineFootprintBytes = baselineBytes,
|
||||||
|
CurrentFootprintBytes = currentBytes,
|
||||||
|
LastSampledUtc = utcNow,
|
||||||
|
},
|
||||||
|
(_, existing) => existing with
|
||||||
|
{
|
||||||
|
BaselineFootprintBytes = baselineBytes,
|
||||||
|
CurrentFootprintBytes = currentBytes,
|
||||||
|
LastSampledUtc = utcNow,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Snapshot of a specific (instance, host) pair; null if no counters recorded yet.</summary>
|
||||||
|
public ResilienceStatusSnapshot? TryGet(string driverInstanceId, string hostName) =>
|
||||||
|
_status.TryGetValue(new StatusKey(driverInstanceId, hostName), out var snapshot) ? snapshot : null;
|
||||||
|
|
||||||
|
/// <summary>Copy of every currently-tracked (instance, host, snapshot) triple. Safe under concurrent writes.</summary>
|
||||||
|
public IReadOnlyList<(string DriverInstanceId, string HostName, ResilienceStatusSnapshot Snapshot)> Snapshot() =>
|
||||||
|
_status.Select(kvp => (kvp.Key.DriverInstanceId, kvp.Key.HostName, kvp.Value)).ToList();
|
||||||
|
|
||||||
|
private readonly record struct StatusKey(string DriverInstanceId, string HostName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Snapshot of the resilience counters for one <c>(DriverInstanceId, HostName)</c> pair.</summary>
|
||||||
|
public sealed record ResilienceStatusSnapshot
|
||||||
|
{
|
||||||
|
public int ConsecutiveFailures { get; init; }
|
||||||
|
public DateTime? LastBreakerOpenUtc { get; init; }
|
||||||
|
public DateTime? LastRecycleUtc { get; init; }
|
||||||
|
public long BaselineFootprintBytes { get; init; }
|
||||||
|
public long CurrentFootprintBytes { get; init; }
|
||||||
|
public DateTime LastSampledUtc { get; init; }
|
||||||
|
}
|
||||||
@@ -0,0 +1,157 @@
|
|||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.LocalCache;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Configuration.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class GenerationSealedCacheTests : IDisposable
|
||||||
|
{
|
||||||
|
private readonly string _root = Path.Combine(Path.GetTempPath(), $"otopcua-sealed-{Guid.NewGuid():N}");
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (!Directory.Exists(_root)) return;
|
||||||
|
// Remove ReadOnly attribute first so Directory.Delete can clean sealed files.
|
||||||
|
foreach (var f in Directory.EnumerateFiles(_root, "*", SearchOption.AllDirectories))
|
||||||
|
File.SetAttributes(f, FileAttributes.Normal);
|
||||||
|
Directory.Delete(_root, recursive: true);
|
||||||
|
}
|
||||||
|
catch { /* best-effort cleanup */ }
|
||||||
|
}
|
||||||
|
|
||||||
|
private GenerationSnapshot MakeSnapshot(string clusterId, long generationId, string payload = "{\"sample\":true}") =>
|
||||||
|
new()
|
||||||
|
{
|
||||||
|
ClusterId = clusterId,
|
||||||
|
GenerationId = generationId,
|
||||||
|
CachedAt = DateTime.UtcNow,
|
||||||
|
PayloadJson = payload,
|
||||||
|
};
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task FirstBoot_NoSnapshot_ReadThrows()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
|
||||||
|
await Should.ThrowAsync<GenerationCacheUnavailableException>(
|
||||||
|
() => cache.ReadCurrentAsync("cluster-a"));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SealThenRead_RoundTrips()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
var snapshot = MakeSnapshot("cluster-a", 42, "{\"hello\":\"world\"}");
|
||||||
|
|
||||||
|
await cache.SealAsync(snapshot);
|
||||||
|
|
||||||
|
var read = await cache.ReadCurrentAsync("cluster-a");
|
||||||
|
read.GenerationId.ShouldBe(42);
|
||||||
|
read.ClusterId.ShouldBe("cluster-a");
|
||||||
|
read.PayloadJson.ShouldBe("{\"hello\":\"world\"}");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SealedFile_IsReadOnly_OnDisk()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
await cache.SealAsync(MakeSnapshot("cluster-a", 5));
|
||||||
|
|
||||||
|
var sealedPath = Path.Combine(_root, "cluster-a", "5.db");
|
||||||
|
File.Exists(sealedPath).ShouldBeTrue();
|
||||||
|
var attrs = File.GetAttributes(sealedPath);
|
||||||
|
attrs.HasFlag(FileAttributes.ReadOnly).ShouldBeTrue("sealed file must be read-only");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SealingTwoGenerations_PointerAdvances_ToLatest()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
await cache.SealAsync(MakeSnapshot("cluster-a", 1));
|
||||||
|
await cache.SealAsync(MakeSnapshot("cluster-a", 2));
|
||||||
|
|
||||||
|
cache.TryGetCurrentGenerationId("cluster-a").ShouldBe(2);
|
||||||
|
var read = await cache.ReadCurrentAsync("cluster-a");
|
||||||
|
read.GenerationId.ShouldBe(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task PriorGenerationFile_Survives_AfterNewSeal()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
await cache.SealAsync(MakeSnapshot("cluster-a", 1));
|
||||||
|
await cache.SealAsync(MakeSnapshot("cluster-a", 2));
|
||||||
|
|
||||||
|
File.Exists(Path.Combine(_root, "cluster-a", "1.db")).ShouldBeTrue(
|
||||||
|
"prior generations preserved for audit; pruning is separate");
|
||||||
|
File.Exists(Path.Combine(_root, "cluster-a", "2.db")).ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CorruptSealedFile_ReadFailsClosed()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
await cache.SealAsync(MakeSnapshot("cluster-a", 7));
|
||||||
|
|
||||||
|
// Corrupt the sealed file: clear read-only, truncate, leave pointer intact.
|
||||||
|
var sealedPath = Path.Combine(_root, "cluster-a", "7.db");
|
||||||
|
File.SetAttributes(sealedPath, FileAttributes.Normal);
|
||||||
|
File.WriteAllBytes(sealedPath, [0x00, 0x01, 0x02]);
|
||||||
|
|
||||||
|
await Should.ThrowAsync<GenerationCacheUnavailableException>(
|
||||||
|
() => cache.ReadCurrentAsync("cluster-a"));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task MissingSealedFile_ReadFailsClosed()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
await cache.SealAsync(MakeSnapshot("cluster-a", 3));
|
||||||
|
|
||||||
|
// Delete the sealed file but leave the pointer — corruption scenario.
|
||||||
|
var sealedPath = Path.Combine(_root, "cluster-a", "3.db");
|
||||||
|
File.SetAttributes(sealedPath, FileAttributes.Normal);
|
||||||
|
File.Delete(sealedPath);
|
||||||
|
|
||||||
|
await Should.ThrowAsync<GenerationCacheUnavailableException>(
|
||||||
|
() => cache.ReadCurrentAsync("cluster-a"));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CorruptPointerFile_ReadFailsClosed()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
await cache.SealAsync(MakeSnapshot("cluster-a", 9));
|
||||||
|
|
||||||
|
var pointerPath = Path.Combine(_root, "cluster-a", "CURRENT");
|
||||||
|
File.WriteAllText(pointerPath, "not-a-number");
|
||||||
|
|
||||||
|
await Should.ThrowAsync<GenerationCacheUnavailableException>(
|
||||||
|
() => cache.ReadCurrentAsync("cluster-a"));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SealSameGenerationTwice_IsIdempotent()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
await cache.SealAsync(MakeSnapshot("cluster-a", 11));
|
||||||
|
await cache.SealAsync(MakeSnapshot("cluster-a", 11, "{\"v\":2}"));
|
||||||
|
|
||||||
|
var read = await cache.ReadCurrentAsync("cluster-a");
|
||||||
|
read.PayloadJson.ShouldBe("{\"sample\":true}", "sealed file is immutable; second seal no-ops");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task IndependentClusters_DoNotInterfere()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
await cache.SealAsync(MakeSnapshot("cluster-a", 1));
|
||||||
|
await cache.SealAsync(MakeSnapshot("cluster-b", 10));
|
||||||
|
|
||||||
|
(await cache.ReadCurrentAsync("cluster-a")).GenerationId.ShouldBe(1);
|
||||||
|
(await cache.ReadCurrentAsync("cluster-b")).GenerationId.ShouldBe(10);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,154 @@
|
|||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.LocalCache;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Configuration.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class ResilientConfigReaderTests : IDisposable
|
||||||
|
{
|
||||||
|
private readonly string _root = Path.Combine(Path.GetTempPath(), $"otopcua-reader-{Guid.NewGuid():N}");
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (!Directory.Exists(_root)) return;
|
||||||
|
foreach (var f in Directory.EnumerateFiles(_root, "*", SearchOption.AllDirectories))
|
||||||
|
File.SetAttributes(f, FileAttributes.Normal);
|
||||||
|
Directory.Delete(_root, recursive: true);
|
||||||
|
}
|
||||||
|
catch { /* best-effort */ }
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CentralDbSucceeds_ReturnsValue_MarksFresh()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
var flag = new StaleConfigFlag { };
|
||||||
|
flag.MarkStale(); // pre-existing stale state
|
||||||
|
var reader = new ResilientConfigReader(cache, flag, NullLogger<ResilientConfigReader>.Instance);
|
||||||
|
|
||||||
|
var result = await reader.ReadAsync(
|
||||||
|
"cluster-a",
|
||||||
|
_ => ValueTask.FromResult("fresh-from-db"),
|
||||||
|
_ => "from-cache",
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
result.ShouldBe("fresh-from-db");
|
||||||
|
flag.IsStale.ShouldBeFalse("successful central-DB read clears stale flag");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CentralDbFails_ExhaustsRetries_FallsBackToCache_MarksStale()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
await cache.SealAsync(new GenerationSnapshot
|
||||||
|
{
|
||||||
|
ClusterId = "cluster-a", GenerationId = 99, CachedAt = DateTime.UtcNow,
|
||||||
|
PayloadJson = "{\"cached\":true}",
|
||||||
|
});
|
||||||
|
var flag = new StaleConfigFlag();
|
||||||
|
var reader = new ResilientConfigReader(cache, flag, NullLogger<ResilientConfigReader>.Instance,
|
||||||
|
timeout: TimeSpan.FromSeconds(10), retryCount: 2);
|
||||||
|
var attempts = 0;
|
||||||
|
|
||||||
|
var result = await reader.ReadAsync(
|
||||||
|
"cluster-a",
|
||||||
|
_ =>
|
||||||
|
{
|
||||||
|
attempts++;
|
||||||
|
throw new InvalidOperationException("SQL dead");
|
||||||
|
#pragma warning disable CS0162
|
||||||
|
return ValueTask.FromResult("never");
|
||||||
|
#pragma warning restore CS0162
|
||||||
|
},
|
||||||
|
snap => snap.PayloadJson,
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
attempts.ShouldBe(3, "1 initial + 2 retries = 3 attempts");
|
||||||
|
result.ShouldBe("{\"cached\":true}");
|
||||||
|
flag.IsStale.ShouldBeTrue("cache fallback flips stale flag true");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CentralDbFails_AndCacheAlsoUnavailable_Throws()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
var flag = new StaleConfigFlag();
|
||||||
|
var reader = new ResilientConfigReader(cache, flag, NullLogger<ResilientConfigReader>.Instance,
|
||||||
|
timeout: TimeSpan.FromSeconds(10), retryCount: 0);
|
||||||
|
|
||||||
|
await Should.ThrowAsync<GenerationCacheUnavailableException>(async () =>
|
||||||
|
{
|
||||||
|
await reader.ReadAsync<string>(
|
||||||
|
"cluster-a",
|
||||||
|
_ => throw new InvalidOperationException("SQL dead"),
|
||||||
|
_ => "never",
|
||||||
|
CancellationToken.None);
|
||||||
|
});
|
||||||
|
|
||||||
|
flag.IsStale.ShouldBeFalse("no snapshot ever served, so flag stays whatever it was");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Cancellation_NotRetried()
|
||||||
|
{
|
||||||
|
var cache = new GenerationSealedCache(_root);
|
||||||
|
var flag = new StaleConfigFlag();
|
||||||
|
var reader = new ResilientConfigReader(cache, flag, NullLogger<ResilientConfigReader>.Instance,
|
||||||
|
timeout: TimeSpan.FromSeconds(10), retryCount: 5);
|
||||||
|
using var cts = new CancellationTokenSource();
|
||||||
|
cts.Cancel();
|
||||||
|
var attempts = 0;
|
||||||
|
|
||||||
|
await Should.ThrowAsync<OperationCanceledException>(async () =>
|
||||||
|
{
|
||||||
|
await reader.ReadAsync<string>(
|
||||||
|
"cluster-a",
|
||||||
|
ct =>
|
||||||
|
{
|
||||||
|
attempts++;
|
||||||
|
ct.ThrowIfCancellationRequested();
|
||||||
|
return ValueTask.FromResult("ok");
|
||||||
|
},
|
||||||
|
_ => "cache",
|
||||||
|
cts.Token);
|
||||||
|
});
|
||||||
|
|
||||||
|
attempts.ShouldBeLessThanOrEqualTo(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class StaleConfigFlagTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void Default_IsFresh()
|
||||||
|
{
|
||||||
|
new StaleConfigFlag().IsStale.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MarkStale_ThenFresh_Toggles()
|
||||||
|
{
|
||||||
|
var flag = new StaleConfigFlag();
|
||||||
|
flag.MarkStale();
|
||||||
|
flag.IsStale.ShouldBeTrue();
|
||||||
|
flag.MarkFresh();
|
||||||
|
flag.IsStale.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ConcurrentWrites_Converge()
|
||||||
|
{
|
||||||
|
var flag = new StaleConfigFlag();
|
||||||
|
Parallel.For(0, 1000, i =>
|
||||||
|
{
|
||||||
|
if (i % 2 == 0) flag.MarkStale(); else flag.MarkFresh();
|
||||||
|
});
|
||||||
|
flag.MarkFresh();
|
||||||
|
flag.IsStale.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -29,6 +29,7 @@ public sealed class SchemaComplianceTests
|
|||||||
"DriverInstance", "Device", "Equipment", "Tag", "PollGroup",
|
"DriverInstance", "Device", "Equipment", "Tag", "PollGroup",
|
||||||
"NodeAcl", "ExternalIdReservation",
|
"NodeAcl", "ExternalIdReservation",
|
||||||
"DriverHostStatus",
|
"DriverHostStatus",
|
||||||
|
"DriverInstanceResilienceStatus",
|
||||||
};
|
};
|
||||||
|
|
||||||
var actual = QueryStrings(@"
|
var actual = QueryStrings(@"
|
||||||
|
|||||||
@@ -0,0 +1,110 @@
|
|||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class DriverResilienceStatusTrackerTests
|
||||||
|
{
|
||||||
|
private static readonly DateTime Now = new(2026, 4, 19, 12, 0, 0, DateTimeKind.Utc);
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void TryGet_Returns_Null_Before_AnyWrite()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
|
||||||
|
tracker.TryGet("drv", "host").ShouldBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void RecordFailure_Accumulates_ConsecutiveFailures()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
|
||||||
|
tracker.RecordFailure("drv", "host", Now);
|
||||||
|
tracker.RecordFailure("drv", "host", Now.AddSeconds(1));
|
||||||
|
tracker.RecordFailure("drv", "host", Now.AddSeconds(2));
|
||||||
|
|
||||||
|
tracker.TryGet("drv", "host")!.ConsecutiveFailures.ShouldBe(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void RecordSuccess_Resets_ConsecutiveFailures()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
tracker.RecordFailure("drv", "host", Now);
|
||||||
|
tracker.RecordFailure("drv", "host", Now.AddSeconds(1));
|
||||||
|
|
||||||
|
tracker.RecordSuccess("drv", "host", Now.AddSeconds(2));
|
||||||
|
|
||||||
|
tracker.TryGet("drv", "host")!.ConsecutiveFailures.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void RecordBreakerOpen_Populates_LastBreakerOpenUtc()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
|
||||||
|
tracker.RecordBreakerOpen("drv", "host", Now);
|
||||||
|
|
||||||
|
tracker.TryGet("drv", "host")!.LastBreakerOpenUtc.ShouldBe(Now);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void RecordRecycle_Populates_LastRecycleUtc()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
|
||||||
|
tracker.RecordRecycle("drv", "host", Now);
|
||||||
|
|
||||||
|
tracker.TryGet("drv", "host")!.LastRecycleUtc.ShouldBe(Now);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void RecordFootprint_CapturesBaselineAndCurrent()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
|
||||||
|
tracker.RecordFootprint("drv", "host", baselineBytes: 100_000_000, currentBytes: 150_000_000, Now);
|
||||||
|
|
||||||
|
var snap = tracker.TryGet("drv", "host")!;
|
||||||
|
snap.BaselineFootprintBytes.ShouldBe(100_000_000);
|
||||||
|
snap.CurrentFootprintBytes.ShouldBe(150_000_000);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void DifferentHosts_AreIndependent()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
|
||||||
|
tracker.RecordFailure("drv", "host-a", Now);
|
||||||
|
tracker.RecordFailure("drv", "host-b", Now);
|
||||||
|
tracker.RecordSuccess("drv", "host-a", Now.AddSeconds(1));
|
||||||
|
|
||||||
|
tracker.TryGet("drv", "host-a")!.ConsecutiveFailures.ShouldBe(0);
|
||||||
|
tracker.TryGet("drv", "host-b")!.ConsecutiveFailures.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Snapshot_ReturnsAll_TrackedPairs()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
tracker.RecordFailure("drv-1", "host-a", Now);
|
||||||
|
tracker.RecordFailure("drv-1", "host-b", Now);
|
||||||
|
tracker.RecordFailure("drv-2", "host-a", Now);
|
||||||
|
|
||||||
|
var snapshot = tracker.Snapshot();
|
||||||
|
|
||||||
|
snapshot.Count.ShouldBe(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ConcurrentWrites_DoNotLose_Failures()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
Parallel.For(0, 500, _ => tracker.RecordFailure("drv", "host", Now));
|
||||||
|
|
||||||
|
tracker.TryGet("drv", "host")!.ConsecutiveFailures.ShouldBe(500);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user