560b327ee1
v2-ci / build (push) Failing after 33s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
Imports the freshly-rebuilt ZB.MOM.WW.MxGateway.Client + ZB.MOM.WW.MxGateway.Contracts nupkgs (0.1.0) from /tmp/mxgw-dist. Replaces the vendored libs/ DLLs and the pre-restructure MxGateway.* namespaces across the runtime Galaxy driver, Galaxy.Browser, and their tests. Key changes: - nuget-packages/ added as a local feed via NuGet.config; .gitignore exempts it from the *.nupkg rule so the packages are tracked - Directory.Packages.props pins both packages at 0.1.0 - 4 csprojs swap <Reference HintPath="libs/...dll"/> for <PackageReference/> - 36 .cs files renamed `using MxGateway.*` -> `using ZB.MOM.WW.MxGateway.*` - libs/ removed (vendored DLLs + README.md) GalaxyBrowseSession rewritten around the new lazy API: - RootAsync calls GalaxyRepositoryClient.BrowseAsync (returns LazyBrowseNodes) and caches them by TagName instead of bulk-fetching the whole hierarchy - ExpandAsync looks up the cached LazyBrowseNode and calls its ExpandAsync, giving true one-wire-call-per-click instead of in-memory parent/child scan - _byGobjectId + _hasChildrenSet dropped (LazyBrowseNode carries HasChildrenHint) - AttributesAsync unchanged (already uses DiscoverHierarchyAsync MaxDepth=0) Tests: Galaxy.Tests 245/245, Galaxy.Browser.Tests 10/10, AdminUI.Tests 66/66. Pre-existing 12 solution errors unchanged (test sinks + Cli XML comments).
265 lines
12 KiB
C#
265 lines
12 KiB
C#
using System.Diagnostics.Metrics;
|
|
using System.Threading.Channels;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
|
|
|
/// <summary>
|
|
/// Long-running consumer of <see cref="IGalaxySubscriber.StreamEventsAsync"/>. Translates
|
|
/// each <see cref="MxEvent"/> with family <see cref="MxEventFamily.OnDataChange"/> into
|
|
/// <see cref="DataChangeEventArgs"/> and dispatches one event per registered driver
|
|
/// subscription that includes the changed item handle (fan-out via
|
|
/// <see cref="SubscriptionRegistry.ResolveSubscribers"/>).
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// One pump per connected <see cref="GalaxyMxSession"/>. Reconnect lives in the
|
|
/// <see cref="ReconnectSupervisor"/>; on transport failure here we log and invoke
|
|
/// the optional <c>onStreamFault</c> callback so the owner (GalaxyDriver) can
|
|
/// forward the fault to <see cref="ReconnectSupervisor.ReportTransportFailure"/>
|
|
/// and the supervisor can drive reopen → replay.
|
|
/// </para>
|
|
/// <para>
|
|
/// PR 6.2 — the network-read loop and the listener-fanout loop are decoupled by a
|
|
/// bounded <see cref="Channel{T}"/>. When a listener is slow enough to fill the
|
|
/// channel, new events are dropped (newest-dropped semantics: producer's
|
|
/// <c>TryWrite</c> fails) rather than back-pressuring the gw stream. Three counters
|
|
/// on the <c>ZB.MOM.WW.OtOpcUa.Driver.Galaxy</c> meter expose received / dispatched
|
|
/// / dropped totals so ops sees pressure before it manifests as user-visible loss.
|
|
/// </para>
|
|
/// </remarks>
|
|
internal sealed class EventPump : IAsyncDisposable
|
|
{
|
|
public const string MeterName = "ZB.MOM.WW.OtOpcUa.Driver.Galaxy";
|
|
private const int DefaultChannelCapacity = 50_000;
|
|
|
|
// Single static meter so a host-level MeterListener catches all pump instances.
|
|
private static readonly Meter Meter = new(MeterName);
|
|
private static readonly Counter<long> EventsReceived =
|
|
Meter.CreateCounter<long>("galaxy.events.received", unit: "{event}",
|
|
description: "MxEvents read from the gateway StreamEvents stream.");
|
|
private static readonly Counter<long> EventsDispatched =
|
|
Meter.CreateCounter<long>("galaxy.events.dispatched", unit: "{event}",
|
|
description: "MxEvents passed through the bounded channel and into OnDataChange.");
|
|
private static readonly Counter<long> EventsDropped =
|
|
Meter.CreateCounter<long>("galaxy.events.dropped", unit: "{event}",
|
|
description: "MxEvents dropped because the bounded channel was full (newest-dropped).");
|
|
|
|
private readonly IGalaxySubscriber _subscriber;
|
|
private readonly SubscriptionRegistry _registry;
|
|
private readonly ILogger _logger;
|
|
private readonly Func<long, ISubscriptionHandle> _handleFactory;
|
|
private readonly Action<Exception>? _onStreamFault;
|
|
private readonly Channel<MxEvent> _channel;
|
|
private readonly KeyValuePair<string, object?> _clientTag;
|
|
private readonly CancellationTokenSource _cts = new();
|
|
|
|
private Task? _loop;
|
|
private Task? _dispatchLoop;
|
|
private bool _disposed;
|
|
|
|
/// <summary>Occurs when a data change event is received from the Galaxy subscriber.</summary>
|
|
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
|
|
|
/// <summary>Initializes a new instance of the EventPump class.</summary>
|
|
/// <param name="subscriber">The Galaxy subscriber to consume events from.</param>
|
|
/// <param name="registry">The subscription registry for resolving subscribers.</param>
|
|
/// <param name="logger">The logger instance; if null, uses NullLogger.</param>
|
|
/// <param name="handleFactory">The factory for creating subscription handles; if null, uses GalaxySubscriptionHandle.</param>
|
|
/// <param name="channelCapacity">The bounded channel capacity for buffering events.</param>
|
|
/// <param name="clientName">The client name for metric tagging; if null, uses "<unknown>".</param>
|
|
/// <param name="onStreamFault">Optional callback invoked when the stream faults.</param>
|
|
public EventPump(
|
|
IGalaxySubscriber subscriber,
|
|
SubscriptionRegistry registry,
|
|
ILogger? logger = null,
|
|
Func<long, ISubscriptionHandle>? handleFactory = null,
|
|
int channelCapacity = DefaultChannelCapacity,
|
|
string? clientName = null,
|
|
Action<Exception>? onStreamFault = null)
|
|
{
|
|
_subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber));
|
|
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
|
|
_logger = logger ?? NullLogger.Instance;
|
|
_handleFactory = handleFactory ?? (id => new GalaxySubscriptionHandle(id));
|
|
_onStreamFault = onStreamFault;
|
|
|
|
if (channelCapacity < 1)
|
|
{
|
|
throw new ArgumentOutOfRangeException(nameof(channelCapacity),
|
|
"channelCapacity must be >= 1; recommended 50_000 for 50k-tag deployments.");
|
|
}
|
|
_channel = Channel.CreateBounded<MxEvent>(new BoundedChannelOptions(channelCapacity)
|
|
{
|
|
// Newest-dropped semantics: we use FullMode.Wait but never call the
|
|
// awaitable WriteAsync — only the synchronous TryWrite below in
|
|
// RunAsync. With Wait + TryWrite, a full channel makes TryWrite return
|
|
// false immediately, which we account for via the EventsDropped counter.
|
|
// We deliberately do NOT use BoundedChannelFullMode.DropWrite — that
|
|
// would silently discard the new event inside Channel<T> without
|
|
// surfacing the drop on a counter (Driver.Galaxy-005: keep the comment
|
|
// and the FullMode value consistent).
|
|
FullMode = BoundedChannelFullMode.Wait,
|
|
SingleReader = true,
|
|
SingleWriter = true,
|
|
});
|
|
_clientTag = new KeyValuePair<string, object?>("galaxy.client", clientName ?? "<unknown>");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Start consuming the event stream on a background task. Idempotent — second
|
|
/// calls are no-ops while the loop is running.
|
|
/// </summary>
|
|
public void Start()
|
|
{
|
|
ObjectDisposedException.ThrowIf(_disposed, this);
|
|
if (_loop is not null) return;
|
|
_loop = Task.Run(() => RunAsync(_cts.Token));
|
|
_dispatchLoop = Task.Run(() => DispatchLoopAsync(_cts.Token));
|
|
}
|
|
|
|
private async Task RunAsync(CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
await foreach (var ev in _subscriber.StreamEventsAsync(ct).WithCancellation(ct).ConfigureAwait(false))
|
|
{
|
|
if (ct.IsCancellationRequested) break;
|
|
EventsReceived.Add(1, _clientTag);
|
|
|
|
// Newest-dropped: TryWrite fast-paths the common case (channel has room).
|
|
// When full we count the drop and continue reading the gw stream so
|
|
// back-pressure doesn't propagate upstream.
|
|
if (!_channel.Writer.TryWrite(ev))
|
|
{
|
|
EventsDropped.Add(1, _clientTag);
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
|
{
|
|
// Clean shutdown — no log.
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"Galaxy EventPump loop ended with an exception — notifying reconnect supervisor.");
|
|
|
|
// The gw StreamEvents stream faulted. Signal the reconnect supervisor so it
|
|
// drives reopen → replay. Without this the stream silently dies and a
|
|
// transient gateway drop permanently stops data-change notifications.
|
|
if (_onStreamFault is not null)
|
|
{
|
|
try { _onStreamFault(ex); }
|
|
catch (Exception cbEx)
|
|
{
|
|
_logger.LogWarning(cbEx,
|
|
"Galaxy EventPump stream-fault callback threw — supervisor may not have been notified.");
|
|
}
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
// Tell the dispatch loop the producer is done so it drains and exits.
|
|
_channel.Writer.TryComplete();
|
|
}
|
|
}
|
|
|
|
private async Task DispatchLoopAsync(CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
await foreach (var ev in _channel.Reader.ReadAllAsync(ct).ConfigureAwait(false))
|
|
{
|
|
Dispatch(ev);
|
|
EventsDispatched.Add(1, _clientTag);
|
|
}
|
|
}
|
|
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
|
{
|
|
// Clean shutdown.
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"Galaxy EventPump dispatch loop ended with an exception — events past this point will be lost until restart.");
|
|
}
|
|
}
|
|
|
|
private void Dispatch(MxEvent ev)
|
|
{
|
|
switch (ev.Family)
|
|
{
|
|
case MxEventFamily.OnDataChange:
|
|
DispatchDataChange(ev);
|
|
break;
|
|
default:
|
|
// OnAlarmTransition is no longer carried on the per-session event stream
|
|
// — alarms come from the gateway's session-less StreamAlarms feed
|
|
// (GatewayGalaxyAlarmFeed). OnWriteComplete / OperationComplete /
|
|
// OnBufferedDataChange are filtered out: write callers get their reply
|
|
// via the InvokeAsync round-trip, not via the event stream.
|
|
return;
|
|
}
|
|
}
|
|
|
|
private void DispatchDataChange(MxEvent ev)
|
|
{
|
|
var subscribers = _registry.ResolveSubscribers(ev.ItemHandle);
|
|
if (subscribers.Count == 0) return; // stale event after unsubscribe — drop quietly
|
|
|
|
var snapshot = ToSnapshot(ev);
|
|
foreach (var (subscriptionId, fullReference) in subscribers)
|
|
{
|
|
var handle = _handleFactory(subscriptionId);
|
|
try
|
|
{
|
|
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, fullReference, snapshot));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"Galaxy OnDataChange handler threw for {FullRef} subscription {SubscriptionId} — continuing fan-out.",
|
|
fullReference, subscriptionId);
|
|
}
|
|
}
|
|
}
|
|
|
|
private DataValueSnapshot ToSnapshot(MxEvent ev)
|
|
{
|
|
var value = MxValueDecoder.Decode(ev.Value);
|
|
var statusCode = ev.Statuses.Count > 0
|
|
? StatusCodeMap.FromMxStatus(ev.Statuses[0], _logger)
|
|
: StatusCodeMap.FromQualityByte((byte)(ev.Quality & 0xFF), _logger);
|
|
|
|
DateTime? sourceTimestamp = ev.SourceTimestamp is { } ts ? ts.ToDateTime() : null;
|
|
return new DataValueSnapshot(
|
|
Value: value,
|
|
StatusCode: statusCode,
|
|
SourceTimestampUtc: sourceTimestamp,
|
|
ServerTimestampUtc: DateTime.UtcNow);
|
|
}
|
|
|
|
/// <summary>Disposes the event pump and cancels all running tasks.</summary>
|
|
/// <returns>A value task representing the asynchronous disposal operation.</returns>
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
if (_disposed) return;
|
|
_disposed = true;
|
|
_cts.Cancel();
|
|
_channel.Writer.TryComplete();
|
|
if (_loop is not null)
|
|
{
|
|
try { await _loop.ConfigureAwait(false); } catch { /* shutdown */ }
|
|
}
|
|
if (_dispatchLoop is not null)
|
|
{
|
|
try { await _dispatchLoop.ConfigureAwait(false); } catch { /* shutdown */ }
|
|
}
|
|
_cts.Dispose();
|
|
}
|
|
}
|