fix(tests): stabilize three flaky tests under parallel full-solution load
#1 EventPumpBoundedChannelTests.Tags_metrics_with_client_name_for_multi_driver_hosts: Replace fixed Task.Delay(100) with a poll-until-condition loop (5 s timeout, 25 ms poll) so the test waits until the galaxy.events.received measurement for galaxy.client=Driver-X actually lands in the listener. Also adds lock(captured) in the MeterListener callback and at all reads, since Counter.Add() fires the callback on the RunAsync background thread. #2 VirtualTagEngineTests.Upstream_change_triggers_cascade_through_two_levels: After waiting for B=15.0, also await WaitForConditionAsync for C=30.0 before asserting C. The cascade runs B then C sequentially under the _evalGate semaphore; the prior code could read C while its evaluation had not yet acquired the gate. #3 ThreeUserInteropMatrixTests.Admin_Resolves_All_Five_Groups_From_LDAP: Wrap the AuthenticateAsync call in a 15 s linked CancellationTokenSource with one retry so transient GLAuth latency spikes under parallel test load do not cause a CancellationToken expiry before the LDAP bind/search complete. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -68,9 +68,15 @@ public sealed class VirtualTagEngineTests
|
|||||||
engine.Read("B").Value.ShouldBe(11.0);
|
engine.Read("B").Value.ShouldBe(11.0);
|
||||||
engine.Read("C").Value.ShouldBe(22.0);
|
engine.Read("C").Value.ShouldBe(22.0);
|
||||||
|
|
||||||
// Change upstream — cascade should recompute B (11→15.0) then C (30.0)
|
// Change upstream — cascade should recompute B (11→15.0) then C (30.0).
|
||||||
|
// Both B and C are updated in the same CascadeAsync call (topological order:
|
||||||
|
// B then C), but we must wait for each independently: the WaitForConditionAsync
|
||||||
|
// on B returns as soon as _valueCache["B"] is set (before the semaphore is
|
||||||
|
// released for C's evaluation), so asserting C immediately after the B-wait
|
||||||
|
// races against C's still-in-progress evaluation. Wait for C explicitly.
|
||||||
up.Push("A", 5.0);
|
up.Push("A", 5.0);
|
||||||
await WaitForConditionAsync(() => Equals(engine.Read("B").Value, 15.0));
|
await WaitForConditionAsync(() => Equals(engine.Read("B").Value, 15.0));
|
||||||
|
await WaitForConditionAsync(() => Equals(engine.Read("C").Value, 30.0));
|
||||||
engine.Read("B").Value.ShouldBe(15.0);
|
engine.Read("B").Value.ShouldBe(15.0);
|
||||||
engine.Read("C").Value.ShouldBe(30.0);
|
engine.Read("C").Value.ShouldBe(30.0);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -87,9 +87,11 @@ public sealed class EventPumpBoundedChannelTests
|
|||||||
{
|
{
|
||||||
if (instr.Meter.Name == EventPump.MeterName) l.EnableMeasurementEvents(instr);
|
if (instr.Meter.Name == EventPump.MeterName) l.EnableMeasurementEvents(instr);
|
||||||
};
|
};
|
||||||
|
// The callback fires on the thread that calls Counter.Add() — that is the
|
||||||
|
// RunAsync background Task. Use lock(captured) everywhere to avoid torn reads.
|
||||||
listener.SetMeasurementEventCallback<long>((instr, _, tags, _) =>
|
listener.SetMeasurementEventCallback<long>((instr, _, tags, _) =>
|
||||||
{
|
{
|
||||||
captured.Add((instr.Name, tags.ToArray()));
|
lock (captured) { captured.Add((instr.Name, tags.ToArray())); }
|
||||||
});
|
});
|
||||||
listener.Start();
|
listener.Start();
|
||||||
|
|
||||||
@@ -101,17 +103,40 @@ public sealed class EventPumpBoundedChannelTests
|
|||||||
{
|
{
|
||||||
pump.Start();
|
pump.Start();
|
||||||
await subscriber.EmitAsync(7, 42.0);
|
await subscriber.EmitAsync(7, 42.0);
|
||||||
await Task.Delay(100);
|
|
||||||
listener.RecordObservableInstruments();
|
// Poll until at least one galaxy.events.received measurement tagged
|
||||||
|
// galaxy.client=Driver-X lands in the listener, rather than using a
|
||||||
|
// fixed delay that races under parallel test load on a busy box.
|
||||||
|
var deadline = DateTime.UtcNow.AddSeconds(5);
|
||||||
|
bool found = false;
|
||||||
|
while (DateTime.UtcNow < deadline)
|
||||||
|
{
|
||||||
|
listener.RecordObservableInstruments();
|
||||||
|
bool hasMatch;
|
||||||
|
lock (captured)
|
||||||
|
{
|
||||||
|
hasMatch = captured.Any(c =>
|
||||||
|
c.Instrument == "galaxy.events.received" &&
|
||||||
|
c.Tags.Any(t => t.Key == "galaxy.client" &&
|
||||||
|
string.Equals((string?)t.Value, "Driver-X", StringComparison.Ordinal)));
|
||||||
|
}
|
||||||
|
if (hasMatch) { found = true; break; }
|
||||||
|
await Task.Delay(25);
|
||||||
|
}
|
||||||
|
_ = found; // assertion happens below after dispose
|
||||||
}
|
}
|
||||||
|
|
||||||
// The static Meter is shared across all EventPump instances in the test
|
// The static Meter is shared across all EventPump instances in the test
|
||||||
// assembly; xUnit may run other pump tests in parallel and their
|
// assembly; xUnit may run other pump tests in parallel and their
|
||||||
// measurements land on the same listener. Filter to our pump's tag value.
|
// measurements land on the same listener. Filter to our pump's tag value.
|
||||||
var ours = captured
|
List<(string Instrument, KeyValuePair<string, object?>[] Tags)> ours;
|
||||||
.Where(c => c.Tags.Any(t => t.Key == "galaxy.client"
|
lock (captured)
|
||||||
&& string.Equals((string?)t.Value, "Driver-X", StringComparison.Ordinal)))
|
{
|
||||||
.ToList();
|
ours = captured
|
||||||
|
.Where(c => c.Tags.Any(t => t.Key == "galaxy.client"
|
||||||
|
&& string.Equals((string?)t.Value, "Driver-X", StringComparison.Ordinal)))
|
||||||
|
.ToList();
|
||||||
|
}
|
||||||
|
|
||||||
ours.ShouldNotBeEmpty(
|
ours.ShouldNotBeEmpty(
|
||||||
"at least one measurement from this test's pump must carry galaxy.client=Driver-X");
|
"at least one measurement from this test's pump must carry galaxy.client=Driver-X");
|
||||||
|
|||||||
@@ -207,9 +207,31 @@ public sealed class ThreeUserInteropMatrixTests
|
|||||||
// pins the resolution explicitly in strict mode.
|
// pins the resolution explicitly in strict mode.
|
||||||
if (!GlauthReachable()) Assert.Skip("GLAuth unreachable at localhost:3893.");
|
if (!GlauthReachable()) Assert.Skip("GLAuth unreachable at localhost:3893.");
|
||||||
|
|
||||||
var auth = await NewAuthenticator().AuthenticateAsync("admin", "admin123", TestContext.Current.CancellationToken);
|
// Under parallel full-solution test load, GLAuth on localhost can be slow to
|
||||||
|
// respond; use a generous per-call timeout independent of xUnit's test runner
|
||||||
|
// deadline so we don't race against the runner's own CancellationToken, and
|
||||||
|
// retry once on timeout to absorb transient latency spikes.
|
||||||
|
const int LdapTimeoutSeconds = 15;
|
||||||
|
UserAuthResult? auth = null;
|
||||||
|
for (var attempt = 0; attempt < 2; attempt++)
|
||||||
|
{
|
||||||
|
using var cts = CancellationTokenSource.CreateLinkedTokenSource(
|
||||||
|
TestContext.Current.CancellationToken);
|
||||||
|
cts.CancelAfter(TimeSpan.FromSeconds(LdapTimeoutSeconds));
|
||||||
|
try
|
||||||
|
{
|
||||||
|
auth = await NewAuthenticator().AuthenticateAsync("admin", "admin123", cts.Token);
|
||||||
|
break; // success — no retry needed
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (!TestContext.Current.CancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
if (attempt == 1) throw; // second attempt also timed out — let it fail
|
||||||
|
// First attempt timed out under load; retry once with a fresh token.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
auth.Success.ShouldBeTrue();
|
auth.ShouldNotBeNull();
|
||||||
|
auth!.Success.ShouldBeTrue();
|
||||||
auth.Groups.ShouldContain("ReadOnly");
|
auth.Groups.ShouldContain("ReadOnly");
|
||||||
auth.Groups.ShouldContain("WriteOperate");
|
auth.Groups.ShouldContain("WriteOperate");
|
||||||
auth.Groups.ShouldContain("WriteTune");
|
auth.Groups.ShouldContain("WriteTune");
|
||||||
|
|||||||
Reference in New Issue
Block a user