server(alarms): monitor resolves watch-list, sends ForcedMode/failover, reflects provider mode into feed + metrics
This commit is contained in:
@@ -13,6 +13,7 @@ public static class AlarmsServiceCollectionExtensions
|
||||
/// <returns>The service collection for chaining.</returns>
|
||||
public static IServiceCollection AddGatewayAlarms(this IServiceCollection services)
|
||||
{
|
||||
services.AddSingleton<IAlarmWatchListResolver, AlarmWatchListResolver>();
|
||||
services.AddSingleton<GatewayAlarmMonitor>();
|
||||
services.AddSingleton<IGatewayAlarmService>(provider => provider.GetRequiredService<GatewayAlarmMonitor>());
|
||||
services.AddHostedService(provider => provider.GetRequiredService<GatewayAlarmMonitor>());
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
using System.Threading.Channels;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
||||
using ZB.MOM.WW.MxGateway.Server.Configuration;
|
||||
using ZB.MOM.WW.MxGateway.Server.Metrics;
|
||||
using ZB.MOM.WW.MxGateway.Server.Sessions;
|
||||
|
||||
namespace ZB.MOM.WW.MxGateway.Server.Alarms;
|
||||
@@ -23,6 +25,8 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
|
||||
private static readonly TimeSpan StartupGrace = TimeSpan.FromSeconds(2);
|
||||
|
||||
private readonly ISessionManager _sessionManager;
|
||||
private readonly IAlarmWatchListResolver _watchListResolver;
|
||||
private readonly GatewayMetrics _metrics;
|
||||
private readonly AlarmsOptions _options;
|
||||
private readonly ILogger<GatewayAlarmMonitor> _logger;
|
||||
|
||||
@@ -30,20 +34,34 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
|
||||
private readonly Dictionary<string, ActiveAlarmSnapshot> _alarms = new(StringComparer.Ordinal);
|
||||
private readonly List<Subscriber> _subscribers = [];
|
||||
|
||||
// Current provider status (mode + degraded + reason + since), guarded by _sync.
|
||||
// Initialized to the alarm-manager, not-degraded baseline so a late joiner sees
|
||||
// a sensible status even before any OnAlarmProviderModeChanged event arrives.
|
||||
private AlarmProviderMode _providerMode = AlarmProviderMode.Alarmmgr;
|
||||
private bool _providerDegraded;
|
||||
private string _providerReason = string.Empty;
|
||||
private DateTimeOffset _providerSince = DateTimeOffset.UtcNow;
|
||||
|
||||
private volatile GatewayAlarmMonitorState _state = GatewayAlarmMonitorState.Disabled;
|
||||
private volatile string? _lastError;
|
||||
private GatewaySession? _session;
|
||||
|
||||
/// <summary>Initializes the gateway alarm monitor.</summary>
|
||||
/// <param name="sessionManager">Gateway session manager.</param>
|
||||
/// <param name="watchListResolver">Resolver for the subtag-fallback watch-list.</param>
|
||||
/// <param name="metrics">Gateway metrics sink.</param>
|
||||
/// <param name="options">Gateway options carrying the alarm configuration.</param>
|
||||
/// <param name="logger">Diagnostic logger.</param>
|
||||
public GatewayAlarmMonitor(
|
||||
ISessionManager sessionManager,
|
||||
IAlarmWatchListResolver watchListResolver,
|
||||
GatewayMetrics metrics,
|
||||
IOptions<GatewayOptions> options,
|
||||
ILogger<GatewayAlarmMonitor> logger)
|
||||
{
|
||||
_sessionManager = sessionManager ?? throw new ArgumentNullException(nameof(sessionManager));
|
||||
_watchListResolver = watchListResolver ?? throw new ArgumentNullException(nameof(watchListResolver));
|
||||
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
|
||||
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value.Alarms;
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
@@ -139,6 +157,16 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
|
||||
private async Task RunMonitorAsync(string subscription, CancellationToken stoppingToken)
|
||||
{
|
||||
_state = GatewayAlarmMonitorState.Starting;
|
||||
lock (_sync)
|
||||
{
|
||||
// Re-baseline the provider status for this lifecycle so a restarted
|
||||
// monitor advertises alarm-manager/not-degraded until told otherwise.
|
||||
_providerMode = AlarmProviderMode.Alarmmgr;
|
||||
_providerDegraded = false;
|
||||
_providerReason = string.Empty;
|
||||
_providerSince = DateTimeOffset.UtcNow;
|
||||
}
|
||||
|
||||
GatewaySession session = await _sessionManager.OpenSessionAsync(
|
||||
new SessionOpenRequest(BackendName, MonitorClientName, Guid.NewGuid().ToString("N"), CommandTimeout: null),
|
||||
MonitorClientName,
|
||||
@@ -173,6 +201,15 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
|
||||
{
|
||||
ApplyTransition(mxEvent.OnAlarmTransition);
|
||||
}
|
||||
else if (mxEvent is { BodyCase: MxEvent.BodyOneofCase.OnAlarmProviderModeChanged }
|
||||
&& mxEvent.OnAlarmProviderModeChanged is not null)
|
||||
{
|
||||
await ApplyProviderModeChangeAsync(
|
||||
session.SessionId,
|
||||
mxEvent.OnAlarmProviderModeChanged,
|
||||
linked.Token)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
@@ -209,6 +246,29 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
|
||||
|
||||
private async Task SubscribeAlarmsAsync(string sessionId, string subscription, CancellationToken cancellationToken)
|
||||
{
|
||||
IReadOnlyList<AlarmSubtagTarget> watchList = await _watchListResolver
|
||||
.ResolveAsync(_options, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
AlarmProviderMode forcedMode = MapForcedMode(_options.Fallback.Mode);
|
||||
|
||||
// When the forced mode is Unspecified (the "Auto" case) and the resolved
|
||||
// watch-list is empty — the common alarmmgr-only deployment — the command
|
||||
// is identical-in-effect to the historical SubscribeAlarms (wnwrap only):
|
||||
// the worker builds the wnwrap consumer and no subtag watch-list.
|
||||
SubscribeAlarmsCommand command = new()
|
||||
{
|
||||
SubscriptionExpression = subscription,
|
||||
ForcedMode = forcedMode,
|
||||
Failover = new AlarmFailoverConfig
|
||||
{
|
||||
ConsecutiveFailureThreshold = _options.Fallback.ConsecutiveFailureThreshold,
|
||||
FailbackProbeIntervalSeconds = _options.Fallback.FailbackProbeIntervalSeconds,
|
||||
FailbackStableProbes = _options.Fallback.FailbackStableProbes,
|
||||
},
|
||||
};
|
||||
command.WatchList.AddRange(watchList);
|
||||
|
||||
WorkerCommandReply reply = await _sessionManager.InvokeAsync(
|
||||
sessionId,
|
||||
new WorkerCommand
|
||||
@@ -216,7 +276,7 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.SubscribeAlarms,
|
||||
SubscribeAlarms = new SubscribeAlarmsCommand { SubscriptionExpression = subscription },
|
||||
SubscribeAlarms = command,
|
||||
},
|
||||
},
|
||||
cancellationToken)
|
||||
@@ -310,6 +370,94 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
|
||||
}
|
||||
}
|
||||
|
||||
// Handles the worker's provider-mode-change event: updates the stored provider
|
||||
// status, broadcasts it to every subscriber (provider status is global, not
|
||||
// alarm-scoped), records the switch metric, and forces a cache reconcile so the
|
||||
// active-alarm set reflects whatever the new mode reports.
|
||||
private async Task ApplyProviderModeChangeAsync(
|
||||
string sessionId,
|
||||
OnAlarmProviderModeChangedEvent change,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
AlarmProviderMode toMode = change.Mode;
|
||||
string reason = change.Reason ?? string.Empty;
|
||||
|
||||
AlarmProviderStatus status;
|
||||
int fromModeInt;
|
||||
lock (_sync)
|
||||
{
|
||||
fromModeInt = ModeToInt(_providerMode);
|
||||
_providerMode = toMode;
|
||||
_providerDegraded = toMode == AlarmProviderMode.Subtag;
|
||||
_providerReason = reason;
|
||||
_providerSince = DateTimeOffset.UtcNow;
|
||||
status = BuildProviderStatus();
|
||||
BroadcastToAll(new AlarmFeedMessage { ProviderStatus = status });
|
||||
}
|
||||
|
||||
_metrics.AlarmProviderSwitched(fromModeInt, ModeToInt(toMode), reason);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Alarm provider mode changed to {Mode} (degraded={Degraded}): {Reason}",
|
||||
toMode,
|
||||
status.Degraded,
|
||||
reason);
|
||||
|
||||
try
|
||||
{
|
||||
await ReconcileAsync(sessionId, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
_logger.LogDebug(
|
||||
exception,
|
||||
"Reconcile after alarm provider mode change failed; keeping the current cache.");
|
||||
}
|
||||
}
|
||||
|
||||
// Caller holds _sync. Builds an AlarmProviderStatus snapshot of the current state.
|
||||
private AlarmProviderStatus BuildProviderStatus()
|
||||
{
|
||||
return new AlarmProviderStatus
|
||||
{
|
||||
Mode = _providerMode,
|
||||
Degraded = _providerDegraded,
|
||||
Reason = _providerReason,
|
||||
Since = Timestamp.FromDateTimeOffset(_providerSince),
|
||||
};
|
||||
}
|
||||
|
||||
// Maps the configured fallback mode string to the forced provider mode the
|
||||
// worker honours. Case-insensitive; anything other than the two force values
|
||||
// (including the default "Auto") yields Unspecified ("let the worker decide").
|
||||
private static AlarmProviderMode MapForcedMode(string? mode)
|
||||
{
|
||||
if (string.Equals(mode, "ForceAlarmManager", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return AlarmProviderMode.Alarmmgr;
|
||||
}
|
||||
|
||||
if (string.Equals(mode, "ForceSubtag", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return AlarmProviderMode.Subtag;
|
||||
}
|
||||
|
||||
return AlarmProviderMode.Unspecified;
|
||||
}
|
||||
|
||||
// Maps the provider-mode enum to the integer the metric expects
|
||||
// (alarmmgr=1, subtag=2, unknown/unspecified=0).
|
||||
private static int ModeToInt(AlarmProviderMode mode) => mode switch
|
||||
{
|
||||
AlarmProviderMode.Alarmmgr => 1,
|
||||
AlarmProviderMode.Subtag => 2,
|
||||
_ => 0,
|
||||
};
|
||||
|
||||
// Replaces the cache with the worker's authoritative snapshot, broadcasting
|
||||
// a synthetic transition for any alarm the live stream missed.
|
||||
private void ApplyReconcile(IEnumerable<ActiveAlarmSnapshot> snapshots)
|
||||
@@ -374,6 +522,23 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
|
||||
}
|
||||
}
|
||||
|
||||
// Caller holds _sync. Pushes a feed message to every subscriber regardless of
|
||||
// its alarm-filter prefix. Used for provider-status messages, which are global
|
||||
// rather than scoped to a single alarm reference.
|
||||
private void BroadcastToAll(AlarmFeedMessage message)
|
||||
{
|
||||
for (int index = _subscribers.Count - 1; index >= 0; index--)
|
||||
{
|
||||
Subscriber subscriber = _subscribers[index];
|
||||
if (!subscriber.Channel.Writer.TryWrite(message))
|
||||
{
|
||||
subscriber.Channel.Writer.TryComplete(new InvalidOperationException(
|
||||
"Alarm feed subscriber fell behind and was dropped; reconnect to re-snapshot."));
|
||||
_subscribers.RemoveAt(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void ClearCache()
|
||||
{
|
||||
lock (_sync)
|
||||
@@ -398,11 +563,14 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
|
||||
Subscriber subscriber = new(channel, prefix);
|
||||
|
||||
ActiveAlarmSnapshot[] snapshot;
|
||||
AlarmProviderStatus providerStatus;
|
||||
lock (_sync)
|
||||
{
|
||||
// Register before snapshotting under the same lock so no transition
|
||||
// can slip between the snapshot and the live stream.
|
||||
// Register before snapshotting under the same lock so neither a
|
||||
// transition nor a provider-mode change can slip between the snapshot
|
||||
// and the live stream.
|
||||
_subscribers.Add(subscriber);
|
||||
providerStatus = BuildProviderStatus();
|
||||
snapshot = _alarms.Values
|
||||
.Where(alarm => prefix.Length == 0
|
||||
|| alarm.AlarmFullReference.StartsWith(prefix, StringComparison.Ordinal))
|
||||
@@ -412,6 +580,10 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
|
||||
|
||||
try
|
||||
{
|
||||
// Emit the current provider status first so a late joiner immediately
|
||||
// learns the mode (and whether the feed is degraded) before any alarms.
|
||||
yield return new AlarmFeedMessage { ProviderStatus = providerStatus };
|
||||
|
||||
foreach (ActiveAlarmSnapshot alarm in snapshot)
|
||||
{
|
||||
yield return new AlarmFeedMessage { ActiveAlarm = alarm };
|
||||
|
||||
@@ -0,0 +1,398 @@
|
||||
using System.Diagnostics.CodeAnalysis;
|
||||
using System.Diagnostics.Metrics;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Threading.Channels;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
||||
using ZB.MOM.WW.MxGateway.Server.Alarms;
|
||||
using ZB.MOM.WW.MxGateway.Server.Configuration;
|
||||
using ZB.MOM.WW.MxGateway.Server.Metrics;
|
||||
using ZB.MOM.WW.MxGateway.Server.Sessions;
|
||||
|
||||
namespace ZB.MOM.WW.MxGateway.Tests.Alarms;
|
||||
|
||||
/// <summary>
|
||||
/// Drives <see cref="GatewayAlarmMonitor"/> with a fake session manager to
|
||||
/// verify it reflects the worker's <c>OnAlarmProviderModeChanged</c> event into
|
||||
/// the alarm feed and the switch metric, and that a new subscriber receives the
|
||||
/// provider status as its first message. Also covers the watch-list / forced-mode
|
||||
/// wiring of the <c>SubscribeAlarms</c> command and the Mode→enum mapping.
|
||||
/// </summary>
|
||||
public sealed class GatewayAlarmMonitorProviderModeTests
|
||||
{
|
||||
private static readonly TimeSpan WaitTimeout = TimeSpan.FromSeconds(15);
|
||||
|
||||
[Fact]
|
||||
public async Task ProviderModeChange_BroadcastsDegradedStatus_AndIncrementsSwitchMetric()
|
||||
{
|
||||
using GatewayMetrics metrics = new();
|
||||
long switchCount = 0;
|
||||
using MeterListener listener = new();
|
||||
listener.InstrumentPublished = (instrument, meterListener) =>
|
||||
{
|
||||
if (ReferenceEquals(instrument.Meter, metrics.Meter)
|
||||
&& instrument.Name == "mxgateway.alarms.provider_switches")
|
||||
{
|
||||
meterListener.EnableMeasurementEvents(instrument);
|
||||
}
|
||||
};
|
||||
listener.SetMeasurementEventCallback<long>(
|
||||
(instrument, measurement, _, _) =>
|
||||
{
|
||||
if (ReferenceEquals(instrument.Meter, metrics.Meter)
|
||||
&& instrument.Name == "mxgateway.alarms.provider_switches")
|
||||
{
|
||||
Interlocked.Add(ref switchCount, measurement);
|
||||
}
|
||||
});
|
||||
listener.Start();
|
||||
|
||||
FakeSessionManager sessions = new();
|
||||
using GatewayAlarmMonitor monitor = CreateMonitor(sessions, metrics);
|
||||
|
||||
using CancellationTokenSource cts = new();
|
||||
await monitor.StartAsync(cts.Token);
|
||||
await sessions.WaitForSubscribeAsync(WaitTimeout);
|
||||
|
||||
// Subscribe a live feed reader, drain its first (provider status) message.
|
||||
List<AlarmFeedMessage> received = [];
|
||||
using CancellationTokenSource streamCts = new();
|
||||
Task reader = Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, streamCts.Token))
|
||||
{
|
||||
lock (received) { received.Add(message); }
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Expected when the test cancels the stream.
|
||||
}
|
||||
});
|
||||
|
||||
// Emit the worker event that flips the provider into subtag mode.
|
||||
sessions.EmitEvent(new MxEvent
|
||||
{
|
||||
Family = MxEventFamily.OnAlarmProviderModeChanged,
|
||||
OnAlarmProviderModeChanged = new OnAlarmProviderModeChangedEvent
|
||||
{
|
||||
Mode = AlarmProviderMode.Subtag,
|
||||
Reason = "alarmmgr failed",
|
||||
Hresult = unchecked((int)0x80004005),
|
||||
At = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
|
||||
},
|
||||
});
|
||||
|
||||
AlarmFeedMessage degraded = await WaitForAsync(
|
||||
received,
|
||||
m => m.PayloadCase == AlarmFeedMessage.PayloadOneofCase.ProviderStatus
|
||||
&& m.ProviderStatus.Mode == AlarmProviderMode.Subtag,
|
||||
WaitTimeout);
|
||||
|
||||
Assert.Equal(AlarmProviderMode.Subtag, degraded.ProviderStatus.Mode);
|
||||
Assert.True(degraded.ProviderStatus.Degraded);
|
||||
Assert.Equal("alarmmgr failed", degraded.ProviderStatus.Reason);
|
||||
|
||||
await WaitUntilAsync(() => Interlocked.Read(ref switchCount) >= 1, WaitTimeout);
|
||||
Assert.Equal(1, Interlocked.Read(ref switchCount));
|
||||
|
||||
await streamCts.CancelAsync();
|
||||
await reader;
|
||||
await cts.CancelAsync();
|
||||
await monitor.StopAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task NewSubscriber_ReceivesProviderStatusAsFirstMessage()
|
||||
{
|
||||
using GatewayMetrics metrics = new();
|
||||
FakeSessionManager sessions = new();
|
||||
using GatewayAlarmMonitor monitor = CreateMonitor(sessions, metrics);
|
||||
|
||||
using CancellationTokenSource cts = new();
|
||||
await monitor.StartAsync(cts.Token);
|
||||
await sessions.WaitForSubscribeAsync(WaitTimeout);
|
||||
|
||||
using CancellationTokenSource streamCts = new();
|
||||
AlarmFeedMessage? first = null;
|
||||
Task reader = Task.Run(async () =>
|
||||
{
|
||||
await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, streamCts.Token))
|
||||
{
|
||||
first = message;
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
await WaitUntilAsync(() => first is not null, WaitTimeout);
|
||||
|
||||
Assert.NotNull(first);
|
||||
Assert.Equal(AlarmFeedMessage.PayloadOneofCase.ProviderStatus, first!.PayloadCase);
|
||||
// Baseline before any provider-mode event: alarm-manager, not degraded.
|
||||
Assert.Equal(AlarmProviderMode.Alarmmgr, first.ProviderStatus.Mode);
|
||||
Assert.False(first.ProviderStatus.Degraded);
|
||||
|
||||
await streamCts.CancelAsync();
|
||||
await reader;
|
||||
await cts.CancelAsync();
|
||||
await monitor.StopAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SubscribeAlarms_SendsForcedModeAndWatchList_FromConfiguration()
|
||||
{
|
||||
using GatewayMetrics metrics = new();
|
||||
FakeSessionManager sessions = new();
|
||||
StubWatchListResolver resolver = new(
|
||||
[
|
||||
new AlarmSubtagTarget { AlarmFullReference = "Galaxy!Area.Tank01.Hi", ActiveSubtag = "Tank01.Hi.active" },
|
||||
]);
|
||||
|
||||
AlarmsOptions options = new()
|
||||
{
|
||||
Enabled = true,
|
||||
SubscriptionExpression = @"\\NODE\Galaxy!Area",
|
||||
Fallback = new AlarmFallbackOptions
|
||||
{
|
||||
Mode = "ForceSubtag",
|
||||
ConsecutiveFailureThreshold = 7,
|
||||
FailbackProbeIntervalSeconds = 11,
|
||||
FailbackStableProbes = 4,
|
||||
},
|
||||
};
|
||||
|
||||
using GatewayAlarmMonitor monitor = new(
|
||||
sessions,
|
||||
resolver,
|
||||
metrics,
|
||||
Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }),
|
||||
NullLogger<GatewayAlarmMonitor>.Instance);
|
||||
|
||||
using CancellationTokenSource cts = new();
|
||||
await monitor.StartAsync(cts.Token);
|
||||
await sessions.WaitForSubscribeAsync(WaitTimeout);
|
||||
|
||||
SubscribeAlarmsCommand sent = Assert.IsType<SubscribeAlarmsCommand>(sessions.LastSubscribeCommand);
|
||||
Assert.Equal(AlarmProviderMode.Subtag, sent.ForcedMode);
|
||||
Assert.Equal(7, sent.Failover.ConsecutiveFailureThreshold);
|
||||
Assert.Equal(11, sent.Failover.FailbackProbeIntervalSeconds);
|
||||
Assert.Equal(4, sent.Failover.FailbackStableProbes);
|
||||
AlarmSubtagTarget target = Assert.Single(sent.WatchList);
|
||||
Assert.Equal("Galaxy!Area.Tank01.Hi", target.AlarmFullReference);
|
||||
|
||||
await cts.CancelAsync();
|
||||
await monitor.StopAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("ForceAlarmManager", AlarmProviderMode.Alarmmgr)]
|
||||
[InlineData("forcealarmmanager", AlarmProviderMode.Alarmmgr)]
|
||||
[InlineData("ForceSubtag", AlarmProviderMode.Subtag)]
|
||||
[InlineData("forcesubtag", AlarmProviderMode.Subtag)]
|
||||
[InlineData("Auto", AlarmProviderMode.Unspecified)]
|
||||
[InlineData("", AlarmProviderMode.Unspecified)]
|
||||
[InlineData("nonsense", AlarmProviderMode.Unspecified)]
|
||||
public async Task ModeString_MapsToForcedProviderMode(string mode, AlarmProviderMode expected)
|
||||
{
|
||||
using GatewayMetrics metrics = new();
|
||||
FakeSessionManager sessions = new();
|
||||
|
||||
AlarmsOptions options = new()
|
||||
{
|
||||
Enabled = true,
|
||||
SubscriptionExpression = @"\\NODE\Galaxy!Area",
|
||||
Fallback = new AlarmFallbackOptions { Mode = mode },
|
||||
};
|
||||
|
||||
using GatewayAlarmMonitor monitor = new(
|
||||
sessions,
|
||||
new StubWatchListResolver([]),
|
||||
metrics,
|
||||
Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }),
|
||||
NullLogger<GatewayAlarmMonitor>.Instance);
|
||||
|
||||
using CancellationTokenSource cts = new();
|
||||
await monitor.StartAsync(cts.Token);
|
||||
await sessions.WaitForSubscribeAsync(WaitTimeout);
|
||||
|
||||
Assert.Equal(expected, sessions.LastSubscribeCommand!.ForcedMode);
|
||||
// Auto + empty watch-list preserves historical alarmmgr-only behaviour.
|
||||
if (expected == AlarmProviderMode.Unspecified)
|
||||
{
|
||||
Assert.Empty(sessions.LastSubscribeCommand!.WatchList);
|
||||
}
|
||||
|
||||
await cts.CancelAsync();
|
||||
await monitor.StopAsync(CancellationToken.None);
|
||||
}
|
||||
|
||||
private static GatewayAlarmMonitor CreateMonitor(FakeSessionManager sessions, GatewayMetrics metrics)
|
||||
{
|
||||
AlarmsOptions options = new()
|
||||
{
|
||||
Enabled = true,
|
||||
SubscriptionExpression = @"\\NODE\Galaxy!Area",
|
||||
};
|
||||
return new GatewayAlarmMonitor(
|
||||
sessions,
|
||||
new StubWatchListResolver([]),
|
||||
metrics,
|
||||
Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }),
|
||||
NullLogger<GatewayAlarmMonitor>.Instance);
|
||||
}
|
||||
|
||||
private static async Task<AlarmFeedMessage> WaitForAsync(
|
||||
List<AlarmFeedMessage> received,
|
||||
Func<AlarmFeedMessage, bool> predicate,
|
||||
TimeSpan timeout)
|
||||
{
|
||||
DateTime deadline = DateTime.UtcNow + timeout;
|
||||
while (DateTime.UtcNow < deadline)
|
||||
{
|
||||
lock (received)
|
||||
{
|
||||
AlarmFeedMessage? match = received.FirstOrDefault(predicate);
|
||||
if (match is not null)
|
||||
{
|
||||
return match;
|
||||
}
|
||||
}
|
||||
|
||||
await Task.Delay(25);
|
||||
}
|
||||
|
||||
throw new TimeoutException("No matching AlarmFeedMessage was received in time.");
|
||||
}
|
||||
|
||||
private static async Task WaitUntilAsync(Func<bool> condition, TimeSpan timeout)
|
||||
{
|
||||
DateTime deadline = DateTime.UtcNow + timeout;
|
||||
while (DateTime.UtcNow < deadline)
|
||||
{
|
||||
if (condition())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await Task.Delay(25);
|
||||
}
|
||||
|
||||
throw new TimeoutException("Condition was not met in time.");
|
||||
}
|
||||
|
||||
/// <summary><see cref="IAlarmWatchListResolver"/> that returns a fixed watch-list.</summary>
|
||||
private sealed class StubWatchListResolver(IReadOnlyList<AlarmSubtagTarget> targets) : IAlarmWatchListResolver
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public Task<IReadOnlyList<AlarmSubtagTarget>> ResolveAsync(
|
||||
AlarmsOptions options,
|
||||
CancellationToken cancellationToken = default) => Task.FromResult(targets);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Minimal <see cref="ISessionManager"/> for driving the monitor: opens a
|
||||
/// constructed session, records the SubscribeAlarms command, replies OK to
|
||||
/// every command, and exposes a channel for pushing worker events.
|
||||
/// </summary>
|
||||
private sealed class FakeSessionManager : ISessionManager
|
||||
{
|
||||
private readonly Channel<WorkerEvent> _events = Channel.CreateUnbounded<WorkerEvent>();
|
||||
private readonly TaskCompletionSource _subscribed =
|
||||
new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
/// <summary>The most recent SubscribeAlarms command the monitor sent.</summary>
|
||||
public SubscribeAlarmsCommand? LastSubscribeCommand { get; private set; }
|
||||
|
||||
/// <summary>Pushes a worker event onto the monitor's event stream.</summary>
|
||||
public void EmitEvent(MxEvent mxEvent) =>
|
||||
_events.Writer.TryWrite(new WorkerEvent { Event = mxEvent });
|
||||
|
||||
/// <summary>Completes once the monitor has issued its SubscribeAlarms command.</summary>
|
||||
public Task WaitForSubscribeAsync(TimeSpan timeout) => _subscribed.Task.WaitAsync(timeout);
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<GatewaySession> OpenSessionAsync(
|
||||
SessionOpenRequest request,
|
||||
string? clientIdentity,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
GatewaySession session = new(
|
||||
Guid.NewGuid().ToString("N"),
|
||||
"Galaxy",
|
||||
"pipe-test",
|
||||
"nonce-test",
|
||||
clientIdentity,
|
||||
null,
|
||||
null,
|
||||
TimeSpan.FromSeconds(30),
|
||||
TimeSpan.FromSeconds(30),
|
||||
TimeSpan.FromSeconds(30),
|
||||
DateTimeOffset.UtcNow);
|
||||
return Task.FromResult(session);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<WorkerCommandReply> InvokeAsync(
|
||||
string sessionId,
|
||||
WorkerCommand command,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (command.Command?.Kind == MxCommandKind.SubscribeAlarms)
|
||||
{
|
||||
LastSubscribeCommand = command.Command.SubscribeAlarms;
|
||||
_subscribed.TrySetResult();
|
||||
}
|
||||
|
||||
MxCommandReply reply = new()
|
||||
{
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
};
|
||||
if (command.Command?.Kind == MxCommandKind.QueryActiveAlarms)
|
||||
{
|
||||
reply.QueryActiveAlarms = new QueryActiveAlarmsReplyPayload();
|
||||
}
|
||||
|
||||
return Task.FromResult(new WorkerCommandReply { Reply = reply });
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||
string sessionId,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
await foreach (WorkerEvent workerEvent in _events.Reader.ReadAllAsync(cancellationToken))
|
||||
{
|
||||
yield return workerEvent;
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public bool TryGetSession(string sessionId, [MaybeNullWhen(false)] out GatewaySession session)
|
||||
{
|
||||
session = null;
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<SessionCloseResult> CloseSessionAsync(string sessionId, CancellationToken cancellationToken)
|
||||
{
|
||||
_events.Writer.TryComplete();
|
||||
return Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<SessionCloseResult> KillWorkerAsync(string sessionId, string reason, CancellationToken cancellationToken) =>
|
||||
Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<int> CloseExpiredLeasesAsync(DateTimeOffset now, CancellationToken cancellationToken) =>
|
||||
Task.FromResult(0);
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user