using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using Mbproxy; using Mbproxy.Configuration; using Mbproxy.Options; using Mbproxy.Proxy; using Mbproxy.Proxy.Supervision; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using Polly; using Xunit; namespace Mbproxy.Tests.Configuration; /// /// Unit tests for using a fake /// and real (but fast-recovery) supervisors. /// Tests operate at the Apply level — no file I/O, no real config reload chain. /// [Trait("Category", "Unit")] public sealed class ConfigReconcilerTests : IAsyncDisposable { // ── Helpers ─────────────────────────────────────────────────────────────────────────── private static int PickFreePort() { var l = new TcpListener(IPAddress.Loopback, 0); l.Start(); int port = ((IPEndPoint)l.LocalEndpoint).Port; l.Stop(); return port; } private static PlcOptions MakePlc(string name, int listenPort, string host = "127.0.0.1") => new() { Name = name, ListenPort = listenPort, Host = host, Port = 502 }; private static MbproxyOptions MakeOptions(PlcOptions[] plcs, BcdTagListOptions? global = null) => new() { Plcs = plcs, BcdTags = global ?? new BcdTagListOptions(), AdminPort = 8080, }; private static ResiliencePipeline FastRecovery() { var profile = new RecoveryProfile { InitialBackoffMs = [50, 50], SteadyStateMs = 50 }; return PolicyFactory.BuildListenerRecovery(profile, NullLogger.Instance); } private PlcListenerSupervisor BuildSupervisor(PlcOptions plc) { ILoggerFactory lf = NullLoggerFactory.Instance; return new PlcListenerSupervisor( plc, new ConnectionOptions(), new NoopPduPipeline(), lf.CreateLogger(), lf.CreateLogger(), lf.CreateLogger($"Mbproxy.Proxy.UpstreamPipe.{plc.Name}"), perPlcContext: null, FastRecovery(), lf.CreateLogger(), backendConnectPipeline: null); } private ConfigReconciler BuildReconciler( IOptionsMonitor monitor, ServiceCounters? counters = null) { return new ConfigReconciler( monitor, NullLoggerFactory.Instance, counters ?? new ServiceCounters()); } // The reconciler and supervisors tracked for cleanup. private readonly List _reconcilers = []; private readonly List _supervisors = []; public async ValueTask DisposeAsync() { foreach (var r in _reconcilers) r.Dispose(); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); foreach (var s in _supervisors) { try { await s.StopAsync(cts.Token); } catch { /* best effort */ } await s.DisposeAsync(); } } // ── Test 1: Happy path ──────────────────────────────────────────────────────────────── [Fact] public async Task Apply_HappyPath_StartsAndStopsSupervisors_PerPlan() { int portA = PickFreePort(); int portB = PickFreePort(); var plcA = MakePlc("A", portA); var initial = MakeOptions([plcA]); var next = MakeOptions([plcA, MakePlc("B", portB)]); // Build initial supervisor for A. var supA = BuildSupervisor(plcA); _supervisors.Add(supA); await supA.StartAsync(CancellationToken.None); var supervisors = new System.Collections.Concurrent.ConcurrentDictionary(StringComparer.Ordinal) { ["A"] = supA, }; var monitor = new FakeOptionsMonitor(initial); var reconciler = BuildReconciler(monitor); _reconcilers.Add(reconciler); reconciler.Attach(supervisors, initial); // Apply a config that adds PLC-B. using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); bool applied = await reconciler.ApplyAsync(next, cts.Token); Assert.True(applied, "Apply should succeed for a valid config"); // The supervisor dictionary must now contain both A and B. Assert.True(supervisors.ContainsKey("A"), "Supervisor A should still exist"); Assert.True(supervisors.ContainsKey("B"), "Supervisor B should have been added"); _supervisors.Add(supervisors["B"]); } // ── Test 2: Validation fails → no mutation ──────────────────────────────────────────── [Fact] public async Task Apply_ValidationFails_NoMutationOccurs_AndLogsRejected() { int portA = PickFreePort(); var plcA = MakePlc("A", portA); var initial = MakeOptions([plcA]); // Invalid next: duplicate listen port. var invalid = MakeOptions([plcA, MakePlc("B", portA)]); // port conflict var supA = BuildSupervisor(plcA); _supervisors.Add(supA); await supA.StartAsync(CancellationToken.None); var supervisors = new System.Collections.Concurrent.ConcurrentDictionary(StringComparer.Ordinal) { ["A"] = supA, }; var counters = new ServiceCounters(); var monitor = new FakeOptionsMonitor(initial); var reconciler = BuildReconciler(monitor, counters); _reconcilers.Add(reconciler); reconciler.Attach(supervisors, initial); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); bool applied = await reconciler.ApplyAsync(invalid, cts.Token); Assert.False(applied, "Apply should return false for invalid config"); // State must NOT have mutated: B must not have been added. Assert.False(supervisors.ContainsKey("B"), "B must not have been added after rejection"); Assert.Single((IEnumerable>)supervisors); // Rejected counter must have been bumped. Assert.Equal(1, counters.ReloadRejectedCount); Assert.Equal(0, counters.ReloadAppliedCount); } // ── Test 3: Reseat does NOT restart the supervisor ──────────────────────────────────── [Fact] public async Task Apply_ReseatTagMap_DoesNotRestartSupervisor() { int portA = PickFreePort(); var plcA = MakePlc("A", portA); var globalBefore = new BcdTagListOptions { Global = [new BcdTagOptions { Address = 1072, Width = 16 }], }; var globalAfter = new BcdTagListOptions { Global = [ new BcdTagOptions { Address = 1072, Width = 16 }, new BcdTagOptions { Address = 1080, Width = 16 }, ], }; var initial = MakeOptions([plcA], global: globalBefore); var next = MakeOptions([plcA], global: globalAfter); var supA = BuildSupervisor(plcA); _supervisors.Add(supA); await supA.StartAsync(CancellationToken.None); // Wait until bound. using var waitCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await supA.WaitForInitialBindAttemptAsync(waitCts.Token); Assert.Equal(SupervisorState.Bound, supA.Snapshot().State); var supervisors = new System.Collections.Concurrent.ConcurrentDictionary(StringComparer.Ordinal) { ["A"] = supA, }; var monitor = new FakeOptionsMonitor(initial); var reconciler = BuildReconciler(monitor); _reconcilers.Add(reconciler); reconciler.Attach(supervisors, initial); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); bool applied = await reconciler.ApplyAsync(next, cts.Token); Assert.True(applied); // The supervisor instance must be the SAME object — no restart. Assert.Same(supA, supervisors["A"]); // Supervisor must still be Bound — it was NOT stopped and restarted. Assert.Equal(SupervisorState.Bound, supA.Snapshot().State); } // ── Test 4: Concurrent reloads are serialised ───────────────────────────────────────── [Fact] public async Task Apply_ConcurrentReloads_Are_Serialised() { // Start with an empty config (no PLCs) so Apply is fast but still real. var initial = MakeOptions([]); var monitor = new FakeOptionsMonitor(initial); // We'll count how many concurrent executions happen simultaneously. int concurrentPeak = 0; int inProgress = 0; var counters = new ServiceCounters(); var reconciler = BuildReconciler(monitor, counters); _reconcilers.Add(reconciler); reconciler.Attach(new System.Collections.Concurrent.ConcurrentDictionary(StringComparer.Ordinal), initial); // Fire 5 concurrent Apply calls — they must execute one-at-a-time. var opts = MakeOptions([]); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)); // Wrap ApplyAsync in a task that measures concurrency. // We use a short Task.Delay inside to make concurrent calls more visible. var tasks = Enumerable.Range(0, 5).Select(_ => Task.Run(async () => { // Increment in-progress and capture peak. int current = Interlocked.Increment(ref inProgress); Interlocked.Exchange(ref concurrentPeak, Math.Max(Interlocked.CompareExchange(ref concurrentPeak, 0, 0), current)); await Task.Delay(5, cts.Token); // tiny delay to increase collision chance bool result = await reconciler.ApplyAsync(opts, cts.Token); Interlocked.Decrement(ref inProgress); return result; }, cts.Token)).ToArray(); var results = await Task.WhenAll(tasks); // All 5 should have been applied (empty config is always valid). Assert.All(results, r => Assert.True(r)); // The serialisation check: while the above measurement isn't perfect // (the Interlocked peak is set before the semaphore wait, not inside), // the key invariant we verify is that all 5 completed successfully // without deadlock or exception — proving the semaphore doesn't deadlock // under concurrent load. Assert.Equal(5, counters.ReloadAppliedCount); } /// /// Stress-tests the live supervisor dictionary and the coalescing-accessor wiring. /// Many concurrent Apply calls drive add/remove of many distinct PLCs; the inner /// Task.WhenAll continuations must not corrupt the dictionary or crash with /// KeyNotFoundException or ArgumentException. The test asserts: all applies /// complete, no exceptions are thrown, and the reload counter is exactly the /// apply count. /// [Fact(Timeout = 30_000)] public async Task Apply_ManyConcurrentReloads_With_PlcChurn_NoCorruption() { // Empty initial — first Apply will Add all PLCs. var initial = MakeOptions([]); var monitor = new FakeOptionsMonitor(initial); var supervisors = new System.Collections.Concurrent.ConcurrentDictionary(StringComparer.Ordinal); var counters = new ServiceCounters(); var reconciler = BuildReconciler(monitor, counters); _reconcilers.Add(reconciler); reconciler.Attach(supervisors, initial); // Build 8 different option snapshots, each a different PLC roster. // Each Apply will trigger Add+Remove churn against the live supervisor dict — // exactly the path that the ConcurrentDictionary guards against corruption. const int snapshots = 8; const int plcsPerSnapshot = 4; var snaps = new MbproxyOptions[snapshots]; var allPlcs = new List(); for (int s = 0; s < snapshots; s++) { var plcsForSnap = new PlcOptions[plcsPerSnapshot]; for (int p = 0; p < plcsPerSnapshot; p++) { plcsForSnap[p] = MakePlc($"PLC-{s}-{p}", PickFreePort()); allPlcs.Add(plcsForSnap[p]); } snaps[s] = MakeOptions(plcsForSnap); } using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(25)); // Fire 16 concurrent applies cycling through the 8 snapshots so each is // submitted twice. Inner per-PLC Task.WhenAll continuations run in parallel // and stress-test the dictionary mutation safety. var tasks = Enumerable.Range(0, 16) .Select(i => Task.Run(() => reconciler.ApplyAsync(snaps[i % snapshots], cts.Token), cts.Token)) .ToArray(); var results = await Task.WhenAll(tasks); Assert.All(results, r => Assert.True(r, "every Apply must succeed")); Assert.Equal(16, counters.ReloadAppliedCount); // Final dictionary state: all keys present must come from the last-applied snapshot. // The "last-applied snapshot" depends on scheduling so we just verify NO orphan // entries — every supervisor in the dict must correspond to some snapshot's PLCs. var validNames = new HashSet(allPlcs.Select(p => p.Name)); foreach (var name in supervisors.Keys) Assert.Contains(name, validNames); // Track supervisors for cleanup. foreach (var s in supervisors.Values) _supervisors.Add(s); } } /// /// Minimal fake backed by a fixed value. /// internal sealed class FakeOptionsMonitor : IOptionsMonitor { private MbproxyOptions _value; private readonly List> _callbacks = []; public FakeOptionsMonitor(MbproxyOptions value) => _value = value; public MbproxyOptions CurrentValue => _value; public MbproxyOptions Get(string? name) => _value; public IDisposable? OnChange(Action listener) { _callbacks.Add(listener); return new DisposableAction(() => _callbacks.Remove(listener)); } /// Simulates an appsettings file change notification. public void TriggerChange(MbproxyOptions newValue) { _value = newValue; foreach (var cb in _callbacks) cb(newValue, null); } private sealed class DisposableAction(Action action) : IDisposable { public void Dispose() => action(); } }