using System.Diagnostics.Metrics; using System.Threading.Channels; using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts.Proto; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime; /// /// PR 6.2 — pins the EventPump's bounded-channel + drop-newest behavior. We /// hold the dispatch loop with a slow handler so the channel fills, then verify /// the producer keeps reading from the gw stream and increments the /// galaxy.events.dropped counter rather than blocking. /// public sealed class EventPumpBoundedChannelTests { [Fact] public async Task Drops_newest_when_channel_fills_and_records_metric() { var counters = StartMeterCapture(); try { var subscriber = new ManualSubscriber(); var registry = new SubscriptionRegistry(); registry.Register(1, [new TagBinding("Tag.A", ItemHandle: 7)]); // Tiny channel + slow handler ⇒ producer hits FullMode.Wait → TryWrite false // for every overflow event. var dispatchGate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); await using var pump = new EventPump( subscriber, registry, channelCapacity: 2, clientName: "PumpTest"); pump.OnDataChange += async (_, _) => { // Block the dispatch loop until we've shoved enough events through to // overflow the bounded channel. Consume the gate exactly once. await dispatchGate.Task.ConfigureAwait(false); }; pump.Start(); const int totalEvents = 10; for (var i = 0; i < totalEvents; i++) { await subscriber.EmitAsync(itemHandle: 7, value: i); } // Give the producer a beat to run TryWrite for every event. await Task.Delay(150); // Capacity 2 + 1 in-flight in the dispatcher = 3 may have been accepted; the // remainder should have hit the dropped counter. Don't pin exact counts — // the scheduler can interleave; pin the invariants instead. counters.Received.ShouldBeGreaterThanOrEqualTo(totalEvents); counters.Dropped.ShouldBeGreaterThan(0, "with capacity=2 and a held dispatcher we must drop at least one of 10 events"); (counters.Received).ShouldBe(counters.Dispatched + counters.Dropped + counters.InFlight, "received = dispatched + dropped + (events still queued)"); // Release the dispatcher so DisposeAsync can drain cleanly. dispatchGate.TrySetResult(); } finally { counters.Dispose(); } } [Fact] public async Task Throws_when_channelCapacity_is_invalid() { var subscriber = new ManualSubscriber(); var registry = new SubscriptionRegistry(); Should.Throw(() => new EventPump(subscriber, registry, channelCapacity: 0)); Should.Throw(() => new EventPump(subscriber, registry, channelCapacity: -1)); await Task.CompletedTask; } [Fact] public async Task Tags_metrics_with_client_name_for_multi_driver_hosts() { var captured = new List<(string Instrument, KeyValuePair[] Tags)>(); using var listener = new MeterListener(); listener.InstrumentPublished = (instr, l) => { if (instr.Meter.Name == EventPump.MeterName) l.EnableMeasurementEvents(instr); }; listener.SetMeasurementEventCallback((instr, _, tags, _) => { captured.Add((instr.Name, tags.ToArray())); }); listener.Start(); var subscriber = new ManualSubscriber(); var registry = new SubscriptionRegistry(); registry.Register(1, [new TagBinding("Tag.A", ItemHandle: 7)]); await using (var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "Driver-X")) { pump.Start(); await subscriber.EmitAsync(7, 42.0); await Task.Delay(100); listener.RecordObservableInstruments(); } // The static Meter is shared across all EventPump instances in the test // assembly; xUnit may run other pump tests in parallel and their // measurements land on the same listener. Filter to our pump's tag value. var ours = captured .Where(c => c.Tags.Any(t => t.Key == "galaxy.client" && string.Equals((string?)t.Value, "Driver-X", StringComparison.Ordinal))) .ToList(); ours.ShouldNotBeEmpty( "at least one measurement from this test's pump must carry galaxy.client=Driver-X"); ours.ShouldContain(c => c.Instrument == "galaxy.events.received"); } private static CounterCapture StartMeterCapture() { var capture = new CounterCapture(); var listener = new MeterListener(); listener.InstrumentPublished = (instr, l) => { if (instr.Meter.Name == EventPump.MeterName) l.EnableMeasurementEvents(instr); }; listener.SetMeasurementEventCallback((instr, value, _, _) => { switch (instr.Name) { case "galaxy.events.received": Interlocked.Add(ref capture._received, value); break; case "galaxy.events.dispatched": Interlocked.Add(ref capture._dispatched, value); break; case "galaxy.events.dropped": Interlocked.Add(ref capture._dropped, value); break; } }); listener.Start(); capture.Listener = listener; return capture; } private sealed class CounterCapture : IDisposable { public MeterListener? Listener; internal long _received, _dispatched, _dropped; public long Received => Interlocked.Read(ref _received); public long Dispatched => Interlocked.Read(ref _dispatched); public long Dropped => Interlocked.Read(ref _dropped); public long InFlight => Math.Max(0, Received - Dispatched - Dropped); public void Dispose() => Listener?.Dispose(); } private sealed class ManualSubscriber : IGalaxySubscriber { private readonly Channel _stream = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); public Task> SubscribeBulkAsync( IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) => Task.FromResult>([]); public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) => Task.CompletedTask; public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) => _stream.Reader.ReadAllAsync(cancellationToken); public ValueTask EmitAsync(int itemHandle, double value) => _stream.Writer.WriteAsync(new MxEvent { Family = MxEventFamily.OnDataChange, ItemHandle = itemHandle, Value = new MxValue { DoubleValue = value }, Quality = 192, SourceTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), }); } }