using System.Diagnostics.Metrics;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
///
/// Production over the gateway's session-less
/// StreamAlarms RPC. The stream opens with one
/// per currently-active alarm (the ConditionRefresh snapshot), then a
/// snapshot_complete sentinel, then a live
/// for every subsequent raise / acknowledge / clear. Each message is decoded into a
/// (severity already bucketed via
/// ) and surfaced on .
///
///
///
/// The feed is independent of any worker session — the gateway's always-on central
/// alarm monitor owns the AVEVA subscription. The driver previously decoded alarm
/// transitions off the per-session StreamEvents stream ();
/// that path was retired when the gateway moved to the session-less alarm model.
///
///
/// The stream is supplied as a factory delegate (production passes
/// MxGatewayClient.StreamAlarmsAsync) so tests can drive synthetic feeds.
/// Streaming RPCs are not covered by the client's unary retry pipeline, so the feed
/// owns its reconnect: on any non-cancellation stream fault it logs, waits
/// reconnectDelay, and re-opens. The gateway re-sends the active-alarm
/// snapshot on every re-open, so the OPC UA condition layer sees current state
/// after a reconnect.
///
///
internal sealed class GatewayGalaxyAlarmFeed : IGalaxyAlarmFeed
{
///
/// Opens a StreamAlarms feed. Matches the method group
/// MxGatewayClient.StreamAlarmsAsync.
///
/// The stream request parameters.
/// A cancellation token.
internal delegate IAsyncEnumerable AlarmStreamFactory(
StreamAlarmsRequest request, CancellationToken cancellationToken);
private static readonly TimeSpan DefaultReconnectDelay = TimeSpan.FromSeconds(5);
// Shares the driver meter name so a host-level MeterListener catches feed counters
// alongside the EventPump's. Distinct Meter instance — same name is intentional.
private static readonly Meter Meter = new(EventPump.MeterName);
private static readonly Counter AlarmTransitionsReceived =
Meter.CreateCounter("galaxy.alarm_feed.transitions.received", unit: "{event}",
description: "Alarm feed messages decoded and forwarded to driver-level handlers.");
private static readonly Counter AlarmTransitionsDecodingFailures =
Meter.CreateCounter("galaxy.alarm_feed.transitions.decoding_failures", unit: "{event}",
description: "Alarm feed messages dropped for a missing body or unspecified transition kind.");
private static readonly Counter AlarmFeedReconnects =
Meter.CreateCounter("galaxy.alarm_feed.reconnects", unit: "{reconnect}",
description: "Times the alarm feed re-opened its StreamAlarms stream after a transport fault.");
private readonly AlarmStreamFactory _streamFactory;
private readonly ILogger _logger;
private readonly string _alarmFilterPrefix;
private readonly TimeSpan _reconnectDelay;
private readonly KeyValuePair _clientTag;
private readonly CancellationTokenSource _cts = new();
private Task? _loop;
private bool _disposed;
/// Occurs when an alarm transition (raise, acknowledge, clear) is received from the Galaxy feed.
public event EventHandler? OnAlarmTransition;
/// Initializes a new instance of the class.
/// A factory delegate that opens the alarm stream.
/// An optional logger for diagnostic output.
/// An optional client name for tagging log entries.
/// An optional prefix to filter alarms in the stream.
/// An optional delay before reconnecting after a stream fault.
public GatewayGalaxyAlarmFeed(
AlarmStreamFactory streamFactory,
ILogger? logger = null,
string? clientName = null,
string? alarmFilterPrefix = null,
TimeSpan? reconnectDelay = null)
{
_streamFactory = streamFactory ?? throw new ArgumentNullException(nameof(streamFactory));
_logger = logger ?? NullLogger.Instance;
_alarmFilterPrefix = alarmFilterPrefix ?? string.Empty;
_reconnectDelay = reconnectDelay ?? DefaultReconnectDelay;
_clientTag = new KeyValuePair("galaxy.client", clientName ?? "");
}
/// Starts the alarm feed by opening the stream and processing messages in a background task.
public void Start()
{
ObjectDisposedException.ThrowIf(_disposed, this);
if (_loop is not null) return;
_loop = Task.Run(() => RunAsync(_cts.Token));
}
private async Task RunAsync(CancellationToken ct)
{
var firstAttempt = true;
while (!ct.IsCancellationRequested)
{
if (!firstAttempt)
{
AlarmFeedReconnects.Add(1, _clientTag);
}
firstAttempt = false;
try
{
var request = new StreamAlarmsRequest
{
ClientCorrelationId = Guid.NewGuid().ToString("N"),
AlarmFilterPrefix = _alarmFilterPrefix,
};
await foreach (var message in _streamFactory(request, ct)
.WithCancellation(ct).ConfigureAwait(false))
{
if (ct.IsCancellationRequested) break;
Dispatch(message);
}
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
return; // clean shutdown
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Galaxy alarm feed stream faulted — reopening in {DelaySeconds}s.",
_reconnectDelay.TotalSeconds);
}
try
{
await Task.Delay(_reconnectDelay, ct).ConfigureAwait(false);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
return;
}
}
}
private void Dispatch(AlarmFeedMessage message)
{
switch (message.PayloadCase)
{
case AlarmFeedMessage.PayloadOneofCase.ActiveAlarm:
DispatchSnapshotEntry(message.ActiveAlarm);
break;
case AlarmFeedMessage.PayloadOneofCase.Transition:
DispatchTransition(message.Transition);
break;
case AlarmFeedMessage.PayloadOneofCase.SnapshotComplete:
_logger.LogDebug("Galaxy alarm feed active-alarm snapshot complete.");
break;
default:
// Empty oneof — worker / gateway version skew. Count and drop.
AlarmTransitionsDecodingFailures.Add(1, _clientTag);
break;
}
}
///
/// Decode one entry of the initial active-alarm snapshot. Each currently-active
/// alarm is surfaced as a transition so the OPC UA Part 9 condition layer sees
/// the alarm's present state on (re)connect: an unacknowledged active alarm as
/// a , an acknowledged one as a
/// .
///
private void DispatchSnapshotEntry(ActiveAlarmSnapshot snapshot)
{
var kind = snapshot.CurrentState switch
{
AlarmConditionState.Active => GalaxyAlarmTransitionKind.Raise,
AlarmConditionState.ActiveAcked => GalaxyAlarmTransitionKind.Acknowledge,
AlarmConditionState.Inactive => GalaxyAlarmTransitionKind.Clear,
_ => GalaxyAlarmTransitionKind.Unspecified,
};
if (kind == GalaxyAlarmTransitionKind.Unspecified)
{
AlarmTransitionsDecodingFailures.Add(1, _clientTag);
_logger.LogDebug(
"Galaxy alarm feed snapshot entry for {AlarmRef} has unspecified condition state; ignoring.",
snapshot.AlarmFullReference);
return;
}
var (bucket, opcUaSeverity) = MxAccessSeverityMapper.Map(snapshot.Severity);
Raise(new GalaxyAlarmTransition(
AlarmFullReference: snapshot.AlarmFullReference,
SourceObjectReference: snapshot.SourceObjectReference,
AlarmTypeName: snapshot.AlarmTypeName,
TransitionKind: kind,
SeverityBucket: bucket,
OpcUaSeverity: opcUaSeverity,
RawMxAccessSeverity: snapshot.Severity,
OriginalRaiseTimestampUtc: snapshot.OriginalRaiseTimestamp?.ToDateTime(),
TransitionTimestampUtc: snapshot.LastTransitionTimestamp?.ToDateTime() ?? DateTime.UtcNow,
OperatorUser: snapshot.OperatorUser,
OperatorComment: snapshot.OperatorComment,
Category: snapshot.Category,
Description: snapshot.Description));
}
private void DispatchTransition(OnAlarmTransitionEvent body)
{
if (body.TransitionKind == AlarmTransitionKind.Unspecified)
{
AlarmTransitionsDecodingFailures.Add(1, _clientTag);
_logger.LogDebug(
"Galaxy alarm feed transition for {AlarmRef} has unspecified transition kind; ignoring.",
body.AlarmFullReference);
return;
}
var (bucket, opcUaSeverity) = MxAccessSeverityMapper.Map(body.Severity);
Raise(new GalaxyAlarmTransition(
AlarmFullReference: body.AlarmFullReference,
SourceObjectReference: body.SourceObjectReference,
AlarmTypeName: body.AlarmTypeName,
TransitionKind: MapTransitionKind(body.TransitionKind),
SeverityBucket: bucket,
OpcUaSeverity: opcUaSeverity,
RawMxAccessSeverity: body.Severity,
OriginalRaiseTimestampUtc: body.OriginalRaiseTimestamp?.ToDateTime(),
TransitionTimestampUtc: body.TransitionTimestamp?.ToDateTime() ?? DateTime.UtcNow,
OperatorUser: body.OperatorUser,
OperatorComment: body.OperatorComment,
Category: body.Category,
Description: body.Description));
}
private void Raise(GalaxyAlarmTransition transition)
{
AlarmTransitionsReceived.Add(1, _clientTag);
try
{
OnAlarmTransition?.Invoke(this, transition);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Galaxy alarm feed OnAlarmTransition handler threw for {AlarmRef} — continuing.",
transition.AlarmFullReference);
}
}
private static GalaxyAlarmTransitionKind MapTransitionKind(AlarmTransitionKind kind) => kind switch
{
AlarmTransitionKind.Raise => GalaxyAlarmTransitionKind.Raise,
AlarmTransitionKind.Acknowledge => GalaxyAlarmTransitionKind.Acknowledge,
AlarmTransitionKind.Clear => GalaxyAlarmTransitionKind.Clear,
AlarmTransitionKind.Retrigger => GalaxyAlarmTransitionKind.Retrigger,
_ => GalaxyAlarmTransitionKind.Unspecified,
};
/// Releases the alarm feed resources and stops the background stream task.
public async ValueTask DisposeAsync()
{
if (_disposed) return;
_disposed = true;
_cts.Cancel();
if (_loop is not null)
{
try { await _loop.ConfigureAwait(false); } catch { /* shutdown */ }
}
_cts.Dispose();
}
}