Files
wwtools/mbproxy/tests/Mbproxy.Tests/Proxy/Multiplexing/UpstreamPipeTests.cs
T
Joseph Doherty 1a2856526a mbproxy: strip historical phase/wave/plan references from source comments
Comments described the *history* of how the code arrived (phase numbers,
wave IDs, review IDs, dated TODOs) instead of what it does today. That
scaffolding rotted as the codebase evolved. Cleaned 60 source files +
.gitignore; behaviour unchanged (387/387 tests still pass).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 13:04:30 -04:00

105 lines
4.0 KiB
C#

using System.Net;
using System.Net.Sockets;
using Mbproxy.Proxy.Multiplexing;
using Microsoft.Extensions.Logging.Abstractions;
using Shouldly;
using Xunit;
namespace Mbproxy.Tests.Proxy.Multiplexing;
/// <summary>
/// Unit tests for <see cref="UpstreamPipe"/>'s response-channel contract — particularly
/// the <see cref="UpstreamPipe.TrySendResponse"/> non-blocking enqueue, which exists
/// so the per-PLC backend reader cannot be stalled by one slow upstream client.
/// </summary>
[Trait("Category", "Unit")]
public sealed class UpstreamPipeTests
{
// ── Helpers ───────────────────────────────────────────────────────────────
private static async Task<(Socket clientSide, Socket serverSide)> AcceptedSocketPairAsync()
{
// Build a loopback listener and connect a client to get a real socket pair.
var listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
try
{
int port = ((IPEndPoint)listener.LocalEndpoint).Port;
var clientSide = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var connectTask = clientSide.ConnectAsync(IPAddress.Loopback, port);
var serverSide = await listener.AcceptSocketAsync();
await connectTask;
return (clientSide, serverSide);
}
finally
{
listener.Stop();
}
}
// ── Tests ─────────────────────────────────────────────────────────────────
/// <summary>
/// When no write-loop is draining the response channel, repeated
/// <see cref="UpstreamPipe.TrySendResponse"/> calls must succeed up to the channel's
/// bounded capacity and return <c>false</c> on every subsequent call without blocking.
/// This is the non-blocking contract the per-PLC backend reader relies on.
/// </summary>
[Fact]
public async Task TrySendResponse_WhenChannelFull_ReturnsFalse_WithoutBlocking()
{
var (client, server) = await AcceptedSocketPairAsync();
try
{
// Construct the pipe but do NOT call RunWriteLoopAsync — the channel will not
// be drained, so it fills after `ResponseChannelCapacity` (= 16) writes.
var pipe = new UpstreamPipe(server, "TEST", NullLogger.Instance);
int successes = 0;
int failures = 0;
for (int i = 0; i < 100; i++)
{
bool ok = pipe.TrySendResponse(new byte[] { 0, 0 });
if (ok) successes++;
else failures++;
}
successes.ShouldBe(16,
"the channel's bounded capacity is 16; first 16 writes must succeed");
failures.ShouldBe(84,
"after capacity is reached, every further TrySendResponse must return false (not block)");
await pipe.DisposeAsync();
}
finally
{
try { client.Dispose(); } catch { }
try { server.Dispose(); } catch { }
}
}
/// <summary>
/// Once the pipe has been disposed, <see cref="UpstreamPipe.TrySendResponse"/>
/// returns <c>false</c> regardless of channel state, never throws.
/// </summary>
[Fact]
public async Task TrySendResponse_AfterDispose_ReturnsFalse()
{
var (client, server) = await AcceptedSocketPairAsync();
try
{
var pipe = new UpstreamPipe(server, "TEST", NullLogger.Instance);
await pipe.DisposeAsync();
bool ok = pipe.TrySendResponse(new byte[] { 0, 0 });
ok.ShouldBeFalse("a disposed pipe must reject sends without throwing");
}
finally
{
try { client.Dispose(); } catch { }
try { server.Dispose(); } catch { }
}
}
}