diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs index 42b44d4..7bf8035 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/GalaxyDriver.cs @@ -511,7 +511,9 @@ public sealed class GalaxyDriver lock (_pumpLock) { if (_eventPump is not null) return _eventPump; - _eventPump = new EventPump(_subscriber!, _subscriptions, _logger); + _eventPump = new EventPump( + _subscriber!, _subscriptions, _logger, + clientName: _options.MxAccess.ClientName); _eventPump.OnDataChange += OnPumpDataChange; _eventPump.Start(); return _eventPump; diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs index c2a5c48..ee6d8c0 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs @@ -1,3 +1,5 @@ +using System.Diagnostics.Metrics; +using System.Threading.Channels; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using MxGateway.Contracts.Proto; @@ -13,19 +15,47 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; /// ). /// /// +/// /// One pump per connected . Reconnect lives in PR 4.5's /// supervisor; on transport failure here we log + propagate so the supervisor can /// decide whether to restart. +/// +/// +/// PR 6.2 — the network-read loop and the listener-fanout loop are decoupled by a +/// bounded . When a listener is slow enough to fill the +/// channel, new events are dropped (newest-dropped semantics: producer's +/// TryWrite fails) rather than back-pressuring the gw stream. Three counters +/// on the ZB.MOM.WW.OtOpcUa.Driver.Galaxy meter expose received / dispatched +/// / dropped totals so ops sees pressure before it manifests as user-visible loss. +/// /// internal sealed class EventPump : IAsyncDisposable { + public const string MeterName = "ZB.MOM.WW.OtOpcUa.Driver.Galaxy"; + private const int DefaultChannelCapacity = 50_000; + + // Single static meter so a host-level MeterListener catches all pump instances. + private static readonly Meter Meter = new(MeterName); + private static readonly Counter EventsReceived = + Meter.CreateCounter("galaxy.events.received", unit: "{event}", + description: "MxEvents read from the gateway StreamEvents stream."); + private static readonly Counter EventsDispatched = + Meter.CreateCounter("galaxy.events.dispatched", unit: "{event}", + description: "MxEvents passed through the bounded channel and into OnDataChange."); + private static readonly Counter EventsDropped = + Meter.CreateCounter("galaxy.events.dropped", unit: "{event}", + description: "MxEvents dropped because the bounded channel was full (newest-dropped)."); + private readonly IGalaxySubscriber _subscriber; private readonly SubscriptionRegistry _registry; private readonly ILogger _logger; private readonly Func _handleFactory; + private readonly Channel _channel; + private readonly KeyValuePair _clientTag; private readonly CancellationTokenSource _cts = new(); private Task? _loop; + private Task? _dispatchLoop; private bool _disposed; public event EventHandler? OnDataChange; @@ -34,12 +64,30 @@ internal sealed class EventPump : IAsyncDisposable IGalaxySubscriber subscriber, SubscriptionRegistry registry, ILogger? logger = null, - Func? handleFactory = null) + Func? handleFactory = null, + int channelCapacity = DefaultChannelCapacity, + string? clientName = null) { _subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber)); _registry = registry ?? throw new ArgumentNullException(nameof(registry)); _logger = logger ?? NullLogger.Instance; _handleFactory = handleFactory ?? (id => new GalaxySubscriptionHandle(id)); + + if (channelCapacity < 1) + { + throw new ArgumentOutOfRangeException(nameof(channelCapacity), + "channelCapacity must be >= 1; recommended 50_000 for 50k-tag deployments."); + } + _channel = Channel.CreateBounded(new BoundedChannelOptions(channelCapacity) + { + // Newest-dropped policy: when full, the producer's TryWrite returns false + // and we account for the drop. We do this manually rather than relying on + // BoundedChannelFullMode.DropWrite so we can count drops without polling. + FullMode = BoundedChannelFullMode.Wait, + SingleReader = true, + SingleWriter = true, + }); + _clientTag = new KeyValuePair("galaxy.client", clientName ?? ""); } /// @@ -51,6 +99,7 @@ internal sealed class EventPump : IAsyncDisposable ObjectDisposedException.ThrowIf(_disposed, this); if (_loop is not null) return; _loop = Task.Run(() => RunAsync(_cts.Token)); + _dispatchLoop = Task.Run(() => DispatchLoopAsync(_cts.Token)); } private async Task RunAsync(CancellationToken ct) @@ -60,7 +109,15 @@ internal sealed class EventPump : IAsyncDisposable await foreach (var ev in _subscriber.StreamEventsAsync(ct).WithCancellation(ct).ConfigureAwait(false)) { if (ct.IsCancellationRequested) break; - Dispatch(ev); + EventsReceived.Add(1, _clientTag); + + // Newest-dropped: TryWrite fast-paths the common case (channel has room). + // When full we count the drop and continue reading the gw stream so + // back-pressure doesn't propagate upstream. + if (!_channel.Writer.TryWrite(ev)) + { + EventsDropped.Add(1, _clientTag); + } } } catch (OperationCanceledException) when (ct.IsCancellationRequested) @@ -72,6 +129,32 @@ internal sealed class EventPump : IAsyncDisposable _logger.LogWarning(ex, "Galaxy EventPump loop ended with an exception — reconnect supervisor (PR 4.5) handles restart."); } + finally + { + // Tell the dispatch loop the producer is done so it drains and exits. + _channel.Writer.TryComplete(); + } + } + + private async Task DispatchLoopAsync(CancellationToken ct) + { + try + { + await foreach (var ev in _channel.Reader.ReadAllAsync(ct).ConfigureAwait(false)) + { + Dispatch(ev); + EventsDispatched.Add(1, _clientTag); + } + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + // Clean shutdown. + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Galaxy EventPump dispatch loop ended with an exception — events past this point will be lost until restart."); + } } private void Dispatch(MxEvent ev) @@ -121,10 +204,15 @@ internal sealed class EventPump : IAsyncDisposable if (_disposed) return; _disposed = true; _cts.Cancel(); + _channel.Writer.TryComplete(); if (_loop is not null) { try { await _loop.ConfigureAwait(false); } catch { /* shutdown */ } } + if (_dispatchLoop is not null) + { + try { await _dispatchLoop.ConfigureAwait(false); } catch { /* shutdown */ } + } _cts.Dispose(); } } diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpBoundedChannelTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpBoundedChannelTests.cs new file mode 100644 index 0000000..0f7af24 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/EventPumpBoundedChannelTests.cs @@ -0,0 +1,179 @@ +using System.Diagnostics.Metrics; +using System.Threading.Channels; +using Google.Protobuf.WellKnownTypes; +using MxGateway.Contracts.Proto; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime; + +/// +/// PR 6.2 — pins the EventPump's bounded-channel + drop-newest behavior. We +/// hold the dispatch loop with a slow handler so the channel fills, then verify +/// the producer keeps reading from the gw stream and increments the +/// galaxy.events.dropped counter rather than blocking. +/// +public sealed class EventPumpBoundedChannelTests +{ + [Fact] + public async Task Drops_newest_when_channel_fills_and_records_metric() + { + var counters = StartMeterCapture(); + try + { + var subscriber = new ManualSubscriber(); + var registry = new SubscriptionRegistry(); + registry.Register(1, [new TagBinding("Tag.A", ItemHandle: 7)]); + + // Tiny channel + slow handler ⇒ producer hits FullMode.Wait → TryWrite false + // for every overflow event. + var dispatchGate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + await using var pump = new EventPump( + subscriber, registry, channelCapacity: 2, clientName: "PumpTest"); + pump.OnDataChange += async (_, _) => + { + // Block the dispatch loop until we've shoved enough events through to + // overflow the bounded channel. Consume the gate exactly once. + await dispatchGate.Task.ConfigureAwait(false); + }; + pump.Start(); + + const int totalEvents = 10; + for (var i = 0; i < totalEvents; i++) + { + await subscriber.EmitAsync(itemHandle: 7, value: i); + } + // Give the producer a beat to run TryWrite for every event. + await Task.Delay(150); + + // Capacity 2 + 1 in-flight in the dispatcher = 3 may have been accepted; the + // remainder should have hit the dropped counter. Don't pin exact counts — + // the scheduler can interleave; pin the invariants instead. + counters.Received.ShouldBeGreaterThanOrEqualTo(totalEvents); + counters.Dropped.ShouldBeGreaterThan(0, + "with capacity=2 and a held dispatcher we must drop at least one of 10 events"); + (counters.Received).ShouldBe(counters.Dispatched + counters.Dropped + counters.InFlight, + "received = dispatched + dropped + (events still queued)"); + + // Release the dispatcher so DisposeAsync can drain cleanly. + dispatchGate.TrySetResult(); + } + finally + { + counters.Dispose(); + } + } + + [Fact] + public async Task Throws_when_channelCapacity_is_invalid() + { + var subscriber = new ManualSubscriber(); + var registry = new SubscriptionRegistry(); + Should.Throw(() => + new EventPump(subscriber, registry, channelCapacity: 0)); + Should.Throw(() => + new EventPump(subscriber, registry, channelCapacity: -1)); + await Task.CompletedTask; + } + + [Fact] + public async Task Tags_metrics_with_client_name_for_multi_driver_hosts() + { + var captured = new List<(string Instrument, KeyValuePair[] Tags)>(); + using var listener = new MeterListener(); + listener.InstrumentPublished = (instr, l) => + { + if (instr.Meter.Name == EventPump.MeterName) l.EnableMeasurementEvents(instr); + }; + listener.SetMeasurementEventCallback((instr, _, tags, _) => + { + captured.Add((instr.Name, tags.ToArray())); + }); + listener.Start(); + + var subscriber = new ManualSubscriber(); + var registry = new SubscriptionRegistry(); + registry.Register(1, [new TagBinding("Tag.A", ItemHandle: 7)]); + + await using (var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "Driver-X")) + { + pump.Start(); + await subscriber.EmitAsync(7, 42.0); + await Task.Delay(100); + listener.RecordObservableInstruments(); + } + + // The static Meter is shared across all EventPump instances in the test + // assembly; xUnit may run other pump tests in parallel and their + // measurements land on the same listener. Filter to our pump's tag value. + var ours = captured + .Where(c => c.Tags.Any(t => t.Key == "galaxy.client" + && string.Equals((string?)t.Value, "Driver-X", StringComparison.Ordinal))) + .ToList(); + + ours.ShouldNotBeEmpty( + "at least one measurement from this test's pump must carry galaxy.client=Driver-X"); + ours.ShouldContain(c => c.Instrument == "galaxy.events.received"); + } + + private static CounterCapture StartMeterCapture() + { + var capture = new CounterCapture(); + var listener = new MeterListener(); + listener.InstrumentPublished = (instr, l) => + { + if (instr.Meter.Name == EventPump.MeterName) l.EnableMeasurementEvents(instr); + }; + listener.SetMeasurementEventCallback((instr, value, _, _) => + { + switch (instr.Name) + { + case "galaxy.events.received": Interlocked.Add(ref capture._received, value); break; + case "galaxy.events.dispatched": Interlocked.Add(ref capture._dispatched, value); break; + case "galaxy.events.dropped": Interlocked.Add(ref capture._dropped, value); break; + } + }); + listener.Start(); + capture.Listener = listener; + return capture; + } + + private sealed class CounterCapture : IDisposable + { + public MeterListener? Listener; + internal long _received, _dispatched, _dropped; + public long Received => Interlocked.Read(ref _received); + public long Dispatched => Interlocked.Read(ref _dispatched); + public long Dropped => Interlocked.Read(ref _dropped); + public long InFlight => Math.Max(0, Received - Dispatched - Dropped); + public void Dispose() => Listener?.Dispose(); + } + + private sealed class ManualSubscriber : IGalaxySubscriber + { + private readonly Channel _stream = + Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true }); + + public Task> SubscribeBulkAsync( + IReadOnlyList fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken) + => Task.FromResult>([]); + + public Task UnsubscribeBulkAsync(IReadOnlyList itemHandles, CancellationToken cancellationToken) + => Task.CompletedTask; + + public IAsyncEnumerable StreamEventsAsync(CancellationToken cancellationToken) + => _stream.Reader.ReadAllAsync(cancellationToken); + + public ValueTask EmitAsync(int itemHandle, double value) => + _stream.Writer.WriteAsync(new MxEvent + { + Family = MxEventFamily.OnDataChange, + ItemHandle = itemHandle, + Value = new MxValue { DoubleValue = value }, + Quality = 192, + SourceTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), + }); + } +}