alarms: propagate degraded/source_provider through snapshot + gateway cache paths (integration fix I1/I2)

This commit is contained in:
Joseph Doherty
2026-06-13 10:53:55 -04:00
parent 2f30f0c7c0
commit ec88532fe4
4 changed files with 172 additions and 0 deletions
@@ -227,6 +227,118 @@ public sealed class AlarmFailoverEndToEndTests
await monitor.StopAsync(CancellationToken.None);
}
[Fact]
public async Task DegradedTransition_CachedThenReplayed_CarriesDegradedAndSourceProviderToNewSubscriber()
{
using GatewayMetrics metrics = new();
FakeSessionManager sessions = new();
StubWatchListResolver resolver = new([]);
AlarmsOptions options = new()
{
Enabled = true,
SubscriptionExpression = @"\\NODE\Galaxy!Area",
Fallback = new AlarmFallbackOptions
{
Mode = "Auto",
ConsecutiveFailureThreshold = 3,
FailbackProbeIntervalSeconds = 9,
FailbackStableProbes = 2,
},
};
using GatewayAlarmMonitor monitor = new(
sessions,
resolver,
metrics,
Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }),
NullLogger<GatewayAlarmMonitor>.Instance);
using CancellationTokenSource cts = new();
await monitor.StartAsync(cts.Token);
await sessions.WaitForSubscribeAsync(WaitTimeout);
// First subscriber: drive the feed past the baseline ProviderStatus so we
// know the monitor's event loop is live before we emit the transition.
List<AlarmFeedMessage> firstReader = [];
TaskCompletionSource baselineReceived = new(TaskCreationOptions.RunContinuationsAsynchronously);
using CancellationTokenSource firstStreamCts = new();
Task reader = Task.Run(async () =>
{
try
{
await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, firstStreamCts.Token))
{
lock (firstReader)
{
firstReader.Add(message);
if (message.PayloadCase == AlarmFeedMessage.PayloadOneofCase.ProviderStatus)
{
baselineReceived.TrySetResult();
}
}
}
}
catch (OperationCanceledException)
{
// Expected when the test cancels the stream.
}
});
await baselineReceived.Task.WaitAsync(WaitTimeout);
// Apply a degraded (subtag) transition. This lands in the monitor's cache
// via SnapshotFromTransition, which must preserve Degraded/SourceProvider.
sessions.EmitEvent(new MxEvent
{
OnAlarmTransition = new OnAlarmTransitionEvent
{
AlarmFullReference = "Galaxy!Area.Tank02.Lo",
SourceObjectReference = "Tank02",
AlarmTypeName = "AnalogLimitAlarm.Lo",
TransitionKind = AlarmTransitionKind.Raise,
Severity = 250,
Degraded = true,
SourceProvider = AlarmProviderMode.Subtag,
TransitionTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
},
});
// Wait for the live transition to be observed by the first subscriber so we
// know the cache has been updated before opening the new stream.
await WaitForAsync(
firstReader,
m => m.PayloadCase == AlarmFeedMessage.PayloadOneofCase.Transition
&& m.Transition.AlarmFullReference == "Galaxy!Area.Tank02.Lo",
WaitTimeout);
// New subscriber: its initial cache snapshot must carry the degraded flags.
using CancellationTokenSource newStreamCts = new();
ActiveAlarmSnapshot? initialActiveAlarm = null;
await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, newStreamCts.Token))
{
if (message.PayloadCase == AlarmFeedMessage.PayloadOneofCase.ActiveAlarm
&& message.ActiveAlarm.AlarmFullReference == "Galaxy!Area.Tank02.Lo")
{
initialActiveAlarm = message.ActiveAlarm;
}
if (message.PayloadCase == AlarmFeedMessage.PayloadOneofCase.SnapshotComplete)
{
break;
}
}
Assert.NotNull(initialActiveAlarm);
Assert.True(initialActiveAlarm!.Degraded);
Assert.Equal(AlarmProviderMode.Subtag, initialActiveAlarm.SourceProvider);
await newStreamCts.CancelAsync();
await firstStreamCts.CancelAsync();
await reader;
await cts.CancelAsync();
await monitor.StopAsync(CancellationToken.None);
}
private static async Task<AlarmFeedMessage> WaitForAsync(
List<AlarmFeedMessage> received,
Func<AlarmFeedMessage, bool> predicate,