From ec88532fe4a087978c645dc3ffe1ae34eae1be53 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 13 Jun 2026 10:53:55 -0400 Subject: [PATCH] alarms: propagate degraded/source_provider through snapshot + gateway cache paths (integration fix I1/I2) --- .../Alarms/GatewayAlarmMonitor.cs | 4 + .../Alarms/AlarmFailoverEndToEndTests.cs | 112 ++++++++++++++++++ .../MxAccess/AlarmDispatcherTests.cs | 54 +++++++++ .../MxAccess/AlarmDispatcher.cs | 2 + 4 files changed, 172 insertions(+) diff --git a/src/ZB.MOM.WW.MxGateway.Server/Alarms/GatewayAlarmMonitor.cs b/src/ZB.MOM.WW.MxGateway.Server/Alarms/GatewayAlarmMonitor.cs index d48e330..fdbfc89 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Alarms/GatewayAlarmMonitor.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Alarms/GatewayAlarmMonitor.cs @@ -804,6 +804,8 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic Description = transition.Description, OperatorUser = transition.OperatorUser, OperatorComment = transition.OperatorComment, + Degraded = transition.Degraded, + SourceProvider = transition.SourceProvider, }; if (transition.OriginalRaiseTimestamp is not null) { @@ -840,6 +842,8 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic Description = snapshot.Description, OperatorUser = snapshot.OperatorUser, OperatorComment = snapshot.OperatorComment, + Degraded = snapshot.Degraded, + SourceProvider = snapshot.SourceProvider, }; if (snapshot.OriginalRaiseTimestamp is not null) { diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Alarms/AlarmFailoverEndToEndTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Alarms/AlarmFailoverEndToEndTests.cs index 8a1eaca..a6f8c41 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Alarms/AlarmFailoverEndToEndTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Alarms/AlarmFailoverEndToEndTests.cs @@ -227,6 +227,118 @@ public sealed class AlarmFailoverEndToEndTests await monitor.StopAsync(CancellationToken.None); } + [Fact] + public async Task DegradedTransition_CachedThenReplayed_CarriesDegradedAndSourceProviderToNewSubscriber() + { + using GatewayMetrics metrics = new(); + FakeSessionManager sessions = new(); + StubWatchListResolver resolver = new([]); + + AlarmsOptions options = new() + { + Enabled = true, + SubscriptionExpression = @"\\NODE\Galaxy!Area", + Fallback = new AlarmFallbackOptions + { + Mode = "Auto", + ConsecutiveFailureThreshold = 3, + FailbackProbeIntervalSeconds = 9, + FailbackStableProbes = 2, + }, + }; + + using GatewayAlarmMonitor monitor = new( + sessions, + resolver, + metrics, + Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }), + NullLogger.Instance); + + using CancellationTokenSource cts = new(); + await monitor.StartAsync(cts.Token); + await sessions.WaitForSubscribeAsync(WaitTimeout); + + // First subscriber: drive the feed past the baseline ProviderStatus so we + // know the monitor's event loop is live before we emit the transition. + List firstReader = []; + TaskCompletionSource baselineReceived = new(TaskCreationOptions.RunContinuationsAsynchronously); + using CancellationTokenSource firstStreamCts = new(); + Task reader = Task.Run(async () => + { + try + { + await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, firstStreamCts.Token)) + { + lock (firstReader) + { + firstReader.Add(message); + if (message.PayloadCase == AlarmFeedMessage.PayloadOneofCase.ProviderStatus) + { + baselineReceived.TrySetResult(); + } + } + } + } + catch (OperationCanceledException) + { + // Expected when the test cancels the stream. + } + }); + await baselineReceived.Task.WaitAsync(WaitTimeout); + + // Apply a degraded (subtag) transition. This lands in the monitor's cache + // via SnapshotFromTransition, which must preserve Degraded/SourceProvider. + sessions.EmitEvent(new MxEvent + { + OnAlarmTransition = new OnAlarmTransitionEvent + { + AlarmFullReference = "Galaxy!Area.Tank02.Lo", + SourceObjectReference = "Tank02", + AlarmTypeName = "AnalogLimitAlarm.Lo", + TransitionKind = AlarmTransitionKind.Raise, + Severity = 250, + Degraded = true, + SourceProvider = AlarmProviderMode.Subtag, + TransitionTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }, + }); + + // Wait for the live transition to be observed by the first subscriber so we + // know the cache has been updated before opening the new stream. + await WaitForAsync( + firstReader, + m => m.PayloadCase == AlarmFeedMessage.PayloadOneofCase.Transition + && m.Transition.AlarmFullReference == "Galaxy!Area.Tank02.Lo", + WaitTimeout); + + // New subscriber: its initial cache snapshot must carry the degraded flags. + using CancellationTokenSource newStreamCts = new(); + ActiveAlarmSnapshot? initialActiveAlarm = null; + await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, newStreamCts.Token)) + { + if (message.PayloadCase == AlarmFeedMessage.PayloadOneofCase.ActiveAlarm + && message.ActiveAlarm.AlarmFullReference == "Galaxy!Area.Tank02.Lo") + { + initialActiveAlarm = message.ActiveAlarm; + } + + if (message.PayloadCase == AlarmFeedMessage.PayloadOneofCase.SnapshotComplete) + { + break; + } + } + + Assert.NotNull(initialActiveAlarm); + Assert.True(initialActiveAlarm!.Degraded); + Assert.Equal(AlarmProviderMode.Subtag, initialActiveAlarm.SourceProvider); + + await newStreamCts.CancelAsync(); + await firstStreamCts.CancelAsync(); + await reader; + await cts.CancelAsync(); + await monitor.StopAsync(CancellationToken.None); + } + private static async Task WaitForAsync( List received, Func predicate, diff --git a/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/AlarmDispatcherTests.cs b/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/AlarmDispatcherTests.cs index 3a07064..c30ac6f 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/AlarmDispatcherTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker.Tests/MxAccess/AlarmDispatcherTests.cs @@ -279,6 +279,60 @@ public sealed class AlarmDispatcherTests Assert.Equal(AlarmConditionState.ActiveAcked, snapshots[1].CurrentState); } + /// + /// Verifies that the per-record subtag-fallback flag flows through the + /// snapshot path: a degraded record maps to an + /// with + /// set and , while a non-degraded + /// record stays on the alarmmgr parity contract. + /// + [Fact] + public void SnapshotActiveAlarms_PropagatesDegradedAndSourceProvider() + { + FakeAlarmConsumer consumer = new FakeAlarmConsumer(); + DateTime ts = new DateTime(2026, 5, 1, 17, 26, 14, 709, DateTimeKind.Utc); + consumer.SnapshotResult = new[] + { + new MxAlarmSnapshotRecord + { + AlarmGuid = Guid.NewGuid(), + ProviderName = "Galaxy", + Group = "TestArea", + TagName = "Tag1", + Type = "DSC", + Priority = 500, + State = MxAlarmStateKind.UnackAlm, + TransitionTimestampUtc = ts, + Degraded = true, + }, + new MxAlarmSnapshotRecord + { + AlarmGuid = Guid.NewGuid(), + ProviderName = "Galaxy", + Group = "TestArea", + TagName = "Tag2", + Type = "ANL", + Priority = 100, + State = MxAlarmStateKind.UnackAlm, + TransitionTimestampUtc = ts, + Degraded = false, + }, + }; + using AlarmDispatcher dispatcher = new AlarmDispatcher( + consumer, + new MxAccessAlarmEventSink(new MxAccessEventQueue(), new MxAccessEventMapper()), + SessionId); + + IReadOnlyList snapshots = dispatcher.SnapshotActiveAlarms(); + Assert.Equal(2, snapshots.Count); + + Assert.True(snapshots[0].Degraded); + Assert.Equal(AlarmProviderMode.Subtag, snapshots[0].SourceProvider); + + Assert.False(snapshots[1].Degraded); + Assert.Equal(AlarmProviderMode.Alarmmgr, snapshots[1].SourceProvider); + } + /// Verifies that dispose unsubscribes the handler and disposes the consumer. [Fact] public void Dispose_WhenSubscribed_UnsubscribesHandlerAndDisposesConsumer() diff --git a/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/AlarmDispatcher.cs b/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/AlarmDispatcher.cs index 8a71c9b..feda915 100644 --- a/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/AlarmDispatcher.cs +++ b/src/ZB.MOM.WW.MxGateway.Worker/MxAccess/AlarmDispatcher.cs @@ -209,6 +209,8 @@ public sealed class AlarmDispatcher : IDisposable OperatorComment = record.AlarmComment, Category = record.Group, Description = string.Empty, + Degraded = record.Degraded, + SourceProvider = record.Degraded ? AlarmProviderMode.Subtag : AlarmProviderMode.Alarmmgr, }; if (record.TransitionTimestampUtc != DateTime.MinValue) {