Files
mxaccessgw/src/ZB.MOM.WW.MxGateway.Tests/Alarms/GatewayAlarmMonitorProviderModeTests.cs
T

792 lines
30 KiB
C#

using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Metrics;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Alarms;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Sessions;
namespace ZB.MOM.WW.MxGateway.Tests.Alarms;
/// <summary>
/// Drives <see cref="GatewayAlarmMonitor"/> with a fake session manager to
/// verify it reflects the worker's <c>OnAlarmProviderModeChanged</c> event into
/// the alarm feed and the switch metric, and that a new subscriber receives the
/// provider status as its first message. Also covers the watch-list / forced-mode
/// wiring of the <c>SubscribeAlarms</c> command and the Mode→enum mapping.
/// </summary>
public sealed class GatewayAlarmMonitorProviderModeTests
{
private static readonly TimeSpan WaitTimeout = TimeSpan.FromSeconds(15);
[Fact]
public async Task ProviderModeChange_BroadcastsDegradedStatus_AndIncrementsSwitchMetric()
{
using GatewayMetrics metrics = new();
long switchCount = 0;
using MeterListener listener = new();
listener.InstrumentPublished = (instrument, meterListener) =>
{
if (ReferenceEquals(instrument.Meter, metrics.Meter)
&& instrument.Name == "mxgateway.alarms.provider_switches")
{
meterListener.EnableMeasurementEvents(instrument);
}
};
listener.SetMeasurementEventCallback<long>(
(instrument, measurement, _, _) =>
{
if (ReferenceEquals(instrument.Meter, metrics.Meter)
&& instrument.Name == "mxgateway.alarms.provider_switches")
{
Interlocked.Add(ref switchCount, measurement);
}
});
listener.Start();
FakeSessionManager sessions = new();
using GatewayAlarmMonitor monitor = CreateMonitor(sessions, metrics);
using CancellationTokenSource cts = new();
await monitor.StartAsync(cts.Token);
await sessions.WaitForSubscribeAsync(WaitTimeout);
// Subscribe a live feed reader. Gate emitting the mode-change event until the
// reader has consumed its baseline ProviderStatus message, avoiding a race where
// the event arrives before the subscriber is registered and draining its snapshot.
List<AlarmFeedMessage> received = [];
TaskCompletionSource baselineReceived = new(TaskCreationOptions.RunContinuationsAsynchronously);
using CancellationTokenSource streamCts = new();
Task reader = Task.Run(async () =>
{
try
{
await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, streamCts.Token))
{
lock (received)
{
received.Add(message);
// Signal once the first message (baseline ProviderStatus) has arrived.
if (received.Count == 1)
{
baselineReceived.TrySetResult();
}
}
}
}
catch (OperationCanceledException)
{
// Expected when the test cancels the stream.
}
});
// Wait for the baseline ProviderStatus to arrive before emitting the mode change,
// so the subscriber is registered and the event is not dropped.
await baselineReceived.Task.WaitAsync(WaitTimeout);
// Emit the worker event that flips the provider into subtag mode.
sessions.EmitEvent(new MxEvent
{
OnAlarmProviderModeChanged = new OnAlarmProviderModeChangedEvent
{
Mode = AlarmProviderMode.Subtag,
Reason = "alarmmgr failed",
Hresult = unchecked((int)0x80004005),
At = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
},
});
AlarmFeedMessage degraded = await WaitForAsync(
received,
m => m.PayloadCase == AlarmFeedMessage.PayloadOneofCase.ProviderStatus
&& m.ProviderStatus.Mode == AlarmProviderMode.Subtag,
WaitTimeout);
Assert.Equal(AlarmProviderMode.Subtag, degraded.ProviderStatus.Mode);
Assert.True(degraded.ProviderStatus.Degraded);
Assert.Equal("alarmmgr failed", degraded.ProviderStatus.Reason);
await WaitUntilAsync(() => Interlocked.Read(ref switchCount) >= 1, WaitTimeout);
Assert.Equal(1, Interlocked.Read(ref switchCount));
await streamCts.CancelAsync();
await reader;
await cts.CancelAsync();
await monitor.StopAsync(CancellationToken.None);
}
/// <summary>
/// Server-053: a redundant <c>OnAlarmProviderModeChanged</c> event whose target
/// mode equals the current mode still records a provider switch. The worker is the
/// authority on when a mode change occurred; the gateway does not second-guess it,
/// so each event the worker emits increments <c>provider_switches</c> (no from==to
/// suppression). This test pins that semantics so it cannot drift silently.
/// </summary>
[Fact]
public async Task ProviderModeChange_RepeatedSameMode_RecordsASwitchForEachEvent()
{
using GatewayMetrics metrics = new();
long switchCount = 0;
using MeterListener listener = new();
listener.InstrumentPublished = (instrument, meterListener) =>
{
if (ReferenceEquals(instrument.Meter, metrics.Meter)
&& instrument.Name == "mxgateway.alarms.provider_switches")
{
meterListener.EnableMeasurementEvents(instrument);
}
};
listener.SetMeasurementEventCallback<long>(
(instrument, measurement, _, _) =>
{
if (ReferenceEquals(instrument.Meter, metrics.Meter)
&& instrument.Name == "mxgateway.alarms.provider_switches")
{
Interlocked.Add(ref switchCount, measurement);
}
});
listener.Start();
FakeSessionManager sessions = new();
using GatewayAlarmMonitor monitor = CreateMonitor(sessions, metrics);
using CancellationTokenSource cts = new();
await monitor.StartAsync(cts.Token);
await sessions.WaitForSubscribeAsync(WaitTimeout);
List<AlarmFeedMessage> received = [];
TaskCompletionSource baselineReceived = new(TaskCreationOptions.RunContinuationsAsynchronously);
using CancellationTokenSource streamCts = new();
Task reader = Task.Run(async () =>
{
try
{
await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, streamCts.Token))
{
lock (received)
{
received.Add(message);
if (received.Count == 1)
{
baselineReceived.TrySetResult();
}
}
}
}
catch (OperationCanceledException)
{
// Expected when the test cancels the stream.
}
});
await baselineReceived.Task.WaitAsync(WaitTimeout);
// First subtag-mode event.
sessions.EmitEvent(new MxEvent
{
OnAlarmProviderModeChanged = new OnAlarmProviderModeChangedEvent
{
Mode = AlarmProviderMode.Subtag,
Reason = "alarmmgr failed",
At = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
},
});
await WaitUntilAsync(() => Interlocked.Read(ref switchCount) >= 1, WaitTimeout);
// Second subtag-mode event — same mode, but still a worker-reported switch.
sessions.EmitEvent(new MxEvent
{
OnAlarmProviderModeChanged = new OnAlarmProviderModeChangedEvent
{
Mode = AlarmProviderMode.Subtag,
Reason = "still degraded",
At = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
},
});
await WaitUntilAsync(() => Interlocked.Read(ref switchCount) >= 2, WaitTimeout);
Assert.Equal(2, Interlocked.Read(ref switchCount));
await streamCts.CancelAsync();
await reader;
await cts.CancelAsync();
await monitor.StopAsync(CancellationToken.None);
}
/// <summary>
/// Tests-032: pins the monitor's <c>toMode → AlarmProviderSwitchReason</c>
/// derivation (<c>GatewayAlarmMonitor.ApplyProviderModeChangeAsync</c>): an
/// alarmmgr→subtag change must emit <c>reason=failover</c> and a subtag→alarmmgr
/// change must emit <c>reason=failback</c>. Captures the <c>reason</c> tag off the
/// <c>mxgateway.alarms.provider_switches</c> counter — a regression that swapped
/// the Failover/Failback arms or collapsed them to Unknown would be caught here,
/// whereas the count-only tests above would still pass.
/// </summary>
[Fact]
public async Task ProviderModeChange_FailoverThenFailback_RecordsCorrectReasonTags()
{
using GatewayMetrics metrics = new();
List<string> capturedReasons = [];
using MeterListener listener = new();
listener.InstrumentPublished = (instrument, meterListener) =>
{
if (ReferenceEquals(instrument.Meter, metrics.Meter)
&& instrument.Name == "mxgateway.alarms.provider_switches")
{
meterListener.EnableMeasurementEvents(instrument);
}
};
listener.SetMeasurementEventCallback<long>(
(instrument, _, tags, _) =>
{
if (!ReferenceEquals(instrument.Meter, metrics.Meter)
|| instrument.Name != "mxgateway.alarms.provider_switches")
{
return;
}
foreach (KeyValuePair<string, object?> tag in tags)
{
if (tag.Key == "reason" && tag.Value is string reasonTag)
{
lock (capturedReasons)
{
capturedReasons.Add(reasonTag);
}
}
}
});
listener.Start();
FakeSessionManager sessions = new();
using GatewayAlarmMonitor monitor = CreateMonitor(sessions, metrics);
using CancellationTokenSource cts = new();
await monitor.StartAsync(cts.Token);
await sessions.WaitForSubscribeAsync(WaitTimeout);
// Register a live subscriber and gate the mode-change events until the baseline
// ProviderStatus message has been drained, so neither event is dropped.
List<AlarmFeedMessage> received = [];
TaskCompletionSource baselineReceived = new(TaskCreationOptions.RunContinuationsAsynchronously);
using CancellationTokenSource streamCts = new();
Task reader = Task.Run(async () =>
{
try
{
await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, streamCts.Token))
{
lock (received)
{
received.Add(message);
if (received.Count == 1)
{
baselineReceived.TrySetResult();
}
}
}
}
catch (OperationCanceledException)
{
// Expected when the test cancels the stream.
}
});
await baselineReceived.Task.WaitAsync(WaitTimeout);
// alarmmgr (baseline) → subtag: must classify as a failover.
sessions.EmitEvent(new MxEvent
{
OnAlarmProviderModeChanged = new OnAlarmProviderModeChangedEvent
{
Mode = AlarmProviderMode.Subtag,
Reason = "alarmmgr failed",
At = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
},
});
await WaitUntilAsync(
() => { lock (capturedReasons) { return capturedReasons.Count >= 1; } },
WaitTimeout);
// subtag → alarmmgr: must classify as a failback.
sessions.EmitEvent(new MxEvent
{
OnAlarmProviderModeChanged = new OnAlarmProviderModeChangedEvent
{
Mode = AlarmProviderMode.Alarmmgr,
Reason = "alarmmgr recovered",
At = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
},
});
await WaitUntilAsync(
() => { lock (capturedReasons) { return capturedReasons.Count >= 2; } },
WaitTimeout);
lock (capturedReasons)
{
Assert.Equal(new[] { "failover", "failback" }, capturedReasons);
}
await streamCts.CancelAsync();
await reader;
await cts.CancelAsync();
await monitor.StopAsync(CancellationToken.None);
}
[Fact]
public async Task NewSubscriber_ReceivesProviderStatusAsFirstMessage()
{
using GatewayMetrics metrics = new();
FakeSessionManager sessions = new();
using GatewayAlarmMonitor monitor = CreateMonitor(sessions, metrics);
using CancellationTokenSource cts = new();
await monitor.StartAsync(cts.Token);
await sessions.WaitForSubscribeAsync(WaitTimeout);
using CancellationTokenSource streamCts = new();
AlarmFeedMessage? first = null;
Task reader = Task.Run(async () =>
{
await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, streamCts.Token))
{
first = message;
break;
}
});
await WaitUntilAsync(() => first is not null, WaitTimeout);
Assert.NotNull(first);
Assert.Equal(AlarmFeedMessage.PayloadOneofCase.ProviderStatus, first!.PayloadCase);
// Baseline before any provider-mode event: alarm-manager, not degraded.
Assert.Equal(AlarmProviderMode.Alarmmgr, first.ProviderStatus.Mode);
Assert.False(first.ProviderStatus.Degraded);
await streamCts.CancelAsync();
await reader;
await cts.CancelAsync();
await monitor.StopAsync(CancellationToken.None);
}
[Fact]
public async Task ForceSubtagConfig_BaselinesProviderStatusToSubtagDegraded_WithoutSwitch()
{
using GatewayMetrics metrics = new();
long switchCount = 0;
int gaugeValue = -1;
using MeterListener listener = new();
listener.InstrumentPublished = (instrument, meterListener) =>
{
if (ReferenceEquals(instrument.Meter, metrics.Meter)
&& (instrument.Name == "mxgateway.alarms.provider_switches"
|| instrument.Name == "mxgateway.alarms.provider_mode"))
{
meterListener.EnableMeasurementEvents(instrument);
}
};
listener.SetMeasurementEventCallback<long>(
(instrument, measurement, _, _) =>
{
if (ReferenceEquals(instrument.Meter, metrics.Meter)
&& instrument.Name == "mxgateway.alarms.provider_switches")
{
Interlocked.Add(ref switchCount, measurement);
}
});
listener.SetMeasurementEventCallback<int>(
(instrument, measurement, _, _) =>
{
if (ReferenceEquals(instrument.Meter, metrics.Meter)
&& instrument.Name == "mxgateway.alarms.provider_mode")
{
Interlocked.Exchange(ref gaugeValue, measurement);
}
});
listener.Start();
FakeSessionManager sessions = new();
using GatewayAlarmMonitor monitor = CreateMonitor(sessions, metrics, "ForceSubtag");
using CancellationTokenSource cts = new();
await monitor.StartAsync(cts.Token);
await sessions.WaitForSubscribeAsync(WaitTimeout);
using CancellationTokenSource streamCts = new();
AlarmFeedMessage? first = null;
Task reader = Task.Run(async () =>
{
await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, streamCts.Token))
{
first = message;
break;
}
});
await WaitUntilAsync(() => first is not null, WaitTimeout);
Assert.NotNull(first);
Assert.Equal(AlarmFeedMessage.PayloadOneofCase.ProviderStatus, first!.PayloadCase);
Assert.Equal(AlarmProviderMode.Subtag, first.ProviderStatus.Mode);
Assert.True(first.ProviderStatus.Degraded);
// The observable gauge must read subtag (2) after start.
listener.RecordObservableInstruments();
Assert.Equal(2, Volatile.Read(ref gaugeValue));
// The initial set must not record a provider switch.
Assert.Equal(0, Interlocked.Read(ref switchCount));
await streamCts.CancelAsync();
await reader;
await cts.CancelAsync();
await monitor.StopAsync(CancellationToken.None);
}
[Fact]
public async Task ForceAlarmManagerConfig_BaselinesProviderStatusToAlarmmgr_WithoutSwitch()
{
using GatewayMetrics metrics = new();
long switchCount = 0;
int gaugeValue = -1;
using MeterListener listener = new();
listener.InstrumentPublished = (instrument, meterListener) =>
{
if (ReferenceEquals(instrument.Meter, metrics.Meter)
&& (instrument.Name == "mxgateway.alarms.provider_switches"
|| instrument.Name == "mxgateway.alarms.provider_mode"))
{
meterListener.EnableMeasurementEvents(instrument);
}
};
listener.SetMeasurementEventCallback<long>(
(instrument, measurement, _, _) =>
{
if (ReferenceEquals(instrument.Meter, metrics.Meter)
&& instrument.Name == "mxgateway.alarms.provider_switches")
{
Interlocked.Add(ref switchCount, measurement);
}
});
listener.SetMeasurementEventCallback<int>(
(instrument, measurement, _, _) =>
{
if (ReferenceEquals(instrument.Meter, metrics.Meter)
&& instrument.Name == "mxgateway.alarms.provider_mode")
{
Interlocked.Exchange(ref gaugeValue, measurement);
}
});
listener.Start();
FakeSessionManager sessions = new();
using GatewayAlarmMonitor monitor = CreateMonitor(sessions, metrics, "ForceAlarmManager");
using CancellationTokenSource cts = new();
await monitor.StartAsync(cts.Token);
await sessions.WaitForSubscribeAsync(WaitTimeout);
using CancellationTokenSource streamCts = new();
AlarmFeedMessage? first = null;
Task reader = Task.Run(async () =>
{
await foreach (AlarmFeedMessage message in monitor.StreamAsync(null, streamCts.Token))
{
first = message;
break;
}
});
await WaitUntilAsync(() => first is not null, WaitTimeout);
Assert.NotNull(first);
Assert.Equal(AlarmFeedMessage.PayloadOneofCase.ProviderStatus, first!.PayloadCase);
Assert.Equal(AlarmProviderMode.Alarmmgr, first.ProviderStatus.Mode);
Assert.False(first.ProviderStatus.Degraded);
listener.RecordObservableInstruments();
Assert.Equal(1, Volatile.Read(ref gaugeValue));
Assert.Equal(0, Interlocked.Read(ref switchCount));
await streamCts.CancelAsync();
await reader;
await cts.CancelAsync();
await monitor.StopAsync(CancellationToken.None);
}
[Fact]
public async Task SubscribeAlarms_SendsForcedModeAndWatchList_FromConfiguration()
{
using GatewayMetrics metrics = new();
FakeSessionManager sessions = new();
StubWatchListResolver resolver = new(
[
new AlarmSubtagTarget { AlarmFullReference = "Galaxy!Area.Tank01.Hi", ActiveSubtag = "Tank01.Hi.active" },
]);
AlarmsOptions options = new()
{
Enabled = true,
SubscriptionExpression = @"\\NODE\Galaxy!Area",
Fallback = new AlarmFallbackOptions
{
Mode = "ForceSubtag",
ConsecutiveFailureThreshold = 7,
FailbackProbeIntervalSeconds = 11,
FailbackStableProbes = 4,
},
};
using GatewayAlarmMonitor monitor = new(
sessions,
resolver,
metrics,
Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }),
NullLogger<GatewayAlarmMonitor>.Instance);
using CancellationTokenSource cts = new();
await monitor.StartAsync(cts.Token);
await sessions.WaitForSubscribeAsync(WaitTimeout);
SubscribeAlarmsCommand sent = Assert.IsType<SubscribeAlarmsCommand>(sessions.LastSubscribeCommand);
Assert.Equal(AlarmProviderMode.Subtag, sent.ForcedMode);
Assert.Equal(7, sent.Failover.ConsecutiveFailureThreshold);
Assert.Equal(11, sent.Failover.FailbackProbeIntervalSeconds);
Assert.Equal(4, sent.Failover.FailbackStableProbes);
AlarmSubtagTarget target = Assert.Single(sent.WatchList);
Assert.Equal("Galaxy!Area.Tank01.Hi", target.AlarmFullReference);
await cts.CancelAsync();
await monitor.StopAsync(CancellationToken.None);
}
[Theory]
[InlineData("ForceAlarmManager", AlarmProviderMode.Alarmmgr)]
[InlineData("forcealarmmanager", AlarmProviderMode.Alarmmgr)]
[InlineData("ForceSubtag", AlarmProviderMode.Subtag)]
[InlineData("forcesubtag", AlarmProviderMode.Subtag)]
[InlineData("Auto", AlarmProviderMode.Unspecified)]
[InlineData("", AlarmProviderMode.Unspecified)]
[InlineData("nonsense", AlarmProviderMode.Unspecified)]
public async Task ModeString_MapsToForcedProviderMode(string mode, AlarmProviderMode expected)
{
using GatewayMetrics metrics = new();
FakeSessionManager sessions = new();
AlarmsOptions options = new()
{
Enabled = true,
SubscriptionExpression = @"\\NODE\Galaxy!Area",
Fallback = new AlarmFallbackOptions { Mode = mode },
};
using GatewayAlarmMonitor monitor = new(
sessions,
new StubWatchListResolver([]),
metrics,
Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }),
NullLogger<GatewayAlarmMonitor>.Instance);
using CancellationTokenSource cts = new();
await monitor.StartAsync(cts.Token);
await sessions.WaitForSubscribeAsync(WaitTimeout);
Assert.Equal(expected, sessions.LastSubscribeCommand!.ForcedMode);
// Auto + empty watch-list preserves historical alarmmgr-only behaviour.
if (expected == AlarmProviderMode.Unspecified)
{
Assert.Empty(sessions.LastSubscribeCommand!.WatchList);
}
await cts.CancelAsync();
await monitor.StopAsync(CancellationToken.None);
}
private static GatewayAlarmMonitor CreateMonitor(FakeSessionManager sessions, GatewayMetrics metrics)
{
AlarmsOptions options = new()
{
Enabled = true,
SubscriptionExpression = @"\\NODE\Galaxy!Area",
};
return new GatewayAlarmMonitor(
sessions,
new StubWatchListResolver([]),
metrics,
Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }),
NullLogger<GatewayAlarmMonitor>.Instance);
}
private static GatewayAlarmMonitor CreateMonitor(FakeSessionManager sessions, GatewayMetrics metrics, string mode)
{
AlarmsOptions options = new()
{
Enabled = true,
SubscriptionExpression = @"\\NODE\Galaxy!Area",
Fallback = new AlarmFallbackOptions { Mode = mode },
};
return new GatewayAlarmMonitor(
sessions,
new StubWatchListResolver([]),
metrics,
Microsoft.Extensions.Options.Options.Create(new GatewayOptions { Alarms = options }),
NullLogger<GatewayAlarmMonitor>.Instance);
}
private static async Task<AlarmFeedMessage> WaitForAsync(
List<AlarmFeedMessage> received,
Func<AlarmFeedMessage, bool> predicate,
TimeSpan timeout)
{
DateTime deadline = DateTime.UtcNow + timeout;
while (DateTime.UtcNow < deadline)
{
lock (received)
{
AlarmFeedMessage? match = received.FirstOrDefault(predicate);
if (match is not null)
{
return match;
}
}
await Task.Delay(25);
}
throw new TimeoutException("No matching AlarmFeedMessage was received in time.");
}
private static async Task WaitUntilAsync(Func<bool> condition, TimeSpan timeout)
{
DateTime deadline = DateTime.UtcNow + timeout;
while (DateTime.UtcNow < deadline)
{
if (condition())
{
return;
}
await Task.Delay(25);
}
throw new TimeoutException("Condition was not met in time.");
}
/// <summary><see cref="IAlarmWatchListResolver"/> that returns a fixed watch-list.</summary>
private sealed class StubWatchListResolver(IReadOnlyList<AlarmSubtagTarget> targets) : IAlarmWatchListResolver
{
/// <inheritdoc />
public Task<IReadOnlyList<AlarmSubtagTarget>> ResolveAsync(
AlarmsOptions options,
CancellationToken cancellationToken = default) => Task.FromResult(targets);
}
/// <summary>
/// Minimal <see cref="ISessionManager"/> for driving the monitor: opens a
/// constructed session, records the SubscribeAlarms command, replies OK to
/// every command, and exposes a channel for pushing worker events.
/// </summary>
private sealed class FakeSessionManager : ISessionManager
{
private readonly Channel<WorkerEvent> _events = Channel.CreateUnbounded<WorkerEvent>();
private readonly TaskCompletionSource _subscribed =
new(TaskCreationOptions.RunContinuationsAsynchronously);
/// <summary>The most recent SubscribeAlarms command the monitor sent.</summary>
public SubscribeAlarmsCommand? LastSubscribeCommand { get; private set; }
/// <summary>Pushes a worker event onto the monitor's event stream.</summary>
public void EmitEvent(MxEvent mxEvent) =>
_events.Writer.TryWrite(new WorkerEvent { Event = mxEvent });
/// <summary>Completes once the monitor has issued its SubscribeAlarms command.</summary>
public Task WaitForSubscribeAsync(TimeSpan timeout) => _subscribed.Task.WaitAsync(timeout);
/// <inheritdoc />
public Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken)
{
GatewaySession session = new(
Guid.NewGuid().ToString("N"),
"Galaxy",
"pipe-test",
"nonce-test",
clientIdentity,
null,
null,
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(30),
DateTimeOffset.UtcNow);
return Task.FromResult(session);
}
/// <inheritdoc />
public Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken)
{
if (command.Command?.Kind == MxCommandKind.SubscribeAlarms)
{
LastSubscribeCommand = command.Command.SubscribeAlarms;
_subscribed.TrySetResult();
}
MxCommandReply reply = new()
{
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
};
if (command.Command?.Kind == MxCommandKind.QueryActiveAlarms)
{
reply.QueryActiveAlarms = new QueryActiveAlarmsReplyPayload();
}
return Task.FromResult(new WorkerCommandReply { Reply = reply });
}
/// <inheritdoc />
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (WorkerEvent workerEvent in _events.Reader.ReadAllAsync(cancellationToken))
{
yield return workerEvent;
}
}
/// <inheritdoc />
public bool TryGetSession(string sessionId, [MaybeNullWhen(false)] out GatewaySession session)
{
session = null;
return false;
}
/// <inheritdoc />
public Task<SessionCloseResult> CloseSessionAsync(string sessionId, CancellationToken cancellationToken)
{
_events.Writer.TryComplete();
return Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
}
/// <inheritdoc />
public Task<SessionCloseResult> KillWorkerAsync(string sessionId, string reason, CancellationToken cancellationToken) =>
Task.FromResult(new SessionCloseResult(sessionId, SessionState.Closed, AlreadyClosed: false));
/// <inheritdoc />
public Task<int> CloseExpiredLeasesAsync(DateTimeOffset now, CancellationToken cancellationToken) =>
Task.FromResult(0);
/// <inheritdoc />
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
}