Files
lmxopcua/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpBoundedChannelTests.cs
Joseph Doherty 392b219233 fix(tests): stabilize three flaky tests under parallel full-solution load
#1 EventPumpBoundedChannelTests.Tags_metrics_with_client_name_for_multi_driver_hosts:
Replace fixed Task.Delay(100) with a poll-until-condition loop (5 s
timeout, 25 ms poll) so the test waits until the galaxy.events.received
measurement for galaxy.client=Driver-X actually lands in the listener.
Also adds lock(captured) in the MeterListener callback and at all reads,
since Counter.Add() fires the callback on the RunAsync background thread.

#2 VirtualTagEngineTests.Upstream_change_triggers_cascade_through_two_levels:
After waiting for B=15.0, also await WaitForConditionAsync for C=30.0
before asserting C. The cascade runs B then C sequentially under the
_evalGate semaphore; the prior code could read C while its evaluation
had not yet acquired the gate.

#3 ThreeUserInteropMatrixTests.Admin_Resolves_All_Five_Groups_From_LDAP:
Wrap the AuthenticateAsync call in a 15 s linked CancellationTokenSource
with one retry so transient GLAuth latency spikes under parallel test
load do not cause a CancellationToken expiry before the LDAP bind/search
complete.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 05:59:00 -04:00

205 lines
8.7 KiB
C#

using System.Diagnostics.Metrics;
using System.Threading.Channels;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime;
/// <summary>
/// PR 6.2 — pins the EventPump's bounded-channel + drop-newest behavior. We
/// hold the dispatch loop with a slow handler so the channel fills, then verify
/// the producer keeps reading from the gw stream and increments the
/// <c>galaxy.events.dropped</c> counter rather than blocking.
/// </summary>
public sealed class EventPumpBoundedChannelTests
{
[Fact]
public async Task Drops_newest_when_channel_fills_and_records_metric()
{
var counters = StartMeterCapture();
try
{
var subscriber = new ManualSubscriber();
var registry = new SubscriptionRegistry();
registry.Register(1, [new TagBinding("Tag.A", ItemHandle: 7)]);
// Tiny channel + slow handler ⇒ producer hits FullMode.Wait → TryWrite false
// for every overflow event.
var dispatchGate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
await using var pump = new EventPump(
subscriber, registry, channelCapacity: 2, clientName: "PumpTest");
pump.OnDataChange += async (_, _) =>
{
// Block the dispatch loop until we've shoved enough events through to
// overflow the bounded channel. Consume the gate exactly once.
await dispatchGate.Task.ConfigureAwait(false);
};
pump.Start();
const int totalEvents = 10;
for (var i = 0; i < totalEvents; i++)
{
await subscriber.EmitAsync(itemHandle: 7, value: i);
}
// Give the producer a beat to run TryWrite for every event.
await Task.Delay(150);
// Capacity 2 + 1 in-flight in the dispatcher = 3 may have been accepted; the
// remainder should have hit the dropped counter. Don't pin exact counts —
// the scheduler can interleave; pin the invariants instead.
counters.Received.ShouldBeGreaterThanOrEqualTo(totalEvents);
counters.Dropped.ShouldBeGreaterThan(0,
"with capacity=2 and a held dispatcher we must drop at least one of 10 events");
(counters.Received).ShouldBe(counters.Dispatched + counters.Dropped + counters.InFlight,
"received = dispatched + dropped + (events still queued)");
// Release the dispatcher so DisposeAsync can drain cleanly.
dispatchGate.TrySetResult();
}
finally
{
counters.Dispose();
}
}
[Fact]
public async Task Throws_when_channelCapacity_is_invalid()
{
var subscriber = new ManualSubscriber();
var registry = new SubscriptionRegistry();
Should.Throw<ArgumentOutOfRangeException>(() =>
new EventPump(subscriber, registry, channelCapacity: 0));
Should.Throw<ArgumentOutOfRangeException>(() =>
new EventPump(subscriber, registry, channelCapacity: -1));
await Task.CompletedTask;
}
[Fact]
public async Task Tags_metrics_with_client_name_for_multi_driver_hosts()
{
var captured = new List<(string Instrument, KeyValuePair<string, object?>[] Tags)>();
using var listener = new MeterListener();
listener.InstrumentPublished = (instr, l) =>
{
if (instr.Meter.Name == EventPump.MeterName) l.EnableMeasurementEvents(instr);
};
// The callback fires on the thread that calls Counter.Add() — that is the
// RunAsync background Task. Use lock(captured) everywhere to avoid torn reads.
listener.SetMeasurementEventCallback<long>((instr, _, tags, _) =>
{
lock (captured) { captured.Add((instr.Name, tags.ToArray())); }
});
listener.Start();
var subscriber = new ManualSubscriber();
var registry = new SubscriptionRegistry();
registry.Register(1, [new TagBinding("Tag.A", ItemHandle: 7)]);
await using (var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "Driver-X"))
{
pump.Start();
await subscriber.EmitAsync(7, 42.0);
// Poll until at least one galaxy.events.received measurement tagged
// galaxy.client=Driver-X lands in the listener, rather than using a
// fixed delay that races under parallel test load on a busy box.
var deadline = DateTime.UtcNow.AddSeconds(5);
bool found = false;
while (DateTime.UtcNow < deadline)
{
listener.RecordObservableInstruments();
bool hasMatch;
lock (captured)
{
hasMatch = captured.Any(c =>
c.Instrument == "galaxy.events.received" &&
c.Tags.Any(t => t.Key == "galaxy.client" &&
string.Equals((string?)t.Value, "Driver-X", StringComparison.Ordinal)));
}
if (hasMatch) { found = true; break; }
await Task.Delay(25);
}
_ = found; // assertion happens below after dispose
}
// The static Meter is shared across all EventPump instances in the test
// assembly; xUnit may run other pump tests in parallel and their
// measurements land on the same listener. Filter to our pump's tag value.
List<(string Instrument, KeyValuePair<string, object?>[] Tags)> ours;
lock (captured)
{
ours = captured
.Where(c => c.Tags.Any(t => t.Key == "galaxy.client"
&& string.Equals((string?)t.Value, "Driver-X", StringComparison.Ordinal)))
.ToList();
}
ours.ShouldNotBeEmpty(
"at least one measurement from this test's pump must carry galaxy.client=Driver-X");
ours.ShouldContain(c => c.Instrument == "galaxy.events.received");
}
private static CounterCapture StartMeterCapture()
{
var capture = new CounterCapture();
var listener = new MeterListener();
listener.InstrumentPublished = (instr, l) =>
{
if (instr.Meter.Name == EventPump.MeterName) l.EnableMeasurementEvents(instr);
};
listener.SetMeasurementEventCallback<long>((instr, value, _, _) =>
{
switch (instr.Name)
{
case "galaxy.events.received": Interlocked.Add(ref capture._received, value); break;
case "galaxy.events.dispatched": Interlocked.Add(ref capture._dispatched, value); break;
case "galaxy.events.dropped": Interlocked.Add(ref capture._dropped, value); break;
}
});
listener.Start();
capture.Listener = listener;
return capture;
}
private sealed class CounterCapture : IDisposable
{
public MeterListener? Listener;
internal long _received, _dispatched, _dropped;
public long Received => Interlocked.Read(ref _received);
public long Dispatched => Interlocked.Read(ref _dispatched);
public long Dropped => Interlocked.Read(ref _dropped);
public long InFlight => Math.Max(0, Received - Dispatched - Dropped);
public void Dispose() => Listener?.Dispose();
}
private sealed class ManualSubscriber : IGalaxySubscriber
{
private readonly Channel<MxEvent> _stream =
Channel.CreateUnbounded<MxEvent>(new UnboundedChannelOptions { SingleReader = true });
public Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(
IReadOnlyList<string> fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
=> Task.FromResult<IReadOnlyList<SubscribeResult>>([]);
public Task UnsubscribeBulkAsync(IReadOnlyList<int> itemHandles, CancellationToken cancellationToken)
=> Task.CompletedTask;
public IAsyncEnumerable<MxEvent> StreamEventsAsync(CancellationToken cancellationToken)
=> _stream.Reader.ReadAllAsync(cancellationToken);
public ValueTask EmitAsync(int itemHandle, double value) =>
_stream.Writer.WriteAsync(new MxEvent
{
Family = MxEventFamily.OnDataChange,
ItemHandle = itemHandle,
Value = new MxValue { DoubleValue = value },
Quality = 192,
SourceTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
});
}
}