test(comm): de-race StreamRelay null-map + SiteConnectionUp gauge under load (#288)

This commit is contained in:
Joseph Doherty
2026-06-19 05:00:13 -04:00
parent ab40534986
commit 84cb8c2abb
2 changed files with 21 additions and 5 deletions
@@ -386,15 +386,23 @@ public class SiteStreamGrpcServerTests : TestKit
// While the stream is up the gauge is one above whatever baseline other // While the stream is up the gauge is one above whatever baseline other
// (possibly parallel) tests left behind — read relative so the assertion // (possibly parallel) tests left behind — read relative so the assertion
// is robust to test interleaving on the process-wide static counter. // is robust to test interleaving on the process-wide static counter.
Assert.Equal(baseline + 1, ReadGauge()); //
// SiteConnectionOpened() runs AFTER the _activeStreams insertion that
// WaitForConditionAsync(ActiveStreamCount == 1) keys off (see
// SiteStreamGrpcServer.SubscribeInstance), so a one-shot ReadGauge() here
// can race the increment under full-suite CPU oversubscription. Poll the
// gauge with a generous timeout until it reaches baseline + 1.
AwaitAssert(() => Assert.Equal(baseline + 1, ReadGauge()), TimeSpan.FromSeconds(5));
cts.Cancel(); cts.Cancel();
await streamTask; await streamTask;
await WaitForConditionAsync(() => server.ActiveStreamCount == 0); await WaitForConditionAsync(() => server.ActiveStreamCount == 0);
// After the cancel path runs the finally, the gauge is balanced back to // After the cancel path runs the finally, the gauge is balanced back to
// the baseline — no leaked "up" count. // the baseline — no leaked "up" count. Poll here too: the gauge decrement
Assert.Equal(baseline, ReadGauge()); // and the stream-count drop are independent observations, so under load
// give the finally's SiteConnectionClosed() time to land.
AwaitAssert(() => Assert.Equal(baseline, ReadGauge()), TimeSpan.FromSeconds(5));
} }
[Fact] [Fact]
@@ -279,8 +279,16 @@ public class StreamRelayActorTests : TestKit
var ts = DateTimeOffset.UtcNow; var ts = DateTimeOffset.UtcNow;
actor.Tell(new AttributeValueChanged("Inst", "Path", "Name", null, "Good", ts)); actor.Tell(new AttributeValueChanged("Inst", "Path", "Name", null, "Good", ts));
Thread.Sleep(500); // Poll for the relayed event with a generous timeout instead of a single
Assert.True(channel.Reader.TryRead(out var evt)); // fixed Thread.Sleep + one-shot TryRead. Under full-suite CPU
// oversubscription the actor may not have processed the Tell within a
// fixed 500ms window, racing the read; AwaitAssert retries until the
// channel has the item (or the generous deadline elapses).
SiteStreamEvent? evt = null;
AwaitAssert(
() => Assert.True(channel.Reader.TryRead(out evt), "Expected a relayed proto event on the channel"),
TimeSpan.FromSeconds(5));
Assert.NotNull(evt);
Assert.Equal("", evt.AttributeChanged.Value); Assert.Equal("", evt.AttributeChanged.Value);
} }