Files
lmxopcua/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/Runtime/EventPump.cs
Joseph Doherty 27a8d05b7c feat(driver-galaxy): consume the gateway's session-less alarm model
The mxaccessgw updated alarms to a session-less central monitor:
AcknowledgeAlarm dropped SessionId and alarm transitions now come from
the session-less StreamAlarms feed instead of the per-session worker
StreamEvents stream. The GalaxyDriver no longer compiled against the
updated client.

- GatewayGalaxyAlarmAcknowledger: session-less rewrite — no GalaxyMxSession;
  outcome read from ProtocolStatus (throw) and Hresult (warn).
- New IGalaxyAlarmFeed seam + GatewayGalaxyAlarmFeed: background consumer
  of StreamAlarms that decodes the active-alarm snapshot plus live
  transitions into GalaxyAlarmTransition and reopens the stream on
  transport faults.
- EventPump: drop the dead per-session OnAlarmTransition path; the
  per-session stream no longer carries alarms.
- GalaxyDriver: bridge the feed onto IAlarmSource.OnAlarmEvent; the feed
  starts on SubscribeAlarmsAsync, independent of data subscriptions.
- Tests: replace EventPumpAlarmTests with GatewayGalaxyAlarmFeedTests;
  move the driver alarm-source tests onto the IGalaxyAlarmFeed seam.

Browse needed no change — GatewayGalaxyHierarchySource consumes the
unchanged DiscoverHierarchy contract.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 03:59:36 -04:00

231 lines
9.2 KiB
C#

using System.Diagnostics.Metrics;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using MxGateway.Contracts.Proto;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
/// <summary>
/// Long-running consumer of <see cref="IGalaxySubscriber.StreamEventsAsync"/>. Translates
/// each <see cref="MxEvent"/> with family <see cref="MxEventFamily.OnDataChange"/> into
/// <see cref="DataChangeEventArgs"/> and dispatches one event per registered driver
/// subscription that includes the changed item handle (fan-out via
/// <see cref="SubscriptionRegistry.ResolveSubscribers"/>).
/// </summary>
/// <remarks>
/// <para>
/// One pump per connected <see cref="GalaxyMxSession"/>. Reconnect lives in PR 4.5's
/// supervisor; on transport failure here we log + propagate so the supervisor can
/// decide whether to restart.
/// </para>
/// <para>
/// PR 6.2 — the network-read loop and the listener-fanout loop are decoupled by a
/// bounded <see cref="Channel{T}"/>. When a listener is slow enough to fill the
/// channel, new events are dropped (newest-dropped semantics: producer's
/// <c>TryWrite</c> fails) rather than back-pressuring the gw stream. Three counters
/// on the <c>ZB.MOM.WW.OtOpcUa.Driver.Galaxy</c> meter expose received / dispatched
/// / dropped totals so ops sees pressure before it manifests as user-visible loss.
/// </para>
/// </remarks>
internal sealed class EventPump : IAsyncDisposable
{
public const string MeterName = "ZB.MOM.WW.OtOpcUa.Driver.Galaxy";
private const int DefaultChannelCapacity = 50_000;
// Single static meter so a host-level MeterListener catches all pump instances.
private static readonly Meter Meter = new(MeterName);
private static readonly Counter<long> EventsReceived =
Meter.CreateCounter<long>("galaxy.events.received", unit: "{event}",
description: "MxEvents read from the gateway StreamEvents stream.");
private static readonly Counter<long> EventsDispatched =
Meter.CreateCounter<long>("galaxy.events.dispatched", unit: "{event}",
description: "MxEvents passed through the bounded channel and into OnDataChange.");
private static readonly Counter<long> EventsDropped =
Meter.CreateCounter<long>("galaxy.events.dropped", unit: "{event}",
description: "MxEvents dropped because the bounded channel was full (newest-dropped).");
private readonly IGalaxySubscriber _subscriber;
private readonly SubscriptionRegistry _registry;
private readonly ILogger _logger;
private readonly Func<long, ISubscriptionHandle> _handleFactory;
private readonly Channel<MxEvent> _channel;
private readonly KeyValuePair<string, object?> _clientTag;
private readonly CancellationTokenSource _cts = new();
private Task? _loop;
private Task? _dispatchLoop;
private bool _disposed;
public event EventHandler<DataChangeEventArgs>? OnDataChange;
public EventPump(
IGalaxySubscriber subscriber,
SubscriptionRegistry registry,
ILogger? logger = null,
Func<long, ISubscriptionHandle>? handleFactory = null,
int channelCapacity = DefaultChannelCapacity,
string? clientName = null)
{
_subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber));
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
_logger = logger ?? NullLogger.Instance;
_handleFactory = handleFactory ?? (id => new GalaxySubscriptionHandle(id));
if (channelCapacity < 1)
{
throw new ArgumentOutOfRangeException(nameof(channelCapacity),
"channelCapacity must be >= 1; recommended 50_000 for 50k-tag deployments.");
}
_channel = Channel.CreateBounded<MxEvent>(new BoundedChannelOptions(channelCapacity)
{
// Newest-dropped policy: when full, the producer's TryWrite returns false
// and we account for the drop. We do this manually rather than relying on
// BoundedChannelFullMode.DropWrite so we can count drops without polling.
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = true,
});
_clientTag = new KeyValuePair<string, object?>("galaxy.client", clientName ?? "<unknown>");
}
/// <summary>
/// Start consuming the event stream on a background task. Idempotent — second
/// calls are no-ops while the loop is running.
/// </summary>
public void Start()
{
ObjectDisposedException.ThrowIf(_disposed, this);
if (_loop is not null) return;
_loop = Task.Run(() => RunAsync(_cts.Token));
_dispatchLoop = Task.Run(() => DispatchLoopAsync(_cts.Token));
}
private async Task RunAsync(CancellationToken ct)
{
try
{
await foreach (var ev in _subscriber.StreamEventsAsync(ct).WithCancellation(ct).ConfigureAwait(false))
{
if (ct.IsCancellationRequested) break;
EventsReceived.Add(1, _clientTag);
// Newest-dropped: TryWrite fast-paths the common case (channel has room).
// When full we count the drop and continue reading the gw stream so
// back-pressure doesn't propagate upstream.
if (!_channel.Writer.TryWrite(ev))
{
EventsDropped.Add(1, _clientTag);
}
}
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Clean shutdown — no log.
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Galaxy EventPump loop ended with an exception — reconnect supervisor (PR 4.5) handles restart.");
}
finally
{
// Tell the dispatch loop the producer is done so it drains and exits.
_channel.Writer.TryComplete();
}
}
private async Task DispatchLoopAsync(CancellationToken ct)
{
try
{
await foreach (var ev in _channel.Reader.ReadAllAsync(ct).ConfigureAwait(false))
{
Dispatch(ev);
EventsDispatched.Add(1, _clientTag);
}
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Clean shutdown.
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Galaxy EventPump dispatch loop ended with an exception — events past this point will be lost until restart.");
}
}
private void Dispatch(MxEvent ev)
{
switch (ev.Family)
{
case MxEventFamily.OnDataChange:
DispatchDataChange(ev);
break;
default:
// OnAlarmTransition is no longer carried on the per-session event stream
// — alarms come from the gateway's session-less StreamAlarms feed
// (GatewayGalaxyAlarmFeed). OnWriteComplete / OperationComplete /
// OnBufferedDataChange are filtered out: write callers get their reply
// via the InvokeAsync round-trip, not via the event stream.
return;
}
}
private void DispatchDataChange(MxEvent ev)
{
var subscribers = _registry.ResolveSubscribers(ev.ItemHandle);
if (subscribers.Count == 0) return; // stale event after unsubscribe — drop quietly
var snapshot = ToSnapshot(ev);
foreach (var (subscriptionId, fullReference) in subscribers)
{
var handle = _handleFactory(subscriptionId);
try
{
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, fullReference, snapshot));
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Galaxy OnDataChange handler threw for {FullRef} subscription {SubscriptionId} — continuing fan-out.",
fullReference, subscriptionId);
}
}
}
private DataValueSnapshot ToSnapshot(MxEvent ev)
{
var value = MxValueDecoder.Decode(ev.Value);
var statusCode = ev.Statuses.Count > 0
? StatusCodeMap.FromMxStatus(ev.Statuses[0], _logger)
: StatusCodeMap.FromQualityByte((byte)(ev.Quality & 0xFF), _logger);
DateTime? sourceTimestamp = ev.SourceTimestamp is { } ts ? ts.ToDateTime() : null;
return new DataValueSnapshot(
Value: value,
StatusCode: statusCode,
SourceTimestampUtc: sourceTimestamp,
ServerTimestampUtc: DateTime.UtcNow);
}
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
_cts.Cancel();
_channel.Writer.TryComplete();
if (_loop is not null)
{
try { await _loop.ConfigureAwait(false); } catch { /* shutdown */ }
}
if (_dispatchLoop is not null)
{
try { await _dispatchLoop.ConfigureAwait(false); } catch { /* shutdown */ }
}
_cts.Dispose();
}
}