diff --git a/src/ZB.MOM.WW.MxGateway.Server/Alarms/AlarmsServiceCollectionExtensions.cs b/src/ZB.MOM.WW.MxGateway.Server/Alarms/AlarmsServiceCollectionExtensions.cs
index 26b7e83..8050e42 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Alarms/AlarmsServiceCollectionExtensions.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Alarms/AlarmsServiceCollectionExtensions.cs
@@ -13,6 +13,7 @@ public static class AlarmsServiceCollectionExtensions
/// The service collection for chaining.
public static IServiceCollection AddGatewayAlarms(this IServiceCollection services)
{
+ services.AddSingleton();
services.AddSingleton();
services.AddSingleton(provider => provider.GetRequiredService());
services.AddHostedService(provider => provider.GetRequiredService());
diff --git a/src/ZB.MOM.WW.MxGateway.Server/Alarms/GatewayAlarmMonitor.cs b/src/ZB.MOM.WW.MxGateway.Server/Alarms/GatewayAlarmMonitor.cs
index 30461c9..c9ae21f 100644
--- a/src/ZB.MOM.WW.MxGateway.Server/Alarms/GatewayAlarmMonitor.cs
+++ b/src/ZB.MOM.WW.MxGateway.Server/Alarms/GatewayAlarmMonitor.cs
@@ -1,7 +1,9 @@
using System.Threading.Channels;
+using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Configuration;
+using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Sessions;
namespace ZB.MOM.WW.MxGateway.Server.Alarms;
@@ -23,6 +25,8 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
private static readonly TimeSpan StartupGrace = TimeSpan.FromSeconds(2);
private readonly ISessionManager _sessionManager;
+ private readonly IAlarmWatchListResolver _watchListResolver;
+ private readonly GatewayMetrics _metrics;
private readonly AlarmsOptions _options;
private readonly ILogger _logger;
@@ -30,20 +34,34 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
private readonly Dictionary _alarms = new(StringComparer.Ordinal);
private readonly List _subscribers = [];
+ // Current provider status (mode + degraded + reason + since), guarded by _sync.
+ // Initialized to the alarm-manager, not-degraded baseline so a late joiner sees
+ // a sensible status even before any OnAlarmProviderModeChanged event arrives.
+ private AlarmProviderMode _providerMode = AlarmProviderMode.Alarmmgr;
+ private bool _providerDegraded;
+ private string _providerReason = string.Empty;
+ private DateTimeOffset _providerSince = DateTimeOffset.UtcNow;
+
private volatile GatewayAlarmMonitorState _state = GatewayAlarmMonitorState.Disabled;
private volatile string? _lastError;
private GatewaySession? _session;
/// Initializes the gateway alarm monitor.
/// Gateway session manager.
+ /// Resolver for the subtag-fallback watch-list.
+ /// Gateway metrics sink.
/// Gateway options carrying the alarm configuration.
/// Diagnostic logger.
public GatewayAlarmMonitor(
ISessionManager sessionManager,
+ IAlarmWatchListResolver watchListResolver,
+ GatewayMetrics metrics,
IOptions options,
ILogger logger)
{
_sessionManager = sessionManager ?? throw new ArgumentNullException(nameof(sessionManager));
+ _watchListResolver = watchListResolver ?? throw new ArgumentNullException(nameof(watchListResolver));
+ _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value.Alarms;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
@@ -139,6 +157,16 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
private async Task RunMonitorAsync(string subscription, CancellationToken stoppingToken)
{
_state = GatewayAlarmMonitorState.Starting;
+ lock (_sync)
+ {
+ // Re-baseline the provider status for this lifecycle so a restarted
+ // monitor advertises alarm-manager/not-degraded until told otherwise.
+ _providerMode = AlarmProviderMode.Alarmmgr;
+ _providerDegraded = false;
+ _providerReason = string.Empty;
+ _providerSince = DateTimeOffset.UtcNow;
+ }
+
GatewaySession session = await _sessionManager.OpenSessionAsync(
new SessionOpenRequest(BackendName, MonitorClientName, Guid.NewGuid().ToString("N"), CommandTimeout: null),
MonitorClientName,
@@ -173,6 +201,15 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
{
ApplyTransition(mxEvent.OnAlarmTransition);
}
+ else if (mxEvent is { BodyCase: MxEvent.BodyOneofCase.OnAlarmProviderModeChanged }
+ && mxEvent.OnAlarmProviderModeChanged is not null)
+ {
+ await ApplyProviderModeChangeAsync(
+ session.SessionId,
+ mxEvent.OnAlarmProviderModeChanged,
+ linked.Token)
+ .ConfigureAwait(false);
+ }
}
}
finally
@@ -209,6 +246,29 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
private async Task SubscribeAlarmsAsync(string sessionId, string subscription, CancellationToken cancellationToken)
{
+ IReadOnlyList watchList = await _watchListResolver
+ .ResolveAsync(_options, cancellationToken)
+ .ConfigureAwait(false);
+
+ AlarmProviderMode forcedMode = MapForcedMode(_options.Fallback.Mode);
+
+ // When the forced mode is Unspecified (the "Auto" case) and the resolved
+ // watch-list is empty — the common alarmmgr-only deployment — the command
+ // is identical-in-effect to the historical SubscribeAlarms (wnwrap only):
+ // the worker builds the wnwrap consumer and no subtag watch-list.
+ SubscribeAlarmsCommand command = new()
+ {
+ SubscriptionExpression = subscription,
+ ForcedMode = forcedMode,
+ Failover = new AlarmFailoverConfig
+ {
+ ConsecutiveFailureThreshold = _options.Fallback.ConsecutiveFailureThreshold,
+ FailbackProbeIntervalSeconds = _options.Fallback.FailbackProbeIntervalSeconds,
+ FailbackStableProbes = _options.Fallback.FailbackStableProbes,
+ },
+ };
+ command.WatchList.AddRange(watchList);
+
WorkerCommandReply reply = await _sessionManager.InvokeAsync(
sessionId,
new WorkerCommand
@@ -216,7 +276,7 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
Command = new MxCommand
{
Kind = MxCommandKind.SubscribeAlarms,
- SubscribeAlarms = new SubscribeAlarmsCommand { SubscriptionExpression = subscription },
+ SubscribeAlarms = command,
},
},
cancellationToken)
@@ -310,6 +370,94 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
}
}
+ // Handles the worker's provider-mode-change event: updates the stored provider
+ // status, broadcasts it to every subscriber (provider status is global, not
+ // alarm-scoped), records the switch metric, and forces a cache reconcile so the
+ // active-alarm set reflects whatever the new mode reports.
+ private async Task ApplyProviderModeChangeAsync(
+ string sessionId,
+ OnAlarmProviderModeChangedEvent change,
+ CancellationToken cancellationToken)
+ {
+ AlarmProviderMode toMode = change.Mode;
+ string reason = change.Reason ?? string.Empty;
+
+ AlarmProviderStatus status;
+ int fromModeInt;
+ lock (_sync)
+ {
+ fromModeInt = ModeToInt(_providerMode);
+ _providerMode = toMode;
+ _providerDegraded = toMode == AlarmProviderMode.Subtag;
+ _providerReason = reason;
+ _providerSince = DateTimeOffset.UtcNow;
+ status = BuildProviderStatus();
+ BroadcastToAll(new AlarmFeedMessage { ProviderStatus = status });
+ }
+
+ _metrics.AlarmProviderSwitched(fromModeInt, ModeToInt(toMode), reason);
+
+ _logger.LogInformation(
+ "Alarm provider mode changed to {Mode} (degraded={Degraded}): {Reason}",
+ toMode,
+ status.Degraded,
+ reason);
+
+ try
+ {
+ await ReconcileAsync(sessionId, cancellationToken).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ throw;
+ }
+ catch (Exception exception)
+ {
+ _logger.LogDebug(
+ exception,
+ "Reconcile after alarm provider mode change failed; keeping the current cache.");
+ }
+ }
+
+ // Caller holds _sync. Builds an AlarmProviderStatus snapshot of the current state.
+ private AlarmProviderStatus BuildProviderStatus()
+ {
+ return new AlarmProviderStatus
+ {
+ Mode = _providerMode,
+ Degraded = _providerDegraded,
+ Reason = _providerReason,
+ Since = Timestamp.FromDateTimeOffset(_providerSince),
+ };
+ }
+
+ // Maps the configured fallback mode string to the forced provider mode the
+ // worker honours. Case-insensitive; anything other than the two force values
+ // (including the default "Auto") yields Unspecified ("let the worker decide").
+ private static AlarmProviderMode MapForcedMode(string? mode)
+ {
+ if (string.Equals(mode, "ForceAlarmManager", StringComparison.OrdinalIgnoreCase))
+ {
+ return AlarmProviderMode.Alarmmgr;
+ }
+
+ if (string.Equals(mode, "ForceSubtag", StringComparison.OrdinalIgnoreCase))
+ {
+ return AlarmProviderMode.Subtag;
+ }
+
+ return AlarmProviderMode.Unspecified;
+ }
+
+ // Maps the provider-mode enum to the integer the metric expects
+ // (alarmmgr=1, subtag=2, unknown/unspecified=0).
+ private static int ModeToInt(AlarmProviderMode mode) => mode switch
+ {
+ AlarmProviderMode.Alarmmgr => 1,
+ AlarmProviderMode.Subtag => 2,
+ _ => 0,
+ };
+
// Replaces the cache with the worker's authoritative snapshot, broadcasting
// a synthetic transition for any alarm the live stream missed.
private void ApplyReconcile(IEnumerable snapshots)
@@ -374,6 +522,23 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
}
}
+ // Caller holds _sync. Pushes a feed message to every subscriber regardless of
+ // its alarm-filter prefix. Used for provider-status messages, which are global
+ // rather than scoped to a single alarm reference.
+ private void BroadcastToAll(AlarmFeedMessage message)
+ {
+ for (int index = _subscribers.Count - 1; index >= 0; index--)
+ {
+ Subscriber subscriber = _subscribers[index];
+ if (!subscriber.Channel.Writer.TryWrite(message))
+ {
+ subscriber.Channel.Writer.TryComplete(new InvalidOperationException(
+ "Alarm feed subscriber fell behind and was dropped; reconnect to re-snapshot."));
+ _subscribers.RemoveAt(index);
+ }
+ }
+ }
+
private void ClearCache()
{
lock (_sync)
@@ -398,11 +563,14 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
Subscriber subscriber = new(channel, prefix);
ActiveAlarmSnapshot[] snapshot;
+ AlarmProviderStatus providerStatus;
lock (_sync)
{
- // Register before snapshotting under the same lock so no transition
- // can slip between the snapshot and the live stream.
+ // Register before snapshotting under the same lock so neither a
+ // transition nor a provider-mode change can slip between the snapshot
+ // and the live stream.
_subscribers.Add(subscriber);
+ providerStatus = BuildProviderStatus();
snapshot = _alarms.Values
.Where(alarm => prefix.Length == 0
|| alarm.AlarmFullReference.StartsWith(prefix, StringComparison.Ordinal))
@@ -412,6 +580,10 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic
try
{
+ // Emit the current provider status first so a late joiner immediately
+ // learns the mode (and whether the feed is degraded) before any alarms.
+ yield return new AlarmFeedMessage { ProviderStatus = providerStatus };
+
foreach (ActiveAlarmSnapshot alarm in snapshot)
{
yield return new AlarmFeedMessage { ActiveAlarm = alarm };
diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Alarms/GatewayAlarmMonitorProviderModeTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Alarms/GatewayAlarmMonitorProviderModeTests.cs
new file mode 100644
index 0000000..ca67e91
--- /dev/null
+++ b/src/ZB.MOM.WW.MxGateway.Tests/Alarms/GatewayAlarmMonitorProviderModeTests.cs
@@ -0,0 +1,398 @@
+using System.Diagnostics.CodeAnalysis;
+using System.Diagnostics.Metrics;
+using System.Runtime.CompilerServices;
+using System.Threading.Channels;
+using Google.Protobuf.WellKnownTypes;
+using Microsoft.Extensions.Logging.Abstractions;
+using Microsoft.Extensions.Options;
+using ZB.MOM.WW.MxGateway.Contracts.Proto;
+using ZB.MOM.WW.MxGateway.Server.Alarms;
+using ZB.MOM.WW.MxGateway.Server.Configuration;
+using ZB.MOM.WW.MxGateway.Server.Metrics;
+using ZB.MOM.WW.MxGateway.Server.Sessions;
+
+namespace ZB.MOM.WW.MxGateway.Tests.Alarms;
+
+///
+/// Drives with a fake session manager to
+/// verify it reflects the worker's OnAlarmProviderModeChanged event into
+/// the alarm feed and the switch metric, and that a new subscriber receives the
+/// provider status as its first message. Also covers the watch-list / forced-mode
+/// wiring of the SubscribeAlarms command and the Mode→enum mapping.
+///
+public sealed class GatewayAlarmMonitorProviderModeTests
+{
+ private static readonly TimeSpan WaitTimeout = TimeSpan.FromSeconds(15);
+
+ [Fact]
+ public async Task ProviderModeChange_BroadcastsDegradedStatus_AndIncrementsSwitchMetric()
+ {
+ using GatewayMetrics metrics = new();
+ long switchCount = 0;
+ using MeterListener listener = new();
+ listener.InstrumentPublished = (instrument, meterListener) =>
+ {
+ if (ReferenceEquals(instrument.Meter, metrics.Meter)
+ && instrument.Name == "mxgateway.alarms.provider_switches")
+ {
+ meterListener.EnableMeasurementEvents(instrument);
+ }
+ };
+ listener.SetMeasurementEventCallback(
+ (instrument, measurement, _, _) =>
+ {
+ if (ReferenceEquals(instrument.Meter, metrics.Meter)
+ && instrument.Name == "mxgateway.alarms.provider_switches")
+ {
+ Interlocked.Add(ref switchCount, measurement);
+ }
+ });
+ listener.Start();
+
+ FakeSessionManager sessions = new();
+ using GatewayAlarmMonitor monitor = CreateMonitor(sessions, metrics);
+
+ using CancellationTokenSource cts = new();
+ await monitor.StartAsync(cts.Token);
+ await sessions.WaitForSubscribeAsync(WaitTimeout);
+
+ // Subscribe a live feed reader, drain its first (provider status) message.
+ List received = [];
+ using CancellationTokenSource streamCts = new();
+ Task reader = Task.Run(async () =>
+ {
+ try
+ {
+ await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, streamCts.Token))
+ {
+ lock (received) { received.Add(message); }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ // Expected when the test cancels the stream.
+ }
+ });
+
+ // Emit the worker event that flips the provider into subtag mode.
+ sessions.EmitEvent(new MxEvent
+ {
+ Family = MxEventFamily.OnAlarmProviderModeChanged,
+ OnAlarmProviderModeChanged = new OnAlarmProviderModeChangedEvent
+ {
+ Mode = AlarmProviderMode.Subtag,
+ Reason = "alarmmgr failed",
+ Hresult = unchecked((int)0x80004005),
+ At = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
+ },
+ });
+
+ AlarmFeedMessage degraded = await WaitForAsync(
+ received,
+ m => m.PayloadCase == AlarmFeedMessage.PayloadOneofCase.ProviderStatus
+ && m.ProviderStatus.Mode == AlarmProviderMode.Subtag,
+ WaitTimeout);
+
+ Assert.Equal(AlarmProviderMode.Subtag, degraded.ProviderStatus.Mode);
+ Assert.True(degraded.ProviderStatus.Degraded);
+ Assert.Equal("alarmmgr failed", degraded.ProviderStatus.Reason);
+
+ await WaitUntilAsync(() => Interlocked.Read(ref switchCount) >= 1, WaitTimeout);
+ Assert.Equal(1, Interlocked.Read(ref switchCount));
+
+ await streamCts.CancelAsync();
+ await reader;
+ await cts.CancelAsync();
+ await monitor.StopAsync(CancellationToken.None);
+ }
+
+ [Fact]
+ public async Task NewSubscriber_ReceivesProviderStatusAsFirstMessage()
+ {
+ using GatewayMetrics metrics = new();
+ FakeSessionManager sessions = new();
+ using GatewayAlarmMonitor monitor = CreateMonitor(sessions, metrics);
+
+ using CancellationTokenSource cts = new();
+ await monitor.StartAsync(cts.Token);
+ await sessions.WaitForSubscribeAsync(WaitTimeout);
+
+ using CancellationTokenSource streamCts = new();
+ AlarmFeedMessage? first = null;
+ Task reader = Task.Run(async () =>
+ {
+ await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, streamCts.Token))
+ {
+ first = message;
+ break;
+ }
+ });
+
+ await WaitUntilAsync(() => first is not null, WaitTimeout);
+
+ Assert.NotNull(first);
+ Assert.Equal(AlarmFeedMessage.PayloadOneofCase.ProviderStatus, first!.PayloadCase);
+ // Baseline before any provider-mode event: alarm-manager, not degraded.
+ Assert.Equal(AlarmProviderMode.Alarmmgr, first.ProviderStatus.Mode);
+ Assert.False(first.ProviderStatus.Degraded);
+
+ await streamCts.CancelAsync();
+ await reader;
+ await cts.CancelAsync();
+ await monitor.StopAsync(CancellationToken.None);
+ }
+
+ [Fact]
+ public async Task SubscribeAlarms_SendsForcedModeAndWatchList_FromConfiguration()
+ {
+ using GatewayMetrics metrics = new();
+ FakeSessionManager sessions = new();
+ StubWatchListResolver resolver = new(
+ [
+ new AlarmSubtagTarget { AlarmFullReference = "Galaxy!Area.Tank01.Hi", ActiveSubtag = "Tank01.Hi.active" },
+ ]);
+
+ AlarmsOptions options = new()
+ {
+ Enabled = true,
+ SubscriptionExpression = @"\\NODE\Galaxy!Area",
+ Fallback = new AlarmFallbackOptions
+ {
+ Mode = "ForceSubtag",
+ ConsecutiveFailureThreshold = 7,
+ FailbackProbeIntervalSeconds = 11,
+ FailbackStableProbes = 4,
+ },
+ };
+
+ using GatewayAlarmMonitor monitor = new(
+ sessions,
+ resolver,
+ metrics,
+ Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }),
+ NullLogger.Instance);
+
+ using CancellationTokenSource cts = new();
+ await monitor.StartAsync(cts.Token);
+ await sessions.WaitForSubscribeAsync(WaitTimeout);
+
+ SubscribeAlarmsCommand sent = Assert.IsType(sessions.LastSubscribeCommand);
+ Assert.Equal(AlarmProviderMode.Subtag, sent.ForcedMode);
+ Assert.Equal(7, sent.Failover.ConsecutiveFailureThreshold);
+ Assert.Equal(11, sent.Failover.FailbackProbeIntervalSeconds);
+ Assert.Equal(4, sent.Failover.FailbackStableProbes);
+ AlarmSubtagTarget target = Assert.Single(sent.WatchList);
+ Assert.Equal("Galaxy!Area.Tank01.Hi", target.AlarmFullReference);
+
+ await cts.CancelAsync();
+ await monitor.StopAsync(CancellationToken.None);
+ }
+
+ [Theory]
+ [InlineData("ForceAlarmManager", AlarmProviderMode.Alarmmgr)]
+ [InlineData("forcealarmmanager", AlarmProviderMode.Alarmmgr)]
+ [InlineData("ForceSubtag", AlarmProviderMode.Subtag)]
+ [InlineData("forcesubtag", AlarmProviderMode.Subtag)]
+ [InlineData("Auto", AlarmProviderMode.Unspecified)]
+ [InlineData("", AlarmProviderMode.Unspecified)]
+ [InlineData("nonsense", AlarmProviderMode.Unspecified)]
+ public async Task ModeString_MapsToForcedProviderMode(string mode, AlarmProviderMode expected)
+ {
+ using GatewayMetrics metrics = new();
+ FakeSessionManager sessions = new();
+
+ AlarmsOptions options = new()
+ {
+ Enabled = true,
+ SubscriptionExpression = @"\\NODE\Galaxy!Area",
+ Fallback = new AlarmFallbackOptions { Mode = mode },
+ };
+
+ using GatewayAlarmMonitor monitor = new(
+ sessions,
+ new StubWatchListResolver([]),
+ metrics,
+ Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }),
+ NullLogger.Instance);
+
+ using CancellationTokenSource cts = new();
+ await monitor.StartAsync(cts.Token);
+ await sessions.WaitForSubscribeAsync(WaitTimeout);
+
+ Assert.Equal(expected, sessions.LastSubscribeCommand!.ForcedMode);
+ // Auto + empty watch-list preserves historical alarmmgr-only behaviour.
+ if (expected == AlarmProviderMode.Unspecified)
+ {
+ Assert.Empty(sessions.LastSubscribeCommand!.WatchList);
+ }
+
+ await cts.CancelAsync();
+ await monitor.StopAsync(CancellationToken.None);
+ }
+
+ private static GatewayAlarmMonitor CreateMonitor(FakeSessionManager sessions, GatewayMetrics metrics)
+ {
+ AlarmsOptions options = new()
+ {
+ Enabled = true,
+ SubscriptionExpression = @"\\NODE\Galaxy!Area",
+ };
+ return new GatewayAlarmMonitor(
+ sessions,
+ new StubWatchListResolver([]),
+ metrics,
+ Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }),
+ NullLogger.Instance);
+ }
+
+ private static async Task WaitForAsync(
+ List received,
+ Func predicate,
+ TimeSpan timeout)
+ {
+ DateTime deadline = DateTime.UtcNow + timeout;
+ while (DateTime.UtcNow < deadline)
+ {
+ lock (received)
+ {
+ AlarmFeedMessage? match = received.FirstOrDefault(predicate);
+ if (match is not null)
+ {
+ return match;
+ }
+ }
+
+ await Task.Delay(25);
+ }
+
+ throw new TimeoutException("No matching AlarmFeedMessage was received in time.");
+ }
+
+ private static async Task WaitUntilAsync(Func condition, TimeSpan timeout)
+ {
+ DateTime deadline = DateTime.UtcNow + timeout;
+ while (DateTime.UtcNow < deadline)
+ {
+ if (condition())
+ {
+ return;
+ }
+
+ await Task.Delay(25);
+ }
+
+ throw new TimeoutException("Condition was not met in time.");
+ }
+
+ /// that returns a fixed watch-list.
+ private sealed class StubWatchListResolver(IReadOnlyList targets) : IAlarmWatchListResolver
+ {
+ ///
+ public Task> ResolveAsync(
+ AlarmsOptions options,
+ CancellationToken cancellationToken = default) => Task.FromResult(targets);
+ }
+
+ ///
+ /// Minimal for driving the monitor: opens a
+ /// constructed session, records the SubscribeAlarms command, replies OK to
+ /// every command, and exposes a channel for pushing worker events.
+ ///
+ private sealed class FakeSessionManager : ISessionManager
+ {
+ private readonly Channel _events = Channel.CreateUnbounded();
+ private readonly TaskCompletionSource _subscribed =
+ new(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ /// The most recent SubscribeAlarms command the monitor sent.
+ public SubscribeAlarmsCommand? LastSubscribeCommand { get; private set; }
+
+ /// Pushes a worker event onto the monitor's event stream.
+ public void EmitEvent(MxEvent mxEvent) =>
+ _events.Writer.TryWrite(new WorkerEvent { Event = mxEvent });
+
+ /// Completes once the monitor has issued its SubscribeAlarms command.
+ public Task WaitForSubscribeAsync(TimeSpan timeout) => _subscribed.Task.WaitAsync(timeout);
+
+ ///
+ public Task OpenSessionAsync(
+ SessionOpenRequest request,
+ string? clientIdentity,
+ CancellationToken cancellationToken)
+ {
+ GatewaySession session = new(
+ Guid.NewGuid().ToString("N"),
+ "Galaxy",
+ "pipe-test",
+ "nonce-test",
+ clientIdentity,
+ null,
+ null,
+ TimeSpan.FromSeconds(30),
+ TimeSpan.FromSeconds(30),
+ TimeSpan.FromSeconds(30),
+ DateTimeOffset.UtcNow);
+ return Task.FromResult(session);
+ }
+
+ ///
+ public Task InvokeAsync(
+ string sessionId,
+ WorkerCommand command,
+ CancellationToken cancellationToken)
+ {
+ if (command.Command?.Kind == MxCommandKind.SubscribeAlarms)
+ {
+ LastSubscribeCommand = command.Command.SubscribeAlarms;
+ _subscribed.TrySetResult();
+ }
+
+ MxCommandReply reply = new()
+ {
+ ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
+ };
+ if (command.Command?.Kind == MxCommandKind.QueryActiveAlarms)
+ {
+ reply.QueryActiveAlarms = new QueryActiveAlarmsReplyPayload();
+ }
+
+ return Task.FromResult(new WorkerCommandReply { Reply = reply });
+ }
+
+ ///
+ public async IAsyncEnumerable ReadEventsAsync(
+ string sessionId,
+ [EnumeratorCancellation] CancellationToken cancellationToken)
+ {
+ await foreach (WorkerEvent workerEvent in _events.Reader.ReadAllAsync(cancellationToken))
+ {
+ yield return workerEvent;
+ }
+ }
+
+ ///
+ public bool TryGetSession(string sessionId, [MaybeNullWhen(false)] out GatewaySession session)
+ {
+ session = null;
+ return false;
+ }
+
+ ///
+ public Task CloseSessionAsync(string sessionId, CancellationToken cancellationToken)
+ {
+ _events.Writer.TryComplete();
+ return Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
+ }
+
+ ///
+ public Task KillWorkerAsync(string sessionId, string reason, CancellationToken cancellationToken) =>
+ Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
+
+ ///
+ public Task CloseExpiredLeasesAsync(DateTimeOffset now, CancellationToken cancellationToken) =>
+ Task.FromResult(0);
+
+ ///
+ public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
+ }
+}