PR 6.2 — Bounded EventPump channel + drop-newest metrics

Decouples the gw stream-read loop from the listener-fanout loop with a
bounded Channel<MxEvent> (default capacity 50_000) sitting between them.
When a slow listener fills the channel, the producer's TryWrite returns
false and we count the drop rather than back-pressuring the gw stream.

Three counters on the ZB.MOM.WW.OtOpcUa.Driver.Galaxy meter expose the
pressure curve before it manifests as user-visible loss:

- galaxy.events.received  — MxEvents read from StreamEvents
- galaxy.events.dispatched — MxEvents that made it through to OnDataChange
- galaxy.events.dropped   — MxEvents discarded because the channel was full

Each measurement carries a galaxy.client tag so multi-driver hosts can
split by source. The driver wires _options.MxAccess.ClientName into the
new EventPump constructor parameter.

Tests: drop-newest under pressure, capacity validation, and per-pump
measurement filtering (xUnit can run other pump tests in parallel and
their measurements land on the same listener — the test filters to its
own client name).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-29 16:50:39 -04:00
parent 619207e7f5
commit 7b21c3b428
3 changed files with 272 additions and 3 deletions

View File

@@ -511,7 +511,9 @@ public sealed class GalaxyDriver
lock (_pumpLock)
{
if (_eventPump is not null) return _eventPump;
_eventPump = new EventPump(_subscriber!, _subscriptions, _logger);
_eventPump = new EventPump(
_subscriber!, _subscriptions, _logger,
clientName: _options.MxAccess.ClientName);
_eventPump.OnDataChange += OnPumpDataChange;
_eventPump.Start();
return _eventPump;

View File

@@ -1,3 +1,5 @@
using System.Diagnostics.Metrics;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using MxGateway.Contracts.Proto;
@@ -13,19 +15,47 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
/// <see cref="SubscriptionRegistry.ResolveSubscribers"/>).
/// </summary>
/// <remarks>
/// <para>
/// One pump per connected <see cref="GalaxyMxSession"/>. Reconnect lives in PR 4.5's
/// supervisor; on transport failure here we log + propagate so the supervisor can
/// decide whether to restart.
/// </para>
/// <para>
/// PR 6.2 — the network-read loop and the listener-fanout loop are decoupled by a
/// bounded <see cref="Channel{T}"/>. When a listener is slow enough to fill the
/// channel, new events are dropped (newest-dropped semantics: producer's
/// <c>TryWrite</c> fails) rather than back-pressuring the gw stream. Three counters
/// on the <c>ZB.MOM.WW.OtOpcUa.Driver.Galaxy</c> meter expose received / dispatched
/// / dropped totals so ops sees pressure before it manifests as user-visible loss.
/// </para>
/// </remarks>
internal sealed class EventPump : IAsyncDisposable
{
public const string MeterName = "ZB.MOM.WW.OtOpcUa.Driver.Galaxy";
private const int DefaultChannelCapacity = 50_000;
// Single static meter so a host-level MeterListener catches all pump instances.
private static readonly Meter Meter = new(MeterName);
private static readonly Counter<long> EventsReceived =
Meter.CreateCounter<long>("galaxy.events.received", unit: "{event}",
description: "MxEvents read from the gateway StreamEvents stream.");
private static readonly Counter<long> EventsDispatched =
Meter.CreateCounter<long>("galaxy.events.dispatched", unit: "{event}",
description: "MxEvents passed through the bounded channel and into OnDataChange.");
private static readonly Counter<long> EventsDropped =
Meter.CreateCounter<long>("galaxy.events.dropped", unit: "{event}",
description: "MxEvents dropped because the bounded channel was full (newest-dropped).");
private readonly IGalaxySubscriber _subscriber;
private readonly SubscriptionRegistry _registry;
private readonly ILogger _logger;
private readonly Func<long, ISubscriptionHandle> _handleFactory;
private readonly Channel<MxEvent> _channel;
private readonly KeyValuePair<string, object?> _clientTag;
private readonly CancellationTokenSource _cts = new();
private Task? _loop;
private Task? _dispatchLoop;
private bool _disposed;
public event EventHandler<DataChangeEventArgs>? OnDataChange;
@@ -34,12 +64,30 @@ internal sealed class EventPump : IAsyncDisposable
IGalaxySubscriber subscriber,
SubscriptionRegistry registry,
ILogger? logger = null,
Func<long, ISubscriptionHandle>? handleFactory = null)
Func<long, ISubscriptionHandle>? handleFactory = null,
int channelCapacity = DefaultChannelCapacity,
string? clientName = null)
{
_subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber));
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
_logger = logger ?? NullLogger.Instance;
_handleFactory = handleFactory ?? (id => new GalaxySubscriptionHandle(id));
if (channelCapacity < 1)
{
throw new ArgumentOutOfRangeException(nameof(channelCapacity),
"channelCapacity must be >= 1; recommended 50_000 for 50k-tag deployments.");
}
_channel = Channel.CreateBounded<MxEvent>(new BoundedChannelOptions(channelCapacity)
{
// Newest-dropped policy: when full, the producer's TryWrite returns false
// and we account for the drop. We do this manually rather than relying on
// BoundedChannelFullMode.DropWrite so we can count drops without polling.
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = true,
});
_clientTag = new KeyValuePair<string, object?>("galaxy.client", clientName ?? "<unknown>");
}
/// <summary>
@@ -51,6 +99,7 @@ internal sealed class EventPump : IAsyncDisposable
ObjectDisposedException.ThrowIf(_disposed, this);
if (_loop is not null) return;
_loop = Task.Run(() => RunAsync(_cts.Token));
_dispatchLoop = Task.Run(() => DispatchLoopAsync(_cts.Token));
}
private async Task RunAsync(CancellationToken ct)
@@ -60,7 +109,15 @@ internal sealed class EventPump : IAsyncDisposable
await foreach (var ev in _subscriber.StreamEventsAsync(ct).WithCancellation(ct).ConfigureAwait(false))
{
if (ct.IsCancellationRequested) break;
Dispatch(ev);
EventsReceived.Add(1, _clientTag);
// Newest-dropped: TryWrite fast-paths the common case (channel has room).
// When full we count the drop and continue reading the gw stream so
// back-pressure doesn't propagate upstream.
if (!_channel.Writer.TryWrite(ev))
{
EventsDropped.Add(1, _clientTag);
}
}
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
@@ -72,6 +129,32 @@ internal sealed class EventPump : IAsyncDisposable
_logger.LogWarning(ex,
"Galaxy EventPump loop ended with an exception — reconnect supervisor (PR 4.5) handles restart.");
}
finally
{
// Tell the dispatch loop the producer is done so it drains and exits.
_channel.Writer.TryComplete();
}
}
private async Task DispatchLoopAsync(CancellationToken ct)
{
try
{
await foreach (var ev in _channel.Reader.ReadAllAsync(ct).ConfigureAwait(false))
{
Dispatch(ev);
EventsDispatched.Add(1, _clientTag);
}
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Clean shutdown.
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Galaxy EventPump dispatch loop ended with an exception — events past this point will be lost until restart.");
}
}
private void Dispatch(MxEvent ev)
@@ -121,10 +204,15 @@ internal sealed class EventPump : IAsyncDisposable
if (_disposed) return;
_disposed = true;
_cts.Cancel();
_channel.Writer.TryComplete();
if (_loop is not null)
{
try { await _loop.ConfigureAwait(false); } catch { /* shutdown */ }
}
if (_dispatchLoop is not null)
{
try { await _dispatchLoop.ConfigureAwait(false); } catch { /* shutdown */ }
}
_cts.Dispose();
}
}