Files
Joseph Doherty 1a2856526a mbproxy: strip historical phase/wave/plan references from source comments
Comments described the *history* of how the code arrived (phase numbers,
wave IDs, review IDs, dated TODOs) instead of what it does today. That
scaffolding rotted as the codebase evolved. Cleaned 60 source files +
.gitignore; behaviour unchanged (387/387 tests still pass).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 13:04:30 -04:00

470 lines
20 KiB
C#

using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Text.Json;
using Mbproxy;
using Mbproxy.Configuration;
using Mbproxy.Proxy;
using Mbproxy.Proxy.Supervision;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Serilog;
using Serilog.Core;
using Serilog.Events;
using Shouldly;
using Xunit;
namespace Mbproxy.Tests.Configuration;
/// <summary>
/// End-to-end hot-reload tests. Each test:
/// <list type="number">
/// <item>Writes a temp appsettings.json file.</item>
/// <item>Builds a real host that reads it with <c>reloadOnChange: true</c>.</item>
/// <item>Mutates the file and waits for the reconciler to apply the change.</item>
/// <item>Asserts the running state reflects the new config.</item>
/// </list>
///
/// These tests do NOT require the pymodbus simulator because they use
/// <see cref="NoopPduPipeline"/> and loopback-only sockets.
/// </summary>
[Trait("Category", "E2E")]
public sealed class HotReloadE2ETests : IAsyncLifetime
{
// ── Helpers ───────────────────────────────────────────────────────────────────────────
private static int PickFreePort()
{
var l = new TcpListener(IPAddress.Loopback, 0);
l.Start();
int port = ((IPEndPoint)l.LocalEndpoint).Port;
l.Stop();
return port;
}
/// <summary>
/// Writes a minimal appsettings.json with the given PLC entries and optional global
/// BCD tags. Uses JSON rather than the raw config API so that
/// <c>Microsoft.Extensions.Configuration.Json</c> / <see cref="FileSystemWatcher"/>
/// pick up the change exactly as they would in production.
/// </summary>
private static void WriteConfig(
string path,
IEnumerable<(string name, int listenPort)> plcs,
IEnumerable<(int addr, int width)>? globalBcdTags = null,
int adminPort = 8080)
{
var plcArr = plcs.Select((p, i) => new
{
Name = p.name,
ListenPort = p.listenPort,
Host = "127.0.0.1",
Port = 502,
}).ToArray();
var globalArr = (globalBcdTags ?? []).Select(t => new { Address = t.addr, Width = t.width }).ToArray();
var doc = new
{
Mbproxy = new
{
AdminPort = adminPort,
BcdTags = new { Global = globalArr },
Plcs = plcArr,
Connection = new { BackendConnectTimeoutMs = 500, BackendRequestTimeoutMs = 500 },
},
};
// Write to a temp path then rename-replace, which is the exact pattern that causes
// FileSystemWatcher to fire 2-3 times and exercises the debounce.
string tmp = path + ".tmp";
File.WriteAllText(tmp, JsonSerializer.Serialize(doc, new JsonSerializerOptions { WriteIndented = true }));
File.Move(tmp, path, overwrite: true);
}
/// <summary>Waits up to <paramref name="timeout"/> for <paramref name="predicate"/> to become true.</summary>
private static async Task WaitForAsync(Func<bool> predicate, TimeSpan timeout, string failMessage)
{
using var cts = new CancellationTokenSource(timeout);
while (!predicate() && !cts.IsCancellationRequested)
await Task.Delay(50, cts.Token).ConfigureAwait(false);
predicate().ShouldBeTrue(failMessage);
}
private IHost BuildHost(string configPath, ILogEventSink? logSink = null)
{
var logger = logSink is not null
? new LoggerConfiguration()
.MinimumLevel.Information()
.WriteTo.Sink(logSink)
.CreateLogger()
: new LoggerConfiguration().MinimumLevel.Fatal().CreateLogger();
var builder = Host.CreateApplicationBuilder();
// Wire the JSON file with reloadOnChange: true (the production pattern).
builder.Configuration.Sources.Clear();
builder.Configuration.AddJsonFile(configPath, optional: false, reloadOnChange: true);
builder.Services.AddSerilog(logger, dispose: false);
builder.AddMbproxyOptions();
builder.Services.AddSingleton<IPduPipeline, NoopPduPipeline>();
builder.Services.AddHostedService<ProxyWorker>();
return builder.Build();
}
// Temp config file path, unique per test run to avoid collisions.
private string _configPath = "";
public ValueTask InitializeAsync()
{
_configPath = Path.Combine(Path.GetTempPath(), $"mbproxy_test_{Guid.NewGuid():N}.json");
return ValueTask.CompletedTask;
}
public ValueTask DisposeAsync()
{
try { File.Delete(_configPath); } catch { /* best effort */ }
return ValueTask.CompletedTask;
}
// ── E2E 1: Add a PLC at runtime → new listener binds ─────────────────────────────────
[Fact(Timeout = 5_000)]
public async Task E2E_AddPlcAtRuntime_NewListenerBinds_AndIsReachable()
{
int portA = PickFreePort();
int portB = PickFreePort();
// Start the host with only PLC-A.
WriteConfig(_configPath, [("PLC-A", portA)]);
using var host = BuildHost(_configPath);
using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await host.StartAsync(startCts.Token);
// Wait for PLC-A to bind.
await WaitForAsync(
() => CanConnect(portA),
TimeSpan.FromSeconds(5),
"PLC-A listener should be reachable after startup");
// Add PLC-B by rewriting the config file.
WriteConfig(_configPath, [("PLC-A", portA), ("PLC-B", portB)]);
// Wait up to 3 s for the new listener to appear.
await WaitForAsync(
() => CanConnect(portB),
TimeSpan.FromSeconds(3),
"PLC-B listener should bind within 3 s of config reload");
using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await host.StopAsync(stopCts.Token);
}
// ── E2E 2: Remove a PLC at runtime → port closes ─────────────────────────────────────
// Timeout 10 s: this test does 5 s startup-wait + 3 s reload-wait + cleanup. The
// hot-reload propagation window needs the headroom; tightening to 5 s causes flakes.
[Fact(Timeout = 10_000)]
public async Task E2E_RemovePlcAtRuntime_ClosesUpstreamConnections()
{
int portA = PickFreePort();
int portB = PickFreePort();
// Start with both PLCs.
WriteConfig(_configPath, [("PLC-A", portA), ("PLC-B", portB)]);
using var host = BuildHost(_configPath);
using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await host.StartAsync(startCts.Token);
// Wait for both listeners.
await WaitForAsync(
() => CanConnect(portA) && CanConnect(portB),
TimeSpan.FromSeconds(5),
"Both PLC-A and PLC-B should bind at startup");
// Remove PLC-B.
WriteConfig(_configPath, [("PLC-A", portA)]);
// Wait up to 3 s for PLC-B's port to close.
await WaitForAsync(
() => !CanConnect(portB),
TimeSpan.FromSeconds(3),
"PLC-B port should stop accepting connections after removal");
// PLC-A must still work.
CanConnect(portA).ShouldBeTrue("PLC-A listener must remain bound");
using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await host.StopAsync(stopCts.Token);
}
// ── E2E 3: Global BCD tag list change → reseat without restart ────────────────────────
[Fact(Timeout = 5_000)]
public async Task E2E_ChangeGlobalBcdTagList_RewriteReflectsImmediately()
{
// This test verifies that after a global tag list change, the supervisor for
// the PLC is reseated (new context) without being restarted.
// We check by reading the reconciler's applied count.
int portA = PickFreePort();
WriteConfig(_configPath, [("PLC-A", portA)], globalBcdTags: []);
var sink = new HotReloadCapturingSink();
using var host = BuildHost(_configPath, logSink: sink);
using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await host.StartAsync(startCts.Token);
await WaitForAsync(
() => CanConnect(portA),
TimeSpan.FromSeconds(5),
"PLC-A should bind at startup");
var counters = host.Services.GetRequiredService<ServiceCounters>();
int beforeCount = counters.ReloadAppliedCount;
// Add a global BCD tag → should trigger a reseat (not a restart).
WriteConfig(_configPath, [("PLC-A", portA)], globalBcdTags: [(1072, 16)]);
// Wait for the reconciler to apply.
await WaitForAsync(
() => counters.ReloadAppliedCount > beforeCount,
TimeSpan.FromSeconds(3),
"ReloadAppliedCount should increment after config change");
// Give Serilog a small window to flush the log event through the pipeline
// into the capturing sink (Serilog dispatch is synchronous on this path, but
// the CapturingSink enqueue happens on whatever thread ApplyAsync ran on).
await Task.Delay(100, TestContext.Current.CancellationToken);
// Verify the reload.applied event was logged.
await WaitForAsync(
() => sink.Events.Any(e => e.MessageTemplate.Text.Contains("Config reload applied")),
TimeSpan.FromSeconds(2),
"mbproxy.config.reload.applied must be logged");
var appliedEvents = sink.Events
.Where(e => e.MessageTemplate.Text.Contains("Config reload applied"))
.ToList();
appliedEvents.ShouldNotBeEmpty("mbproxy.config.reload.applied must be logged");
// PLC-A must still be bound (reseat does not restart).
CanConnect(portA).ShouldBeTrue("PLC-A must remain bound after reseat");
using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await host.StopAsync(stopCts.Token);
}
// ── E2E 4: Invalid reload → does not mutate running state ────────────────────────────
[Fact(Timeout = 5_000)]
public async Task E2E_InvalidReload_DoesNotMutateRunningState()
{
int portA = PickFreePort();
int portB = PickFreePort();
WriteConfig(_configPath, [("PLC-A", portA)]);
var sink = new HotReloadCapturingSink();
using var host = BuildHost(_configPath, logSink: sink);
using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await host.StartAsync(startCts.Token);
await WaitForAsync(
() => CanConnect(portA),
TimeSpan.FromSeconds(5),
"PLC-A should bind at startup");
var counters = host.Services.GetRequiredService<ServiceCounters>();
// Write a BROKEN config: both PLCs on the same port → duplicate ListenPort error.
WriteConfig(_configPath, [("PLC-A", portA), ("PLC-B", portA)]);
// Wait for the rejected event.
await WaitForAsync(
() => counters.ReloadRejectedCount >= 1,
TimeSpan.FromSeconds(3),
"ReloadRejectedCount should increment for invalid config");
// Wait for the log event to propagate into the capturing sink.
await WaitForAsync(
() => sink.Events.Any(e =>
e.Level == LogEventLevel.Error &&
e.MessageTemplate.Text.Contains("Config reload rejected")),
TimeSpan.FromSeconds(2),
"mbproxy.config.reload.rejected must be logged");
// Verify the reload.rejected event was logged.
var rejectedEvents = sink.Events
.Where(e => e.Level == LogEventLevel.Error &&
e.MessageTemplate.Text.Contains("Config reload rejected"))
.ToList();
rejectedEvents.ShouldNotBeEmpty("mbproxy.config.reload.rejected must be logged");
// Host must still be running with old config.
CanConnect(portA).ShouldBeTrue("PLC-A must remain bound after rejected reload");
// PLC-B must NOT have been added (rejected = no partial apply).
CanConnect(portB).ShouldBeFalse("PLC-B must not have been added after rejection");
// Applied count must not have changed.
counters.ReloadAppliedCount.ShouldBe(0, "No reload should have been applied");
using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await host.StopAsync(stopCts.Token);
}
// ── cache flush on tag-list reload ──────────────────────────────────────────────────
/// <summary>
/// Verifies that a tag-list reload for a PLC with a cacheable tag emits
/// <c>mbproxy.cache.flushed</c>. The cache count is 0 (no real backend to populate
/// it), but the event must still fire — it's the operator's signal that the in-memory
/// cache state was reset by a config reload.
/// </summary>
[Fact(Timeout = 8_000)]
public async Task E2E_TagListReload_OnCacheablePlc_EmitsCacheFlushedEvent()
{
int port = PickFreePort();
int adminPort = PickFreePort();
WriteConfigWithCacheableTag(_configPath, port, adminPort, address: 1024, cacheTtlMs: 60_000);
var sink = new HotReloadCapturingSink();
using var host = BuildHost(_configPath, logSink: sink);
using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await host.StartAsync(startCts.Token);
await WaitForAsync(() => CanConnect(port), TimeSpan.FromSeconds(5),
"listener should be reachable after startup");
// Mutate the tag list (different address, still cacheable) — this is a Reseat,
// not an Add/Remove, so ReplaceContextAsync runs and the cache flush fires.
WriteConfigWithCacheableTag(_configPath, port, adminPort, address: 1080, cacheTtlMs: 60_000);
// First confirm the reconciler actually applied the reload at all — gives a clearer
// failure mode than a bare timeout if Reseat isn't firing.
await WaitForAsync(
() => sink.Events.Any(e => e.MessageTemplate.Text.Contains("Config reload applied")),
TimeSpan.FromSeconds(5),
"Config reload applied must fire first; verifies reconciler picked up the change");
await WaitForAsync(
() => sink.Events.Any(e => e.MessageTemplate.Text.Contains("Cache flushed")),
TimeSpan.FromSeconds(2),
"expected mbproxy.cache.flushed after tag-list reload on a cacheable PLC");
using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await host.StopAsync(stopCts.Token);
}
// ── ReadCoalescing.Enabled hot-reload flip ──────────────────────────────────────────
/// <summary>
/// Verifies that flipping <c>Mbproxy.Resilience.ReadCoalescing.Enabled</c> at
/// runtime via hot-reload propagates to the live <see cref="IOptionsMonitor{T}"/>
/// snapshot. The accessor is wired through to add/restart supervisors; the
/// multiplexer reads it per-PDU. Proving the IOptionsMonitor sees the new value
/// is sufficient — the per-PDU read path is unit-tested at the multiplexer level.
/// </summary>
[Fact(Timeout = 8_000)]
public async Task E2E_ReadCoalescingEnabled_FlipAtRuntime_PropagatesToOptionsMonitor()
{
int port = PickFreePort();
int adminPort = PickFreePort();
WriteConfigWithCoalescing(_configPath, port, adminPort, enabled: true);
using var host = BuildHost(_configPath);
using var startCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await host.StartAsync(startCts.Token);
await WaitForAsync(() => CanConnect(port), TimeSpan.FromSeconds(5),
"listener should be reachable after startup");
var monitor = host.Services
.GetRequiredService<Microsoft.Extensions.Options.IOptionsMonitor<Mbproxy.Options.MbproxyOptions>>();
monitor.CurrentValue.Resilience.ReadCoalescing.Enabled.ShouldBeTrue(
"initial config sets Enabled=true");
// Flip to false and re-save.
WriteConfigWithCoalescing(_configPath, port, adminPort, enabled: false);
await WaitForAsync(
() => monitor.CurrentValue.Resilience.ReadCoalescing.Enabled == false,
TimeSpan.FromSeconds(5),
"IOptionsMonitor.CurrentValue must reflect Enabled=false after hot-reload");
using var stopCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await host.StopAsync(stopCts.Token);
}
private static void WriteConfigWithCoalescing(
string path, int listenPort, int adminPort, bool enabled)
{
var doc = new
{
Mbproxy = new
{
AdminPort = adminPort,
BcdTags = new { Global = Array.Empty<object>() },
Plcs = new[] { new { Name = "PLC-A", ListenPort = listenPort, Host = "127.0.0.1", Port = 502 } },
Connection = new { BackendConnectTimeoutMs = 500, BackendRequestTimeoutMs = 500 },
Resilience = new
{
ReadCoalescing = new { Enabled = enabled, MaxParties = 32 },
},
},
};
string tmp = path + ".tmp";
File.WriteAllText(tmp, JsonSerializer.Serialize(doc, new JsonSerializerOptions { WriteIndented = true }));
File.Move(tmp, path, overwrite: true);
}
private static void WriteConfigWithCacheableTag(
string path, int listenPort, int adminPort, int address, int cacheTtlMs)
{
var doc = new
{
Mbproxy = new
{
AdminPort = adminPort,
BcdTags = new { Global = new[] { new { Address = address, Width = 16, CacheTtlMs = cacheTtlMs } } },
Plcs = new[] { new { Name = "PLC-A", ListenPort = listenPort, Host = "127.0.0.1", Port = 502 } },
Connection = new { BackendConnectTimeoutMs = 500, BackendRequestTimeoutMs = 500 },
},
};
string tmp = path + ".tmp";
File.WriteAllText(tmp, JsonSerializer.Serialize(doc, new JsonSerializerOptions { WriteIndented = true }));
File.Move(tmp, path, overwrite: true);
}
// ── Helpers ───────────────────────────────────────────────────────────────────────────
private static bool CanConnect(int port)
{
try
{
using var c = new TcpClient();
c.Connect("127.0.0.1", port);
return true;
}
catch
{
return false;
}
}
}
/// <summary>Serilog <see cref="ILogEventSink"/> that stores events for assertion (hot-reload tests).</summary>
internal sealed class HotReloadCapturingSink : ILogEventSink
{
private readonly ConcurrentQueue<LogEvent> _events = new();
public IEnumerable<LogEvent> Events => _events;
public void Emit(LogEvent logEvent) => _events.Enqueue(logEvent);
}