using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Threading.Channels; using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Server.Alarms; using ZB.MOM.WW.MxGateway.Server.Configuration; using ZB.MOM.WW.MxGateway.Server.Metrics; using ZB.MOM.WW.MxGateway.Server.Sessions; namespace ZB.MOM.WW.MxGateway.Tests.Alarms; /// /// Drives a single instance through the full /// alarm-provider failover/failback lifecycle and asserts the live feed reflects /// each stage. This complements the per-aspect tests in /// GatewayAlarmMonitorProviderModeTests with one cohesive scenario: /// subscribe (watch-list + forced mode) -> baseline alarmmgr status -> alarmmgr /// transition -> failover to subtag (degraded) -> subtag transition -> failback /// to alarmmgr (recovered). /// /// /// The minimal session-manager / watch-list-resolver harness here is replicated /// (not shared) from the sibling GatewayAlarmMonitorProviderModeTests. The /// sibling's harness is a private nested type, and the task forbids changing that /// test's behaviour; replicating the few members this scenario needs keeps the /// sibling completely untouched and this file self-contained, at the cost of a /// small amount of duplication. /// public sealed class AlarmFailoverEndToEndTests { private static readonly TimeSpan WaitTimeout = TimeSpan.FromSeconds(30); [Fact] public async Task ProviderFailoverAndFailback_FullLifecycle_ReflectedInFeed() { using GatewayMetrics metrics = new(); FakeSessionManager sessions = new(); // Watch-list-bearing, Auto config: a non-empty resolved watch-list with the // default ("Auto" -> Unspecified) forced mode, so step 1 can assert both the // resolved watch-list and the forced mode/failover the SubscribeAlarms carries. StubWatchListResolver resolver = new( [ new AlarmSubtagTarget { AlarmFullReference = "Galaxy!Area.Tank01.Hi", ActiveSubtag = "Tank01.Hi.active" }, new AlarmSubtagTarget { AlarmFullReference = "Galaxy!Area.Tank02.Lo", ActiveSubtag = "Tank02.Lo.active" }, ]); AlarmsOptions options = new() { Enabled = true, SubscriptionExpression = @"\\NODE\Galaxy!Area", Fallback = new AlarmFallbackOptions { Mode = "Auto", ConsecutiveFailureThreshold = 3, FailbackProbeIntervalSeconds = 9, FailbackStableProbes = 2, }, }; using GatewayAlarmMonitor monitor = new( sessions, resolver, metrics, Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }), NullLogger.Instance); using CancellationTokenSource cts = new(); await monitor.StartAsync(cts.Token); await sessions.WaitForSubscribeAsync(WaitTimeout); // --- Step 1: SubscribeAlarms carries the resolved watch-list + forced mode/failover. --- SubscribeAlarmsCommand sent = Assert.IsType(sessions.LastSubscribeCommand); Assert.Equal(AlarmProviderMode.Unspecified, sent.ForcedMode); // "Auto" Assert.Equal(3, sent.Failover.ConsecutiveFailureThreshold); Assert.Equal(9, sent.Failover.FailbackProbeIntervalSeconds); Assert.Equal(2, sent.Failover.FailbackStableProbes); Assert.NotEmpty(sent.WatchList); Assert.Equal(2, sent.WatchList.Count); Assert.Contains(sent.WatchList, t => t.AlarmFullReference == "Galaxy!Area.Tank01.Hi"); // Live feed reader collecting every message in order. List received = []; TaskCompletionSource baselineReceived = new(TaskCreationOptions.RunContinuationsAsynchronously); using CancellationTokenSource streamCts = new(); Task reader = Task.Run(async () => { try { await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, streamCts.Token)) { lock (received) { received.Add(message); if (received.Count == 1) { // The first message is the baseline ProviderStatus. baselineReceived.TrySetResult(); } } } } catch (OperationCanceledException) { // Expected when the test cancels the stream. } }); // --- Step 2: first message is ProviderStatus{Alarmmgr, Degraded=false}. --- await baselineReceived.Task.WaitAsync(WaitTimeout); AlarmFeedMessage baseline; lock (received) { baseline = received[0]; } Assert.Equal(AlarmFeedMessage.PayloadOneofCase.ProviderStatus, baseline.PayloadCase); Assert.Equal(AlarmProviderMode.Alarmmgr, baseline.ProviderStatus.Mode); Assert.False(baseline.ProviderStatus.Degraded); // --- Step 3: alarmmgr-style transition (Degraded=false, SourceProvider=Alarmmgr, Raise). --- sessions.EmitEvent(new MxEvent { OnAlarmTransition = new OnAlarmTransitionEvent { AlarmFullReference = "Galaxy!Area.Tank01.Hi", SourceObjectReference = "Tank01", AlarmTypeName = "AnalogLimitAlarm.Hi", TransitionKind = AlarmTransitionKind.Raise, Severity = 500, Degraded = false, SourceProvider = AlarmProviderMode.Alarmmgr, TransitionTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), }, }); AlarmFeedMessage alarmmgrTransition = await WaitForAsync( received, m => m.PayloadCase == AlarmFeedMessage.PayloadOneofCase.Transition && m.Transition.AlarmFullReference == "Galaxy!Area.Tank01.Hi" && m.Transition.SourceProvider == AlarmProviderMode.Alarmmgr, WaitTimeout); Assert.Equal(AlarmTransitionKind.Raise, alarmmgrTransition.Transition.TransitionKind); Assert.False(alarmmgrTransition.Transition.Degraded); Assert.Equal(AlarmProviderMode.Alarmmgr, alarmmgrTransition.Transition.SourceProvider); // --- Step 4: failover to subtag -> ProviderStatus{Subtag, Degraded=true}. --- sessions.EmitEvent(new MxEvent { OnAlarmProviderModeChanged = new OnAlarmProviderModeChangedEvent { Mode = AlarmProviderMode.Subtag, Reason = "alarmmgr failed", Hresult = unchecked((int)0x80004005), At = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), }, }); AlarmFeedMessage degraded = await WaitForAsync( received, m => m.PayloadCase == AlarmFeedMessage.PayloadOneofCase.ProviderStatus && m.ProviderStatus.Mode == AlarmProviderMode.Subtag, WaitTimeout); Assert.Equal(AlarmProviderMode.Subtag, degraded.ProviderStatus.Mode); Assert.True(degraded.ProviderStatus.Degraded); Assert.Equal("alarmmgr failed", degraded.ProviderStatus.Reason); // --- Step 5: subtag-style transition (Degraded=true, SourceProvider=Subtag, Raise on a different ref). --- sessions.EmitEvent(new MxEvent { OnAlarmTransition = new OnAlarmTransitionEvent { AlarmFullReference = "Galaxy!Area.Tank02.Lo", SourceObjectReference = "Tank02", AlarmTypeName = "AnalogLimitAlarm.Lo", TransitionKind = AlarmTransitionKind.Raise, Severity = 250, Degraded = true, SourceProvider = AlarmProviderMode.Subtag, TransitionTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), }, }); AlarmFeedMessage subtagTransition = await WaitForAsync( received, m => m.PayloadCase == AlarmFeedMessage.PayloadOneofCase.Transition && m.Transition.AlarmFullReference == "Galaxy!Area.Tank02.Lo" && m.Transition.SourceProvider == AlarmProviderMode.Subtag, WaitTimeout); Assert.Equal(AlarmTransitionKind.Raise, subtagTransition.Transition.TransitionKind); Assert.True(subtagTransition.Transition.Degraded); Assert.Equal(AlarmProviderMode.Subtag, subtagTransition.Transition.SourceProvider); // --- Step 6: failback to alarmmgr -> ProviderStatus{Alarmmgr, Degraded=false} (recovery). --- sessions.EmitEvent(new MxEvent { OnAlarmProviderModeChanged = new OnAlarmProviderModeChangedEvent { Mode = AlarmProviderMode.Alarmmgr, Reason = "recovered", Hresult = 0, At = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), }, }); // Match the recovery status specifically: an Alarmmgr ProviderStatus that // carries the "recovered" reason, distinguishing it from the baseline at [0]. AlarmFeedMessage recovered = await WaitForAsync( received, m => m.PayloadCase == AlarmFeedMessage.PayloadOneofCase.ProviderStatus && m.ProviderStatus.Mode == AlarmProviderMode.Alarmmgr && m.ProviderStatus.Reason == "recovered", WaitTimeout); Assert.Equal(AlarmProviderMode.Alarmmgr, recovered.ProviderStatus.Mode); Assert.False(recovered.ProviderStatus.Degraded); Assert.Equal("recovered", recovered.ProviderStatus.Reason); await streamCts.CancelAsync(); await reader; await cts.CancelAsync(); await monitor.StopAsync(CancellationToken.None); } [Fact] public async Task DegradedTransition_CachedThenReplayed_CarriesDegradedAndSourceProviderToNewSubscriber() { using GatewayMetrics metrics = new(); FakeSessionManager sessions = new(); StubWatchListResolver resolver = new([]); AlarmsOptions options = new() { Enabled = true, SubscriptionExpression = @"\\NODE\Galaxy!Area", Fallback = new AlarmFallbackOptions { Mode = "Auto", ConsecutiveFailureThreshold = 3, FailbackProbeIntervalSeconds = 9, FailbackStableProbes = 2, }, }; using GatewayAlarmMonitor monitor = new( sessions, resolver, metrics, Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }), NullLogger.Instance); using CancellationTokenSource cts = new(); await monitor.StartAsync(cts.Token); await sessions.WaitForSubscribeAsync(WaitTimeout); // First subscriber: drive the feed past the baseline ProviderStatus so we // know the monitor's event loop is live before we emit the transition. List firstReader = []; TaskCompletionSource baselineReceived = new(TaskCreationOptions.RunContinuationsAsynchronously); using CancellationTokenSource firstStreamCts = new(); Task reader = Task.Run(async () => { try { await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, firstStreamCts.Token)) { lock (firstReader) { firstReader.Add(message); if (message.PayloadCase == AlarmFeedMessage.PayloadOneofCase.ProviderStatus) { baselineReceived.TrySetResult(); } } } } catch (OperationCanceledException) { // Expected when the test cancels the stream. } }); await baselineReceived.Task.WaitAsync(WaitTimeout); // Apply a degraded (subtag) transition. This lands in the monitor's cache // via SnapshotFromTransition, which must preserve Degraded/SourceProvider. sessions.EmitEvent(new MxEvent { OnAlarmTransition = new OnAlarmTransitionEvent { AlarmFullReference = "Galaxy!Area.Tank02.Lo", SourceObjectReference = "Tank02", AlarmTypeName = "AnalogLimitAlarm.Lo", TransitionKind = AlarmTransitionKind.Raise, Severity = 250, Degraded = true, SourceProvider = AlarmProviderMode.Subtag, TransitionTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), }, }); // Wait for the live transition to be observed by the first subscriber so we // know the cache has been updated before opening the new stream. await WaitForAsync( firstReader, m => m.PayloadCase == AlarmFeedMessage.PayloadOneofCase.Transition && m.Transition.AlarmFullReference == "Galaxy!Area.Tank02.Lo", WaitTimeout); // New subscriber: its initial cache snapshot must carry the degraded flags. // Bound the drain so a regression that never emits SnapshotComplete fails // with a clean TimeoutException (via cancellation) instead of hanging. using CancellationTokenSource newStreamCts = new(); using CancellationTokenSource drainTimeoutCts = new(); drainTimeoutCts.CancelAfter(WaitTimeout); using CancellationTokenSource linkedDrainCts = CancellationTokenSource.CreateLinkedTokenSource(newStreamCts.Token, drainTimeoutCts.Token); ActiveAlarmSnapshot? initialActiveAlarm = null; try { await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, linkedDrainCts.Token)) { if (message.PayloadCase == AlarmFeedMessage.PayloadOneofCase.ActiveAlarm && message.ActiveAlarm.AlarmFullReference == "Galaxy!Area.Tank02.Lo") { initialActiveAlarm = message.ActiveAlarm; } if (message.PayloadCase == AlarmFeedMessage.PayloadOneofCase.SnapshotComplete) { break; } } } catch (OperationCanceledException) when (drainTimeoutCts.IsCancellationRequested) { throw new TimeoutException( "The new subscriber did not receive a SnapshotComplete message within the timeout."); } Assert.NotNull(initialActiveAlarm); Assert.True(initialActiveAlarm!.Degraded); Assert.Equal(AlarmProviderMode.Subtag, initialActiveAlarm.SourceProvider); await newStreamCts.CancelAsync(); await firstStreamCts.CancelAsync(); await reader; await cts.CancelAsync(); await monitor.StopAsync(CancellationToken.None); } private static async Task WaitForAsync( List received, Func predicate, TimeSpan timeout) { DateTime deadline = DateTime.UtcNow + timeout; while (DateTime.UtcNow < deadline) { lock (received) { AlarmFeedMessage? match = received.FirstOrDefault(predicate); if (match is not null) { return match; } } await Task.Delay(25); } throw new TimeoutException("No matching AlarmFeedMessage was received in time."); } /// that returns a fixed watch-list. private sealed class StubWatchListResolver(IReadOnlyList targets) : IAlarmWatchListResolver { /// public Task> ResolveAsync( AlarmsOptions options, CancellationToken cancellationToken = default) => Task.FromResult(targets); } /// /// Minimal for driving the monitor: opens a /// constructed session, records the SubscribeAlarms command, replies OK to /// every command, and exposes a channel for pushing worker events. /// private sealed class FakeSessionManager : ISessionManager { private readonly Channel _events = Channel.CreateUnbounded(); private readonly TaskCompletionSource _subscribed = new(TaskCreationOptions.RunContinuationsAsynchronously); /// The most recent SubscribeAlarms command the monitor sent. public SubscribeAlarmsCommand? LastSubscribeCommand { get; private set; } /// Pushes a worker event onto the monitor's event stream. public void EmitEvent(MxEvent mxEvent) => _events.Writer.TryWrite(new WorkerEvent { Event = mxEvent }); /// Completes once the monitor has issued its SubscribeAlarms command. public Task WaitForSubscribeAsync(TimeSpan timeout) => _subscribed.Task.WaitAsync(timeout); /// public Task OpenSessionAsync( SessionOpenRequest request, string? clientIdentity, CancellationToken cancellationToken) { GatewaySession session = new( Guid.NewGuid().ToString("N"), "Galaxy", "pipe-test", "nonce-test", clientIdentity, null, null, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30), DateTimeOffset.UtcNow); return Task.FromResult(session); } /// public Task InvokeAsync( string sessionId, WorkerCommand command, CancellationToken cancellationToken) { if (command.Command?.Kind == MxCommandKind.SubscribeAlarms) { LastSubscribeCommand = command.Command.SubscribeAlarms; _subscribed.TrySetResult(); } MxCommandReply reply = new() { ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, }; if (command.Command?.Kind == MxCommandKind.QueryActiveAlarms) { reply.QueryActiveAlarms = new QueryActiveAlarmsReplyPayload(); } return Task.FromResult(new WorkerCommandReply { Reply = reply }); } /// public async IAsyncEnumerable ReadEventsAsync( string sessionId, [EnumeratorCancellation] CancellationToken cancellationToken) { await foreach (WorkerEvent workerEvent in _events.Reader.ReadAllAsync(cancellationToken)) { yield return workerEvent; } } /// public bool TryGetSession(string sessionId, [MaybeNullWhen(false)] out GatewaySession session) { session = null; return false; } /// public Task CloseSessionAsync(string sessionId, CancellationToken cancellationToken) { _events.Writer.TryComplete(); return Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false)); } /// public Task KillWorkerAsync(string sessionId, string reason, CancellationToken cancellationToken) => Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false)); /// public Task CloseExpiredLeasesAsync(DateTimeOffset now, CancellationToken cancellationToken) => Task.FromResult(0); /// public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask; } }