9251c564c1
Closes the latent + minor + test-discipline items left after Wave 4. Updates
the re-review doc with a final resolution table — every actionable finding
now marked Resolved or Accepted with rationale.
NM3 — _supervisorCts leaks on re-Start
StartAsync now disposes the previous CTS before reassigning. Idempotent:
a try/catch (ObjectDisposedException) covers the very-first-Start case
where the field-init CTS is still fresh.
NM4 — W2.15 TCS is single-shot
_firstAttemptCompleted is no longer readonly; StartAsync re-creates it
after the W2.16 guard so a re-Started supervisor's
WaitForInitialBindAttemptAsync doesn't observe the previous run's signal.
Nm6 — _admin GetService<> returns null silently
ProxyWorker.ExecuteAsync now logs a Warning when admin isn't registered.
Preserves the loud-failure intent from the original IHostedService
registration without forcing test hosts to wire admin.
Nm7 — AdminEndpointHost.DisposeAsync no double-dispose guard
Added a volatile bool _disposed flag with an early-return at the top of
DisposeAsync. Symmetry with PlcMultiplexer; protects against
ProxyWorker.StopAsync explicitly disposing then DI disposing the singleton
again on host shutdown.
T3 — RemoveInheritedAppsettings only fires on Build
AfterTargets="Build;Publish" + a second Delete against $(PublishDir)
so a `dotnet publish` against the test csproj doesn't ship the example
PLCs from the linked install template.
T4 — Stale TryAttachOrCreate_*_ReturnsTrue_* test method names
Renamed to AttachOrCreate_*_WasNew{True,False} after W3 dropped the bool
return.
Accepted (with rationale documented in ReReviewAfterRemediation.md):
Nm2 — CoalescedHit semantic is per-design
Nm4 — _lastBindError preservation on clean exit is intentional forensics
Nm5 — EventLogBridge has no injectable logger
Nm8 — Cosmetic log noise
T1 — Reflection on private fields documented as maintenance trap
Tests: 387 pass / 0 fail.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
257 lines
9.9 KiB
C#
257 lines
9.9 KiB
C#
using System.Net;
|
||
using System.Net.Sockets;
|
||
using Mbproxy.Proxy.Multiplexing;
|
||
using Microsoft.Extensions.Logging.Abstractions;
|
||
using Shouldly;
|
||
using Xunit;
|
||
|
||
namespace Mbproxy.Tests.Proxy.Multiplexing;
|
||
|
||
/// <summary>
|
||
/// Unit tests for the Phase-10 <see cref="InFlightByKeyMap"/>. Covers the atomic
|
||
/// attach-or-create primitive (load-bearing concurrency invariant), the per-entry
|
||
/// max-parties cap (load-shedding safety valve), and concurrent attach correctness.
|
||
/// </summary>
|
||
[Trait("Category", "Unit")]
|
||
public sealed class InFlightByKeyMapTests
|
||
{
|
||
private static UpstreamPipe MakePipe()
|
||
{
|
||
// The map only retains references to InterestedParty; it never reads pipe state.
|
||
// A connected loopback socket satisfies the constructor contract.
|
||
var listener = new TcpListener(IPAddress.Loopback, 0);
|
||
listener.Start();
|
||
var c = new TcpClient();
|
||
c.Connect(IPAddress.Loopback, ((IPEndPoint)listener.LocalEndpoint).Port);
|
||
var s = listener.AcceptSocket();
|
||
listener.Stop();
|
||
return new UpstreamPipe(s, "PLC1", NullLogger.Instance);
|
||
}
|
||
|
||
private static InFlightRequest MakeRequest(InterestedParty party, byte fc = 0x03,
|
||
ushort start = 100, ushort qty = 1, byte unit = 1)
|
||
{
|
||
// The factory uses a mutable List<InterestedParty> so the map can append on attach.
|
||
var list = new List<InterestedParty>(capacity: 1) { party };
|
||
return new InFlightRequest(
|
||
UnitId: unit,
|
||
Fc: fc,
|
||
StartAddress: start,
|
||
Qty: qty,
|
||
InterestedParties: list,
|
||
SentAtUtc: DateTimeOffset.UtcNow);
|
||
}
|
||
|
||
[Fact]
|
||
public async Task AttachOrCreate_NewKey_CallsFactory_WasNewTrue()
|
||
{
|
||
var pipe = MakePipe();
|
||
try
|
||
{
|
||
var map = new InFlightByKeyMap();
|
||
var key = new CoalescingKey(1, 0x03, 100, 1);
|
||
var party = new InterestedParty(pipe, OriginalTxId: 0x1234);
|
||
|
||
int factoryCalls = 0;
|
||
map.AttachOrCreate(
|
||
key, party,
|
||
factory: () => { factoryCalls++; return MakeRequest(party); },
|
||
maxParties: 32,
|
||
out var req, out bool wasNew);
|
||
|
||
wasNew.ShouldBeTrue("a brand-new key must take the create branch");
|
||
factoryCalls.ShouldBe(1, "the factory must be called exactly once");
|
||
req.ShouldNotBeNull();
|
||
req.InterestedParties.Count.ShouldBe(1);
|
||
map.Count.ShouldBe(1);
|
||
}
|
||
finally
|
||
{
|
||
await pipe.DisposeAsync();
|
||
}
|
||
}
|
||
|
||
[Fact]
|
||
public async Task AttachOrCreate_ExistingKey_AppendsParty_WasNewFalse()
|
||
{
|
||
var pipeA = MakePipe();
|
||
var pipeB = MakePipe();
|
||
try
|
||
{
|
||
var map = new InFlightByKeyMap();
|
||
var key = new CoalescingKey(1, 0x03, 100, 1);
|
||
|
||
var partyA = new InterestedParty(pipeA, OriginalTxId: 0x1111);
|
||
var partyB = new InterestedParty(pipeB, OriginalTxId: 0x2222);
|
||
|
||
int factoryCalls = 0;
|
||
map.AttachOrCreate(key, partyA,
|
||
factory: () => { factoryCalls++; return MakeRequest(partyA); },
|
||
maxParties: 32, out var first, out bool firstWasNew);
|
||
|
||
map.AttachOrCreate(key, partyB,
|
||
factory: () => { factoryCalls++; return MakeRequest(partyB); },
|
||
maxParties: 32, out var second, out bool secondWasNew);
|
||
|
||
firstWasNew.ShouldBeTrue();
|
||
secondWasNew.ShouldBeFalse("the second attach must coalesce onto the first");
|
||
factoryCalls.ShouldBe(1, "the factory must only fire on the create branch");
|
||
second.ShouldBeSameAs(first, "both attaches must return the same InFlightRequest reference");
|
||
second.InterestedParties.Count.ShouldBe(2, "the second party must be appended in place");
|
||
second.InterestedParties[0].OriginalTxId.ShouldBe((ushort)0x1111);
|
||
second.InterestedParties[1].OriginalTxId.ShouldBe((ushort)0x2222);
|
||
}
|
||
finally
|
||
{
|
||
await pipeA.DisposeAsync();
|
||
await pipeB.DisposeAsync();
|
||
}
|
||
}
|
||
|
||
[Fact]
|
||
public async Task AttachOrCreate_ExistingKey_AtMaxParties_CreatesFreshEntry_NotAppend()
|
||
{
|
||
var pipeA = MakePipe();
|
||
var pipeB = MakePipe();
|
||
var pipeC = MakePipe();
|
||
try
|
||
{
|
||
var map = new InFlightByKeyMap();
|
||
var key = new CoalescingKey(1, 0x03, 100, 1);
|
||
|
||
var partyA = new InterestedParty(pipeA, OriginalTxId: 0xAAAA);
|
||
var partyB = new InterestedParty(pipeB, OriginalTxId: 0xBBBB);
|
||
var partyC = new InterestedParty(pipeC, OriginalTxId: 0xCCCC);
|
||
|
||
// MaxParties = 2 — first attach creates, second appends, third overflows.
|
||
map.AttachOrCreate(key, partyA,
|
||
factory: () => MakeRequest(partyA), maxParties: 2,
|
||
out var first, out _);
|
||
map.AttachOrCreate(key, partyB,
|
||
factory: () => MakeRequest(partyB), maxParties: 2,
|
||
out var second, out _);
|
||
|
||
int factoryCalls = 0;
|
||
map.AttachOrCreate(key, partyC,
|
||
factory: () => { factoryCalls++; return MakeRequest(partyC); },
|
||
maxParties: 2,
|
||
out var third, out bool thirdWasNew);
|
||
|
||
thirdWasNew.ShouldBeTrue("the third attach must overflow into a fresh entry");
|
||
factoryCalls.ShouldBe(1, "the factory must fire to create the overflow entry");
|
||
third.ShouldNotBeSameAs(first, "the overflow must be a distinct InFlightRequest");
|
||
third.InterestedParties.Count.ShouldBe(1, "the overflow entry starts with only its triggering party");
|
||
first.InterestedParties.Count.ShouldBe(2, "the original entry stays capped at maxParties");
|
||
}
|
||
finally
|
||
{
|
||
await pipeA.DisposeAsync();
|
||
await pipeB.DisposeAsync();
|
||
await pipeC.DisposeAsync();
|
||
}
|
||
}
|
||
|
||
[Fact]
|
||
public async Task TryRemove_AfterAttach_AllPartiesPresent_InRetrievedEntry()
|
||
{
|
||
var pipeA = MakePipe();
|
||
var pipeB = MakePipe();
|
||
try
|
||
{
|
||
var map = new InFlightByKeyMap();
|
||
var key = new CoalescingKey(1, 0x03, 100, 1);
|
||
|
||
var partyA = new InterestedParty(pipeA, 1);
|
||
var partyB = new InterestedParty(pipeB, 2);
|
||
|
||
map.AttachOrCreate(key, partyA, () => MakeRequest(partyA), 32, out _, out _);
|
||
map.AttachOrCreate(key, partyB, () => MakeRequest(partyB), 32, out _, out _);
|
||
|
||
bool removed = map.TryRemove(key, out var req);
|
||
|
||
removed.ShouldBeTrue();
|
||
req.InterestedParties.Count.ShouldBe(2, "both attached parties must be present in the removed entry");
|
||
map.Count.ShouldBe(0);
|
||
}
|
||
finally
|
||
{
|
||
await pipeA.DisposeAsync();
|
||
await pipeB.DisposeAsync();
|
||
}
|
||
}
|
||
|
||
[Fact]
|
||
public void TryRemove_OfMissing_ReturnsFalse()
|
||
{
|
||
var map = new InFlightByKeyMap();
|
||
var key = new CoalescingKey(1, 0x03, 100, 1);
|
||
|
||
map.TryRemove(key, out _).ShouldBeFalse("removing a never-attached key must report false");
|
||
}
|
||
|
||
[Fact]
|
||
public async Task Concurrent_AttachOrCreate_From_Two_Threads_NoLostParties_AndNoDuplicateEntries()
|
||
{
|
||
// 16 tasks × 500 ops each, all racing on the same key. The map must keep exactly
|
||
// one entry per key (unlimited MaxParties → no overflow). Each successful attach
|
||
// must contribute exactly one party to whatever entry was created/joined.
|
||
//
|
||
// Each task reuses a single UpstreamPipe across its ops — the map only stores the
|
||
// InterestedParty reference; pipe state is irrelevant to the map's invariants.
|
||
// Spinning up 100 × 1000 = 100,000 loopback sockets exhausts the test machine's
|
||
// ephemeral port pool; we use one pipe per task instead.
|
||
const int Tasks = 16;
|
||
const int OpsPerTask = 500;
|
||
const int MaxParties = int.MaxValue;
|
||
|
||
var map = new InFlightByKeyMap();
|
||
var key = new CoalescingKey(1, 0x03, 100, 1);
|
||
var pipes = new List<UpstreamPipe>(Tasks);
|
||
for (int i = 0; i < Tasks; i++) pipes.Add(MakePipe());
|
||
|
||
long attaches = 0;
|
||
long creates = 0;
|
||
|
||
try
|
||
{
|
||
var work = new Task[Tasks];
|
||
var workCt = TestContext.Current.CancellationToken;
|
||
for (int t = 0; t < Tasks; t++)
|
||
{
|
||
var pipe = pipes[t];
|
||
work[t] = Task.Run(() =>
|
||
{
|
||
for (int i = 0; i < OpsPerTask; i++)
|
||
{
|
||
if (workCt.IsCancellationRequested) return;
|
||
var party = new InterestedParty(pipe, (ushort)i);
|
||
map.AttachOrCreate(
|
||
key, party,
|
||
factory: () => MakeRequest(party),
|
||
maxParties: MaxParties,
|
||
out _, out bool wasNew);
|
||
if (wasNew) Interlocked.Increment(ref creates);
|
||
else Interlocked.Increment(ref attaches);
|
||
}
|
||
}, workCt);
|
||
}
|
||
await Task.WhenAll(work);
|
||
|
||
(creates + attaches).ShouldBe((long)(Tasks * OpsPerTask), "every op must take exactly one branch");
|
||
creates.ShouldBe(1, "all ops share the same key with unlimited MaxParties — exactly one create");
|
||
|
||
// The retained entry must contain every attached party.
|
||
bool removed = map.TryRemove(key, out var entry);
|
||
removed.ShouldBeTrue();
|
||
entry.InterestedParties.Count.ShouldBe(Tasks * OpsPerTask,
|
||
"the entry's party list must hold every attached party — no lost parties under race");
|
||
map.Count.ShouldBe(0);
|
||
}
|
||
finally
|
||
{
|
||
foreach (var p in pipes)
|
||
try { await p.DisposeAsync(); } catch { }
|
||
}
|
||
}
|
||
}
|