Files
wwtools/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/InFlightByKeyMapTests.cs
T
Joseph Doherty 7ead3581ab mbproxy: Wave 3 cleanups, docs, and test gaps from 2026-05-14 review
Closes the Wave 3 (cleanup) tier of codereviews/2026-05-14/RemediationPlan.md.
Tests: 378 pass / 0 fail (baseline 370 + 8 new W3 regression tests).

Code cleanups:
  * PlcMultiplexer: removed dead `elapsedMs` calculation (the actual EWMA
    conversion uses Stopwatch ticks two lines below).
  * UpstreamPipe.FillAsync: dropped the meaningless `firstRead && remaining
    == count ? false : false` ternary; both branches were `false`.
  * InFlightByKeyMap.TryAttachOrCreate (always returned `true`) renamed to
    `AttachOrCreate` and made `void`. Test sites updated to drop the dead
    `bool ok = ...; ok.ShouldBeTrue();` assertions.
  * BcdCodec.HasBadNibble promoted from private to internal; the duplicate
    copy in BcdPduPipeline removed and the call sites updated to
    `BcdCodec.HasBadNibble`.
  * PlcMultiplexer watchdog comment fixed: said "1-second floor", code uses
    100 ms. Now both agree.
  * StatusSnapshotBuilder: simplified the unreachable
    `RemoteEp?.ToString() ?? RemoteEp?.Address.ToString() ?? "?"` to
    `RemoteEp?.ToString() ?? "?"`.
  * Mbproxy.csproj: stale "deferred" Polly comment replaced with a real
    description of where Polly is used (BackendConnect + ListenerRecovery).

Doc updates:
  * README: added a callout about the unconventional 32-bit BCD wire format
    ("two base-10000 digits in CDAB", not standard binary CDAB Int32) so
    integrators using off-the-shelf clients learn about the silent-corruption
    hazard before configuring writes.
  * docs/design.md: clarified `cacheMissCount` and `coalescedMissCount`
    semantics — "miss" means "did not find a fresh entry / did not coalesce",
    NOT "produced a backend round-trip". Operators wanting actual backend
    traffic should compute `miss − coalescedHit − exception04`.
  * docs/Architecture/ResponseCache.md: documented the structural
    "skip invalidation while recovering" gating (no backend reader during
    recovery → no FC06/FC16 response → no invalidation).
  * docs/Operations/Configuration.md: noted that the Event Log sink is the
    custom EventLogBridge, not Serilog.Sinks.EventLog (W2.23 cached check).
  * docs/plan/README.md: added a Phase 12 row pointing at the remediation
    plan and linking out to codereviews/2026-05-14/.

Test additions (W3 high-value gaps):
  * BcdPduPipelineTests:
    - FC16_WriteStartsOnHighWord_Of32BitPair_PassesThroughRaw_WithPartialWarning
      (symmetric inverse of the existing low-side partial-overlap test).
    - FC03_Mixed_16Bit_32Bit_AndNonBcd_InOneRead_OnlyConfiguredSlotsRewritten
      (mixed-slot routing in a single FC03 read).
    - FC16_Response_PassesThroughUnchanged_RegardlessOfTagMap (FC16 response
      carries no register data; rewriter must pass through).
  * AdminEndpointTests:
    - NonGetMethod_AgainstAdminRoutes_Returns405 (Theory: POST/PUT/DELETE/
      PATCH against `/` and `/status.json` must return 405; guards against
      an accidental MapPost being added later).
  * HotReloadE2ETests:
    - E2E_TagListReload_OnCacheablePlc_EmitsCacheFlushedEvent (validates the
      W2.8 cache.flushed wiring end-to-end via the real FileSystemWatcher
      reload path).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 06:06:52 -04:00

257 lines
10 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Net;
using System.Net.Sockets;
using Mbproxy.Proxy.Multiplexing;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
namespace Mbproxy.Tests.Proxy.Multiplexing;
/// <summary>
/// Unit tests for the Phase-10 <see cref="InFlightByKeyMap"/>. Covers the atomic
/// attach-or-create primitive (load-bearing concurrency invariant), the per-entry
/// max-parties cap (load-shedding safety valve), and concurrent attach correctness.
/// </summary>
[Trait("Category", "Unit")]
public sealed class InFlightByKeyMapTests
{
private static UpstreamPipe MakePipe()
{
// The map only retains references to InterestedParty; it never reads pipe state.
// A connected loopback socket satisfies the constructor contract.
var listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
var c = new TcpClient();
c.Connect(IPAddress.Loopback, ((IPEndPoint)listener.LocalEndpoint).Port);
var s = listener.AcceptSocket();
listener.Stop();
return new UpstreamPipe(s, "PLC1", NullLogger.Instance);
}
private static InFlightRequest MakeRequest(InterestedParty party, byte fc = 0x03,
ushort start = 100, ushort qty = 1, byte unit = 1)
{
// The factory uses a mutable List<InterestedParty> so the map can append on attach.
var list = new List<InterestedParty>(capacity: 1) { party };
return new InFlightRequest(
UnitId: unit,
Fc: fc,
StartAddress: start,
Qty: qty,
InterestedParties: list,
SentAtUtc: DateTimeOffset.UtcNow);
}
[Fact]
public async Task TryAttachOrCreate_NewKey_CallsFactory_ReturnsTrue_WasNewTrue()
{
var pipe = MakePipe();
try
{
var map = new InFlightByKeyMap();
var key = new CoalescingKey(1, 0x03, 100, 1);
var party = new InterestedParty(pipe, OriginalTxId: 0x1234);
int factoryCalls = 0;
map.AttachOrCreate(
key, party,
factory: () => { factoryCalls++; return MakeRequest(party); },
maxParties: 32,
out var req, out bool wasNew);
wasNew.ShouldBeTrue("a brand-new key must take the create branch");
factoryCalls.ShouldBe(1, "the factory must be called exactly once");
req.ShouldNotBeNull();
req.InterestedParties.Count.ShouldBe(1);
map.Count.ShouldBe(1);
}
finally
{
await pipe.DisposeAsync();
}
}
[Fact]
public async Task TryAttachOrCreate_ExistingKey_AppendsParty_ReturnsTrue_WasNewFalse()
{
var pipeA = MakePipe();
var pipeB = MakePipe();
try
{
var map = new InFlightByKeyMap();
var key = new CoalescingKey(1, 0x03, 100, 1);
var partyA = new InterestedParty(pipeA, OriginalTxId: 0x1111);
var partyB = new InterestedParty(pipeB, OriginalTxId: 0x2222);
int factoryCalls = 0;
map.AttachOrCreate(key, partyA,
factory: () => { factoryCalls++; return MakeRequest(partyA); },
maxParties: 32, out var first, out bool firstWasNew);
map.AttachOrCreate(key, partyB,
factory: () => { factoryCalls++; return MakeRequest(partyB); },
maxParties: 32, out var second, out bool secondWasNew);
firstWasNew.ShouldBeTrue();
secondWasNew.ShouldBeFalse("the second attach must coalesce onto the first");
factoryCalls.ShouldBe(1, "the factory must only fire on the create branch");
second.ShouldBeSameAs(first, "both attaches must return the same InFlightRequest reference");
second.InterestedParties.Count.ShouldBe(2, "the second party must be appended in place");
second.InterestedParties[0].OriginalTxId.ShouldBe((ushort)0x1111);
second.InterestedParties[1].OriginalTxId.ShouldBe((ushort)0x2222);
}
finally
{
await pipeA.DisposeAsync();
await pipeB.DisposeAsync();
}
}
[Fact]
public async Task TryAttachOrCreate_ExistingKey_AtMaxParties_CreatesFreshEntry_NotAppend()
{
var pipeA = MakePipe();
var pipeB = MakePipe();
var pipeC = MakePipe();
try
{
var map = new InFlightByKeyMap();
var key = new CoalescingKey(1, 0x03, 100, 1);
var partyA = new InterestedParty(pipeA, OriginalTxId: 0xAAAA);
var partyB = new InterestedParty(pipeB, OriginalTxId: 0xBBBB);
var partyC = new InterestedParty(pipeC, OriginalTxId: 0xCCCC);
// MaxParties = 2 — first attach creates, second appends, third overflows.
map.AttachOrCreate(key, partyA,
factory: () => MakeRequest(partyA), maxParties: 2,
out var first, out _);
map.AttachOrCreate(key, partyB,
factory: () => MakeRequest(partyB), maxParties: 2,
out var second, out _);
int factoryCalls = 0;
map.AttachOrCreate(key, partyC,
factory: () => { factoryCalls++; return MakeRequest(partyC); },
maxParties: 2,
out var third, out bool thirdWasNew);
thirdWasNew.ShouldBeTrue("the third attach must overflow into a fresh entry");
factoryCalls.ShouldBe(1, "the factory must fire to create the overflow entry");
third.ShouldNotBeSameAs(first, "the overflow must be a distinct InFlightRequest");
third.InterestedParties.Count.ShouldBe(1, "the overflow entry starts with only its triggering party");
first.InterestedParties.Count.ShouldBe(2, "the original entry stays capped at maxParties");
}
finally
{
await pipeA.DisposeAsync();
await pipeB.DisposeAsync();
await pipeC.DisposeAsync();
}
}
[Fact]
public async Task TryRemove_AfterAttach_AllPartiesPresent_InRetrievedEntry()
{
var pipeA = MakePipe();
var pipeB = MakePipe();
try
{
var map = new InFlightByKeyMap();
var key = new CoalescingKey(1, 0x03, 100, 1);
var partyA = new InterestedParty(pipeA, 1);
var partyB = new InterestedParty(pipeB, 2);
map.AttachOrCreate(key, partyA, () => MakeRequest(partyA), 32, out _, out _);
map.AttachOrCreate(key, partyB, () => MakeRequest(partyB), 32, out _, out _);
bool removed = map.TryRemove(key, out var req);
removed.ShouldBeTrue();
req.InterestedParties.Count.ShouldBe(2, "both attached parties must be present in the removed entry");
map.Count.ShouldBe(0);
}
finally
{
await pipeA.DisposeAsync();
await pipeB.DisposeAsync();
}
}
[Fact]
public void TryRemove_OfMissing_ReturnsFalse()
{
var map = new InFlightByKeyMap();
var key = new CoalescingKey(1, 0x03, 100, 1);
map.TryRemove(key, out _).ShouldBeFalse("removing a never-attached key must report false");
}
[Fact]
public async Task Concurrent_AttachOrCreate_From_Two_Threads_NoLostParties_AndNoDuplicateEntries()
{
// 16 tasks × 500 ops each, all racing on the same key. The map must keep exactly
// one entry per key (unlimited MaxParties → no overflow). Each successful attach
// must contribute exactly one party to whatever entry was created/joined.
//
// Each task reuses a single UpstreamPipe across its ops — the map only stores the
// InterestedParty reference; pipe state is irrelevant to the map's invariants.
// Spinning up 100 × 1000 = 100,000 loopback sockets exhausts the test machine's
// ephemeral port pool; we use one pipe per task instead.
const int Tasks = 16;
const int OpsPerTask = 500;
const int MaxParties = int.MaxValue;
var map = new InFlightByKeyMap();
var key = new CoalescingKey(1, 0x03, 100, 1);
var pipes = new List<UpstreamPipe>(Tasks);
for (int i = 0; i < Tasks; i++) pipes.Add(MakePipe());
long attaches = 0;
long creates = 0;
try
{
var work = new Task[Tasks];
var workCt = TestContext.Current.CancellationToken;
for (int t = 0; t < Tasks; t++)
{
var pipe = pipes[t];
work[t] = Task.Run(() =>
{
for (int i = 0; i < OpsPerTask; i++)
{
if (workCt.IsCancellationRequested) return;
var party = new InterestedParty(pipe, (ushort)i);
map.AttachOrCreate(
key, party,
factory: () => MakeRequest(party),
maxParties: MaxParties,
out _, out bool wasNew);
if (wasNew) Interlocked.Increment(ref creates);
else Interlocked.Increment(ref attaches);
}
}, workCt);
}
await Task.WhenAll(work);
(creates + attaches).ShouldBe((long)(Tasks * OpsPerTask), "every op must take exactly one branch");
creates.ShouldBe(1, "all ops share the same key with unlimited MaxParties — exactly one create");
// The retained entry must contain every attached party.
bool removed = map.TryRemove(key, out var entry);
removed.ShouldBeTrue();
entry.InterestedParties.Count.ShouldBe(Tasks * OpsPerTask,
"the entry's party list must hold every attached party — no lost parties under race");
map.Count.ShouldBe(0);
}
finally
{
foreach (var p in pipes)
try { await p.DisposeAsync(); } catch { }
}
}
}