using System.Diagnostics.CodeAnalysis; using System.Diagnostics.Metrics; using System.Runtime.CompilerServices; using System.Threading.Channels; using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Server.Alarms; using ZB.MOM.WW.MxGateway.Server.Configuration; using ZB.MOM.WW.MxGateway.Server.Metrics; using ZB.MOM.WW.MxGateway.Server.Sessions; namespace ZB.MOM.WW.MxGateway.Tests.Alarms; /// /// Drives with a fake session manager to /// verify it reflects the worker's OnAlarmProviderModeChanged event into /// the alarm feed and the switch metric, and that a new subscriber receives the /// provider status as its first message. Also covers the watch-list / forced-mode /// wiring of the SubscribeAlarms command and the Mode→enum mapping. /// public sealed class GatewayAlarmMonitorProviderModeTests { private static readonly TimeSpan WaitTimeout = TimeSpan.FromSeconds(15); [Fact] public async Task ProviderModeChange_BroadcastsDegradedStatus_AndIncrementsSwitchMetric() { using GatewayMetrics metrics = new(); long switchCount = 0; using MeterListener listener = new(); listener.InstrumentPublished = (instrument, meterListener) => { if (ReferenceEquals(instrument.Meter, metrics.Meter) && instrument.Name == "mxgateway.alarms.provider_switches") { meterListener.EnableMeasurementEvents(instrument); } }; listener.SetMeasurementEventCallback( (instrument, measurement, _, _) => { if (ReferenceEquals(instrument.Meter, metrics.Meter) && instrument.Name == "mxgateway.alarms.provider_switches") { Interlocked.Add(ref switchCount, measurement); } }); listener.Start(); FakeSessionManager sessions = new(); using GatewayAlarmMonitor monitor = CreateMonitor(sessions, metrics); using CancellationTokenSource cts = new(); await monitor.StartAsync(cts.Token); await sessions.WaitForSubscribeAsync(WaitTimeout); // Subscribe a live feed reader. Gate emitting the mode-change event until the // reader has consumed its baseline ProviderStatus message, avoiding a race where // the event arrives before the subscriber is registered and draining its snapshot. List received = []; TaskCompletionSource baselineReceived = new(TaskCreationOptions.RunContinuationsAsynchronously); using CancellationTokenSource streamCts = new(); Task reader = Task.Run(async () => { try { await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, streamCts.Token)) { lock (received) { received.Add(message); // Signal once the first message (baseline ProviderStatus) has arrived. if (received.Count == 1) { baselineReceived.TrySetResult(); } } } } catch (OperationCanceledException) { // Expected when the test cancels the stream. } }); // Wait for the baseline ProviderStatus to arrive before emitting the mode change, // so the subscriber is registered and the event is not dropped. await baselineReceived.Task.WaitAsync(WaitTimeout); // Emit the worker event that flips the provider into subtag mode. sessions.EmitEvent(new MxEvent { OnAlarmProviderModeChanged = new OnAlarmProviderModeChangedEvent { Mode = AlarmProviderMode.Subtag, Reason = "alarmmgr failed", Hresult = unchecked((int)0x80004005), At = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), }, }); AlarmFeedMessage degraded = await WaitForAsync( received, m => m.PayloadCase == AlarmFeedMessage.PayloadOneofCase.ProviderStatus && m.ProviderStatus.Mode == AlarmProviderMode.Subtag, WaitTimeout); Assert.Equal(AlarmProviderMode.Subtag, degraded.ProviderStatus.Mode); Assert.True(degraded.ProviderStatus.Degraded); Assert.Equal("alarmmgr failed", degraded.ProviderStatus.Reason); await WaitUntilAsync(() => Interlocked.Read(ref switchCount) >= 1, WaitTimeout); Assert.Equal(1, Interlocked.Read(ref switchCount)); await streamCts.CancelAsync(); await reader; await cts.CancelAsync(); await monitor.StopAsync(CancellationToken.None); } [Fact] public async Task NewSubscriber_ReceivesProviderStatusAsFirstMessage() { using GatewayMetrics metrics = new(); FakeSessionManager sessions = new(); using GatewayAlarmMonitor monitor = CreateMonitor(sessions, metrics); using CancellationTokenSource cts = new(); await monitor.StartAsync(cts.Token); await sessions.WaitForSubscribeAsync(WaitTimeout); using CancellationTokenSource streamCts = new(); AlarmFeedMessage? first = null; Task reader = Task.Run(async () => { await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, streamCts.Token)) { first = message; break; } }); await WaitUntilAsync(() => first is not null, WaitTimeout); Assert.NotNull(first); Assert.Equal(AlarmFeedMessage.PayloadOneofCase.ProviderStatus, first!.PayloadCase); // Baseline before any provider-mode event: alarm-manager, not degraded. Assert.Equal(AlarmProviderMode.Alarmmgr, first.ProviderStatus.Mode); Assert.False(first.ProviderStatus.Degraded); await streamCts.CancelAsync(); await reader; await cts.CancelAsync(); await monitor.StopAsync(CancellationToken.None); } [Fact] public async Task SubscribeAlarms_SendsForcedModeAndWatchList_FromConfiguration() { using GatewayMetrics metrics = new(); FakeSessionManager sessions = new(); StubWatchListResolver resolver = new( [ new AlarmSubtagTarget { AlarmFullReference = "Galaxy!Area.Tank01.Hi", ActiveSubtag = "Tank01.Hi.active" }, ]); AlarmsOptions options = new() { Enabled = true, SubscriptionExpression = @"\\NODE\Galaxy!Area", Fallback = new AlarmFallbackOptions { Mode = "ForceSubtag", ConsecutiveFailureThreshold = 7, FailbackProbeIntervalSeconds = 11, FailbackStableProbes = 4, }, }; using GatewayAlarmMonitor monitor = new( sessions, resolver, metrics, Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }), NullLogger.Instance); using CancellationTokenSource cts = new(); await monitor.StartAsync(cts.Token); await sessions.WaitForSubscribeAsync(WaitTimeout); SubscribeAlarmsCommand sent = Assert.IsType(sessions.LastSubscribeCommand); Assert.Equal(AlarmProviderMode.Subtag, sent.ForcedMode); Assert.Equal(7, sent.Failover.ConsecutiveFailureThreshold); Assert.Equal(11, sent.Failover.FailbackProbeIntervalSeconds); Assert.Equal(4, sent.Failover.FailbackStableProbes); AlarmSubtagTarget target = Assert.Single(sent.WatchList); Assert.Equal("Galaxy!Area.Tank01.Hi", target.AlarmFullReference); await cts.CancelAsync(); await monitor.StopAsync(CancellationToken.None); } [Theory] [InlineData("ForceAlarmManager", AlarmProviderMode.Alarmmgr)] [InlineData("forcealarmmanager", AlarmProviderMode.Alarmmgr)] [InlineData("ForceSubtag", AlarmProviderMode.Subtag)] [InlineData("forcesubtag", AlarmProviderMode.Subtag)] [InlineData("Auto", AlarmProviderMode.Unspecified)] [InlineData("", AlarmProviderMode.Unspecified)] [InlineData("nonsense", AlarmProviderMode.Unspecified)] public async Task ModeString_MapsToForcedProviderMode(string mode, AlarmProviderMode expected) { using GatewayMetrics metrics = new(); FakeSessionManager sessions = new(); AlarmsOptions options = new() { Enabled = true, SubscriptionExpression = @"\\NODE\Galaxy!Area", Fallback = new AlarmFallbackOptions { Mode = mode }, }; using GatewayAlarmMonitor monitor = new( sessions, new StubWatchListResolver([]), metrics, Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }), NullLogger.Instance); using CancellationTokenSource cts = new(); await monitor.StartAsync(cts.Token); await sessions.WaitForSubscribeAsync(WaitTimeout); Assert.Equal(expected, sessions.LastSubscribeCommand!.ForcedMode); // Auto + empty watch-list preserves historical alarmmgr-only behaviour. if (expected == AlarmProviderMode.Unspecified) { Assert.Empty(sessions.LastSubscribeCommand!.WatchList); } await cts.CancelAsync(); await monitor.StopAsync(CancellationToken.None); } private static GatewayAlarmMonitor CreateMonitor(FakeSessionManager sessions, GatewayMetrics metrics) { AlarmsOptions options = new() { Enabled = true, SubscriptionExpression = @"\\NODE\Galaxy!Area", }; return new GatewayAlarmMonitor( sessions, new StubWatchListResolver([]), metrics, Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }), NullLogger.Instance); } private static async Task WaitForAsync( List received, Func predicate, TimeSpan timeout) { DateTime deadline = DateTime.UtcNow + timeout; while (DateTime.UtcNow < deadline) { lock (received) { AlarmFeedMessage? match = received.FirstOrDefault(predicate); if (match is not null) { return match; } } await Task.Delay(25); } throw new TimeoutException("No matching AlarmFeedMessage was received in time."); } private static async Task WaitUntilAsync(Func condition, TimeSpan timeout) { DateTime deadline = DateTime.UtcNow + timeout; while (DateTime.UtcNow < deadline) { if (condition()) { return; } await Task.Delay(25); } throw new TimeoutException("Condition was not met in time."); } /// that returns a fixed watch-list. private sealed class StubWatchListResolver(IReadOnlyList targets) : IAlarmWatchListResolver { /// public Task> ResolveAsync( AlarmsOptions options, CancellationToken cancellationToken = default) => Task.FromResult(targets); } /// /// Minimal for driving the monitor: opens a /// constructed session, records the SubscribeAlarms command, replies OK to /// every command, and exposes a channel for pushing worker events. /// private sealed class FakeSessionManager : ISessionManager { private readonly Channel _events = Channel.CreateUnbounded(); private readonly TaskCompletionSource _subscribed = new(TaskCreationOptions.RunContinuationsAsynchronously); /// The most recent SubscribeAlarms command the monitor sent. public SubscribeAlarmsCommand? LastSubscribeCommand { get; private set; } /// Pushes a worker event onto the monitor's event stream. public void EmitEvent(MxEvent mxEvent) => _events.Writer.TryWrite(new WorkerEvent { Event = mxEvent }); /// Completes once the monitor has issued its SubscribeAlarms command. public Task WaitForSubscribeAsync(TimeSpan timeout) => _subscribed.Task.WaitAsync(timeout); /// public Task OpenSessionAsync( SessionOpenRequest request, string? clientIdentity, CancellationToken cancellationToken) { GatewaySession session = new( Guid.NewGuid().ToString("N"), "Galaxy", "pipe-test", "nonce-test", clientIdentity, null, null, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30), DateTimeOffset.UtcNow); return Task.FromResult(session); } /// public Task InvokeAsync( string sessionId, WorkerCommand command, CancellationToken cancellationToken) { if (command.Command?.Kind == MxCommandKind.SubscribeAlarms) { LastSubscribeCommand = command.Command.SubscribeAlarms; _subscribed.TrySetResult(); } MxCommandReply reply = new() { ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }; if (command.Command?.Kind == MxCommandKind.QueryActiveAlarms) { reply.QueryActiveAlarms = new QueryActiveAlarmsReplyPayload(); } return Task.FromResult(new WorkerCommandReply { Reply = reply }); } /// public async IAsyncEnumerable ReadEventsAsync( string sessionId, [EnumeratorCancellation] CancellationToken cancellationToken) { await foreach (WorkerEvent workerEvent in _events.Reader.ReadAllAsync(cancellationToken)) { yield return workerEvent; } } /// public bool TryGetSession(string sessionId, [MaybeNullWhen(false)] out GatewaySession session) { session = null; return false; } /// public Task CloseSessionAsync(string sessionId, CancellationToken cancellationToken) { _events.Writer.TryComplete(); return Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false)); } /// public Task KillWorkerAsync(string sessionId, string reason, CancellationToken cancellationToken) => Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false)); /// public Task CloseExpiredLeasesAsync(DateTimeOffset now, CancellationToken cancellationToken) => Task.FromResult(0); /// public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask; } }