using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using System.Text.Json; using Mbproxy; using Mbproxy.Configuration; using Mbproxy.Proxy; using Mbproxy.Proxy.Supervision; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Serilog; using Serilog.Core; using Serilog.Events; using Shouldly; using Xunit; namespace Mbproxy.Tests.Configuration; /// /// End-to-end hot-reload tests. Each test: /// /// Writes a temp appsettings.json file. /// Builds a real host that reads it with reloadOnChange: true. /// Mutates the file and waits for the reconciler to apply the change. /// Asserts the running state reflects the new config. /// /// /// These tests do NOT require the pymodbus simulator because they use /// and loopback-only sockets. /// [Trait("Category", "E2E")] public sealed class HotReloadE2ETests : IAsyncLifetime { // ── Helpers ─────────────────────────────────────────────────────────────────────────── private static int PickFreePort() { var l = new TcpListener(IPAddress.Loopback, 0); l.Start(); int port = ((IPEndPoint)l.LocalEndpoint).Port; l.Stop(); return port; } /// /// Writes a minimal appsettings.json with the given PLC entries and optional global /// BCD tags. Uses JSON rather than the raw config API so that /// Microsoft.Extensions.Configuration.Json / /// pick up the change exactly as they would in production. /// private static void WriteConfig( string path, IEnumerable<(string name, int listenPort)> plcs, IEnumerable<(int addr, int width)>? globalBcdTags = null, int adminPort = 8080) { var plcArr = plcs.Select((p, i) => new { Name = p.name, ListenPort = p.listenPort, Host = "127.0.0.1", Port = 502, }).ToArray(); var globalArr = (globalBcdTags ?? []).Select(t => new { Address = t.addr, Width = t.width }).ToArray(); var doc = new { Mbproxy = new { AdminPort = adminPort, BcdTags = new { Global = globalArr }, Plcs = plcArr, Connection = new { BackendConnectTimeoutMs = 500, BackendRequestTimeoutMs = 500 }, }, }; // Write to a temp path then rename-replace, which is the exact pattern that causes // FileSystemWatcher to fire 2-3 times and exercises the debounce. string tmp = path + ".tmp"; File.WriteAllText(tmp, JsonSerializer.Serialize(doc, new JsonSerializerOptions { WriteIndented = true })); File.Move(tmp, path, overwrite: true); } /// Waits up to for to become true. private static async Task WaitForAsync(Func predicate, TimeSpan timeout, string failMessage) { using var cts = new CancellationTokenSource(timeout); while (!predicate() && !cts.IsCancellationRequested) await Task.Delay(50, cts.Token).ConfigureAwait(false); predicate().ShouldBeTrue(failMessage); } private IHost BuildHost(string configPath, ILogEventSink? logSink = null) { var logger = logSink is not null ? new LoggerConfiguration() .MinimumLevel.Information() .WriteTo.Sink(logSink) .CreateLogger() : new LoggerConfiguration().MinimumLevel.Fatal().CreateLogger(); var builder = Host.CreateApplicationBuilder(); // Wire the JSON file with reloadOnChange: true (the production pattern). builder.Configuration.Sources.Clear(); builder.Configuration.AddJsonFile(configPath, optional: false, reloadOnChange: true); builder.Services.AddSerilog(logger, dispose: false); builder.AddMbproxyOptions(); builder.Services.AddSingleton(); builder.Services.AddHostedService(); return builder.Build(); } // Temp config file path, unique per test run to avoid collisions. private string _configPath = ""; public ValueTask InitializeAsync() { _configPath = Path.Combine(Path.GetTempPath(), $"mbproxy_test_{Guid.NewGuid():N}.json"); return ValueTask.CompletedTask; } public ValueTask DisposeAsync() { try { File.Delete(_configPath); } catch { /* best effort */ } return ValueTask.CompletedTask; } // ── E2E 1: Add a PLC at runtime → new listener binds ───────────────────────────────── [Fact(Timeout = 5_000)] public async Task E2E_AddPlcAtRuntime_NewListenerBinds_AndIsReachable() { int portA = PickFreePort(); int portB = PickFreePort(); // Start the host with only PLC-A. WriteConfig(_configPath, [("PLC-A", portA)]); using var host = BuildHost(_configPath); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await host.StartAsync(startCts.Token); // Wait for PLC-A to bind. await WaitForAsync( () => CanConnect(portA), TimeSpan.FromSeconds(5), "PLC-A listener should be reachable after startup"); // Add PLC-B by rewriting the config file. WriteConfig(_configPath, [("PLC-A", portA), ("PLC-B", portB)]); // Wait up to 3 s for the new listener to appear. await WaitForAsync( () => CanConnect(portB), TimeSpan.FromSeconds(3), "PLC-B listener should bind within 3 s of config reload"); using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await host.StopAsync(stopCts.Token); } // ── E2E 2: Remove a PLC at runtime → port closes ───────────────────────────────────── // Timeout 10 s: this test does 5 s startup-wait + 3 s reload-wait + cleanup. The // hot-reload propagation window needs the headroom; tightening to 5 s causes flakes. [Fact(Timeout = 10_000)] public async Task E2E_RemovePlcAtRuntime_ClosesUpstreamConnections() { int portA = PickFreePort(); int portB = PickFreePort(); // Start with both PLCs. WriteConfig(_configPath, [("PLC-A", portA), ("PLC-B", portB)]); using var host = BuildHost(_configPath); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await host.StartAsync(startCts.Token); // Wait for both listeners. await WaitForAsync( () => CanConnect(portA) && CanConnect(portB), TimeSpan.FromSeconds(5), "Both PLC-A and PLC-B should bind at startup"); // Remove PLC-B. WriteConfig(_configPath, [("PLC-A", portA)]); // Wait up to 3 s for PLC-B's port to close. await WaitForAsync( () => !CanConnect(portB), TimeSpan.FromSeconds(3), "PLC-B port should stop accepting connections after removal"); // PLC-A must still work. CanConnect(portA).ShouldBeTrue("PLC-A listener must remain bound"); using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await host.StopAsync(stopCts.Token); } // ── E2E 3: Global BCD tag list change → reseat without restart ──────────────────────── [Fact(Timeout = 5_000)] public async Task E2E_ChangeGlobalBcdTagList_RewriteReflectsImmediately() { // This test verifies that after a global tag list change, the supervisor for // the PLC is reseated (new context) without being restarted. // We check by reading the reconciler's applied count. int portA = PickFreePort(); WriteConfig(_configPath, [("PLC-A", portA)], globalBcdTags: []); var sink = new HotReloadCapturingSink(); using var host = BuildHost(_configPath, logSink: sink); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await host.StartAsync(startCts.Token); await WaitForAsync( () => CanConnect(portA), TimeSpan.FromSeconds(5), "PLC-A should bind at startup"); var counters = host.Services.GetRequiredService(); int beforeCount = counters.ReloadAppliedCount; // Add a global BCD tag → should trigger a reseat (not a restart). WriteConfig(_configPath, [("PLC-A", portA)], globalBcdTags: [(1072, 16)]); // Wait for the reconciler to apply. await WaitForAsync( () => counters.ReloadAppliedCount > beforeCount, TimeSpan.FromSeconds(3), "ReloadAppliedCount should increment after config change"); // Give Serilog a small window to flush the log event through the pipeline // into the capturing sink (Serilog dispatch is synchronous on this path, but // the CapturingSink enqueue happens on whatever thread ApplyAsync ran on). await Task.Delay(100, TestContext.Current.CancellationToken); // Verify the reload.applied event was logged. await WaitForAsync( () => sink.Events.Any(e => e.MessageTemplate.Text.Contains("Config reload applied")), TimeSpan.FromSeconds(2), "mbproxy.config.reload.applied must be logged"); var appliedEvents = sink.Events .Where(e => e.MessageTemplate.Text.Contains("Config reload applied")) .ToList(); appliedEvents.ShouldNotBeEmpty("mbproxy.config.reload.applied must be logged"); // PLC-A must still be bound (reseat does not restart). CanConnect(portA).ShouldBeTrue("PLC-A must remain bound after reseat"); using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await host.StopAsync(stopCts.Token); } // ── E2E 4: Invalid reload → does not mutate running state ──────────────────────────── [Fact(Timeout = 5_000)] public async Task E2E_InvalidReload_DoesNotMutateRunningState() { int portA = PickFreePort(); int portB = PickFreePort(); WriteConfig(_configPath, [("PLC-A", portA)]); var sink = new HotReloadCapturingSink(); using var host = BuildHost(_configPath, logSink: sink); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await host.StartAsync(startCts.Token); await WaitForAsync( () => CanConnect(portA), TimeSpan.FromSeconds(5), "PLC-A should bind at startup"); var counters = host.Services.GetRequiredService(); // Write a BROKEN config: both PLCs on the same port → duplicate ListenPort error. WriteConfig(_configPath, [("PLC-A", portA), ("PLC-B", portA)]); // Wait for the rejected event. await WaitForAsync( () => counters.ReloadRejectedCount >= 1, TimeSpan.FromSeconds(3), "ReloadRejectedCount should increment for invalid config"); // Wait for the log event to propagate into the capturing sink. await WaitForAsync( () => sink.Events.Any(e => e.Level == LogEventLevel.Error && e.MessageTemplate.Text.Contains("Config reload rejected")), TimeSpan.FromSeconds(2), "mbproxy.config.reload.rejected must be logged"); // Verify the reload.rejected event was logged. var rejectedEvents = sink.Events .Where(e => e.Level == LogEventLevel.Error && e.MessageTemplate.Text.Contains("Config reload rejected")) .ToList(); rejectedEvents.ShouldNotBeEmpty("mbproxy.config.reload.rejected must be logged"); // Host must still be running with old config. CanConnect(portA).ShouldBeTrue("PLC-A must remain bound after rejected reload"); // PLC-B must NOT have been added (rejected = no partial apply). CanConnect(portB).ShouldBeFalse("PLC-B must not have been added after rejection"); // Applied count must not have changed. counters.ReloadAppliedCount.ShouldBe(0, "No reload should have been applied"); using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await host.StopAsync(stopCts.Token); } // ── cache flush on tag-list reload ────────────────────────────────────────────────── /// /// Verifies that a tag-list reload for a PLC with a cacheable tag emits /// mbproxy.cache.flushed. The cache count is 0 (no real backend to populate /// it), but the event must still fire — it's the operator's signal that the in-memory /// cache state was reset by a config reload. /// [Fact(Timeout = 8_000)] public async Task E2E_TagListReload_OnCacheablePlc_EmitsCacheFlushedEvent() { int port = PickFreePort(); int adminPort = PickFreePort(); WriteConfigWithCacheableTag(_configPath, port, adminPort, address: 1024, cacheTtlMs: 60_000); var sink = new HotReloadCapturingSink(); using var host = BuildHost(_configPath, logSink: sink); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await host.StartAsync(startCts.Token); await WaitForAsync(() => CanConnect(port), TimeSpan.FromSeconds(5), "listener should be reachable after startup"); // Mutate the tag list (different address, still cacheable) — this is a Reseat, // not an Add/Remove, so ReplaceContextAsync runs and the cache flush fires. WriteConfigWithCacheableTag(_configPath, port, adminPort, address: 1080, cacheTtlMs: 60_000); // First confirm the reconciler actually applied the reload at all — gives a clearer // failure mode than a bare timeout if Reseat isn't firing. await WaitForAsync( () => sink.Events.Any(e => e.MessageTemplate.Text.Contains("Config reload applied")), TimeSpan.FromSeconds(5), "Config reload applied must fire first; verifies reconciler picked up the change"); await WaitForAsync( () => sink.Events.Any(e => e.MessageTemplate.Text.Contains("Cache flushed")), TimeSpan.FromSeconds(2), "expected mbproxy.cache.flushed after tag-list reload on a cacheable PLC"); using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await host.StopAsync(stopCts.Token); } // ── ReadCoalescing.Enabled hot-reload flip ────────────────────────────────────────── /// /// Verifies that flipping Mbproxy.Resilience.ReadCoalescing.Enabled at /// runtime via hot-reload propagates to the live /// snapshot. The accessor is wired through to add/restart supervisors; the /// multiplexer reads it per-PDU. Proving the IOptionsMonitor sees the new value /// is sufficient — the per-PDU read path is unit-tested at the multiplexer level. /// [Fact(Timeout = 8_000)] public async Task E2E_ReadCoalescingEnabled_FlipAtRuntime_PropagatesToOptionsMonitor() { int port = PickFreePort(); int adminPort = PickFreePort(); WriteConfigWithCoalescing(_configPath, port, adminPort, enabled: true); using var host = BuildHost(_configPath); using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await host.StartAsync(startCts.Token); await WaitForAsync(() => CanConnect(port), TimeSpan.FromSeconds(5), "listener should be reachable after startup"); var monitor = host.Services .GetRequiredService>(); monitor.CurrentValue.Resilience.ReadCoalescing.Enabled.ShouldBeTrue( "initial config sets Enabled=true"); // Flip to false and re-save. WriteConfigWithCoalescing(_configPath, port, adminPort, enabled: false); await WaitForAsync( () => monitor.CurrentValue.Resilience.ReadCoalescing.Enabled == false, TimeSpan.FromSeconds(5), "IOptionsMonitor.CurrentValue must reflect Enabled=false after hot-reload"); using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await host.StopAsync(stopCts.Token); } private static void WriteConfigWithCoalescing( string path, int listenPort, int adminPort, bool enabled) { var doc = new { Mbproxy = new { AdminPort = adminPort, BcdTags = new { Global = Array.Empty() }, Plcs = new[] { new { Name = "PLC-A", ListenPort = listenPort, Host = "127.0.0.1", Port = 502 } }, Connection = new { BackendConnectTimeoutMs = 500, BackendRequestTimeoutMs = 500 }, Resilience = new { ReadCoalescing = new { Enabled = enabled, MaxParties = 32 }, }, }, }; string tmp = path + ".tmp"; File.WriteAllText(tmp, JsonSerializer.Serialize(doc, new JsonSerializerOptions { WriteIndented = true })); File.Move(tmp, path, overwrite: true); } private static void WriteConfigWithCacheableTag( string path, int listenPort, int adminPort, int address, int cacheTtlMs) { var doc = new { Mbproxy = new { AdminPort = adminPort, BcdTags = new { Global = new[] { new { Address = address, Width = 16, CacheTtlMs = cacheTtlMs } } }, Plcs = new[] { new { Name = "PLC-A", ListenPort = listenPort, Host = "127.0.0.1", Port = 502 } }, Connection = new { BackendConnectTimeoutMs = 500, BackendRequestTimeoutMs = 500 }, }, }; string tmp = path + ".tmp"; File.WriteAllText(tmp, JsonSerializer.Serialize(doc, new JsonSerializerOptions { WriteIndented = true })); File.Move(tmp, path, overwrite: true); } // ── Helpers ─────────────────────────────────────────────────────────────────────────── private static bool CanConnect(int port) { try { using var c = new TcpClient(); c.Connect("127.0.0.1", port); return true; } catch { return false; } } } /// Serilog that stores events for assertion (hot-reload tests). internal sealed class HotReloadCapturingSink : ILogEventSink { private readonly ConcurrentQueue _events = new(); public IEnumerable Events => _events; public void Emit(LogEvent logEvent) => _events.Enqueue(logEvent); }