using System.Diagnostics.Metrics; using System.Threading.Channels; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using MxGateway.Contracts.Proto; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; /// /// Long-running consumer of . Translates /// each with family into /// and dispatches one event per registered driver /// subscription that includes the changed item handle (fan-out via /// ). /// /// /// /// One pump per connected . Reconnect lives in the /// ; on transport failure here we log and invoke /// the optional onStreamFault callback so the owner (GalaxyDriver) can /// forward the fault to /// and the supervisor can drive reopen → replay. /// /// /// PR 6.2 — the network-read loop and the listener-fanout loop are decoupled by a /// bounded . When a listener is slow enough to fill the /// channel, new events are dropped (newest-dropped semantics: producer's /// TryWrite fails) rather than back-pressuring the gw stream. Three counters /// on the ZB.MOM.WW.OtOpcUa.Driver.Galaxy meter expose received / dispatched /// / dropped totals so ops sees pressure before it manifests as user-visible loss. /// /// internal sealed class EventPump : IAsyncDisposable { public const string MeterName = "ZB.MOM.WW.OtOpcUa.Driver.Galaxy"; private const int DefaultChannelCapacity = 50_000; // Single static meter so a host-level MeterListener catches all pump instances. private static readonly Meter Meter = new(MeterName); private static readonly Counter EventsReceived = Meter.CreateCounter("galaxy.events.received", unit: "{event}", description: "MxEvents read from the gateway StreamEvents stream."); private static readonly Counter EventsDispatched = Meter.CreateCounter("galaxy.events.dispatched", unit: "{event}", description: "MxEvents passed through the bounded channel and into OnDataChange."); private static readonly Counter EventsDropped = Meter.CreateCounter("galaxy.events.dropped", unit: "{event}", description: "MxEvents dropped because the bounded channel was full (newest-dropped)."); private readonly IGalaxySubscriber _subscriber; private readonly SubscriptionRegistry _registry; private readonly ILogger _logger; private readonly Func _handleFactory; private readonly Action? _onStreamFault; private readonly Channel _channel; private readonly KeyValuePair _clientTag; private readonly CancellationTokenSource _cts = new(); private Task? _loop; private Task? _dispatchLoop; private bool _disposed; public event EventHandler? OnDataChange; public EventPump( IGalaxySubscriber subscriber, SubscriptionRegistry registry, ILogger? logger = null, Func? handleFactory = null, int channelCapacity = DefaultChannelCapacity, string? clientName = null, Action? onStreamFault = null) { _subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber)); _registry = registry ?? throw new ArgumentNullException(nameof(registry)); _logger = logger ?? NullLogger.Instance; _handleFactory = handleFactory ?? (id => new GalaxySubscriptionHandle(id)); _onStreamFault = onStreamFault; if (channelCapacity < 1) { throw new ArgumentOutOfRangeException(nameof(channelCapacity), "channelCapacity must be >= 1; recommended 50_000 for 50k-tag deployments."); } _channel = Channel.CreateBounded(new BoundedChannelOptions(channelCapacity) { // Newest-dropped semantics: we use FullMode.Wait but never call the // awaitable WriteAsync — only the synchronous TryWrite below in // RunAsync. With Wait + TryWrite, a full channel makes TryWrite return // false immediately, which we account for via the EventsDropped counter. // We deliberately do NOT use BoundedChannelFullMode.DropWrite — that // would silently discard the new event inside Channel without // surfacing the drop on a counter (Driver.Galaxy-005: keep the comment // and the FullMode value consistent). FullMode = BoundedChannelFullMode.Wait, SingleReader = true, SingleWriter = true, }); _clientTag = new KeyValuePair("galaxy.client", clientName ?? ""); } /// /// Start consuming the event stream on a background task. Idempotent — second /// calls are no-ops while the loop is running. /// public void Start() { ObjectDisposedException.ThrowIf(_disposed, this); if (_loop is not null) return; _loop = Task.Run(() => RunAsync(_cts.Token)); _dispatchLoop = Task.Run(() => DispatchLoopAsync(_cts.Token)); } private async Task RunAsync(CancellationToken ct) { try { await foreach (var ev in _subscriber.StreamEventsAsync(ct).WithCancellation(ct).ConfigureAwait(false)) { if (ct.IsCancellationRequested) break; EventsReceived.Add(1, _clientTag); // Newest-dropped: TryWrite fast-paths the common case (channel has room). // When full we count the drop and continue reading the gw stream so // back-pressure doesn't propagate upstream. if (!_channel.Writer.TryWrite(ev)) { EventsDropped.Add(1, _clientTag); } } } catch (OperationCanceledException) when (ct.IsCancellationRequested) { // Clean shutdown — no log. } catch (Exception ex) { _logger.LogWarning(ex, "Galaxy EventPump loop ended with an exception — notifying reconnect supervisor."); // The gw StreamEvents stream faulted. Signal the reconnect supervisor so it // drives reopen → replay. Without this the stream silently dies and a // transient gateway drop permanently stops data-change notifications. if (_onStreamFault is not null) { try { _onStreamFault(ex); } catch (Exception cbEx) { _logger.LogWarning(cbEx, "Galaxy EventPump stream-fault callback threw — supervisor may not have been notified."); } } } finally { // Tell the dispatch loop the producer is done so it drains and exits. _channel.Writer.TryComplete(); } } private async Task DispatchLoopAsync(CancellationToken ct) { try { await foreach (var ev in _channel.Reader.ReadAllAsync(ct).ConfigureAwait(false)) { Dispatch(ev); EventsDispatched.Add(1, _clientTag); } } catch (OperationCanceledException) when (ct.IsCancellationRequested) { // Clean shutdown. } catch (Exception ex) { _logger.LogWarning(ex, "Galaxy EventPump dispatch loop ended with an exception — events past this point will be lost until restart."); } } private void Dispatch(MxEvent ev) { switch (ev.Family) { case MxEventFamily.OnDataChange: DispatchDataChange(ev); break; default: // OnAlarmTransition is no longer carried on the per-session event stream // — alarms come from the gateway's session-less StreamAlarms feed // (GatewayGalaxyAlarmFeed). OnWriteComplete / OperationComplete / // OnBufferedDataChange are filtered out: write callers get their reply // via the InvokeAsync round-trip, not via the event stream. return; } } private void DispatchDataChange(MxEvent ev) { var subscribers = _registry.ResolveSubscribers(ev.ItemHandle); if (subscribers.Count == 0) return; // stale event after unsubscribe — drop quietly var snapshot = ToSnapshot(ev); foreach (var (subscriptionId, fullReference) in subscribers) { var handle = _handleFactory(subscriptionId); try { OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, fullReference, snapshot)); } catch (Exception ex) { _logger.LogWarning(ex, "Galaxy OnDataChange handler threw for {FullRef} subscription {SubscriptionId} — continuing fan-out.", fullReference, subscriptionId); } } } private DataValueSnapshot ToSnapshot(MxEvent ev) { var value = MxValueDecoder.Decode(ev.Value); var statusCode = ev.Statuses.Count > 0 ? StatusCodeMap.FromMxStatus(ev.Statuses[0], _logger) : StatusCodeMap.FromQualityByte((byte)(ev.Quality & 0xFF), _logger); DateTime? sourceTimestamp = ev.SourceTimestamp is { } ts ? ts.ToDateTime() : null; return new DataValueSnapshot( Value: value, StatusCode: statusCode, SourceTimestampUtc: sourceTimestamp, ServerTimestampUtc: DateTime.UtcNow); } public async ValueTask DisposeAsync() { if (_disposed) return; _disposed = true; _cts.Cancel(); _channel.Writer.TryComplete(); if (_loop is not null) { try { await _loop.ConfigureAwait(false); } catch { /* shutdown */ } } if (_dispatchLoop is not null) { try { await _dispatchLoop.ConfigureAwait(false); } catch { /* shutdown */ } } _cts.Dispose(); } }