From ebf1d95f72a1da6f6e9422f1b2ca43cce2e11d0c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 13 Jun 2026 10:20:03 -0400 Subject: [PATCH] server(alarms): monitor resolves watch-list, sends ForcedMode/failover, reflects provider mode into feed + metrics --- .../AlarmsServiceCollectionExtensions.cs | 1 + .../Alarms/GatewayAlarmMonitor.cs | 178 +++++++- .../GatewayAlarmMonitorProviderModeTests.cs | 398 ++++++++++++++++++ 3 files changed, 574 insertions(+), 3 deletions(-) create mode 100644 src/ZB.MOM.WW.MxGateway.Tests/Alarms/GatewayAlarmMonitorProviderModeTests.cs diff --git a/src/ZB.MOM.WW.MxGateway.Server/Alarms/AlarmsServiceCollectionExtensions.cs b/src/ZB.MOM.WW.MxGateway.Server/Alarms/AlarmsServiceCollectionExtensions.cs index 26b7e83..8050e42 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Alarms/AlarmsServiceCollectionExtensions.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Alarms/AlarmsServiceCollectionExtensions.cs @@ -13,6 +13,7 @@ public static class AlarmsServiceCollectionExtensions /// The service collection for chaining. public static IServiceCollection AddGatewayAlarms(this IServiceCollection services) { + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(provider => provider.GetRequiredService()); services.AddHostedService(provider => provider.GetRequiredService()); diff --git a/src/ZB.MOM.WW.MxGateway.Server/Alarms/GatewayAlarmMonitor.cs b/src/ZB.MOM.WW.MxGateway.Server/Alarms/GatewayAlarmMonitor.cs index 30461c9..c9ae21f 100644 --- a/src/ZB.MOM.WW.MxGateway.Server/Alarms/GatewayAlarmMonitor.cs +++ b/src/ZB.MOM.WW.MxGateway.Server/Alarms/GatewayAlarmMonitor.cs @@ -1,7 +1,9 @@ using System.Threading.Channels; +using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Options; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Server.Configuration; +using ZB.MOM.WW.MxGateway.Server.Metrics; using ZB.MOM.WW.MxGateway.Server.Sessions; namespace ZB.MOM.WW.MxGateway.Server.Alarms; @@ -23,6 +25,8 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic private static readonly TimeSpan StartupGrace = TimeSpan.FromSeconds(2); private readonly ISessionManager _sessionManager; + private readonly IAlarmWatchListResolver _watchListResolver; + private readonly GatewayMetrics _metrics; private readonly AlarmsOptions _options; private readonly ILogger _logger; @@ -30,20 +34,34 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic private readonly Dictionary _alarms = new(StringComparer.Ordinal); private readonly List _subscribers = []; + // Current provider status (mode + degraded + reason + since), guarded by _sync. + // Initialized to the alarm-manager, not-degraded baseline so a late joiner sees + // a sensible status even before any OnAlarmProviderModeChanged event arrives. + private AlarmProviderMode _providerMode = AlarmProviderMode.Alarmmgr; + private bool _providerDegraded; + private string _providerReason = string.Empty; + private DateTimeOffset _providerSince = DateTimeOffset.UtcNow; + private volatile GatewayAlarmMonitorState _state = GatewayAlarmMonitorState.Disabled; private volatile string? _lastError; private GatewaySession? _session; /// Initializes the gateway alarm monitor. /// Gateway session manager. + /// Resolver for the subtag-fallback watch-list. + /// Gateway metrics sink. /// Gateway options carrying the alarm configuration. /// Diagnostic logger. public GatewayAlarmMonitor( ISessionManager sessionManager, + IAlarmWatchListResolver watchListResolver, + GatewayMetrics metrics, IOptions options, ILogger logger) { _sessionManager = sessionManager ?? throw new ArgumentNullException(nameof(sessionManager)); + _watchListResolver = watchListResolver ?? throw new ArgumentNullException(nameof(watchListResolver)); + _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); _options = (options ?? throw new ArgumentNullException(nameof(options))).Value.Alarms; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } @@ -139,6 +157,16 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic private async Task RunMonitorAsync(string subscription, CancellationToken stoppingToken) { _state = GatewayAlarmMonitorState.Starting; + lock (_sync) + { + // Re-baseline the provider status for this lifecycle so a restarted + // monitor advertises alarm-manager/not-degraded until told otherwise. + _providerMode = AlarmProviderMode.Alarmmgr; + _providerDegraded = false; + _providerReason = string.Empty; + _providerSince = DateTimeOffset.UtcNow; + } + GatewaySession session = await _sessionManager.OpenSessionAsync( new SessionOpenRequest(BackendName, MonitorClientName, Guid.NewGuid().ToString("N"), CommandTimeout: null), MonitorClientName, @@ -173,6 +201,15 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic { ApplyTransition(mxEvent.OnAlarmTransition); } + else if (mxEvent is { BodyCase: MxEvent.BodyOneofCase.OnAlarmProviderModeChanged } + && mxEvent.OnAlarmProviderModeChanged is not null) + { + await ApplyProviderModeChangeAsync( + session.SessionId, + mxEvent.OnAlarmProviderModeChanged, + linked.Token) + .ConfigureAwait(false); + } } } finally @@ -209,6 +246,29 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic private async Task SubscribeAlarmsAsync(string sessionId, string subscription, CancellationToken cancellationToken) { + IReadOnlyList watchList = await _watchListResolver + .ResolveAsync(_options, cancellationToken) + .ConfigureAwait(false); + + AlarmProviderMode forcedMode = MapForcedMode(_options.Fallback.Mode); + + // When the forced mode is Unspecified (the "Auto" case) and the resolved + // watch-list is empty — the common alarmmgr-only deployment — the command + // is identical-in-effect to the historical SubscribeAlarms (wnwrap only): + // the worker builds the wnwrap consumer and no subtag watch-list. + SubscribeAlarmsCommand command = new() + { + SubscriptionExpression = subscription, + ForcedMode = forcedMode, + Failover = new AlarmFailoverConfig + { + ConsecutiveFailureThreshold = _options.Fallback.ConsecutiveFailureThreshold, + FailbackProbeIntervalSeconds = _options.Fallback.FailbackProbeIntervalSeconds, + FailbackStableProbes = _options.Fallback.FailbackStableProbes, + }, + }; + command.WatchList.AddRange(watchList); + WorkerCommandReply reply = await _sessionManager.InvokeAsync( sessionId, new WorkerCommand @@ -216,7 +276,7 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic Command = new MxCommand { Kind = MxCommandKind.SubscribeAlarms, - SubscribeAlarms = new SubscribeAlarmsCommand { SubscriptionExpression = subscription }, + SubscribeAlarms = command, }, }, cancellationToken) @@ -310,6 +370,94 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic } } + // Handles the worker's provider-mode-change event: updates the stored provider + // status, broadcasts it to every subscriber (provider status is global, not + // alarm-scoped), records the switch metric, and forces a cache reconcile so the + // active-alarm set reflects whatever the new mode reports. + private async Task ApplyProviderModeChangeAsync( + string sessionId, + OnAlarmProviderModeChangedEvent change, + CancellationToken cancellationToken) + { + AlarmProviderMode toMode = change.Mode; + string reason = change.Reason ?? string.Empty; + + AlarmProviderStatus status; + int fromModeInt; + lock (_sync) + { + fromModeInt = ModeToInt(_providerMode); + _providerMode = toMode; + _providerDegraded = toMode == AlarmProviderMode.Subtag; + _providerReason = reason; + _providerSince = DateTimeOffset.UtcNow; + status = BuildProviderStatus(); + BroadcastToAll(new AlarmFeedMessage { ProviderStatus = status }); + } + + _metrics.AlarmProviderSwitched(fromModeInt, ModeToInt(toMode), reason); + + _logger.LogInformation( + "Alarm provider mode changed to {Mode} (degraded={Degraded}): {Reason}", + toMode, + status.Degraded, + reason); + + try + { + await ReconcileAsync(sessionId, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + throw; + } + catch (Exception exception) + { + _logger.LogDebug( + exception, + "Reconcile after alarm provider mode change failed; keeping the current cache."); + } + } + + // Caller holds _sync. Builds an AlarmProviderStatus snapshot of the current state. + private AlarmProviderStatus BuildProviderStatus() + { + return new AlarmProviderStatus + { + Mode = _providerMode, + Degraded = _providerDegraded, + Reason = _providerReason, + Since = Timestamp.FromDateTimeOffset(_providerSince), + }; + } + + // Maps the configured fallback mode string to the forced provider mode the + // worker honours. Case-insensitive; anything other than the two force values + // (including the default "Auto") yields Unspecified ("let the worker decide"). + private static AlarmProviderMode MapForcedMode(string? mode) + { + if (string.Equals(mode, "ForceAlarmManager", StringComparison.OrdinalIgnoreCase)) + { + return AlarmProviderMode.Alarmmgr; + } + + if (string.Equals(mode, "ForceSubtag", StringComparison.OrdinalIgnoreCase)) + { + return AlarmProviderMode.Subtag; + } + + return AlarmProviderMode.Unspecified; + } + + // Maps the provider-mode enum to the integer the metric expects + // (alarmmgr=1, subtag=2, unknown/unspecified=0). + private static int ModeToInt(AlarmProviderMode mode) => mode switch + { + AlarmProviderMode.Alarmmgr => 1, + AlarmProviderMode.Subtag => 2, + _ => 0, + }; + // Replaces the cache with the worker's authoritative snapshot, broadcasting // a synthetic transition for any alarm the live stream missed. private void ApplyReconcile(IEnumerable snapshots) @@ -374,6 +522,23 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic } } + // Caller holds _sync. Pushes a feed message to every subscriber regardless of + // its alarm-filter prefix. Used for provider-status messages, which are global + // rather than scoped to a single alarm reference. + private void BroadcastToAll(AlarmFeedMessage message) + { + for (int index = _subscribers.Count - 1; index >= 0; index--) + { + Subscriber subscriber = _subscribers[index]; + if (!subscriber.Channel.Writer.TryWrite(message)) + { + subscriber.Channel.Writer.TryComplete(new InvalidOperationException( + "Alarm feed subscriber fell behind and was dropped; reconnect to re-snapshot.")); + _subscribers.RemoveAt(index); + } + } + } + private void ClearCache() { lock (_sync) @@ -398,11 +563,14 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic Subscriber subscriber = new(channel, prefix); ActiveAlarmSnapshot[] snapshot; + AlarmProviderStatus providerStatus; lock (_sync) { - // Register before snapshotting under the same lock so no transition - // can slip between the snapshot and the live stream. + // Register before snapshotting under the same lock so neither a + // transition nor a provider-mode change can slip between the snapshot + // and the live stream. _subscribers.Add(subscriber); + providerStatus = BuildProviderStatus(); snapshot = _alarms.Values .Where(alarm => prefix.Length == 0 || alarm.AlarmFullReference.StartsWith(prefix, StringComparison.Ordinal)) @@ -412,6 +580,10 @@ public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmServic try { + // Emit the current provider status first so a late joiner immediately + // learns the mode (and whether the feed is degraded) before any alarms. + yield return new AlarmFeedMessage { ProviderStatus = providerStatus }; + foreach (ActiveAlarmSnapshot alarm in snapshot) { yield return new AlarmFeedMessage { ActiveAlarm = alarm }; diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Alarms/GatewayAlarmMonitorProviderModeTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Alarms/GatewayAlarmMonitorProviderModeTests.cs new file mode 100644 index 0000000..ca67e91 --- /dev/null +++ b/src/ZB.MOM.WW.MxGateway.Tests/Alarms/GatewayAlarmMonitorProviderModeTests.cs @@ -0,0 +1,398 @@ +using System.Diagnostics.CodeAnalysis; +using System.Diagnostics.Metrics; +using System.Runtime.CompilerServices; +using System.Threading.Channels; +using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using ZB.MOM.WW.MxGateway.Contracts.Proto; +using ZB.MOM.WW.MxGateway.Server.Alarms; +using ZB.MOM.WW.MxGateway.Server.Configuration; +using ZB.MOM.WW.MxGateway.Server.Metrics; +using ZB.MOM.WW.MxGateway.Server.Sessions; + +namespace ZB.MOM.WW.MxGateway.Tests.Alarms; + +/// +/// Drives with a fake session manager to +/// verify it reflects the worker's OnAlarmProviderModeChanged event into +/// the alarm feed and the switch metric, and that a new subscriber receives the +/// provider status as its first message. Also covers the watch-list / forced-mode +/// wiring of the SubscribeAlarms command and the Mode→enum mapping. +/// +public sealed class GatewayAlarmMonitorProviderModeTests +{ + private static readonly TimeSpan WaitTimeout = TimeSpan.FromSeconds(15); + + [Fact] + public async Task ProviderModeChange_BroadcastsDegradedStatus_AndIncrementsSwitchMetric() + { + using GatewayMetrics metrics = new(); + long switchCount = 0; + using MeterListener listener = new(); + listener.InstrumentPublished = (instrument, meterListener) => + { + if (ReferenceEquals(instrument.Meter, metrics.Meter) + && instrument.Name == "mxgateway.alarms.provider_switches") + { + meterListener.EnableMeasurementEvents(instrument); + } + }; + listener.SetMeasurementEventCallback( + (instrument, measurement, _, _) => + { + if (ReferenceEquals(instrument.Meter, metrics.Meter) + && instrument.Name == "mxgateway.alarms.provider_switches") + { + Interlocked.Add(ref switchCount, measurement); + } + }); + listener.Start(); + + FakeSessionManager sessions = new(); + using GatewayAlarmMonitor monitor = CreateMonitor(sessions, metrics); + + using CancellationTokenSource cts = new(); + await monitor.StartAsync(cts.Token); + await sessions.WaitForSubscribeAsync(WaitTimeout); + + // Subscribe a live feed reader, drain its first (provider status) message. + List received = []; + using CancellationTokenSource streamCts = new(); + Task reader = Task.Run(async () => + { + try + { + await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, streamCts.Token)) + { + lock (received) { received.Add(message); } + } + } + catch (OperationCanceledException) + { + // Expected when the test cancels the stream. + } + }); + + // Emit the worker event that flips the provider into subtag mode. + sessions.EmitEvent(new MxEvent + { + Family = MxEventFamily.OnAlarmProviderModeChanged, + OnAlarmProviderModeChanged = new OnAlarmProviderModeChangedEvent + { + Mode = AlarmProviderMode.Subtag, + Reason = "alarmmgr failed", + Hresult = unchecked((int)0x80004005), + At = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }, + }); + + AlarmFeedMessage degraded = await WaitForAsync( + received, + m => m.PayloadCase == AlarmFeedMessage.PayloadOneofCase.ProviderStatus + && m.ProviderStatus.Mode == AlarmProviderMode.Subtag, + WaitTimeout); + + Assert.Equal(AlarmProviderMode.Subtag, degraded.ProviderStatus.Mode); + Assert.True(degraded.ProviderStatus.Degraded); + Assert.Equal("alarmmgr failed", degraded.ProviderStatus.Reason); + + await WaitUntilAsync(() => Interlocked.Read(ref switchCount) >= 1, WaitTimeout); + Assert.Equal(1, Interlocked.Read(ref switchCount)); + + await streamCts.CancelAsync(); + await reader; + await cts.CancelAsync(); + await monitor.StopAsync(CancellationToken.None); + } + + [Fact] + public async Task NewSubscriber_ReceivesProviderStatusAsFirstMessage() + { + using GatewayMetrics metrics = new(); + FakeSessionManager sessions = new(); + using GatewayAlarmMonitor monitor = CreateMonitor(sessions, metrics); + + using CancellationTokenSource cts = new(); + await monitor.StartAsync(cts.Token); + await sessions.WaitForSubscribeAsync(WaitTimeout); + + using CancellationTokenSource streamCts = new(); + AlarmFeedMessage? first = null; + Task reader = Task.Run(async () => + { + await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, streamCts.Token)) + { + first = message; + break; + } + }); + + await WaitUntilAsync(() => first is not null, WaitTimeout); + + Assert.NotNull(first); + Assert.Equal(AlarmFeedMessage.PayloadOneofCase.ProviderStatus, first!.PayloadCase); + // Baseline before any provider-mode event: alarm-manager, not degraded. + Assert.Equal(AlarmProviderMode.Alarmmgr, first.ProviderStatus.Mode); + Assert.False(first.ProviderStatus.Degraded); + + await streamCts.CancelAsync(); + await reader; + await cts.CancelAsync(); + await monitor.StopAsync(CancellationToken.None); + } + + [Fact] + public async Task SubscribeAlarms_SendsForcedModeAndWatchList_FromConfiguration() + { + using GatewayMetrics metrics = new(); + FakeSessionManager sessions = new(); + StubWatchListResolver resolver = new( + [ + new AlarmSubtagTarget { AlarmFullReference = "Galaxy!Area.Tank01.Hi", ActiveSubtag = "Tank01.Hi.active" }, + ]); + + AlarmsOptions options = new() + { + Enabled = true, + SubscriptionExpression = @"\\NODE\Galaxy!Area", + Fallback = new AlarmFallbackOptions + { + Mode = "ForceSubtag", + ConsecutiveFailureThreshold = 7, + FailbackProbeIntervalSeconds = 11, + FailbackStableProbes = 4, + }, + }; + + using GatewayAlarmMonitor monitor = new( + sessions, + resolver, + metrics, + Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }), + NullLogger.Instance); + + using CancellationTokenSource cts = new(); + await monitor.StartAsync(cts.Token); + await sessions.WaitForSubscribeAsync(WaitTimeout); + + SubscribeAlarmsCommand sent = Assert.IsType(sessions.LastSubscribeCommand); + Assert.Equal(AlarmProviderMode.Subtag, sent.ForcedMode); + Assert.Equal(7, sent.Failover.ConsecutiveFailureThreshold); + Assert.Equal(11, sent.Failover.FailbackProbeIntervalSeconds); + Assert.Equal(4, sent.Failover.FailbackStableProbes); + AlarmSubtagTarget target = Assert.Single(sent.WatchList); + Assert.Equal("Galaxy!Area.Tank01.Hi", target.AlarmFullReference); + + await cts.CancelAsync(); + await monitor.StopAsync(CancellationToken.None); + } + + [Theory] + [InlineData("ForceAlarmManager", AlarmProviderMode.Alarmmgr)] + [InlineData("forcealarmmanager", AlarmProviderMode.Alarmmgr)] + [InlineData("ForceSubtag", AlarmProviderMode.Subtag)] + [InlineData("forcesubtag", AlarmProviderMode.Subtag)] + [InlineData("Auto", AlarmProviderMode.Unspecified)] + [InlineData("", AlarmProviderMode.Unspecified)] + [InlineData("nonsense", AlarmProviderMode.Unspecified)] + public async Task ModeString_MapsToForcedProviderMode(string mode, AlarmProviderMode expected) + { + using GatewayMetrics metrics = new(); + FakeSessionManager sessions = new(); + + AlarmsOptions options = new() + { + Enabled = true, + SubscriptionExpression = @"\\NODE\Galaxy!Area", + Fallback = new AlarmFallbackOptions { Mode = mode }, + }; + + using GatewayAlarmMonitor monitor = new( + sessions, + new StubWatchListResolver([]), + metrics, + Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }), + NullLogger.Instance); + + using CancellationTokenSource cts = new(); + await monitor.StartAsync(cts.Token); + await sessions.WaitForSubscribeAsync(WaitTimeout); + + Assert.Equal(expected, sessions.LastSubscribeCommand!.ForcedMode); + // Auto + empty watch-list preserves historical alarmmgr-only behaviour. + if (expected == AlarmProviderMode.Unspecified) + { + Assert.Empty(sessions.LastSubscribeCommand!.WatchList); + } + + await cts.CancelAsync(); + await monitor.StopAsync(CancellationToken.None); + } + + private static GatewayAlarmMonitor CreateMonitor(FakeSessionManager sessions, GatewayMetrics metrics) + { + AlarmsOptions options = new() + { + Enabled = true, + SubscriptionExpression = @"\\NODE\Galaxy!Area", + }; + return new GatewayAlarmMonitor( + sessions, + new StubWatchListResolver([]), + metrics, + Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }), + NullLogger.Instance); + } + + private static async Task WaitForAsync( + List received, + Func predicate, + TimeSpan timeout) + { + DateTime deadline = DateTime.UtcNow + timeout; + while (DateTime.UtcNow < deadline) + { + lock (received) + { + AlarmFeedMessage? match = received.FirstOrDefault(predicate); + if (match is not null) + { + return match; + } + } + + await Task.Delay(25); + } + + throw new TimeoutException("No matching AlarmFeedMessage was received in time."); + } + + private static async Task WaitUntilAsync(Func condition, TimeSpan timeout) + { + DateTime deadline = DateTime.UtcNow + timeout; + while (DateTime.UtcNow < deadline) + { + if (condition()) + { + return; + } + + await Task.Delay(25); + } + + throw new TimeoutException("Condition was not met in time."); + } + + /// that returns a fixed watch-list. + private sealed class StubWatchListResolver(IReadOnlyList targets) : IAlarmWatchListResolver + { + /// + public Task> ResolveAsync( + AlarmsOptions options, + CancellationToken cancellationToken = default) => Task.FromResult(targets); + } + + /// + /// Minimal for driving the monitor: opens a + /// constructed session, records the SubscribeAlarms command, replies OK to + /// every command, and exposes a channel for pushing worker events. + /// + private sealed class FakeSessionManager : ISessionManager + { + private readonly Channel _events = Channel.CreateUnbounded(); + private readonly TaskCompletionSource _subscribed = + new(TaskCreationOptions.RunContinuationsAsynchronously); + + /// The most recent SubscribeAlarms command the monitor sent. + public SubscribeAlarmsCommand? LastSubscribeCommand { get; private set; } + + /// Pushes a worker event onto the monitor's event stream. + public void EmitEvent(MxEvent mxEvent) => + _events.Writer.TryWrite(new WorkerEvent { Event = mxEvent }); + + /// Completes once the monitor has issued its SubscribeAlarms command. + public Task WaitForSubscribeAsync(TimeSpan timeout) => _subscribed.Task.WaitAsync(timeout); + + /// + public Task OpenSessionAsync( + SessionOpenRequest request, + string? clientIdentity, + CancellationToken cancellationToken) + { + GatewaySession session = new( + Guid.NewGuid().ToString("N"), + "Galaxy", + "pipe-test", + "nonce-test", + clientIdentity, + null, + null, + TimeSpan.FromSeconds(30), + TimeSpan.FromSeconds(30), + TimeSpan.FromSeconds(30), + DateTimeOffset.UtcNow); + return Task.FromResult(session); + } + + /// + public Task InvokeAsync( + string sessionId, + WorkerCommand command, + CancellationToken cancellationToken) + { + if (command.Command?.Kind == MxCommandKind.SubscribeAlarms) + { + LastSubscribeCommand = command.Command.SubscribeAlarms; + _subscribed.TrySetResult(); + } + + MxCommandReply reply = new() + { + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + }; + if (command.Command?.Kind == MxCommandKind.QueryActiveAlarms) + { + reply.QueryActiveAlarms = new QueryActiveAlarmsReplyPayload(); + } + + return Task.FromResult(new WorkerCommandReply { Reply = reply }); + } + + /// + public async IAsyncEnumerable ReadEventsAsync( + string sessionId, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + await foreach (WorkerEvent workerEvent in _events.Reader.ReadAllAsync(cancellationToken)) + { + yield return workerEvent; + } + } + + /// + public bool TryGetSession(string sessionId, [MaybeNullWhen(false)] out GatewaySession session) + { + session = null; + return false; + } + + /// + public Task CloseSessionAsync(string sessionId, CancellationToken cancellationToken) + { + _events.Writer.TryComplete(); + return Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false)); + } + + /// + public Task KillWorkerAsync(string sessionId, string reason, CancellationToken cancellationToken) => + Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false)); + + /// + public Task CloseExpiredLeasesAsync(DateTimeOffset now, CancellationToken cancellationToken) => + Task.FromResult(0); + + /// + public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask; + } +}