7853e94f4b
Second PR of the alarms-over-gateway epic (docs/plans/alarms-over-gateway.md). Depends on PR A.1 in mxaccessgw (merged) which added the OnAlarmTransitionEvent body + family. No runtime impact yet — the gateway doesn't emit the new family until A.3 ships; this PR just stops dropping it on the floor. EventPump.Dispatch becomes a switch on MxEventFamily. The new DispatchAlarmTransition decodes the proto event, runs the raw severity through MxAccessSeverityMapper (the same four-bucket ladder v1 used — 250/500/750/1000 boundaries per docs/v1/AlarmTracking.md), and fires an internal OnAlarmTransition event with a GalaxyAlarmTransition record carrying the full payload. Body absent or transition-kind unspecified → counted via galaxy.alarm_transitions.decoding_failures and dropped. Gateway version skew or worker malformed event therefore degrades to "fall back to the sub-attribute path" rather than crashing the pump. GalaxyDriver consumes the internal event in PR B.2 (next), wrapping it onto IAlarmSource.OnAlarmEvent. The richer fields (operator user + comment, original raise time, category) become visible on the OPC UA Part 9 condition once AlarmEventArgs gets extended in E.7. Tests: - MxAccessSeverityMapperTests — full bucket ladder + clamp behaviour for negative + out-of-range inputs. - EventPumpAlarmTests — raise/ack/clear sequence dispatches in order with operator metadata + original-raise preserved; unspecified kind drops; missing body drops; mixed data-change + alarm streams dispatch independently; OnWriteComplete / OperationComplete filtered out. Full Driver.Galaxy.Tests suite: 196 passed (was 191 — 5 new tests). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
314 lines
13 KiB
C#
314 lines
13 KiB
C#
using System.Diagnostics.Metrics;
|
|
using System.Threading.Channels;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using MxGateway.Contracts.Proto;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
|
|
|
/// <summary>
|
|
/// Long-running consumer of <see cref="IGalaxySubscriber.StreamEventsAsync"/>. Translates
|
|
/// each <see cref="MxEvent"/> with family <see cref="MxEventFamily.OnDataChange"/> into
|
|
/// <see cref="DataChangeEventArgs"/> and dispatches one event per registered driver
|
|
/// subscription that includes the changed item handle (fan-out via
|
|
/// <see cref="SubscriptionRegistry.ResolveSubscribers"/>).
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// One pump per connected <see cref="GalaxyMxSession"/>. Reconnect lives in PR 4.5's
|
|
/// supervisor; on transport failure here we log + propagate so the supervisor can
|
|
/// decide whether to restart.
|
|
/// </para>
|
|
/// <para>
|
|
/// PR 6.2 — the network-read loop and the listener-fanout loop are decoupled by a
|
|
/// bounded <see cref="Channel{T}"/>. When a listener is slow enough to fill the
|
|
/// channel, new events are dropped (newest-dropped semantics: producer's
|
|
/// <c>TryWrite</c> fails) rather than back-pressuring the gw stream. Three counters
|
|
/// on the <c>ZB.MOM.WW.OtOpcUa.Driver.Galaxy</c> meter expose received / dispatched
|
|
/// / dropped totals so ops sees pressure before it manifests as user-visible loss.
|
|
/// </para>
|
|
/// </remarks>
|
|
internal sealed class EventPump : IAsyncDisposable
|
|
{
|
|
public const string MeterName = "ZB.MOM.WW.OtOpcUa.Driver.Galaxy";
|
|
private const int DefaultChannelCapacity = 50_000;
|
|
|
|
// Single static meter so a host-level MeterListener catches all pump instances.
|
|
private static readonly Meter Meter = new(MeterName);
|
|
private static readonly Counter<long> EventsReceived =
|
|
Meter.CreateCounter<long>("galaxy.events.received", unit: "{event}",
|
|
description: "MxEvents read from the gateway StreamEvents stream.");
|
|
private static readonly Counter<long> EventsDispatched =
|
|
Meter.CreateCounter<long>("galaxy.events.dispatched", unit: "{event}",
|
|
description: "MxEvents passed through the bounded channel and into OnDataChange.");
|
|
private static readonly Counter<long> EventsDropped =
|
|
Meter.CreateCounter<long>("galaxy.events.dropped", unit: "{event}",
|
|
description: "MxEvents dropped because the bounded channel was full (newest-dropped).");
|
|
private static readonly Counter<long> AlarmTransitionsReceived =
|
|
Meter.CreateCounter<long>("galaxy.alarm_transitions.received", unit: "{event}",
|
|
description: "OnAlarmTransition events decoded and forwarded to driver-level handlers.");
|
|
private static readonly Counter<long> AlarmTransitionsDecodingFailures =
|
|
Meter.CreateCounter<long>("galaxy.alarm_transitions.decoding_failures", unit: "{event}",
|
|
description: "OnAlarmTransition events that arrived without a populated body or with an unspecified transition kind.");
|
|
|
|
private readonly IGalaxySubscriber _subscriber;
|
|
private readonly SubscriptionRegistry _registry;
|
|
private readonly ILogger _logger;
|
|
private readonly Func<long, ISubscriptionHandle> _handleFactory;
|
|
private readonly Channel<MxEvent> _channel;
|
|
private readonly KeyValuePair<string, object?> _clientTag;
|
|
private readonly CancellationTokenSource _cts = new();
|
|
|
|
private Task? _loop;
|
|
private Task? _dispatchLoop;
|
|
private bool _disposed;
|
|
|
|
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
|
|
|
/// <summary>
|
|
/// Fires for every <see cref="MxEventFamily.OnAlarmTransition"/> event the
|
|
/// gateway forwards. Decoded into a <see cref="GalaxyAlarmTransition"/> with
|
|
/// the OPC UA severity bucket already mapped via
|
|
/// <see cref="MxAccessSeverityMapper"/>. The driver wraps this onto
|
|
/// <c>IAlarmSource.OnAlarmEvent</c> in PR B.2.
|
|
/// </summary>
|
|
internal event EventHandler<GalaxyAlarmTransition>? OnAlarmTransition;
|
|
|
|
public EventPump(
|
|
IGalaxySubscriber subscriber,
|
|
SubscriptionRegistry registry,
|
|
ILogger? logger = null,
|
|
Func<long, ISubscriptionHandle>? handleFactory = null,
|
|
int channelCapacity = DefaultChannelCapacity,
|
|
string? clientName = null)
|
|
{
|
|
_subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber));
|
|
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
|
|
_logger = logger ?? NullLogger.Instance;
|
|
_handleFactory = handleFactory ?? (id => new GalaxySubscriptionHandle(id));
|
|
|
|
if (channelCapacity < 1)
|
|
{
|
|
throw new ArgumentOutOfRangeException(nameof(channelCapacity),
|
|
"channelCapacity must be >= 1; recommended 50_000 for 50k-tag deployments.");
|
|
}
|
|
_channel = Channel.CreateBounded<MxEvent>(new BoundedChannelOptions(channelCapacity)
|
|
{
|
|
// Newest-dropped policy: when full, the producer's TryWrite returns false
|
|
// and we account for the drop. We do this manually rather than relying on
|
|
// BoundedChannelFullMode.DropWrite so we can count drops without polling.
|
|
FullMode = BoundedChannelFullMode.Wait,
|
|
SingleReader = true,
|
|
SingleWriter = true,
|
|
});
|
|
_clientTag = new KeyValuePair<string, object?>("galaxy.client", clientName ?? "<unknown>");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Start consuming the event stream on a background task. Idempotent — second
|
|
/// calls are no-ops while the loop is running.
|
|
/// </summary>
|
|
public void Start()
|
|
{
|
|
ObjectDisposedException.ThrowIf(_disposed, this);
|
|
if (_loop is not null) return;
|
|
_loop = Task.Run(() => RunAsync(_cts.Token));
|
|
_dispatchLoop = Task.Run(() => DispatchLoopAsync(_cts.Token));
|
|
}
|
|
|
|
private async Task RunAsync(CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
await foreach (var ev in _subscriber.StreamEventsAsync(ct).WithCancellation(ct).ConfigureAwait(false))
|
|
{
|
|
if (ct.IsCancellationRequested) break;
|
|
EventsReceived.Add(1, _clientTag);
|
|
|
|
// Newest-dropped: TryWrite fast-paths the common case (channel has room).
|
|
// When full we count the drop and continue reading the gw stream so
|
|
// back-pressure doesn't propagate upstream.
|
|
if (!_channel.Writer.TryWrite(ev))
|
|
{
|
|
EventsDropped.Add(1, _clientTag);
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
|
{
|
|
// Clean shutdown — no log.
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"Galaxy EventPump loop ended with an exception — reconnect supervisor (PR 4.5) handles restart.");
|
|
}
|
|
finally
|
|
{
|
|
// Tell the dispatch loop the producer is done so it drains and exits.
|
|
_channel.Writer.TryComplete();
|
|
}
|
|
}
|
|
|
|
private async Task DispatchLoopAsync(CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
await foreach (var ev in _channel.Reader.ReadAllAsync(ct).ConfigureAwait(false))
|
|
{
|
|
Dispatch(ev);
|
|
EventsDispatched.Add(1, _clientTag);
|
|
}
|
|
}
|
|
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
|
{
|
|
// Clean shutdown.
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"Galaxy EventPump dispatch loop ended with an exception — events past this point will be lost until restart.");
|
|
}
|
|
}
|
|
|
|
private void Dispatch(MxEvent ev)
|
|
{
|
|
switch (ev.Family)
|
|
{
|
|
case MxEventFamily.OnDataChange:
|
|
DispatchDataChange(ev);
|
|
break;
|
|
case MxEventFamily.OnAlarmTransition:
|
|
DispatchAlarmTransition(ev);
|
|
break;
|
|
default:
|
|
// OnWriteComplete / OperationComplete / OnBufferedDataChange are filtered
|
|
// out — write callers get their reply via the InvokeAsync round-trip, not
|
|
// via the event stream.
|
|
return;
|
|
}
|
|
}
|
|
|
|
private void DispatchDataChange(MxEvent ev)
|
|
{
|
|
var subscribers = _registry.ResolveSubscribers(ev.ItemHandle);
|
|
if (subscribers.Count == 0) return; // stale event after unsubscribe — drop quietly
|
|
|
|
var snapshot = ToSnapshot(ev);
|
|
foreach (var (subscriptionId, fullReference) in subscribers)
|
|
{
|
|
var handle = _handleFactory(subscriptionId);
|
|
try
|
|
{
|
|
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, fullReference, snapshot));
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"Galaxy OnDataChange handler threw for {FullRef} subscription {SubscriptionId} — continuing fan-out.",
|
|
fullReference, subscriptionId);
|
|
}
|
|
}
|
|
}
|
|
|
|
private void DispatchAlarmTransition(MxEvent ev)
|
|
{
|
|
// Body absent (e.g. malformed gateway event or worker version skew) — count and
|
|
// drop. The Part 9 sub-attribute fallback path keeps an alarm functional even
|
|
// when the rich payload disappears.
|
|
if (ev.OnAlarmTransition is not { } body)
|
|
{
|
|
AlarmTransitionsDecodingFailures.Add(1, _clientTag);
|
|
_logger.LogDebug(
|
|
"Galaxy OnAlarmTransition event arrived without a populated body (sequence={Sequence}); ignoring.",
|
|
ev.WorkerSequence);
|
|
return;
|
|
}
|
|
if (body.TransitionKind == AlarmTransitionKind.Unspecified)
|
|
{
|
|
AlarmTransitionsDecodingFailures.Add(1, _clientTag);
|
|
_logger.LogDebug(
|
|
"Galaxy OnAlarmTransition for {AlarmRef} has unspecified transition kind; ignoring.",
|
|
body.AlarmFullReference);
|
|
return;
|
|
}
|
|
|
|
var (bucket, opcUaSeverity) = MxAccessSeverityMapper.Map(body.Severity);
|
|
var transitionTimestamp = body.TransitionTimestamp is { } tts
|
|
? tts.ToDateTime()
|
|
: DateTime.UtcNow;
|
|
DateTime? originalRaiseTimestamp = body.OriginalRaiseTimestamp is { } orts
|
|
? orts.ToDateTime()
|
|
: null;
|
|
|
|
var transition = new GalaxyAlarmTransition(
|
|
AlarmFullReference: body.AlarmFullReference,
|
|
SourceObjectReference: body.SourceObjectReference,
|
|
AlarmTypeName: body.AlarmTypeName,
|
|
TransitionKind: MapTransitionKind(body.TransitionKind),
|
|
SeverityBucket: bucket,
|
|
OpcUaSeverity: opcUaSeverity,
|
|
RawMxAccessSeverity: body.Severity,
|
|
OriginalRaiseTimestampUtc: originalRaiseTimestamp,
|
|
TransitionTimestampUtc: transitionTimestamp,
|
|
OperatorUser: body.OperatorUser,
|
|
OperatorComment: body.OperatorComment,
|
|
Category: body.Category,
|
|
Description: body.Description);
|
|
|
|
AlarmTransitionsReceived.Add(1, _clientTag);
|
|
try
|
|
{
|
|
OnAlarmTransition?.Invoke(this, transition);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"Galaxy OnAlarmTransition handler threw for {AlarmRef} — continuing.",
|
|
transition.AlarmFullReference);
|
|
}
|
|
}
|
|
|
|
private static GalaxyAlarmTransitionKind MapTransitionKind(AlarmTransitionKind kind) => kind switch
|
|
{
|
|
AlarmTransitionKind.Raise => GalaxyAlarmTransitionKind.Raise,
|
|
AlarmTransitionKind.Acknowledge => GalaxyAlarmTransitionKind.Acknowledge,
|
|
AlarmTransitionKind.Clear => GalaxyAlarmTransitionKind.Clear,
|
|
AlarmTransitionKind.Retrigger => GalaxyAlarmTransitionKind.Retrigger,
|
|
_ => GalaxyAlarmTransitionKind.Unspecified,
|
|
};
|
|
|
|
private DataValueSnapshot ToSnapshot(MxEvent ev)
|
|
{
|
|
var value = MxValueDecoder.Decode(ev.Value);
|
|
var statusCode = ev.Statuses.Count > 0
|
|
? StatusCodeMap.FromMxStatus(ev.Statuses[0], _logger)
|
|
: StatusCodeMap.FromQualityByte((byte)(ev.Quality & 0xFF), _logger);
|
|
|
|
DateTime? sourceTimestamp = ev.SourceTimestamp is { } ts ? ts.ToDateTime() : null;
|
|
return new DataValueSnapshot(
|
|
Value: value,
|
|
StatusCode: statusCode,
|
|
SourceTimestampUtc: sourceTimestamp,
|
|
ServerTimestampUtc: DateTime.UtcNow);
|
|
}
|
|
|
|
public async ValueTask DisposeAsync()
|
|
{
|
|
if (_disposed) return;
|
|
_disposed = true;
|
|
_cts.Cancel();
|
|
_channel.Writer.TryComplete();
|
|
if (_loop is not null)
|
|
{
|
|
try { await _loop.ConfigureAwait(false); } catch { /* shutdown */ }
|
|
}
|
|
if (_dispatchLoop is not null)
|
|
{
|
|
try { await _dispatchLoop.ConfigureAwait(false); } catch { /* shutdown */ }
|
|
}
|
|
_cts.Dispose();
|
|
}
|
|
}
|