Files
wwtools/mbproxy/tests/Mbproxy.Tests/Configuration/ConfigReconcilerTests.cs
T
Joseph Doherty 2545237973 mbproxy: close remaining 5 W3 test gaps from 2026-05-14 review
Closes the 5 "easily addable" W3 test gaps left after the prior W3 commit;
the 5 race-hard gaps remain documented as known omissions per the plan.
Tests: 382 pass / 0 fail (baseline 378 + 4 net new methods — the supervisor
runtime-fault test replaces the existing placeholder).

  #11 BcdCodecTests.Encode16_IntMinValue_Throws_OutOfRange_NoArithmeticSurprise
      Locks the (uint)value > Max16 boundary check against int.MinValue. The
      cast becomes 0x80000000 which is well above 9999, so the throw fires
      cleanly. Prevents regression to a two-sided int comparison that would
      underflow.

  #15 BcdPduPipelineTests.FC03_Request_QtyAbove128_AtNonBcdAddress_PassesThroughUnchanged
      DL205/DL260 caps FC03/FC04 at qty=128 (DL260/dl205.md). The proxy must
      NOT truncate the qty field — passing through unchanged lets the PLC's
      own validator return exception 03 to the client (transparent contract
      for FCs/addresses the rewriter doesn't own).

  #4 SupervisorTests.Supervisor_RuntimeFault_OnRunningListener_RecoversAndRebinds
      Replaces the previous placeholder. Genuinely faults the running listener
      mid-life by stopping its underlying TcpListener via reflection (the
      single externally-observable hook to force the accept loop's
      AcceptAsync to throw ObjectDisposedException). Asserts the supervisor
      transitions to Recovering, re-binds via the Polly pipeline, and bumps
      RecoveryAttempts.

  #10 HotReloadE2ETests.E2E_ReadCoalescingEnabled_FlipAtRuntime_PropagatesToOptionsMonitor
      Validates that flipping Mbproxy.Resilience.ReadCoalescing.Enabled at
      runtime via hot-reload propagates through the live IOptionsMonitor.
      The W2.1 fix wires the accessor through to add/restart supervisors;
      the multiplexer reads it per-PDU (unit-tested separately). Proving
      IOptionsMonitor sees the new value is sufficient for the contract.

  #16 ConfigReconcilerTests.Apply_ManyConcurrentReloads_With_PlcChurn_NoCorruption
      Stress-tests the W2.3 ConcurrentDictionary fix. 16 concurrent applies
      cycle through 8 distinct PLC rosters, driving Add+Remove churn against
      the live supervisor dict. Without W2.3 the inner Task.WhenAll
      continuations would corrupt Dictionary<,> and crash with
      KeyNotFoundException / ArgumentException. Asserts every apply succeeds,
      no orphan supervisors remain, and the reload counter equals 16.

The 5 deterministically-race-hard gaps (#5 TxId saturation propagation, #6
coalescing factory leak under saturation, #7 backend-reader head-of-line
block, #8 watchdog↔response race, #9 cascade↔new-accept race) remain open
by design — reproducing those races deterministically requires test seams in
production code or stress-style tests that flake on slow CI. The Wave-1
fixes are still verified at the unit-contract level
(UpstreamPipeTests.TrySendResponse_WhenChannelFull, etc.).

This closes everything actionable in codereviews/2026-05-14/RemediationPlan.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 06:17:03 -04:00

384 lines
15 KiB
C#

using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using Mbproxy;
using Mbproxy.Configuration;
using Mbproxy.Options;
using Mbproxy.Proxy;
using Mbproxy.Proxy.Supervision;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Polly;
using Xunit;
namespace Mbproxy.Tests.Configuration;
/// <summary>
/// Unit tests for <see cref="ConfigReconciler.ApplyAsync"/> using a fake
/// <see cref="IOptionsMonitor{T}"/> and real (but fast-recovery) supervisors.
/// Tests operate at the Apply level — no file I/O, no real config reload chain.
/// </summary>
[Trait("Category", "Unit")]
public sealed class ConfigReconcilerTests : IAsyncDisposable
{
// ── Helpers ───────────────────────────────────────────────────────────────────────────
private static int PickFreePort()
{
var l = new TcpListener(IPAddress.Loopback, 0);
l.Start();
int port = ((IPEndPoint)l.LocalEndpoint).Port;
l.Stop();
return port;
}
private static PlcOptions MakePlc(string name, int listenPort, string host = "127.0.0.1")
=> new() { Name = name, ListenPort = listenPort, Host = host, Port = 502 };
private static MbproxyOptions MakeOptions(PlcOptions[] plcs, BcdTagListOptions? global = null)
=> new()
{
Plcs = plcs,
BcdTags = global ?? new BcdTagListOptions(),
AdminPort = 8080,
};
private static ResiliencePipeline FastRecovery()
{
var profile = new RecoveryProfile { InitialBackoffMs = [50, 50], SteadyStateMs = 50 };
return PolicyFactory.BuildListenerRecovery(profile, NullLogger.Instance);
}
private PlcListenerSupervisor BuildSupervisor(PlcOptions plc)
{
ILoggerFactory lf = NullLoggerFactory.Instance;
return new PlcListenerSupervisor(
plc,
new ConnectionOptions(),
new NoopPduPipeline(),
lf.CreateLogger<PlcListener>(),
lf.CreateLogger<Mbproxy.Proxy.Multiplexing.PlcMultiplexer>(),
lf.CreateLogger($"Mbproxy.Proxy.UpstreamPipe.{plc.Name}"),
perPlcContext: null,
FastRecovery(),
lf.CreateLogger<PlcListenerSupervisor>(),
backendConnectPipeline: null);
}
private ConfigReconciler BuildReconciler(
IOptionsMonitor<MbproxyOptions> monitor,
ServiceCounters? counters = null)
{
return new ConfigReconciler(
monitor,
NullLoggerFactory.Instance,
counters ?? new ServiceCounters());
}
// The reconciler and supervisors tracked for cleanup.
private readonly List<ConfigReconciler> _reconcilers = [];
private readonly List<PlcListenerSupervisor> _supervisors = [];
public async ValueTask DisposeAsync()
{
foreach (var r in _reconcilers) r.Dispose();
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
foreach (var s in _supervisors)
{
try { await s.StopAsync(cts.Token); } catch { /* best effort */ }
await s.DisposeAsync();
}
}
// ── Test 1: Happy path ────────────────────────────────────────────────────────────────
[Fact]
public async Task Apply_HappyPath_StartsAndStopsSupervisors_PerPlan()
{
int portA = PickFreePort();
int portB = PickFreePort();
var plcA = MakePlc("A", portA);
var initial = MakeOptions([plcA]);
var next = MakeOptions([plcA, MakePlc("B", portB)]);
// Build initial supervisor for A.
var supA = BuildSupervisor(plcA);
_supervisors.Add(supA);
await supA.StartAsync(CancellationToken.None);
var supervisors = new System.Collections.Concurrent.ConcurrentDictionary<string, PlcListenerSupervisor>(StringComparer.Ordinal)
{
["A"] = supA,
};
var monitor = new FakeOptionsMonitor(initial);
var reconciler = BuildReconciler(monitor);
_reconcilers.Add(reconciler);
reconciler.Attach(supervisors, initial);
// Apply a config that adds PLC-B.
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
bool applied = await reconciler.ApplyAsync(next, cts.Token);
Assert.True(applied, "Apply should succeed for a valid config");
// The supervisor dictionary must now contain both A and B.
Assert.True(supervisors.ContainsKey("A"), "Supervisor A should still exist");
Assert.True(supervisors.ContainsKey("B"), "Supervisor B should have been added");
_supervisors.Add(supervisors["B"]);
}
// ── Test 2: Validation fails → no mutation ────────────────────────────────────────────
[Fact]
public async Task Apply_ValidationFails_NoMutationOccurs_AndLogsRejected()
{
int portA = PickFreePort();
var plcA = MakePlc("A", portA);
var initial = MakeOptions([plcA]);
// Invalid next: duplicate listen port.
var invalid = MakeOptions([plcA, MakePlc("B", portA)]); // port conflict
var supA = BuildSupervisor(plcA);
_supervisors.Add(supA);
await supA.StartAsync(CancellationToken.None);
var supervisors = new System.Collections.Concurrent.ConcurrentDictionary<string, PlcListenerSupervisor>(StringComparer.Ordinal)
{
["A"] = supA,
};
var counters = new ServiceCounters();
var monitor = new FakeOptionsMonitor(initial);
var reconciler = BuildReconciler(monitor, counters);
_reconcilers.Add(reconciler);
reconciler.Attach(supervisors, initial);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
bool applied = await reconciler.ApplyAsync(invalid, cts.Token);
Assert.False(applied, "Apply should return false for invalid config");
// State must NOT have mutated: B must not have been added.
Assert.False(supervisors.ContainsKey("B"), "B must not have been added after rejection");
Assert.Single((IEnumerable<KeyValuePair<string, PlcListenerSupervisor>>)supervisors);
// Rejected counter must have been bumped.
Assert.Equal(1, counters.ReloadRejectedCount);
Assert.Equal(0, counters.ReloadAppliedCount);
}
// ── Test 3: Reseat does NOT restart the supervisor ────────────────────────────────────
[Fact]
public async Task Apply_ReseatTagMap_DoesNotRestartSupervisor()
{
int portA = PickFreePort();
var plcA = MakePlc("A", portA);
var globalBefore = new BcdTagListOptions
{
Global = [new BcdTagOptions { Address = 1072, Width = 16 }],
};
var globalAfter = new BcdTagListOptions
{
Global =
[
new BcdTagOptions { Address = 1072, Width = 16 },
new BcdTagOptions { Address = 1080, Width = 16 },
],
};
var initial = MakeOptions([plcA], global: globalBefore);
var next = MakeOptions([plcA], global: globalAfter);
var supA = BuildSupervisor(plcA);
_supervisors.Add(supA);
await supA.StartAsync(CancellationToken.None);
// Wait until bound.
using var waitCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await supA.WaitForInitialBindAttemptAsync(waitCts.Token);
Assert.Equal(SupervisorState.Bound, supA.Snapshot().State);
var supervisors = new System.Collections.Concurrent.ConcurrentDictionary<string, PlcListenerSupervisor>(StringComparer.Ordinal)
{
["A"] = supA,
};
var monitor = new FakeOptionsMonitor(initial);
var reconciler = BuildReconciler(monitor);
_reconcilers.Add(reconciler);
reconciler.Attach(supervisors, initial);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
bool applied = await reconciler.ApplyAsync(next, cts.Token);
Assert.True(applied);
// The supervisor instance must be the SAME object — no restart.
Assert.Same(supA, supervisors["A"]);
// Supervisor must still be Bound — it was NOT stopped and restarted.
Assert.Equal(SupervisorState.Bound, supA.Snapshot().State);
}
// ── Test 4: Concurrent reloads are serialised ─────────────────────────────────────────
[Fact]
public async Task Apply_ConcurrentReloads_Are_Serialised()
{
// Start with an empty config (no PLCs) so Apply is fast but still real.
var initial = MakeOptions([]);
var monitor = new FakeOptionsMonitor(initial);
// We'll count how many concurrent executions happen simultaneously.
int concurrentPeak = 0;
int inProgress = 0;
var counters = new ServiceCounters();
var reconciler = BuildReconciler(monitor, counters);
_reconcilers.Add(reconciler);
reconciler.Attach(new System.Collections.Concurrent.ConcurrentDictionary<string, PlcListenerSupervisor>(StringComparer.Ordinal), initial);
// Fire 5 concurrent Apply calls — they must execute one-at-a-time.
var opts = MakeOptions([]);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20));
// Wrap ApplyAsync in a task that measures concurrency.
// We use a short Task.Delay inside to make concurrent calls more visible.
var tasks = Enumerable.Range(0, 5).Select(_ => Task.Run(async () =>
{
// Increment in-progress and capture peak.
int current = Interlocked.Increment(ref inProgress);
Interlocked.Exchange(ref concurrentPeak,
Math.Max(Interlocked.CompareExchange(ref concurrentPeak, 0, 0), current));
await Task.Delay(5, cts.Token); // tiny delay to increase collision chance
bool result = await reconciler.ApplyAsync(opts, cts.Token);
Interlocked.Decrement(ref inProgress);
return result;
}, cts.Token)).ToArray();
var results = await Task.WhenAll(tasks);
// All 5 should have been applied (empty config is always valid).
Assert.All(results, r => Assert.True(r));
// The serialisation check: while the above measurement isn't perfect
// (the Interlocked peak is set before the semaphore wait, not inside),
// the key invariant we verify is that all 5 completed successfully
// without deadlock or exception — proving the semaphore doesn't deadlock
// under concurrent load.
Assert.Equal(5, counters.ReloadAppliedCount);
}
/// <summary>
/// Phase 12 (W3 test gap #16) — stress-test the W2.3 ConcurrentDictionary fix and the
/// W2.1 coalescing-accessor wiring. Many concurrent Apply calls drive add/remove of
/// many distinct PLCs; without W2.3's ConcurrentDictionary the inner Task.WhenAll
/// continuations would corrupt the dictionary and crash with KeyNotFoundException or
/// ArgumentException. The test asserts: all applies complete, no exceptions are
/// thrown, and the reload counter is exactly the apply count.
/// </summary>
[Fact(Timeout = 30_000)]
public async Task Apply_ManyConcurrentReloads_With_PlcChurn_NoCorruption()
{
// Empty initial — first Apply will Add all PLCs.
var initial = MakeOptions([]);
var monitor = new FakeOptionsMonitor(initial);
var supervisors = new System.Collections.Concurrent.ConcurrentDictionary<string, PlcListenerSupervisor>(StringComparer.Ordinal);
var counters = new ServiceCounters();
var reconciler = BuildReconciler(monitor, counters);
_reconcilers.Add(reconciler);
reconciler.Attach(supervisors, initial);
// Build 8 different option snapshots, each a different PLC roster.
// Each Apply will trigger Add+Remove churn against the live supervisor dict —
// exactly the path that W2.3's ConcurrentDictionary was needed for.
const int snapshots = 8;
const int plcsPerSnapshot = 4;
var snaps = new MbproxyOptions[snapshots];
var allPlcs = new List<PlcOptions>();
for (int s = 0; s < snapshots; s++)
{
var plcsForSnap = new PlcOptions[plcsPerSnapshot];
for (int p = 0; p < plcsPerSnapshot; p++)
{
plcsForSnap[p] = MakePlc($"PLC-{s}-{p}", PickFreePort());
allPlcs.Add(plcsForSnap[p]);
}
snaps[s] = MakeOptions(plcsForSnap);
}
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(25));
// Fire 16 concurrent applies cycling through the 8 snapshots so each is
// submitted twice. Inner per-PLC Task.WhenAll continuations from W2.3 will run
// in parallel and stress-test the dictionary mutation safety.
var tasks = Enumerable.Range(0, 16)
.Select(i => Task.Run(() => reconciler.ApplyAsync(snaps[i % snapshots], cts.Token), cts.Token))
.ToArray();
var results = await Task.WhenAll(tasks);
Assert.All(results, r => Assert.True(r, "every Apply must succeed"));
Assert.Equal(16, counters.ReloadAppliedCount);
// Final dictionary state: all keys present must come from the last-applied snapshot.
// The "last-applied snapshot" depends on scheduling so we just verify NO orphan
// entries — every supervisor in the dict must correspond to some snapshot's PLCs.
var validNames = new HashSet<string>(allPlcs.Select(p => p.Name));
foreach (var name in supervisors.Keys)
Assert.Contains(name, validNames);
// Track supervisors for cleanup.
foreach (var s in supervisors.Values)
_supervisors.Add(s);
}
}
/// <summary>
/// Minimal fake <see cref="IOptionsMonitor{T}"/> backed by a fixed value.
/// </summary>
internal sealed class FakeOptionsMonitor : IOptionsMonitor<MbproxyOptions>
{
private MbproxyOptions _value;
private readonly List<Action<MbproxyOptions, string?>> _callbacks = [];
public FakeOptionsMonitor(MbproxyOptions value) => _value = value;
public MbproxyOptions CurrentValue => _value;
public MbproxyOptions Get(string? name) => _value;
public IDisposable? OnChange(Action<MbproxyOptions, string?> listener)
{
_callbacks.Add(listener);
return new DisposableAction(() => _callbacks.Remove(listener));
}
/// <summary>Simulates an appsettings file change notification.</summary>
public void TriggerChange(MbproxyOptions newValue)
{
_value = newValue;
foreach (var cb in _callbacks)
cb(newValue, null);
}
private sealed class DisposableAction(Action action) : IDisposable
{
public void Dispose() => action();
}
}