920 lines
35 KiB
C#
920 lines
35 KiB
C#
using System.Threading.Channels;
|
|
using Google.Protobuf.WellKnownTypes;
|
|
using Microsoft.Extensions.Options;
|
|
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
|
using ZB.MOM.WW.MxGateway.Server.Configuration;
|
|
using ZB.MOM.WW.MxGateway.Server.Metrics;
|
|
using ZB.MOM.WW.MxGateway.Server.Sessions;
|
|
|
|
namespace ZB.MOM.WW.MxGateway.Server.Alarms;
|
|
|
|
/// <summary>
|
|
/// The gateway's always-on alarm monitor and broker. It owns one
|
|
/// gateway-managed worker session dedicated to alarms, keeps an in-process
|
|
/// cache of the active-alarm set fed by that session's transition events
|
|
/// (reconciled periodically against the worker's snapshot), and fans the
|
|
/// feed out to any number of <see cref="StreamAsync"/> subscribers.
|
|
/// The session is re-opened transparently if the worker faults.
|
|
/// </summary>
|
|
public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmService
|
|
{
|
|
private const string MonitorClientName = "gateway-alarm-monitor";
|
|
private const string BackendName = "Galaxy";
|
|
private const int SubscriberQueueCapacity = 2048;
|
|
private static readonly TimeSpan RestartBackoff = TimeSpan.FromSeconds(5);
|
|
private static readonly TimeSpan StartupGrace = TimeSpan.FromSeconds(2);
|
|
|
|
private readonly ISessionManager _sessionManager;
|
|
private readonly IAlarmWatchListResolver _watchListResolver;
|
|
private readonly GatewayMetrics _metrics;
|
|
private readonly AlarmsOptions _options;
|
|
private readonly ILogger<GatewayAlarmMonitor> _logger;
|
|
|
|
private readonly object _sync = new();
|
|
private readonly Dictionary<string, ActiveAlarmSnapshot> _alarms = new(StringComparer.Ordinal);
|
|
private readonly List<Subscriber> _subscribers = [];
|
|
|
|
// Current provider status (mode + degraded + reason + since), guarded by _sync.
|
|
// Initialized to the alarm-manager, not-degraded baseline so a late joiner sees
|
|
// a sensible status even before any OnAlarmProviderModeChanged event arrives.
|
|
private AlarmProviderMode _providerMode = AlarmProviderMode.Alarmmgr;
|
|
private bool _providerDegraded;
|
|
private string _providerReason = string.Empty;
|
|
private DateTimeOffset _providerSince = DateTimeOffset.UtcNow;
|
|
|
|
private volatile GatewayAlarmMonitorState _state = GatewayAlarmMonitorState.Disabled;
|
|
private volatile string? _lastError;
|
|
private GatewaySession? _session;
|
|
|
|
/// <summary>Initializes the gateway alarm monitor.</summary>
|
|
/// <param name="sessionManager">Gateway session manager.</param>
|
|
/// <param name="watchListResolver">Resolver for the subtag-fallback watch-list.</param>
|
|
/// <param name="metrics">Gateway metrics sink.</param>
|
|
/// <param name="options">Gateway options carrying the alarm configuration.</param>
|
|
/// <param name="logger">Diagnostic logger.</param>
|
|
public GatewayAlarmMonitor(
|
|
ISessionManager sessionManager,
|
|
IAlarmWatchListResolver watchListResolver,
|
|
GatewayMetrics metrics,
|
|
IOptions<GatewayOptions> options,
|
|
ILogger<GatewayAlarmMonitor> logger)
|
|
{
|
|
_sessionManager = sessionManager ?? throw new ArgumentNullException(nameof(sessionManager));
|
|
_watchListResolver = watchListResolver ?? throw new ArgumentNullException(nameof(watchListResolver));
|
|
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
|
|
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value.Alarms;
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public GatewayAlarmMonitorState State => _state;
|
|
|
|
/// <inheritdoc />
|
|
public string? LastError => _lastError;
|
|
|
|
/// <inheritdoc />
|
|
public int? WorkerProcessId
|
|
{
|
|
get { lock (_sync) { return _session?.WorkerProcessId; } }
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public IReadOnlyList<ActiveAlarmSnapshot> CurrentAlarms
|
|
{
|
|
get
|
|
{
|
|
lock (_sync)
|
|
{
|
|
return _alarms.Values.Select(alarm => alarm.Clone()).ToArray();
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
if (!_options.Enabled)
|
|
{
|
|
_state = GatewayAlarmMonitorState.Disabled;
|
|
_logger.LogInformation("Gateway alarm monitor disabled (MxGateway:Alarms:Enabled is false).");
|
|
return;
|
|
}
|
|
|
|
string subscription = ResolveSubscription();
|
|
if (string.IsNullOrWhiteSpace(subscription))
|
|
{
|
|
_state = GatewayAlarmMonitorState.Faulted;
|
|
_lastError = "MxGateway:Alarms is enabled but no SubscriptionExpression / DefaultArea is configured.";
|
|
_logger.LogError("{Diagnostic}", _lastError);
|
|
return;
|
|
}
|
|
|
|
// Brief grace so worker-process launching and startup orphan cleanup
|
|
// settle before the monitor opens its own session.
|
|
try
|
|
{
|
|
await Task.Delay(StartupGrace, stoppingToken).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
return;
|
|
}
|
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
await RunMonitorAsync(subscription, stoppingToken).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
|
{
|
|
break;
|
|
}
|
|
catch (Exception exception)
|
|
{
|
|
_state = GatewayAlarmMonitorState.Faulted;
|
|
_lastError = exception.Message;
|
|
_logger.LogWarning(
|
|
exception,
|
|
"Gateway alarm monitor lifecycle faulted; restarting in {Backoff}.",
|
|
RestartBackoff);
|
|
try
|
|
{
|
|
await Task.Delay(RestartBackoff, stoppingToken).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
_state = GatewayAlarmMonitorState.Disabled;
|
|
}
|
|
|
|
// One monitoring lifecycle: open a session, subscribe alarms, reconcile,
|
|
// then consume transition events until the session ends or is cancelled.
|
|
private async Task RunMonitorAsync(string subscription, CancellationToken stoppingToken)
|
|
{
|
|
_state = GatewayAlarmMonitorState.Starting;
|
|
|
|
// Derive the lifecycle baseline from the configured forced mode so a
|
|
// ForceSubtag / ForceAlarmManager start advertises the correct mode even
|
|
// though no OnAlarmProviderModeChanged event is raised in those modes
|
|
// (only Auto/failover produces that event). ForceSubtag starts degraded.
|
|
AlarmProviderMode initialMode;
|
|
bool initialDegraded;
|
|
string initialReason;
|
|
switch (MapForcedMode(_options.Fallback.Mode))
|
|
{
|
|
case AlarmProviderMode.Subtag:
|
|
initialMode = AlarmProviderMode.Subtag;
|
|
initialDegraded = true;
|
|
initialReason = "Forced subtag mode (configuration)";
|
|
break;
|
|
case AlarmProviderMode.Alarmmgr:
|
|
initialMode = AlarmProviderMode.Alarmmgr;
|
|
initialDegraded = false;
|
|
initialReason = string.Empty;
|
|
break;
|
|
default:
|
|
// Unspecified (Auto): the failover consumer starts on the
|
|
// alarm-manager primary and only degrades to subtag on failure.
|
|
initialMode = AlarmProviderMode.Alarmmgr;
|
|
initialDegraded = false;
|
|
initialReason = string.Empty;
|
|
break;
|
|
}
|
|
|
|
lock (_sync)
|
|
{
|
|
// Re-baseline the provider status for this lifecycle so a restarted
|
|
// monitor advertises the configured mode until told otherwise.
|
|
_providerMode = initialMode;
|
|
_providerDegraded = initialDegraded;
|
|
_providerReason = initialReason;
|
|
_providerSince = DateTimeOffset.UtcNow;
|
|
}
|
|
|
|
// Align the observable gauge with the lifecycle baseline without recording
|
|
// a switch — the gauge was 0 (unknown) from construction until now.
|
|
_metrics.SetAlarmProviderMode(ModeToInt(initialMode));
|
|
|
|
GatewaySession session = await _sessionManager.OpenSessionAsync(
|
|
new SessionOpenRequest(BackendName, MonitorClientName, Guid.NewGuid().ToString("N"), CommandTimeout: null),
|
|
MonitorClientName,
|
|
stoppingToken)
|
|
.ConfigureAwait(false);
|
|
lock (_sync) { _session = session; }
|
|
|
|
try
|
|
{
|
|
await SubscribeAlarmsAsync(session.SessionId, subscription, stoppingToken).ConfigureAwait(false);
|
|
await ReconcileAsync(session.SessionId, stoppingToken).ConfigureAwait(false);
|
|
|
|
_state = GatewayAlarmMonitorState.Monitoring;
|
|
_lastError = null;
|
|
_logger.LogInformation(
|
|
"Gateway alarm monitor active on {Subscription} (session {SessionId}, worker pid {WorkerPid}).",
|
|
subscription,
|
|
session.SessionId,
|
|
session.WorkerProcessId);
|
|
|
|
using CancellationTokenSource linked = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
|
|
Task reconcileLoop = ReconcileLoopAsync(session.SessionId, linked.Token);
|
|
try
|
|
{
|
|
await foreach (WorkerEvent workerEvent in _sessionManager
|
|
.ReadEventsAsync(session.SessionId, linked.Token)
|
|
.ConfigureAwait(false))
|
|
{
|
|
MxEvent? mxEvent = workerEvent.Event;
|
|
if (mxEvent is { BodyCase: MxEvent.BodyOneofCase.OnAlarmTransition }
|
|
&& mxEvent.OnAlarmTransition is not null)
|
|
{
|
|
ApplyTransition(mxEvent.OnAlarmTransition);
|
|
}
|
|
else if (mxEvent is { BodyCase: MxEvent.BodyOneofCase.OnAlarmProviderModeChanged }
|
|
&& mxEvent.OnAlarmProviderModeChanged is not null)
|
|
{
|
|
await ApplyProviderModeChangeAsync(
|
|
session.SessionId,
|
|
mxEvent.OnAlarmProviderModeChanged,
|
|
linked.Token)
|
|
.ConfigureAwait(false);
|
|
}
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
await linked.CancelAsync().ConfigureAwait(false);
|
|
try
|
|
{
|
|
await reconcileLoop.ConfigureAwait(false);
|
|
}
|
|
catch
|
|
{
|
|
// Reconcile-loop teardown errors are not actionable here.
|
|
}
|
|
}
|
|
|
|
// The event stream ended without cancellation — the worker session
|
|
// closed or faulted. Surface it so the supervisor loop restarts.
|
|
throw new InvalidOperationException("Alarm monitor worker event stream ended.");
|
|
}
|
|
finally
|
|
{
|
|
lock (_sync) { _session = null; }
|
|
ClearCache();
|
|
try
|
|
{
|
|
await _sessionManager.CloseSessionAsync(session.SessionId, CancellationToken.None).ConfigureAwait(false);
|
|
}
|
|
catch (Exception exception)
|
|
{
|
|
_logger.LogDebug(exception, "Closing alarm monitor session {SessionId} failed.", session.SessionId);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task SubscribeAlarmsAsync(string sessionId, string subscription, CancellationToken cancellationToken)
|
|
{
|
|
IReadOnlyList<AlarmSubtagTarget> watchList = await _watchListResolver
|
|
.ResolveAsync(_options, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
AlarmProviderMode forcedMode = MapForcedMode(_options.Fallback.Mode);
|
|
|
|
_logger.LogInformation(
|
|
"Alarm subscribe: forcedMode={ForcedMode} configMode={ConfigMode} watchList={WatchListCount}.",
|
|
forcedMode, _options.Fallback.Mode, watchList.Count);
|
|
|
|
// When the forced mode is Unspecified (the "Auto" case) and the resolved
|
|
// watch-list is empty — the common alarmmgr-only deployment — the command
|
|
// is identical-in-effect to the historical SubscribeAlarms (wnwrap only):
|
|
// the worker builds the wnwrap consumer and no subtag watch-list.
|
|
SubscribeAlarmsCommand command = new()
|
|
{
|
|
SubscriptionExpression = subscription,
|
|
ForcedMode = forcedMode,
|
|
Failover = new AlarmFailoverConfig
|
|
{
|
|
ConsecutiveFailureThreshold = _options.Fallback.ConsecutiveFailureThreshold,
|
|
FailbackProbeIntervalSeconds = _options.Fallback.FailbackProbeIntervalSeconds,
|
|
FailbackStableProbes = _options.Fallback.FailbackStableProbes,
|
|
},
|
|
};
|
|
command.WatchList.AddRange(watchList);
|
|
|
|
WorkerCommandReply reply = await _sessionManager.InvokeAsync(
|
|
sessionId,
|
|
new WorkerCommand
|
|
{
|
|
Command = new MxCommand
|
|
{
|
|
Kind = MxCommandKind.SubscribeAlarms,
|
|
SubscribeAlarms = command,
|
|
},
|
|
},
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
ProtocolStatusCode? code = reply.Reply?.ProtocolStatus?.Code;
|
|
if (code != ProtocolStatusCode.Ok)
|
|
{
|
|
string diagnostic = reply.Reply?.DiagnosticMessage
|
|
?? reply.Reply?.ProtocolStatus?.Message
|
|
?? $"status {code}";
|
|
throw new InvalidOperationException($"Worker rejected SubscribeAlarms: {diagnostic}");
|
|
}
|
|
}
|
|
|
|
private async Task ReconcileLoopAsync(string sessionId, CancellationToken cancellationToken)
|
|
{
|
|
try
|
|
{
|
|
int seconds = Math.Max(5, _options.ReconcileIntervalSeconds);
|
|
using PeriodicTimer timer = new(TimeSpan.FromSeconds(seconds));
|
|
while (await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false))
|
|
{
|
|
try
|
|
{
|
|
await ReconcileAsync(sessionId, cancellationToken).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
throw;
|
|
}
|
|
catch (Exception exception)
|
|
{
|
|
_logger.LogDebug(exception, "Alarm reconcile pass failed; keeping the current cache.");
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
}
|
|
}
|
|
|
|
private async Task ReconcileAsync(string sessionId, CancellationToken cancellationToken)
|
|
{
|
|
WorkerCommandReply reply = await _sessionManager.InvokeAsync(
|
|
sessionId,
|
|
new WorkerCommand
|
|
{
|
|
Command = new MxCommand
|
|
{
|
|
Kind = MxCommandKind.QueryActiveAlarms,
|
|
QueryActiveAlarmsCommand = new QueryActiveAlarmsCommand { AlarmFilterPrefix = string.Empty },
|
|
},
|
|
},
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
if (reply.Reply?.ProtocolStatus?.Code != ProtocolStatusCode.Ok)
|
|
{
|
|
return;
|
|
}
|
|
|
|
QueryActiveAlarmsReplyPayload? payload = reply.Reply.QueryActiveAlarms;
|
|
if (payload is not null)
|
|
{
|
|
ApplyReconcile(payload.Snapshots);
|
|
}
|
|
}
|
|
|
|
// Applies a live transition to the cache and broadcasts it to subscribers.
|
|
private void ApplyTransition(OnAlarmTransitionEvent transition)
|
|
{
|
|
string reference = transition.AlarmFullReference ?? string.Empty;
|
|
if (reference.Length == 0)
|
|
{
|
|
return;
|
|
}
|
|
|
|
lock (_sync)
|
|
{
|
|
if (transition.TransitionKind == AlarmTransitionKind.Clear)
|
|
{
|
|
_alarms.Remove(reference);
|
|
}
|
|
else
|
|
{
|
|
_alarms[reference] = SnapshotFromTransition(transition);
|
|
}
|
|
|
|
Broadcast(new AlarmFeedMessage { Transition = transition }, reference);
|
|
}
|
|
}
|
|
|
|
// Handles the worker's provider-mode-change event: updates the stored provider
|
|
// status, broadcasts it to every subscriber (provider status is global, not
|
|
// alarm-scoped), records the switch metric, and forces a cache reconcile so the
|
|
// active-alarm set reflects whatever the new mode reports.
|
|
private async Task ApplyProviderModeChangeAsync(
|
|
string sessionId,
|
|
OnAlarmProviderModeChangedEvent change,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
AlarmProviderMode toMode = change.Mode;
|
|
string reason = change.Reason ?? string.Empty;
|
|
|
|
AlarmProviderStatus status;
|
|
int fromModeInt;
|
|
lock (_sync)
|
|
{
|
|
fromModeInt = ModeToInt(_providerMode);
|
|
_providerMode = toMode;
|
|
_providerDegraded = toMode == AlarmProviderMode.Subtag;
|
|
_providerReason = reason;
|
|
_providerSince = DateTimeOffset.UtcNow;
|
|
status = BuildProviderStatus();
|
|
BroadcastToAll(new AlarmFeedMessage { ProviderStatus = status });
|
|
}
|
|
|
|
AlarmProviderSwitchReason switchReason = toMode switch
|
|
{
|
|
AlarmProviderMode.Subtag => AlarmProviderSwitchReason.Failover,
|
|
AlarmProviderMode.Alarmmgr => AlarmProviderSwitchReason.Failback,
|
|
_ => AlarmProviderSwitchReason.Unknown,
|
|
};
|
|
_metrics.AlarmProviderSwitched(fromModeInt, ModeToInt(toMode), switchReason);
|
|
|
|
_logger.LogInformation(
|
|
"Alarm provider mode changed to {Mode} (degraded={Degraded}): {Reason}",
|
|
toMode,
|
|
status.Degraded,
|
|
reason);
|
|
|
|
try
|
|
{
|
|
// Intentionally awaited OUTSIDE _sync: ReconcileAsync acquires _sync itself,
|
|
// so holding it across the await here would deadlock. Subscribers therefore
|
|
// see the ProviderStatus push (above) slightly before the cache is re-seeded
|
|
// by the reconcile — an accepted brief inconsistency.
|
|
await ReconcileAsync(sessionId, cancellationToken).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
throw;
|
|
}
|
|
catch (Exception exception)
|
|
{
|
|
_logger.LogDebug(
|
|
exception,
|
|
"Reconcile after alarm provider mode change failed; keeping the current cache.");
|
|
}
|
|
}
|
|
|
|
// Caller holds _sync. Builds an AlarmProviderStatus snapshot of the current state.
|
|
private AlarmProviderStatus BuildProviderStatus()
|
|
{
|
|
return new AlarmProviderStatus
|
|
{
|
|
Mode = _providerMode,
|
|
Degraded = _providerDegraded,
|
|
Reason = _providerReason,
|
|
Since = Timestamp.FromDateTimeOffset(_providerSince),
|
|
};
|
|
}
|
|
|
|
// Maps the configured fallback mode string to the forced provider mode the
|
|
// worker honours. Case-insensitive; anything other than the two force values
|
|
// (including the default "Auto") yields Unspecified ("let the worker decide").
|
|
private static AlarmProviderMode MapForcedMode(string? mode)
|
|
{
|
|
if (string.Equals(mode, "ForceAlarmManager", StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
return AlarmProviderMode.Alarmmgr;
|
|
}
|
|
|
|
if (string.Equals(mode, "ForceSubtag", StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
return AlarmProviderMode.Subtag;
|
|
}
|
|
|
|
return AlarmProviderMode.Unspecified;
|
|
}
|
|
|
|
// Maps the provider-mode enum to the integer the metric expects
|
|
// (alarmmgr=1, subtag=2, unknown/unspecified=0).
|
|
private static int ModeToInt(AlarmProviderMode mode) => mode switch
|
|
{
|
|
AlarmProviderMode.Alarmmgr => 1,
|
|
AlarmProviderMode.Subtag => 2,
|
|
_ => 0,
|
|
};
|
|
|
|
// Replaces the cache with the worker's authoritative snapshot, broadcasting
|
|
// a synthetic transition for any alarm the live stream missed.
|
|
private void ApplyReconcile(IEnumerable<ActiveAlarmSnapshot> snapshots)
|
|
{
|
|
Dictionary<string, ActiveAlarmSnapshot> next = new(StringComparer.Ordinal);
|
|
foreach (ActiveAlarmSnapshot snapshot in snapshots)
|
|
{
|
|
if (!string.IsNullOrEmpty(snapshot.AlarmFullReference))
|
|
{
|
|
next[snapshot.AlarmFullReference] = snapshot;
|
|
}
|
|
}
|
|
|
|
lock (_sync)
|
|
{
|
|
foreach (KeyValuePair<string, ActiveAlarmSnapshot> existing in _alarms)
|
|
{
|
|
if (!next.ContainsKey(existing.Key))
|
|
{
|
|
Broadcast(
|
|
new AlarmFeedMessage { Transition = TransitionFromSnapshot(existing.Value, AlarmTransitionKind.Clear) },
|
|
existing.Key);
|
|
}
|
|
}
|
|
|
|
foreach (KeyValuePair<string, ActiveAlarmSnapshot> incoming in next)
|
|
{
|
|
if (!_alarms.ContainsKey(incoming.Key))
|
|
{
|
|
Broadcast(
|
|
new AlarmFeedMessage { Transition = TransitionFromSnapshot(incoming.Value, AlarmTransitionKind.Raise) },
|
|
incoming.Key);
|
|
}
|
|
}
|
|
|
|
_alarms.Clear();
|
|
foreach (KeyValuePair<string, ActiveAlarmSnapshot> incoming in next)
|
|
{
|
|
_alarms[incoming.Key] = incoming.Value;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Caller holds _sync. Pushes a feed message to every matching subscriber;
|
|
// a subscriber that has fallen behind is completed with an error and dropped.
|
|
private void Broadcast(AlarmFeedMessage message, string reference)
|
|
{
|
|
for (int index = _subscribers.Count - 1; index >= 0; index--)
|
|
{
|
|
Subscriber subscriber = _subscribers[index];
|
|
if (!subscriber.Matches(reference))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
if (!subscriber.Channel.Writer.TryWrite(message))
|
|
{
|
|
subscriber.Channel.Writer.TryComplete(new InvalidOperationException(
|
|
"Alarm feed subscriber fell behind and was dropped; reconnect to re-snapshot."));
|
|
_subscribers.RemoveAt(index);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Caller holds _sync. Pushes a feed message to every subscriber regardless of
|
|
// its alarm-filter prefix. Used for provider-status messages, which are global
|
|
// rather than scoped to a single alarm reference.
|
|
private void BroadcastToAll(AlarmFeedMessage message)
|
|
{
|
|
for (int index = _subscribers.Count - 1; index >= 0; index--)
|
|
{
|
|
Subscriber subscriber = _subscribers[index];
|
|
if (!subscriber.Channel.Writer.TryWrite(message))
|
|
{
|
|
subscriber.Channel.Writer.TryComplete(new InvalidOperationException(
|
|
"Alarm feed subscriber fell behind and was dropped; reconnect to re-snapshot."));
|
|
_subscribers.RemoveAt(index);
|
|
}
|
|
}
|
|
}
|
|
|
|
private void ClearCache()
|
|
{
|
|
lock (_sync)
|
|
{
|
|
_alarms.Clear();
|
|
}
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async IAsyncEnumerable<AlarmFeedMessage> StreamAsync(
|
|
string? alarmFilterPrefix,
|
|
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
|
{
|
|
string prefix = alarmFilterPrefix ?? string.Empty;
|
|
Channel<AlarmFeedMessage> channel = Channel.CreateBounded<AlarmFeedMessage>(
|
|
new BoundedChannelOptions(SubscriberQueueCapacity)
|
|
{
|
|
FullMode = BoundedChannelFullMode.Wait,
|
|
SingleReader = true,
|
|
SingleWriter = false,
|
|
});
|
|
Subscriber subscriber = new(channel, prefix);
|
|
|
|
ActiveAlarmSnapshot[] snapshot;
|
|
AlarmProviderStatus providerStatus;
|
|
lock (_sync)
|
|
{
|
|
// Register before snapshotting under the same lock so neither a
|
|
// transition nor a provider-mode change can slip between the snapshot
|
|
// and the live stream.
|
|
_subscribers.Add(subscriber);
|
|
providerStatus = BuildProviderStatus();
|
|
snapshot = _alarms.Values
|
|
.Where(alarm => prefix.Length == 0
|
|
|| alarm.AlarmFullReference.StartsWith(prefix, StringComparison.Ordinal))
|
|
.Select(alarm => alarm.Clone())
|
|
.ToArray();
|
|
}
|
|
|
|
try
|
|
{
|
|
// Emit the current provider status first so a late joiner immediately
|
|
// learns the mode (and whether the feed is degraded) before any alarms.
|
|
yield return new AlarmFeedMessage { ProviderStatus = providerStatus };
|
|
|
|
foreach (ActiveAlarmSnapshot alarm in snapshot)
|
|
{
|
|
yield return new AlarmFeedMessage { ActiveAlarm = alarm };
|
|
}
|
|
|
|
yield return new AlarmFeedMessage { SnapshotComplete = true };
|
|
|
|
await foreach (AlarmFeedMessage message in channel.Reader
|
|
.ReadAllAsync(cancellationToken)
|
|
.ConfigureAwait(false))
|
|
{
|
|
yield return message;
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
lock (_sync) { _subscribers.Remove(subscriber); }
|
|
channel.Writer.TryComplete();
|
|
}
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<AcknowledgeAlarmReply> AcknowledgeAsync(
|
|
AcknowledgeAlarmRequest request,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(request);
|
|
|
|
string? sessionId;
|
|
lock (_sync) { sessionId = _session?.SessionId; }
|
|
if (sessionId is null || _state != GatewayAlarmMonitorState.Monitoring)
|
|
{
|
|
return new AcknowledgeAlarmReply
|
|
{
|
|
CorrelationId = request.ClientCorrelationId,
|
|
ProtocolStatus = new ProtocolStatus
|
|
{
|
|
Code = ProtocolStatusCode.WorkerUnavailable,
|
|
Message = "Gateway alarm monitor is not currently active.",
|
|
},
|
|
DiagnosticMessage = _lastError ?? "Alarm monitor is not running.",
|
|
};
|
|
}
|
|
|
|
MxCommand? command = BuildAcknowledgeCommand(request, out string? parseError);
|
|
if (command is null)
|
|
{
|
|
return new AcknowledgeAlarmReply
|
|
{
|
|
CorrelationId = request.ClientCorrelationId,
|
|
ProtocolStatus = new ProtocolStatus
|
|
{
|
|
Code = ProtocolStatusCode.InvalidRequest,
|
|
Message = parseError ?? "Invalid acknowledge request.",
|
|
},
|
|
DiagnosticMessage = parseError ?? "Invalid acknowledge request.",
|
|
};
|
|
}
|
|
|
|
WorkerCommandReply workerReply = await _sessionManager
|
|
.InvokeAsync(sessionId, new WorkerCommand { Command = command }, cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
MxCommandReply mxReply = workerReply.Reply ?? new MxCommandReply
|
|
{
|
|
ProtocolStatus = new ProtocolStatus
|
|
{
|
|
Code = ProtocolStatusCode.ProtocolViolation,
|
|
Message = "Worker reply did not include an MxCommandReply.",
|
|
},
|
|
};
|
|
|
|
AcknowledgeAlarmReply reply = new()
|
|
{
|
|
CorrelationId = request.ClientCorrelationId,
|
|
ProtocolStatus = mxReply.ProtocolStatus ?? new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
|
DiagnosticMessage = mxReply.DiagnosticMessage ?? string.Empty,
|
|
};
|
|
if (mxReply.HasHresult)
|
|
{
|
|
reply.Hresult = mxReply.Hresult;
|
|
}
|
|
|
|
return reply;
|
|
}
|
|
|
|
private string ResolveSubscription()
|
|
{
|
|
if (!string.IsNullOrWhiteSpace(_options.SubscriptionExpression))
|
|
{
|
|
return _options.SubscriptionExpression;
|
|
}
|
|
|
|
if (!string.IsNullOrWhiteSpace(_options.DefaultArea))
|
|
{
|
|
return $@"\\{Environment.MachineName}\Galaxy!{_options.DefaultArea}";
|
|
}
|
|
|
|
return string.Empty;
|
|
}
|
|
|
|
private static MxCommand? BuildAcknowledgeCommand(AcknowledgeAlarmRequest request, out string? parseError)
|
|
{
|
|
parseError = null;
|
|
if (string.IsNullOrWhiteSpace(request.AlarmFullReference))
|
|
{
|
|
parseError = "alarm_full_reference is required.";
|
|
return null;
|
|
}
|
|
|
|
string comment = request.Comment ?? string.Empty;
|
|
string operatorUser = request.OperatorUser ?? string.Empty;
|
|
|
|
if (Guid.TryParse(request.AlarmFullReference, out Guid guid))
|
|
{
|
|
return new MxCommand
|
|
{
|
|
Kind = MxCommandKind.AcknowledgeAlarm,
|
|
AcknowledgeAlarmCommand = new AcknowledgeAlarmCommand
|
|
{
|
|
AlarmGuid = guid.ToString(),
|
|
Comment = comment,
|
|
OperatorUser = operatorUser,
|
|
OperatorNode = string.Empty,
|
|
OperatorDomain = string.Empty,
|
|
OperatorFullName = string.Empty,
|
|
},
|
|
};
|
|
}
|
|
|
|
if (TryParseAlarmReference(request.AlarmFullReference, out string provider, out string group, out string alarm))
|
|
{
|
|
return new MxCommand
|
|
{
|
|
Kind = MxCommandKind.AcknowledgeAlarmByName,
|
|
AcknowledgeAlarmByNameCommand = new AcknowledgeAlarmByNameCommand
|
|
{
|
|
AlarmName = alarm,
|
|
ProviderName = provider,
|
|
GroupName = group,
|
|
Comment = comment,
|
|
OperatorUser = operatorUser,
|
|
OperatorNode = string.Empty,
|
|
OperatorDomain = string.Empty,
|
|
OperatorFullName = string.Empty,
|
|
},
|
|
};
|
|
}
|
|
|
|
parseError = "alarm_full_reference must be a canonical GUID or 'Provider!Group.Tag' format.";
|
|
return null;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Parses an alarm reference of the form <c>Provider!Group.Tag</c>: the
|
|
/// first <c>!</c> splits provider from <c>Group.Tag</c>; the first
|
|
/// <c>.</c> after the <c>!</c> splits group from tag.
|
|
/// </summary>
|
|
/// <param name="reference">The full alarm reference.</param>
|
|
/// <param name="providerName">The parsed provider.</param>
|
|
/// <param name="groupName">The parsed group/area.</param>
|
|
/// <param name="alarmName">The parsed tag/alarm name.</param>
|
|
/// <returns>true on a well-formed reference; otherwise false.</returns>
|
|
public static bool TryParseAlarmReference(
|
|
string? reference,
|
|
out string providerName,
|
|
out string groupName,
|
|
out string alarmName)
|
|
{
|
|
providerName = string.Empty;
|
|
groupName = string.Empty;
|
|
alarmName = string.Empty;
|
|
if (string.IsNullOrWhiteSpace(reference))
|
|
{
|
|
return false;
|
|
}
|
|
|
|
int bang = reference!.IndexOf('!', StringComparison.Ordinal);
|
|
if (bang <= 0 || bang == reference.Length - 1)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
string left = reference[..bang];
|
|
string right = reference[(bang + 1)..];
|
|
int dot = right.IndexOf('.', StringComparison.Ordinal);
|
|
if (dot <= 0 || dot == right.Length - 1)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
providerName = left;
|
|
groupName = right[..dot];
|
|
alarmName = right[(dot + 1)..];
|
|
return true;
|
|
}
|
|
|
|
private static ActiveAlarmSnapshot SnapshotFromTransition(OnAlarmTransitionEvent transition)
|
|
{
|
|
ActiveAlarmSnapshot snapshot = new()
|
|
{
|
|
AlarmFullReference = transition.AlarmFullReference,
|
|
SourceObjectReference = transition.SourceObjectReference,
|
|
AlarmTypeName = transition.AlarmTypeName,
|
|
Severity = transition.Severity,
|
|
CurrentState = transition.TransitionKind == AlarmTransitionKind.Acknowledge
|
|
? AlarmConditionState.ActiveAcked
|
|
: AlarmConditionState.Active,
|
|
Category = transition.Category,
|
|
Description = transition.Description,
|
|
OperatorUser = transition.OperatorUser,
|
|
OperatorComment = transition.OperatorComment,
|
|
Degraded = transition.Degraded,
|
|
SourceProvider = transition.SourceProvider,
|
|
};
|
|
if (transition.OriginalRaiseTimestamp is not null)
|
|
{
|
|
snapshot.OriginalRaiseTimestamp = transition.OriginalRaiseTimestamp;
|
|
}
|
|
if (transition.TransitionTimestamp is not null)
|
|
{
|
|
snapshot.LastTransitionTimestamp = transition.TransitionTimestamp;
|
|
}
|
|
if (transition.CurrentValue is not null)
|
|
{
|
|
snapshot.CurrentValue = transition.CurrentValue;
|
|
}
|
|
if (transition.LimitValue is not null)
|
|
{
|
|
snapshot.LimitValue = transition.LimitValue;
|
|
}
|
|
|
|
return snapshot;
|
|
}
|
|
|
|
private static OnAlarmTransitionEvent TransitionFromSnapshot(
|
|
ActiveAlarmSnapshot snapshot,
|
|
AlarmTransitionKind kind)
|
|
{
|
|
OnAlarmTransitionEvent transition = new()
|
|
{
|
|
AlarmFullReference = snapshot.AlarmFullReference,
|
|
SourceObjectReference = snapshot.SourceObjectReference,
|
|
AlarmTypeName = snapshot.AlarmTypeName,
|
|
TransitionKind = kind,
|
|
Severity = snapshot.Severity,
|
|
Category = snapshot.Category,
|
|
Description = snapshot.Description,
|
|
OperatorUser = snapshot.OperatorUser,
|
|
OperatorComment = snapshot.OperatorComment,
|
|
Degraded = snapshot.Degraded,
|
|
SourceProvider = snapshot.SourceProvider,
|
|
};
|
|
if (snapshot.OriginalRaiseTimestamp is not null)
|
|
{
|
|
transition.OriginalRaiseTimestamp = snapshot.OriginalRaiseTimestamp;
|
|
}
|
|
if (snapshot.LastTransitionTimestamp is not null)
|
|
{
|
|
transition.TransitionTimestamp = snapshot.LastTransitionTimestamp;
|
|
}
|
|
if (snapshot.CurrentValue is not null)
|
|
{
|
|
transition.CurrentValue = snapshot.CurrentValue;
|
|
}
|
|
if (snapshot.LimitValue is not null)
|
|
{
|
|
transition.LimitValue = snapshot.LimitValue;
|
|
}
|
|
|
|
return transition;
|
|
}
|
|
|
|
private sealed class Subscriber(Channel<AlarmFeedMessage> channel, string prefix)
|
|
{
|
|
/// <summary>Gets the channel for publishing alarm messages to this subscriber.</summary>
|
|
public Channel<AlarmFeedMessage> Channel { get; } = channel;
|
|
|
|
/// <summary>Determines whether the alarm reference matches this subscriber's filter.</summary>
|
|
/// <param name="reference">The alarm reference to match.</param>
|
|
public bool Matches(string reference)
|
|
{
|
|
return prefix.Length == 0 || reference.StartsWith(prefix, StringComparison.Ordinal);
|
|
}
|
|
}
|
|
}
|