Add gateway central alarm monitor and StreamAlarms feed
The gateway now monitors alarms continuously, independent of any client session, and fans the feed out to every client. GatewayAlarmMonitor is an always-on hosted service that owns one gateway-managed worker session dedicated to alarms: it subscribes the configured provider, caches the active-alarm set from the worker's transition events (reconciled periodically against the worker's authoritative snapshot), re-opens the session if the worker faults, and broadcasts to all subscribers. The new session-less StreamAlarms RPC opens with the current active-alarm snapshot, then streams live transitions; any number of clients fan out from the single monitor without opening a worker session. AcknowledgeAlarm is now session-less and routes through the monitor. The session-scoped QueryActiveAlarms RPC and the per-session alarm auto-subscribe hook are removed, along with the now-dead IAlarmRpcDispatcher trio; the dashboard Alarms tab reads the monitor's in-process cache directly. This intentionally reverses the v1 "no multi-subscriber fan-out" decision for the alarm subsystem. Contracts regenerated; gateway, dashboard and tests build clean, 94 alarm-affected tests pass, and the monitor is verified live. Language-client stubs are regenerated in a follow-up change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -69,9 +69,9 @@ namespace MxGateway.Contracts.Proto {
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Marshaller<global::MxGateway.Contracts.Proto.AcknowledgeAlarmReply> __Marshaller_mxaccess_gateway_v1_AcknowledgeAlarmReply = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::MxGateway.Contracts.Proto.AcknowledgeAlarmReply.Parser));
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Marshaller<global::MxGateway.Contracts.Proto.QueryActiveAlarmsRequest> __Marshaller_mxaccess_gateway_v1_QueryActiveAlarmsRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::MxGateway.Contracts.Proto.QueryActiveAlarmsRequest.Parser));
|
||||
static readonly grpc::Marshaller<global::MxGateway.Contracts.Proto.StreamAlarmsRequest> __Marshaller_mxaccess_gateway_v1_StreamAlarmsRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::MxGateway.Contracts.Proto.StreamAlarmsRequest.Parser));
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Marshaller<global::MxGateway.Contracts.Proto.ActiveAlarmSnapshot> __Marshaller_mxaccess_gateway_v1_ActiveAlarmSnapshot = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::MxGateway.Contracts.Proto.ActiveAlarmSnapshot.Parser));
|
||||
static readonly grpc::Marshaller<global::MxGateway.Contracts.Proto.AlarmFeedMessage> __Marshaller_mxaccess_gateway_v1_AlarmFeedMessage = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::MxGateway.Contracts.Proto.AlarmFeedMessage.Parser));
|
||||
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Method<global::MxGateway.Contracts.Proto.OpenSessionRequest, global::MxGateway.Contracts.Proto.OpenSessionReply> __Method_OpenSession = new grpc::Method<global::MxGateway.Contracts.Proto.OpenSessionRequest, global::MxGateway.Contracts.Proto.OpenSessionReply>(
|
||||
@@ -114,12 +114,12 @@ namespace MxGateway.Contracts.Proto {
|
||||
__Marshaller_mxaccess_gateway_v1_AcknowledgeAlarmReply);
|
||||
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
static readonly grpc::Method<global::MxGateway.Contracts.Proto.QueryActiveAlarmsRequest, global::MxGateway.Contracts.Proto.ActiveAlarmSnapshot> __Method_QueryActiveAlarms = new grpc::Method<global::MxGateway.Contracts.Proto.QueryActiveAlarmsRequest, global::MxGateway.Contracts.Proto.ActiveAlarmSnapshot>(
|
||||
static readonly grpc::Method<global::MxGateway.Contracts.Proto.StreamAlarmsRequest, global::MxGateway.Contracts.Proto.AlarmFeedMessage> __Method_StreamAlarms = new grpc::Method<global::MxGateway.Contracts.Proto.StreamAlarmsRequest, global::MxGateway.Contracts.Proto.AlarmFeedMessage>(
|
||||
grpc::MethodType.ServerStreaming,
|
||||
__ServiceName,
|
||||
"QueryActiveAlarms",
|
||||
__Marshaller_mxaccess_gateway_v1_QueryActiveAlarmsRequest,
|
||||
__Marshaller_mxaccess_gateway_v1_ActiveAlarmSnapshot);
|
||||
"StreamAlarms",
|
||||
__Marshaller_mxaccess_gateway_v1_StreamAlarmsRequest,
|
||||
__Marshaller_mxaccess_gateway_v1_AlarmFeedMessage);
|
||||
|
||||
/// <summary>Service descriptor</summary>
|
||||
public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor
|
||||
@@ -161,8 +161,19 @@ namespace MxGateway.Contracts.Proto {
|
||||
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Session-less central alarm feed. The stream opens with the current
|
||||
/// active-alarm snapshot (one `active_alarm` per alarm), then a single
|
||||
/// `snapshot_complete`, then a `transition` for every subsequent change.
|
||||
/// Served by the gateway's always-on alarm monitor; any number of clients
|
||||
/// fan out from the single monitor without opening a worker session.
|
||||
/// </summary>
|
||||
/// <param name="request">The request received from the client.</param>
|
||||
/// <param name="responseStream">Used for sending responses back to the client.</param>
|
||||
/// <param name="context">The context of the server-side call handler being invoked.</param>
|
||||
/// <returns>A task indicating completion of the handler.</returns>
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual global::System.Threading.Tasks.Task QueryActiveAlarms(global::MxGateway.Contracts.Proto.QueryActiveAlarmsRequest request, grpc::IServerStreamWriter<global::MxGateway.Contracts.Proto.ActiveAlarmSnapshot> responseStream, grpc::ServerCallContext context)
|
||||
public virtual global::System.Threading.Tasks.Task StreamAlarms(global::MxGateway.Contracts.Proto.StreamAlarmsRequest request, grpc::IServerStreamWriter<global::MxGateway.Contracts.Proto.AlarmFeedMessage> responseStream, grpc::ServerCallContext context)
|
||||
{
|
||||
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
|
||||
}
|
||||
@@ -286,15 +297,37 @@ namespace MxGateway.Contracts.Proto {
|
||||
{
|
||||
return CallInvoker.AsyncUnaryCall(__Method_AcknowledgeAlarm, null, options, request);
|
||||
}
|
||||
/// <summary>
|
||||
/// Session-less central alarm feed. The stream opens with the current
|
||||
/// active-alarm snapshot (one `active_alarm` per alarm), then a single
|
||||
/// `snapshot_complete`, then a `transition` for every subsequent change.
|
||||
/// Served by the gateway's always-on alarm monitor; any number of clients
|
||||
/// fan out from the single monitor without opening a worker session.
|
||||
/// </summary>
|
||||
/// <param name="request">The request to send to the server.</param>
|
||||
/// <param name="headers">The initial metadata to send with the call. This parameter is optional.</param>
|
||||
/// <param name="deadline">An optional deadline for the call. The call will be cancelled if deadline is hit.</param>
|
||||
/// <param name="cancellationToken">An optional token for canceling the call.</param>
|
||||
/// <returns>The call object.</returns>
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual grpc::AsyncServerStreamingCall<global::MxGateway.Contracts.Proto.ActiveAlarmSnapshot> QueryActiveAlarms(global::MxGateway.Contracts.Proto.QueryActiveAlarmsRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
|
||||
public virtual grpc::AsyncServerStreamingCall<global::MxGateway.Contracts.Proto.AlarmFeedMessage> StreamAlarms(global::MxGateway.Contracts.Proto.StreamAlarmsRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
|
||||
{
|
||||
return QueryActiveAlarms(request, new grpc::CallOptions(headers, deadline, cancellationToken));
|
||||
return StreamAlarms(request, new grpc::CallOptions(headers, deadline, cancellationToken));
|
||||
}
|
||||
/// <summary>
|
||||
/// Session-less central alarm feed. The stream opens with the current
|
||||
/// active-alarm snapshot (one `active_alarm` per alarm), then a single
|
||||
/// `snapshot_complete`, then a `transition` for every subsequent change.
|
||||
/// Served by the gateway's always-on alarm monitor; any number of clients
|
||||
/// fan out from the single monitor without opening a worker session.
|
||||
/// </summary>
|
||||
/// <param name="request">The request to send to the server.</param>
|
||||
/// <param name="options">The options for the call.</param>
|
||||
/// <returns>The call object.</returns>
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
public virtual grpc::AsyncServerStreamingCall<global::MxGateway.Contracts.Proto.ActiveAlarmSnapshot> QueryActiveAlarms(global::MxGateway.Contracts.Proto.QueryActiveAlarmsRequest request, grpc::CallOptions options)
|
||||
public virtual grpc::AsyncServerStreamingCall<global::MxGateway.Contracts.Proto.AlarmFeedMessage> StreamAlarms(global::MxGateway.Contracts.Proto.StreamAlarmsRequest request, grpc::CallOptions options)
|
||||
{
|
||||
return CallInvoker.AsyncServerStreamingCall(__Method_QueryActiveAlarms, null, options, request);
|
||||
return CallInvoker.AsyncServerStreamingCall(__Method_StreamAlarms, null, options, request);
|
||||
}
|
||||
/// <summary>Creates a new instance of client from given <c>ClientBaseConfiguration</c>.</summary>
|
||||
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
|
||||
@@ -315,7 +348,7 @@ namespace MxGateway.Contracts.Proto {
|
||||
.AddMethod(__Method_Invoke, serviceImpl.Invoke)
|
||||
.AddMethod(__Method_StreamEvents, serviceImpl.StreamEvents)
|
||||
.AddMethod(__Method_AcknowledgeAlarm, serviceImpl.AcknowledgeAlarm)
|
||||
.AddMethod(__Method_QueryActiveAlarms, serviceImpl.QueryActiveAlarms).Build();
|
||||
.AddMethod(__Method_StreamAlarms, serviceImpl.StreamAlarms).Build();
|
||||
}
|
||||
|
||||
/// <summary>Register service method with a service binder with or without implementation. Useful when customizing the service binding logic.
|
||||
@@ -330,7 +363,7 @@ namespace MxGateway.Contracts.Proto {
|
||||
serviceBinder.AddMethod(__Method_Invoke, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::MxGateway.Contracts.Proto.MxCommandRequest, global::MxGateway.Contracts.Proto.MxCommandReply>(serviceImpl.Invoke));
|
||||
serviceBinder.AddMethod(__Method_StreamEvents, serviceImpl == null ? null : new grpc::ServerStreamingServerMethod<global::MxGateway.Contracts.Proto.StreamEventsRequest, global::MxGateway.Contracts.Proto.MxEvent>(serviceImpl.StreamEvents));
|
||||
serviceBinder.AddMethod(__Method_AcknowledgeAlarm, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::MxGateway.Contracts.Proto.AcknowledgeAlarmRequest, global::MxGateway.Contracts.Proto.AcknowledgeAlarmReply>(serviceImpl.AcknowledgeAlarm));
|
||||
serviceBinder.AddMethod(__Method_QueryActiveAlarms, serviceImpl == null ? null : new grpc::ServerStreamingServerMethod<global::MxGateway.Contracts.Proto.QueryActiveAlarmsRequest, global::MxGateway.Contracts.Proto.ActiveAlarmSnapshot>(serviceImpl.QueryActiveAlarms));
|
||||
serviceBinder.AddMethod(__Method_StreamAlarms, serviceImpl == null ? null : new grpc::ServerStreamingServerMethod<global::MxGateway.Contracts.Proto.StreamAlarmsRequest, global::MxGateway.Contracts.Proto.AlarmFeedMessage>(serviceImpl.StreamAlarms));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -11,8 +11,7 @@ import "google/protobuf/timestamp.proto";
|
||||
// additively only. Never renumber or repurpose an existing field number or
|
||||
// enum value. When a field or enum value is removed, add a `reserved` range
|
||||
// (and `reserved` name) covering it in the same change so a future editor
|
||||
// cannot accidentally reuse the retired tag. There are no `reserved`
|
||||
// declarations today because no field or enum value has ever been removed.
|
||||
// cannot accidentally reuse the retired tag.
|
||||
|
||||
// Public client API for MXAccess sessions hosted by the gateway.
|
||||
service MxAccessGateway {
|
||||
@@ -21,7 +20,12 @@ service MxAccessGateway {
|
||||
rpc Invoke(MxCommandRequest) returns (MxCommandReply);
|
||||
rpc StreamEvents(StreamEventsRequest) returns (stream MxEvent);
|
||||
rpc AcknowledgeAlarm(AcknowledgeAlarmRequest) returns (AcknowledgeAlarmReply);
|
||||
rpc QueryActiveAlarms(QueryActiveAlarmsRequest) returns (stream ActiveAlarmSnapshot);
|
||||
// Session-less central alarm feed. The stream opens with the current
|
||||
// active-alarm snapshot (one `active_alarm` per alarm), then a single
|
||||
// `snapshot_complete`, then a `transition` for every subsequent change.
|
||||
// Served by the gateway's always-on alarm monitor; any number of clients
|
||||
// fan out from the single monitor without opening a worker session.
|
||||
rpc StreamAlarms(StreamAlarmsRequest) returns (stream AlarmFeedMessage);
|
||||
}
|
||||
|
||||
message OpenSessionRequest {
|
||||
@@ -785,7 +789,10 @@ enum AlarmConditionState {
|
||||
}
|
||||
|
||||
message AcknowledgeAlarmRequest {
|
||||
string session_id = 1;
|
||||
// Retired: acknowledgement is session-less — it routes to the gateway's
|
||||
// central alarm monitor, not a client worker session.
|
||||
reserved 1;
|
||||
reserved "session_id";
|
||||
string client_correlation_id = 2;
|
||||
// Fully-qualified alarm reference matching OnAlarmTransitionEvent.alarm_full_reference.
|
||||
string alarm_full_reference = 3;
|
||||
@@ -797,7 +804,9 @@ message AcknowledgeAlarmRequest {
|
||||
}
|
||||
|
||||
message AcknowledgeAlarmReply {
|
||||
string session_id = 1;
|
||||
// Retired: see AcknowledgeAlarmRequest — acknowledgement is session-less.
|
||||
reserved 1;
|
||||
reserved "session_id";
|
||||
string correlation_id = 2;
|
||||
ProtocolStatus protocol_status = 3;
|
||||
// Native ack return code echoed from the worker. The worker carries the
|
||||
@@ -816,12 +825,27 @@ message AcknowledgeAlarmReply {
|
||||
string diagnostic_message = 6;
|
||||
}
|
||||
|
||||
message QueryActiveAlarmsRequest {
|
||||
string session_id = 1;
|
||||
string client_correlation_id = 2;
|
||||
// Optional alarm-reference prefix used to scope a partial ConditionRefresh
|
||||
// (e.g. equipment sub-tree). Empty means full refresh.
|
||||
string alarm_filter_prefix = 3;
|
||||
// Request to attach to the gateway's central alarm feed (StreamAlarms).
|
||||
message StreamAlarmsRequest {
|
||||
string client_correlation_id = 1;
|
||||
// Optional alarm-reference prefix scoping the feed to an equipment
|
||||
// sub-tree. Empty streams every active alarm.
|
||||
string alarm_filter_prefix = 2;
|
||||
}
|
||||
|
||||
// One message on the StreamAlarms feed. The stream opens with one
|
||||
// `active_alarm` per currently-active alarm, then a single
|
||||
// `snapshot_complete`, then a `transition` for every subsequent change.
|
||||
message AlarmFeedMessage {
|
||||
oneof payload {
|
||||
// Part of the initial active-alarm snapshot (ConditionRefresh).
|
||||
ActiveAlarmSnapshot active_alarm = 1;
|
||||
// Sentinel: the initial snapshot is fully delivered and `transition`
|
||||
// messages follow. Always true when present.
|
||||
bool snapshot_complete = 2;
|
||||
// A live alarm state change (raise / acknowledge / clear).
|
||||
OnAlarmTransitionEvent transition = 3;
|
||||
}
|
||||
}
|
||||
|
||||
message MxStatusProxy {
|
||||
|
||||
@@ -1084,7 +1084,11 @@ public sealed class WorkerLiveMxAccessSmokeTests(ITestOutputHelper output)
|
||||
mapper,
|
||||
eventStreamService,
|
||||
_metrics,
|
||||
_loggerFactory.CreateLogger<MxAccessGatewayService>());
|
||||
_loggerFactory.CreateLogger<MxAccessGatewayService>(),
|
||||
new MxGateway.Server.Alarms.GatewayAlarmMonitor(
|
||||
sessionManager,
|
||||
options,
|
||||
_loggerFactory.CreateLogger<MxGateway.Server.Alarms.GatewayAlarmMonitor>()));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
namespace MxGateway.Server.Alarms;
|
||||
|
||||
/// <summary>Service-collection wiring for the gateway's central alarm monitor.</summary>
|
||||
public static class AlarmsServiceCollectionExtensions
|
||||
{
|
||||
/// <summary>
|
||||
/// Registers the always-on <see cref="GatewayAlarmMonitor"/> as both
|
||||
/// the <see cref="IGatewayAlarmService"/> singleton and a hosted
|
||||
/// service, so it starts with the gateway host and is shared by the
|
||||
/// gRPC alarm surface and the dashboard.
|
||||
/// </summary>
|
||||
/// <param name="services">Service collection to register services in.</param>
|
||||
/// <returns>The service collection for chaining.</returns>
|
||||
public static IServiceCollection AddGatewayAlarms(this IServiceCollection services)
|
||||
{
|
||||
services.AddSingleton<GatewayAlarmMonitor>();
|
||||
services.AddSingleton<IGatewayAlarmService>(provider => provider.GetRequiredService<GatewayAlarmMonitor>());
|
||||
services.AddHostedService(provider => provider.GetRequiredService<GatewayAlarmMonitor>());
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,693 @@
|
||||
using System.Threading.Channels;
|
||||
using Microsoft.Extensions.Options;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Configuration;
|
||||
using MxGateway.Server.Sessions;
|
||||
|
||||
namespace MxGateway.Server.Alarms;
|
||||
|
||||
/// <summary>
|
||||
/// The gateway's always-on alarm monitor and broker. It owns one
|
||||
/// gateway-managed worker session dedicated to alarms, keeps an in-process
|
||||
/// cache of the active-alarm set fed by that session's transition events
|
||||
/// (reconciled periodically against the worker's snapshot), and fans the
|
||||
/// feed out to any number of <see cref="StreamAsync"/> subscribers.
|
||||
/// The session is re-opened transparently if the worker faults.
|
||||
/// </summary>
|
||||
public sealed class GatewayAlarmMonitor : BackgroundService, IGatewayAlarmService
|
||||
{
|
||||
private const string MonitorClientName = "gateway-alarm-monitor";
|
||||
private const string BackendName = "Galaxy";
|
||||
private const int SubscriberQueueCapacity = 2048;
|
||||
private static readonly TimeSpan RestartBackoff = TimeSpan.FromSeconds(5);
|
||||
private static readonly TimeSpan StartupGrace = TimeSpan.FromSeconds(2);
|
||||
|
||||
private readonly ISessionManager _sessionManager;
|
||||
private readonly AlarmsOptions _options;
|
||||
private readonly ILogger<GatewayAlarmMonitor> _logger;
|
||||
|
||||
private readonly object _sync = new();
|
||||
private readonly Dictionary<string, ActiveAlarmSnapshot> _alarms = new(StringComparer.Ordinal);
|
||||
private readonly List<Subscriber> _subscribers = [];
|
||||
|
||||
private volatile GatewayAlarmMonitorState _state = GatewayAlarmMonitorState.Disabled;
|
||||
private volatile string? _lastError;
|
||||
private GatewaySession? _session;
|
||||
|
||||
/// <summary>Initializes the gateway alarm monitor.</summary>
|
||||
/// <param name="sessionManager">Gateway session manager.</param>
|
||||
/// <param name="options">Gateway options carrying the alarm configuration.</param>
|
||||
/// <param name="logger">Diagnostic logger.</param>
|
||||
public GatewayAlarmMonitor(
|
||||
ISessionManager sessionManager,
|
||||
IOptions<GatewayOptions> options,
|
||||
ILogger<GatewayAlarmMonitor> logger)
|
||||
{
|
||||
_sessionManager = sessionManager ?? throw new ArgumentNullException(nameof(sessionManager));
|
||||
_options = (options ?? throw new ArgumentNullException(nameof(options))).Value.Alarms;
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public GatewayAlarmMonitorState State => _state;
|
||||
|
||||
/// <inheritdoc />
|
||||
public string? LastError => _lastError;
|
||||
|
||||
/// <inheritdoc />
|
||||
public int? WorkerProcessId
|
||||
{
|
||||
get { lock (_sync) { return _session?.WorkerProcessId; } }
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public IReadOnlyList<ActiveAlarmSnapshot> CurrentAlarms
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_sync)
|
||||
{
|
||||
return _alarms.Values.Select(alarm => alarm.Clone()).ToArray();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
if (!_options.Enabled)
|
||||
{
|
||||
_state = GatewayAlarmMonitorState.Disabled;
|
||||
_logger.LogInformation("Gateway alarm monitor disabled (MxGateway:Alarms:Enabled is false).");
|
||||
return;
|
||||
}
|
||||
|
||||
string subscription = ResolveSubscription();
|
||||
if (string.IsNullOrWhiteSpace(subscription))
|
||||
{
|
||||
_state = GatewayAlarmMonitorState.Faulted;
|
||||
_lastError = "MxGateway:Alarms is enabled but no SubscriptionExpression / DefaultArea is configured.";
|
||||
_logger.LogError("{Diagnostic}", _lastError);
|
||||
return;
|
||||
}
|
||||
|
||||
// Brief grace so worker-process launching and startup orphan cleanup
|
||||
// settle before the monitor opens its own session.
|
||||
try
|
||||
{
|
||||
await Task.Delay(StartupGrace, stoppingToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await RunMonitorAsync(subscription, stoppingToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
_state = GatewayAlarmMonitorState.Faulted;
|
||||
_lastError = exception.Message;
|
||||
_logger.LogWarning(
|
||||
exception,
|
||||
"Gateway alarm monitor lifecycle faulted; restarting in {Backoff}.",
|
||||
RestartBackoff);
|
||||
try
|
||||
{
|
||||
await Task.Delay(RestartBackoff, stoppingToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_state = GatewayAlarmMonitorState.Disabled;
|
||||
}
|
||||
|
||||
// One monitoring lifecycle: open a session, subscribe alarms, reconcile,
|
||||
// then consume transition events until the session ends or is cancelled.
|
||||
private async Task RunMonitorAsync(string subscription, CancellationToken stoppingToken)
|
||||
{
|
||||
_state = GatewayAlarmMonitorState.Starting;
|
||||
GatewaySession session = await _sessionManager.OpenSessionAsync(
|
||||
new SessionOpenRequest(BackendName, MonitorClientName, Guid.NewGuid().ToString("N"), CommandTimeout: null),
|
||||
MonitorClientName,
|
||||
stoppingToken)
|
||||
.ConfigureAwait(false);
|
||||
lock (_sync) { _session = session; }
|
||||
|
||||
try
|
||||
{
|
||||
await SubscribeAlarmsAsync(session.SessionId, subscription, stoppingToken).ConfigureAwait(false);
|
||||
await ReconcileAsync(session.SessionId, stoppingToken).ConfigureAwait(false);
|
||||
|
||||
_state = GatewayAlarmMonitorState.Monitoring;
|
||||
_lastError = null;
|
||||
_logger.LogInformation(
|
||||
"Gateway alarm monitor active on {Subscription} (session {SessionId}, worker pid {WorkerPid}).",
|
||||
subscription,
|
||||
session.SessionId,
|
||||
session.WorkerProcessId);
|
||||
|
||||
using CancellationTokenSource linked = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
|
||||
Task reconcileLoop = ReconcileLoopAsync(session.SessionId, linked.Token);
|
||||
try
|
||||
{
|
||||
await foreach (WorkerEvent workerEvent in _sessionManager
|
||||
.ReadEventsAsync(session.SessionId, linked.Token)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
MxEvent? mxEvent = workerEvent.Event;
|
||||
if (mxEvent is { BodyCase: MxEvent.BodyOneofCase.OnAlarmTransition }
|
||||
&& mxEvent.OnAlarmTransition is not null)
|
||||
{
|
||||
ApplyTransition(mxEvent.OnAlarmTransition);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
await linked.CancelAsync().ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
await reconcileLoop.ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Reconcile-loop teardown errors are not actionable here.
|
||||
}
|
||||
}
|
||||
|
||||
// The event stream ended without cancellation — the worker session
|
||||
// closed or faulted. Surface it so the supervisor loop restarts.
|
||||
throw new InvalidOperationException("Alarm monitor worker event stream ended.");
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock (_sync) { _session = null; }
|
||||
ClearCache();
|
||||
try
|
||||
{
|
||||
await _sessionManager.CloseSessionAsync(session.SessionId, CancellationToken.None).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
_logger.LogDebug(exception, "Closing alarm monitor session {SessionId} failed.", session.SessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task SubscribeAlarmsAsync(string sessionId, string subscription, CancellationToken cancellationToken)
|
||||
{
|
||||
WorkerCommandReply reply = await _sessionManager.InvokeAsync(
|
||||
sessionId,
|
||||
new WorkerCommand
|
||||
{
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.SubscribeAlarms,
|
||||
SubscribeAlarms = new SubscribeAlarmsCommand { SubscriptionExpression = subscription },
|
||||
},
|
||||
},
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
ProtocolStatusCode? code = reply.Reply?.ProtocolStatus?.Code;
|
||||
if (code != ProtocolStatusCode.Ok)
|
||||
{
|
||||
string diagnostic = reply.Reply?.DiagnosticMessage
|
||||
?? reply.Reply?.ProtocolStatus?.Message
|
||||
?? $"status {code}";
|
||||
throw new InvalidOperationException($"Worker rejected SubscribeAlarms: {diagnostic}");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReconcileLoopAsync(string sessionId, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
int seconds = Math.Max(5, _options.ReconcileIntervalSeconds);
|
||||
using PeriodicTimer timer = new(TimeSpan.FromSeconds(seconds));
|
||||
while (await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false))
|
||||
{
|
||||
try
|
||||
{
|
||||
await ReconcileAsync(sessionId, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
_logger.LogDebug(exception, "Alarm reconcile pass failed; keeping the current cache.");
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ReconcileAsync(string sessionId, CancellationToken cancellationToken)
|
||||
{
|
||||
WorkerCommandReply reply = await _sessionManager.InvokeAsync(
|
||||
sessionId,
|
||||
new WorkerCommand
|
||||
{
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.QueryActiveAlarms,
|
||||
QueryActiveAlarmsCommand = new QueryActiveAlarmsCommand { AlarmFilterPrefix = string.Empty },
|
||||
},
|
||||
},
|
||||
cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (reply.Reply?.ProtocolStatus?.Code != ProtocolStatusCode.Ok)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
QueryActiveAlarmsReplyPayload? payload = reply.Reply.QueryActiveAlarms;
|
||||
if (payload is not null)
|
||||
{
|
||||
ApplyReconcile(payload.Snapshots);
|
||||
}
|
||||
}
|
||||
|
||||
// Applies a live transition to the cache and broadcasts it to subscribers.
|
||||
private void ApplyTransition(OnAlarmTransitionEvent transition)
|
||||
{
|
||||
string reference = transition.AlarmFullReference ?? string.Empty;
|
||||
if (reference.Length == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
lock (_sync)
|
||||
{
|
||||
if (transition.TransitionKind == AlarmTransitionKind.Clear)
|
||||
{
|
||||
_alarms.Remove(reference);
|
||||
}
|
||||
else
|
||||
{
|
||||
_alarms[reference] = SnapshotFromTransition(transition);
|
||||
}
|
||||
|
||||
Broadcast(new AlarmFeedMessage { Transition = transition }, reference);
|
||||
}
|
||||
}
|
||||
|
||||
// Replaces the cache with the worker's authoritative snapshot, broadcasting
|
||||
// a synthetic transition for any alarm the live stream missed.
|
||||
private void ApplyReconcile(IEnumerable<ActiveAlarmSnapshot> snapshots)
|
||||
{
|
||||
Dictionary<string, ActiveAlarmSnapshot> next = new(StringComparer.Ordinal);
|
||||
foreach (ActiveAlarmSnapshot snapshot in snapshots)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(snapshot.AlarmFullReference))
|
||||
{
|
||||
next[snapshot.AlarmFullReference] = snapshot;
|
||||
}
|
||||
}
|
||||
|
||||
lock (_sync)
|
||||
{
|
||||
foreach (KeyValuePair<string, ActiveAlarmSnapshot> existing in _alarms)
|
||||
{
|
||||
if (!next.ContainsKey(existing.Key))
|
||||
{
|
||||
Broadcast(
|
||||
new AlarmFeedMessage { Transition = TransitionFromSnapshot(existing.Value, AlarmTransitionKind.Clear) },
|
||||
existing.Key);
|
||||
}
|
||||
}
|
||||
|
||||
foreach (KeyValuePair<string, ActiveAlarmSnapshot> incoming in next)
|
||||
{
|
||||
if (!_alarms.ContainsKey(incoming.Key))
|
||||
{
|
||||
Broadcast(
|
||||
new AlarmFeedMessage { Transition = TransitionFromSnapshot(incoming.Value, AlarmTransitionKind.Raise) },
|
||||
incoming.Key);
|
||||
}
|
||||
}
|
||||
|
||||
_alarms.Clear();
|
||||
foreach (KeyValuePair<string, ActiveAlarmSnapshot> incoming in next)
|
||||
{
|
||||
_alarms[incoming.Key] = incoming.Value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Caller holds _sync. Pushes a feed message to every matching subscriber;
|
||||
// a subscriber that has fallen behind is completed with an error and dropped.
|
||||
private void Broadcast(AlarmFeedMessage message, string reference)
|
||||
{
|
||||
for (int index = _subscribers.Count - 1; index >= 0; index--)
|
||||
{
|
||||
Subscriber subscriber = _subscribers[index];
|
||||
if (!subscriber.Matches(reference))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!subscriber.Channel.Writer.TryWrite(message))
|
||||
{
|
||||
subscriber.Channel.Writer.TryComplete(new InvalidOperationException(
|
||||
"Alarm feed subscriber fell behind and was dropped; reconnect to re-snapshot."));
|
||||
_subscribers.RemoveAt(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void ClearCache()
|
||||
{
|
||||
lock (_sync)
|
||||
{
|
||||
_alarms.Clear();
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async IAsyncEnumerable<AlarmFeedMessage> StreamAsync(
|
||||
string? alarmFilterPrefix,
|
||||
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
string prefix = alarmFilterPrefix ?? string.Empty;
|
||||
Channel<AlarmFeedMessage> channel = Channel.CreateBounded<AlarmFeedMessage>(
|
||||
new BoundedChannelOptions(SubscriberQueueCapacity)
|
||||
{
|
||||
FullMode = BoundedChannelFullMode.Wait,
|
||||
SingleReader = true,
|
||||
SingleWriter = false,
|
||||
});
|
||||
Subscriber subscriber = new(channel, prefix);
|
||||
|
||||
ActiveAlarmSnapshot[] snapshot;
|
||||
lock (_sync)
|
||||
{
|
||||
// Register before snapshotting under the same lock so no transition
|
||||
// can slip between the snapshot and the live stream.
|
||||
_subscribers.Add(subscriber);
|
||||
snapshot = _alarms.Values
|
||||
.Where(alarm => prefix.Length == 0
|
||||
|| alarm.AlarmFullReference.StartsWith(prefix, StringComparison.Ordinal))
|
||||
.Select(alarm => alarm.Clone())
|
||||
.ToArray();
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
foreach (ActiveAlarmSnapshot alarm in snapshot)
|
||||
{
|
||||
yield return new AlarmFeedMessage { ActiveAlarm = alarm };
|
||||
}
|
||||
|
||||
yield return new AlarmFeedMessage { SnapshotComplete = true };
|
||||
|
||||
await foreach (AlarmFeedMessage message in channel.Reader
|
||||
.ReadAllAsync(cancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
yield return message;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock (_sync) { _subscribers.Remove(subscriber); }
|
||||
channel.Writer.TryComplete();
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<AcknowledgeAlarmReply> AcknowledgeAsync(
|
||||
AcknowledgeAlarmRequest request,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
|
||||
string? sessionId;
|
||||
lock (_sync) { sessionId = _session?.SessionId; }
|
||||
if (sessionId is null || _state != GatewayAlarmMonitorState.Monitoring)
|
||||
{
|
||||
return new AcknowledgeAlarmReply
|
||||
{
|
||||
CorrelationId = request.ClientCorrelationId,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.WorkerUnavailable,
|
||||
Message = "Gateway alarm monitor is not currently active.",
|
||||
},
|
||||
DiagnosticMessage = _lastError ?? "Alarm monitor is not running.",
|
||||
};
|
||||
}
|
||||
|
||||
MxCommand? command = BuildAcknowledgeCommand(request, out string? parseError);
|
||||
if (command is null)
|
||||
{
|
||||
return new AcknowledgeAlarmReply
|
||||
{
|
||||
CorrelationId = request.ClientCorrelationId,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.InvalidRequest,
|
||||
Message = parseError ?? "Invalid acknowledge request.",
|
||||
},
|
||||
DiagnosticMessage = parseError ?? "Invalid acknowledge request.",
|
||||
};
|
||||
}
|
||||
|
||||
WorkerCommandReply workerReply = await _sessionManager
|
||||
.InvokeAsync(sessionId, new WorkerCommand { Command = command }, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
MxCommandReply mxReply = workerReply.Reply ?? new MxCommandReply
|
||||
{
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.ProtocolViolation,
|
||||
Message = "Worker reply did not include an MxCommandReply.",
|
||||
},
|
||||
};
|
||||
|
||||
AcknowledgeAlarmReply reply = new()
|
||||
{
|
||||
CorrelationId = request.ClientCorrelationId,
|
||||
ProtocolStatus = mxReply.ProtocolStatus ?? new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
DiagnosticMessage = mxReply.DiagnosticMessage ?? string.Empty,
|
||||
};
|
||||
if (mxReply.HasHresult)
|
||||
{
|
||||
reply.Hresult = mxReply.Hresult;
|
||||
}
|
||||
|
||||
return reply;
|
||||
}
|
||||
|
||||
private string ResolveSubscription()
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(_options.SubscriptionExpression))
|
||||
{
|
||||
return _options.SubscriptionExpression;
|
||||
}
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(_options.DefaultArea))
|
||||
{
|
||||
return $@"\\{Environment.MachineName}\Galaxy!{_options.DefaultArea}";
|
||||
}
|
||||
|
||||
return string.Empty;
|
||||
}
|
||||
|
||||
private static MxCommand? BuildAcknowledgeCommand(AcknowledgeAlarmRequest request, out string? parseError)
|
||||
{
|
||||
parseError = null;
|
||||
if (string.IsNullOrWhiteSpace(request.AlarmFullReference))
|
||||
{
|
||||
parseError = "alarm_full_reference is required.";
|
||||
return null;
|
||||
}
|
||||
|
||||
string comment = request.Comment ?? string.Empty;
|
||||
string operatorUser = request.OperatorUser ?? string.Empty;
|
||||
|
||||
if (Guid.TryParse(request.AlarmFullReference, out Guid guid))
|
||||
{
|
||||
return new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.AcknowledgeAlarm,
|
||||
AcknowledgeAlarmCommand = new AcknowledgeAlarmCommand
|
||||
{
|
||||
AlarmGuid = guid.ToString(),
|
||||
Comment = comment,
|
||||
OperatorUser = operatorUser,
|
||||
OperatorNode = string.Empty,
|
||||
OperatorDomain = string.Empty,
|
||||
OperatorFullName = string.Empty,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
if (TryParseAlarmReference(request.AlarmFullReference, out string provider, out string group, out string alarm))
|
||||
{
|
||||
return new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.AcknowledgeAlarmByName,
|
||||
AcknowledgeAlarmByNameCommand = new AcknowledgeAlarmByNameCommand
|
||||
{
|
||||
AlarmName = alarm,
|
||||
ProviderName = provider,
|
||||
GroupName = group,
|
||||
Comment = comment,
|
||||
OperatorUser = operatorUser,
|
||||
OperatorNode = string.Empty,
|
||||
OperatorDomain = string.Empty,
|
||||
OperatorFullName = string.Empty,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
parseError = "alarm_full_reference must be a canonical GUID or 'Provider!Group.Tag' format.";
|
||||
return null;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Parses an alarm reference of the form <c>Provider!Group.Tag</c>: the
|
||||
/// first <c>!</c> splits provider from <c>Group.Tag</c>; the first
|
||||
/// <c>.</c> after the <c>!</c> splits group from tag.
|
||||
/// </summary>
|
||||
/// <param name="reference">The full alarm reference.</param>
|
||||
/// <param name="providerName">The parsed provider.</param>
|
||||
/// <param name="groupName">The parsed group/area.</param>
|
||||
/// <param name="alarmName">The parsed tag/alarm name.</param>
|
||||
/// <returns>true on a well-formed reference; otherwise false.</returns>
|
||||
public static bool TryParseAlarmReference(
|
||||
string? reference,
|
||||
out string providerName,
|
||||
out string groupName,
|
||||
out string alarmName)
|
||||
{
|
||||
providerName = string.Empty;
|
||||
groupName = string.Empty;
|
||||
alarmName = string.Empty;
|
||||
if (string.IsNullOrWhiteSpace(reference))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
int bang = reference!.IndexOf('!', StringComparison.Ordinal);
|
||||
if (bang <= 0 || bang == reference.Length - 1)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
string left = reference[..bang];
|
||||
string right = reference[(bang + 1)..];
|
||||
int dot = right.IndexOf('.', StringComparison.Ordinal);
|
||||
if (dot <= 0 || dot == right.Length - 1)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
providerName = left;
|
||||
groupName = right[..dot];
|
||||
alarmName = right[(dot + 1)..];
|
||||
return true;
|
||||
}
|
||||
|
||||
private static ActiveAlarmSnapshot SnapshotFromTransition(OnAlarmTransitionEvent transition)
|
||||
{
|
||||
ActiveAlarmSnapshot snapshot = new()
|
||||
{
|
||||
AlarmFullReference = transition.AlarmFullReference,
|
||||
SourceObjectReference = transition.SourceObjectReference,
|
||||
AlarmTypeName = transition.AlarmTypeName,
|
||||
Severity = transition.Severity,
|
||||
CurrentState = transition.TransitionKind == AlarmTransitionKind.Acknowledge
|
||||
? AlarmConditionState.ActiveAcked
|
||||
: AlarmConditionState.Active,
|
||||
Category = transition.Category,
|
||||
Description = transition.Description,
|
||||
OperatorUser = transition.OperatorUser,
|
||||
OperatorComment = transition.OperatorComment,
|
||||
};
|
||||
if (transition.OriginalRaiseTimestamp is not null)
|
||||
{
|
||||
snapshot.OriginalRaiseTimestamp = transition.OriginalRaiseTimestamp;
|
||||
}
|
||||
if (transition.TransitionTimestamp is not null)
|
||||
{
|
||||
snapshot.LastTransitionTimestamp = transition.TransitionTimestamp;
|
||||
}
|
||||
if (transition.CurrentValue is not null)
|
||||
{
|
||||
snapshot.CurrentValue = transition.CurrentValue;
|
||||
}
|
||||
if (transition.LimitValue is not null)
|
||||
{
|
||||
snapshot.LimitValue = transition.LimitValue;
|
||||
}
|
||||
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
private static OnAlarmTransitionEvent TransitionFromSnapshot(
|
||||
ActiveAlarmSnapshot snapshot,
|
||||
AlarmTransitionKind kind)
|
||||
{
|
||||
OnAlarmTransitionEvent transition = new()
|
||||
{
|
||||
AlarmFullReference = snapshot.AlarmFullReference,
|
||||
SourceObjectReference = snapshot.SourceObjectReference,
|
||||
AlarmTypeName = snapshot.AlarmTypeName,
|
||||
TransitionKind = kind,
|
||||
Severity = snapshot.Severity,
|
||||
Category = snapshot.Category,
|
||||
Description = snapshot.Description,
|
||||
OperatorUser = snapshot.OperatorUser,
|
||||
OperatorComment = snapshot.OperatorComment,
|
||||
};
|
||||
if (snapshot.OriginalRaiseTimestamp is not null)
|
||||
{
|
||||
transition.OriginalRaiseTimestamp = snapshot.OriginalRaiseTimestamp;
|
||||
}
|
||||
if (snapshot.LastTransitionTimestamp is not null)
|
||||
{
|
||||
transition.TransitionTimestamp = snapshot.LastTransitionTimestamp;
|
||||
}
|
||||
if (snapshot.CurrentValue is not null)
|
||||
{
|
||||
transition.CurrentValue = snapshot.CurrentValue;
|
||||
}
|
||||
if (snapshot.LimitValue is not null)
|
||||
{
|
||||
transition.LimitValue = snapshot.LimitValue;
|
||||
}
|
||||
|
||||
return transition;
|
||||
}
|
||||
|
||||
private sealed class Subscriber(Channel<AlarmFeedMessage> channel, string prefix)
|
||||
{
|
||||
public Channel<AlarmFeedMessage> Channel { get; } = channel;
|
||||
|
||||
public bool Matches(string reference)
|
||||
{
|
||||
return prefix.Length == 0 || reference.StartsWith(prefix, StringComparison.Ordinal);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Server.Alarms;
|
||||
|
||||
/// <summary>Lifecycle state of the gateway's central alarm monitor.</summary>
|
||||
public enum GatewayAlarmMonitorState
|
||||
{
|
||||
/// <summary>Alarm monitoring is switched off (<c>MxGateway:Alarms:Enabled</c> is false).</summary>
|
||||
Disabled,
|
||||
|
||||
/// <summary>The monitor is opening or re-opening its worker session.</summary>
|
||||
Starting,
|
||||
|
||||
/// <summary>The monitor is connected and tracking the active-alarm set.</summary>
|
||||
Monitoring,
|
||||
|
||||
/// <summary>The monitor's last lifecycle attempt failed; a restart is pending.</summary>
|
||||
Faulted,
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The gateway's always-on alarm broker. A single gateway-owned worker
|
||||
/// session monitors the AVEVA alarm provider; this service caches the
|
||||
/// current active-alarm set and fans it out to any number of clients —
|
||||
/// no client needs to open its own worker session to see alarms.
|
||||
/// </summary>
|
||||
public interface IGatewayAlarmService
|
||||
{
|
||||
/// <summary>Current monitor lifecycle state.</summary>
|
||||
GatewayAlarmMonitorState State { get; }
|
||||
|
||||
/// <summary>Diagnostic message from the most recent fault, or null.</summary>
|
||||
string? LastError { get; }
|
||||
|
||||
/// <summary>Process id of the worker backing the monitor, when one is attached.</summary>
|
||||
int? WorkerProcessId { get; }
|
||||
|
||||
/// <summary>A point-in-time copy of the current active-alarm set.</summary>
|
||||
IReadOnlyList<ActiveAlarmSnapshot> CurrentAlarms { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Attaches to the central alarm feed. The returned stream yields one
|
||||
/// <see cref="AlarmFeedMessage"/> per currently-active alarm, then a
|
||||
/// single <c>snapshot_complete</c> sentinel, then a <c>transition</c>
|
||||
/// for every subsequent change.
|
||||
/// </summary>
|
||||
/// <param name="alarmFilterPrefix">Optional alarm-reference prefix scoping the feed.</param>
|
||||
/// <param name="cancellationToken">Token that ends the subscription.</param>
|
||||
IAsyncEnumerable<AlarmFeedMessage> StreamAsync(
|
||||
string? alarmFilterPrefix,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Acknowledges an alarm through the monitor's worker session. Never
|
||||
/// throws — transport and monitor-state failures surface in the
|
||||
/// reply's <see cref="AcknowledgeAlarmReply.ProtocolStatus"/>.
|
||||
/// </summary>
|
||||
/// <param name="request">The acknowledge request.</param>
|
||||
/// <param name="cancellationToken">Token to cancel the call.</param>
|
||||
Task<AcknowledgeAlarmReply> AcknowledgeAsync(
|
||||
AcknowledgeAlarmRequest request,
|
||||
CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -1,32 +1,32 @@
|
||||
namespace MxGateway.Server.Configuration;
|
||||
|
||||
/// <summary>
|
||||
/// Per-gateway alarm-subsystem configuration. Drives the auto-subscribe
|
||||
/// hook in <see cref="Sessions.SessionManager"/>: when
|
||||
/// <see cref="Enabled"/> is true and a session reaches Ready, the
|
||||
/// manager issues a <c>SubscribeAlarmsCommand</c> to the worker with
|
||||
/// the configured <see cref="SubscriptionExpression"/>.
|
||||
/// Configuration for the gateway's always-on central alarm monitor
|
||||
/// (<see cref="Alarms.GatewayAlarmMonitor"/>). When <see cref="Enabled"/>
|
||||
/// is true the gateway opens one gateway-owned worker session dedicated to
|
||||
/// alarms, caches the active-alarm set, and fans it out to every client
|
||||
/// through the <c>StreamAlarms</c> RPC — no client opens its own session
|
||||
/// to see alarms.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Defaults preserve current behaviour (alarms disabled). Operators
|
||||
/// opt in by setting <c>MxGateway:Alarms:Enabled = true</c> and
|
||||
/// supplying a canonical
|
||||
/// <c>\\<machine>\Galaxy!<area></c> subscription
|
||||
/// expression. The literal "Galaxy" provider is correct regardless of
|
||||
/// the configured Galaxy database name (the wnwrap consumer doesn't
|
||||
/// accept the database name as the provider).
|
||||
/// Defaults preserve current behaviour (alarm monitoring disabled).
|
||||
/// Operators opt in by setting <c>MxGateway:Alarms:Enabled = true</c> and
|
||||
/// supplying a canonical <c>\\<machine>\Galaxy!<area></c>
|
||||
/// subscription expression. The literal "Galaxy" provider is correct
|
||||
/// regardless of the configured Galaxy database name (the wnwrap consumer
|
||||
/// does not accept the database name as the provider).
|
||||
/// </remarks>
|
||||
public sealed class AlarmsOptions
|
||||
{
|
||||
/// <summary>Gate the auto-subscribe hook on session open. Default false.</summary>
|
||||
/// <summary>Gate the gateway's always-on central alarm monitor. Default false.</summary>
|
||||
public bool Enabled { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// AVEVA alarm-subscription expression. When empty and
|
||||
/// <see cref="Enabled"/> is true, the gateway falls back to
|
||||
/// <c>\\$(MachineName)\Galaxy!$(DefaultArea)</c> if
|
||||
/// <see cref="DefaultArea"/> is set; otherwise the session open
|
||||
/// fails with a configuration diagnostic.
|
||||
/// AVEVA alarm-subscription expression the monitor subscribes on
|
||||
/// startup. When empty and <see cref="Enabled"/> is true, the gateway
|
||||
/// falls back to <c>\\$(MachineName)\Galaxy!$(DefaultArea)</c> if
|
||||
/// <see cref="DefaultArea"/> is set; otherwise the monitor faults with
|
||||
/// a configuration diagnostic.
|
||||
/// </summary>
|
||||
public string SubscriptionExpression { get; init; } = string.Empty;
|
||||
|
||||
@@ -39,10 +39,10 @@ public sealed class AlarmsOptions
|
||||
public string DefaultArea { get; init; } = string.Empty;
|
||||
|
||||
/// <summary>
|
||||
/// If true, an auto-subscribe failure faults the session. If false
|
||||
/// (default), the failure is logged and the session remains Ready —
|
||||
/// alarm-side commands return "not subscribed" but data subscriptions
|
||||
/// work normally.
|
||||
/// How often the monitor reconciles its in-process alarm cache against
|
||||
/// the worker's authoritative active-alarm snapshot, catching any
|
||||
/// transitions the live poll-and-diff feed missed. Default 30 seconds;
|
||||
/// the monitor floors it at 5 seconds.
|
||||
/// </summary>
|
||||
public bool RequireSubscribeOnOpen { get; init; }
|
||||
public int ReconcileIntervalSeconds { get; init; } = 30;
|
||||
}
|
||||
|
||||
@@ -236,11 +236,10 @@ public sealed class GatewayOptionsValidator : IValidateOptions<GatewayOptions>
|
||||
return;
|
||||
}
|
||||
|
||||
// When the alarm auto-subscribe hook is enabled, the gateway needs either a
|
||||
// canonical SubscriptionExpression or a DefaultArea to compose one from. Both
|
||||
// empty is the configuration mistake SessionManager.TryAutoSubscribeAlarmsAsync
|
||||
// currently surfaces per-session — pulling it up to startup validation makes
|
||||
// the misconfiguration fail-fast at boot, in line with every other section.
|
||||
// When the central alarm monitor is enabled, it needs either a canonical
|
||||
// SubscriptionExpression or a DefaultArea to compose one from. Validating
|
||||
// it at startup makes the misconfiguration fail-fast at boot, in line
|
||||
// with every other section.
|
||||
if (string.IsNullOrWhiteSpace(options.SubscriptionExpression)
|
||||
&& string.IsNullOrWhiteSpace(options.DefaultArea))
|
||||
{
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Alarms;
|
||||
using MxGateway.Server.Sessions;
|
||||
|
||||
namespace MxGateway.Server.Dashboard;
|
||||
@@ -17,7 +18,7 @@ public sealed class DashboardLiveDataService : IDashboardLiveDataService, IAsync
|
||||
private static readonly TimeSpan ReadTimeout = TimeSpan.FromSeconds(5);
|
||||
|
||||
private readonly ISessionManager _sessionManager;
|
||||
private readonly IAlarmRpcDispatcher _alarmDispatcher;
|
||||
private readonly IGatewayAlarmService _alarmService;
|
||||
private readonly ILogger<DashboardLiveDataService> _logger;
|
||||
private readonly SemaphoreSlim _gate = new(1, 1);
|
||||
private readonly HashSet<string> _subscribed = new(StringComparer.OrdinalIgnoreCase);
|
||||
@@ -28,15 +29,15 @@ public sealed class DashboardLiveDataService : IDashboardLiveDataService, IAsync
|
||||
|
||||
/// <summary>Initializes the live-data service.</summary>
|
||||
/// <param name="sessionManager">Gateway session manager.</param>
|
||||
/// <param name="alarmDispatcher">Active-alarm query dispatcher.</param>
|
||||
/// <param name="alarmService">Gateway central alarm service.</param>
|
||||
/// <param name="logger">Diagnostic logger.</param>
|
||||
public DashboardLiveDataService(
|
||||
ISessionManager sessionManager,
|
||||
IAlarmRpcDispatcher alarmDispatcher,
|
||||
IGatewayAlarmService alarmService,
|
||||
ILogger<DashboardLiveDataService> logger)
|
||||
{
|
||||
_sessionManager = sessionManager ?? throw new ArgumentNullException(nameof(sessionManager));
|
||||
_alarmDispatcher = alarmDispatcher ?? throw new ArgumentNullException(nameof(alarmDispatcher));
|
||||
_alarmService = alarmService ?? throw new ArgumentNullException(nameof(alarmService));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
@@ -90,39 +91,20 @@ public sealed class DashboardLiveDataService : IDashboardLiveDataService, IAsync
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<DashboardAlarmQueryResult> QueryAlarmsAsync(CancellationToken cancellationToken)
|
||||
public Task<DashboardAlarmQueryResult> QueryAlarmsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
(GatewaySession session, _) = await EnsureReadyAsync(cancellationToken).ConfigureAwait(false);
|
||||
// Alarms come from the gateway's always-on central monitor; the
|
||||
// dashboard reads its in-process cache directly — no session needed.
|
||||
DashboardActiveAlarm[] alarms = _alarmService.CurrentAlarms
|
||||
.Select(DashboardActiveAlarm.FromSnapshot)
|
||||
.ToArray();
|
||||
|
||||
QueryActiveAlarmsRequest request = new()
|
||||
{
|
||||
SessionId = session.SessionId,
|
||||
ClientCorrelationId = Guid.NewGuid().ToString("N"),
|
||||
};
|
||||
string? error = _alarmService.State is GatewayAlarmMonitorState.Monitoring
|
||||
or GatewayAlarmMonitorState.Disabled
|
||||
? null
|
||||
: _alarmService.LastError ?? $"Alarm monitor is {_alarmService.State}.";
|
||||
|
||||
List<DashboardActiveAlarm> alarms = [];
|
||||
await foreach (ActiveAlarmSnapshot snapshot in _alarmDispatcher
|
||||
.QueryActiveAlarmsAsync(request, cancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
alarms.Add(DashboardActiveAlarm.FromSnapshot(snapshot));
|
||||
}
|
||||
|
||||
return new DashboardAlarmQueryResult(alarms, null, session.WorkerProcessId);
|
||||
}
|
||||
catch (Exception exception) when (exception is not OperationCanceledException)
|
||||
{
|
||||
InvalidateSession();
|
||||
_logger.LogWarning(exception, "Dashboard alarm query failed; the dashboard session will be re-opened.");
|
||||
return new DashboardAlarmQueryResult([], exception.Message, null);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_gate.Release();
|
||||
}
|
||||
return Task.FromResult(new DashboardAlarmQueryResult(alarms, error, _alarmService.WorkerProcessId));
|
||||
}
|
||||
|
||||
// Returns a Ready session + its Register server handle, opening a fresh
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
using Microsoft.AspNetCore.Hosting.StaticWebAssets;
|
||||
using MxGateway.Contracts;
|
||||
using MxGateway.Server.Alarms;
|
||||
using MxGateway.Server.Configuration;
|
||||
using MxGateway.Server.Dashboard;
|
||||
using MxGateway.Server.Diagnostics;
|
||||
@@ -64,6 +65,7 @@ public static class GatewayApplication
|
||||
builder.Services.AddSingleton<IEventStreamService, EventStreamService>();
|
||||
builder.Services.AddWorkerProcessLauncher();
|
||||
builder.Services.AddGatewaySessions();
|
||||
builder.Services.AddGatewayAlarms();
|
||||
builder.Services.AddGatewayDashboard();
|
||||
builder.Services.AddGalaxyRepository();
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ using Grpc.Core;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Alarms;
|
||||
using MxGateway.Server.Metrics;
|
||||
using MxGateway.Server.Security.Authentication;
|
||||
using MxGateway.Server.Security.Authorization;
|
||||
@@ -21,9 +22,8 @@ public sealed class MxAccessGatewayService(
|
||||
IEventStreamService eventStreamService,
|
||||
GatewayMetrics metrics,
|
||||
ILogger<MxAccessGatewayService> logger,
|
||||
IAlarmRpcDispatcher? alarmRpcDispatcher = null) : MxAccessGateway.MxAccessGatewayBase
|
||||
IGatewayAlarmService alarmService) : MxAccessGateway.MxAccessGatewayBase
|
||||
{
|
||||
private readonly IAlarmRpcDispatcher alarmRpcDispatcher = alarmRpcDispatcher ?? new NotWiredAlarmRpcDispatcher();
|
||||
/// <inheritdoc />
|
||||
public override async Task<OpenSessionReply> OpenSession(
|
||||
OpenSessionRequest request,
|
||||
@@ -163,14 +163,13 @@ public sealed class MxAccessGatewayService(
|
||||
|
||||
/// <inheritdoc />
|
||||
/// <remarks>
|
||||
/// Surfaces the public AcknowledgeAlarm RPC. The gateway validates the request,
|
||||
/// resolves the session, and delegates to the registered
|
||||
/// <see cref="IAlarmRpcDispatcher"/>. DI binds the production
|
||||
/// <see cref="MxGateway.Server.Sessions.WorkerAlarmRpcDispatcher"/>, which routes
|
||||
/// the ack through the worker pipe IPC: an <c>alarm_full_reference</c> that parses
|
||||
/// as a canonical GUID forwards to <c>AcknowledgeAlarmCommand</c>; a
|
||||
/// <c>Provider!Group.Tag</c> reference forwards to <c>AcknowledgeAlarmByNameCommand</c>;
|
||||
/// anything else returns an <c>InvalidRequest</c> diagnostic.
|
||||
/// Surfaces the public AcknowledgeAlarm RPC. Acknowledgement is
|
||||
/// session-less: the gateway routes it through the always-on
|
||||
/// <see cref="IGatewayAlarmService"/> monitor session. An
|
||||
/// <c>alarm_full_reference</c> that parses as a canonical GUID forwards
|
||||
/// to <c>AcknowledgeAlarmCommand</c>; a <c>Provider!Group.Tag</c>
|
||||
/// reference forwards to <c>AcknowledgeAlarmByNameCommand</c>; anything
|
||||
/// else returns an <c>InvalidRequest</c> diagnostic in the reply.
|
||||
/// </remarks>
|
||||
public override async Task<AcknowledgeAlarmReply> AcknowledgeAlarm(
|
||||
AcknowledgeAlarmRequest request,
|
||||
@@ -179,25 +178,12 @@ public sealed class MxAccessGatewayService(
|
||||
try
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
if (string.IsNullOrEmpty(request.SessionId))
|
||||
{
|
||||
throw new RpcException(new Status(StatusCode.InvalidArgument, "session_id is required."));
|
||||
}
|
||||
if (string.IsNullOrEmpty(request.AlarmFullReference))
|
||||
{
|
||||
throw new RpcException(new Status(StatusCode.InvalidArgument, "alarm_full_reference is required."));
|
||||
}
|
||||
|
||||
// Validate the session exists. Throws SessionManagerException → mapped to
|
||||
// gRPC NotFound by the caller's MapException.
|
||||
_ = ResolveSession(request.SessionId);
|
||||
|
||||
// Delegate to the registered alarm dispatcher. DI binds the production
|
||||
// WorkerAlarmRpcDispatcher, which routes the ack over the worker IPC by
|
||||
// GUID (AcknowledgeAlarmCommand) or by Provider!Group.Tag reference
|
||||
// (AcknowledgeAlarmByNameCommand). NotWiredAlarmRpcDispatcher is only the
|
||||
// null fallback used when no dispatcher is registered.
|
||||
return await alarmRpcDispatcher.AcknowledgeAsync(request, context.CancellationToken)
|
||||
return await alarmService.AcknowledgeAsync(request, context.CancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception exception) when (exception is not RpcException)
|
||||
@@ -208,38 +194,27 @@ public sealed class MxAccessGatewayService(
|
||||
|
||||
/// <inheritdoc />
|
||||
/// <remarks>
|
||||
/// Surfaces the public QueryActiveAlarms RPC. The gateway validates the request,
|
||||
/// resolves the session, and delegates to the registered
|
||||
/// <see cref="IAlarmRpcDispatcher"/>. DI binds the production
|
||||
/// <see cref="MxGateway.Server.Sessions.WorkerAlarmRpcDispatcher"/>, which issues a
|
||||
/// <c>QueryActiveAlarmsCommand</c> over the worker pipe IPC and streams each
|
||||
/// <c>ActiveAlarmSnapshot</c> from the worker reply.
|
||||
/// Surfaces the public StreamAlarms RPC — the session-less central
|
||||
/// alarm feed. The stream opens with one <c>active_alarm</c> per
|
||||
/// currently-active alarm, then a single <c>snapshot_complete</c>, then
|
||||
/// a <c>transition</c> for every subsequent change. Served by the
|
||||
/// gateway's always-on <see cref="IGatewayAlarmService"/> monitor; any
|
||||
/// number of clients fan out from the single monitor.
|
||||
/// </remarks>
|
||||
public override async Task QueryActiveAlarms(
|
||||
QueryActiveAlarmsRequest request,
|
||||
IServerStreamWriter<ActiveAlarmSnapshot> responseStream,
|
||||
public override async Task StreamAlarms(
|
||||
StreamAlarmsRequest request,
|
||||
IServerStreamWriter<AlarmFeedMessage> responseStream,
|
||||
ServerCallContext context)
|
||||
{
|
||||
try
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
if (string.IsNullOrEmpty(request.SessionId))
|
||||
{
|
||||
throw new RpcException(new Status(StatusCode.InvalidArgument, "session_id is required."));
|
||||
}
|
||||
_ = ResolveSession(request.SessionId);
|
||||
|
||||
// Delegate to the registered alarm dispatcher. DI binds the production
|
||||
// WorkerAlarmRpcDispatcher, which issues a QueryActiveAlarmsCommand over the
|
||||
// worker IPC and streams each ActiveAlarmSnapshot from the worker reply.
|
||||
// NotWiredAlarmRpcDispatcher is only the null fallback used when no
|
||||
// dispatcher is registered.
|
||||
await foreach (ActiveAlarmSnapshot snapshot in alarmRpcDispatcher
|
||||
.QueryActiveAlarmsAsync(request, context.CancellationToken)
|
||||
await foreach (AlarmFeedMessage message in alarmService
|
||||
.StreamAsync(request.AlarmFilterPrefix, context.CancellationToken)
|
||||
.WithCancellation(context.CancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
await responseStream.WriteAsync(snapshot).ConfigureAwait(false);
|
||||
await responseStream.WriteAsync(message).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
catch (Exception exception) when (exception is not RpcException)
|
||||
|
||||
@@ -19,7 +19,7 @@ public sealed class GatewayGrpcScopeResolver
|
||||
StreamEventsRequest => GatewayScopes.EventsRead,
|
||||
MxCommandRequest commandRequest => ResolveCommandScope(commandRequest.Command?.Kind ?? MxCommandKind.Unspecified),
|
||||
AcknowledgeAlarmRequest => GatewayScopes.InvokeWrite,
|
||||
QueryActiveAlarmsRequest => GatewayScopes.EventsRead,
|
||||
StreamAlarmsRequest => GatewayScopes.EventsRead,
|
||||
TestConnectionRequest or
|
||||
GetLastDeployTimeRequest or
|
||||
DiscoverHierarchyRequest or
|
||||
|
||||
@@ -1,41 +0,0 @@
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace MxGateway.Server.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// Gateway-side dispatcher seam for the alarm-RPC surface. Bridges the
|
||||
/// public <c>AcknowledgeAlarm</c> + <c>QueryActiveAlarms</c> gRPC handlers
|
||||
/// to the worker process that hosts <c>IMxAccessAlarmConsumer</c>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// DI binds the production <see cref="WorkerAlarmRpcDispatcher"/> by
|
||||
/// default; it routes calls through the existing worker-pipe IPC.
|
||||
/// <c>NotWiredAlarmRpcDispatcher</c> is only the null fallback used
|
||||
/// when no dispatcher is registered (DI omission / standalone tests).
|
||||
/// Other tests inject a fake to exercise the gateway handler shape
|
||||
/// without spinning up a worker process.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// The dispatcher is session-scoped: every call resolves the
|
||||
/// session and forwards to that session's worker. The handler
|
||||
/// constructs the <see cref="AcknowledgeAlarmReply"/> /
|
||||
/// <see cref="ActiveAlarmSnapshot"/> stream from the dispatcher's
|
||||
/// output without further translation.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public interface IAlarmRpcDispatcher
|
||||
{
|
||||
/// <summary>Forward an Acknowledge to the worker that owns the session.</summary>
|
||||
Task<AcknowledgeAlarmReply> AcknowledgeAsync(
|
||||
AcknowledgeAlarmRequest request,
|
||||
CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>Walk active alarms on the worker that owns the session.</summary>
|
||||
IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync(
|
||||
QueryActiveAlarmsRequest request,
|
||||
CancellationToken cancellationToken);
|
||||
}
|
||||
@@ -1,51 +0,0 @@
|
||||
using System.Collections.Generic;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Grpc;
|
||||
|
||||
namespace MxGateway.Server.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// Null fallback <see cref="IAlarmRpcDispatcher"/> used when no dispatcher
|
||||
/// is registered in the DI container (DI omission or standalone tests).
|
||||
/// Acknowledges with a structured "alarm dispatcher not registered"
|
||||
/// diagnostic and yields an empty active-alarm stream.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Production wires <see cref="WorkerAlarmRpcDispatcher"/> as the
|
||||
/// default <see cref="IAlarmRpcDispatcher"/> via
|
||||
/// <c>SessionServiceCollectionExtensions.AddGatewaySessions</c>, so
|
||||
/// clients that hit this fallback are running against an
|
||||
/// intentionally minimal service composition rather than the full
|
||||
/// gateway.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class NotWiredAlarmRpcDispatcher : IAlarmRpcDispatcher
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public Task<AcknowledgeAlarmReply> AcknowledgeAsync(
|
||||
AcknowledgeAlarmRequest request,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.FromResult(new AcknowledgeAlarmReply
|
||||
{
|
||||
SessionId = request.SessionId,
|
||||
CorrelationId = request.ClientCorrelationId,
|
||||
ProtocolStatus = MxAccessGrpcMapper.Ok("AcknowledgeAlarm accepted; alarm dispatcher is not registered."),
|
||||
DiagnosticMessage = "Alarm dispatcher is not registered.",
|
||||
});
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
#pragma warning disable CS1998 // Async method lacks 'await' operators — empty stream is intentional.
|
||||
public async IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync(
|
||||
QueryActiveAlarmsRequest request,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
yield break;
|
||||
}
|
||||
#pragma warning restore CS1998
|
||||
}
|
||||
@@ -90,8 +90,6 @@ public sealed class SessionManager : ISessionManager
|
||||
_metrics.SessionOpened();
|
||||
sessionOpenedRecorded = true;
|
||||
|
||||
await TryAutoSubscribeAlarmsAsync(session, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
return session;
|
||||
}
|
||||
catch (Exception exception)
|
||||
@@ -410,100 +408,4 @@ public sealed class SessionManager : ISessionManager
|
||||
return Convert.ToBase64String(bytes);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// If <c>Alarms.Enabled</c> is configured, issue a
|
||||
/// <c>SubscribeAlarmsCommand</c> on the freshly-Ready session so the
|
||||
/// worker's wnwrap consumer starts polling. Failure handling is
|
||||
/// governed by <c>Alarms.RequireSubscribeOnOpen</c>:
|
||||
/// <list type="bullet">
|
||||
/// <item><description><c>true</c> — propagate the failure to fault the session.</description></item>
|
||||
/// <item><description><c>false</c> (default) — log a warning and let the session continue serving data subscriptions.</description></item>
|
||||
/// </list>
|
||||
/// </summary>
|
||||
private async Task TryAutoSubscribeAlarmsAsync(
|
||||
GatewaySession session,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
AlarmsOptions alarms = _options.Alarms;
|
||||
if (!alarms.Enabled) return;
|
||||
|
||||
string subscription = ResolveAlarmSubscription(alarms);
|
||||
if (string.IsNullOrWhiteSpace(subscription))
|
||||
{
|
||||
const string diagnostic =
|
||||
"Alarms.Enabled is true but no SubscriptionExpression / DefaultArea is configured.";
|
||||
if (alarms.RequireSubscribeOnOpen)
|
||||
{
|
||||
throw new SessionManagerException(
|
||||
SessionManagerErrorCode.OpenFailed, diagnostic);
|
||||
}
|
||||
_logger.LogWarning(
|
||||
"Auto-subscribe skipped for session {SessionId}: {Diagnostic}",
|
||||
session.SessionId, diagnostic);
|
||||
return;
|
||||
}
|
||||
|
||||
WorkerCommand command = new WorkerCommand
|
||||
{
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.SubscribeAlarms,
|
||||
SubscribeAlarms = new SubscribeAlarmsCommand
|
||||
{
|
||||
SubscriptionExpression = subscription,
|
||||
},
|
||||
},
|
||||
EnqueueTimestamp = Timestamp.FromDateTimeOffset(_timeProvider.GetUtcNow()),
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
WorkerCommandReply reply = await session.InvokeAsync(command, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
ProtocolStatusCode? code = reply.Reply?.ProtocolStatus?.Code;
|
||||
if (code != ProtocolStatusCode.Ok)
|
||||
{
|
||||
string diagnostic = reply.Reply?.DiagnosticMessage
|
||||
?? reply.Reply?.ProtocolStatus?.Message
|
||||
?? "Worker rejected SubscribeAlarms.";
|
||||
if (alarms.RequireSubscribeOnOpen)
|
||||
{
|
||||
throw new SessionManagerException(
|
||||
SessionManagerErrorCode.OpenFailed,
|
||||
$"Auto-subscribe failed for session {session.SessionId}: {diagnostic}");
|
||||
}
|
||||
_logger.LogWarning(
|
||||
"Auto-subscribe failed for session {SessionId} (status {StatusCode}): {Diagnostic}",
|
||||
session.SessionId, code, diagnostic);
|
||||
return;
|
||||
}
|
||||
_logger.LogInformation(
|
||||
"Alarm auto-subscribe succeeded for session {SessionId} on {Subscription}.",
|
||||
session.SessionId, subscription);
|
||||
}
|
||||
catch (SessionManagerException)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (Exception ex) when (!alarms.RequireSubscribeOnOpen)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
ex,
|
||||
"Auto-subscribe threw for session {SessionId} on {Subscription}; alarm path remains inactive.",
|
||||
session.SessionId, subscription);
|
||||
}
|
||||
}
|
||||
|
||||
private static string ResolveAlarmSubscription(AlarmsOptions alarms)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(alarms.SubscriptionExpression))
|
||||
{
|
||||
return alarms.SubscriptionExpression;
|
||||
}
|
||||
if (!string.IsNullOrWhiteSpace(alarms.DefaultArea))
|
||||
{
|
||||
return $@"\\{Environment.MachineName}\Galaxy!{alarms.DefaultArea}";
|
||||
}
|
||||
return string.Empty;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,6 @@ public static class SessionServiceCollectionExtensions
|
||||
services.AddSingleton<ISessionRegistry, SessionRegistry>();
|
||||
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
|
||||
services.AddSingleton<ISessionManager, SessionManager>();
|
||||
services.AddSingleton<IAlarmRpcDispatcher, WorkerAlarmRpcDispatcher>();
|
||||
services.AddHostedService<SessionLeaseMonitorHostedService>();
|
||||
services.AddHostedService<SessionShutdownHostedService>();
|
||||
|
||||
|
||||
@@ -1,229 +0,0 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Grpc;
|
||||
|
||||
namespace MxGateway.Server.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// Production <see cref="IAlarmRpcDispatcher"/> that routes the public
|
||||
/// <c>AcknowledgeAlarm</c> + <c>QueryActiveAlarms</c> RPCs through the
|
||||
/// worker pipe IPC. DI binds this dispatcher; <see cref="NotWiredAlarmRpcDispatcher"/>
|
||||
/// is only the null fallback used when no dispatcher is registered.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// <c>QueryActiveAlarms</c> issues a
|
||||
/// <see cref="QueryActiveAlarmsCommand"/> over the pipe and yields
|
||||
/// each <see cref="ActiveAlarmSnapshot"/> from the
|
||||
/// <see cref="QueryActiveAlarmsReplyPayload"/>.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <c>AcknowledgeAlarm</c> accepts either form of
|
||||
/// <see cref="AcknowledgeAlarmRequest.AlarmFullReference"/>: a canonical
|
||||
/// GUID forwards as an <see cref="AcknowledgeAlarmCommand"/>; a
|
||||
/// <c>Provider!Group.Tag</c> reference is parsed by
|
||||
/// <see cref="TryParseAlarmReference"/> and forwarded as an
|
||||
/// <see cref="AcknowledgeAlarmByNameCommand"/>. Any other reference
|
||||
/// returns an <c>InvalidRequest</c> diagnostic.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class WorkerAlarmRpcDispatcher(
|
||||
ISessionRegistry sessionRegistry,
|
||||
TimeProvider? timeProvider = null) : IAlarmRpcDispatcher
|
||||
{
|
||||
private readonly ISessionRegistry sessionRegistry = sessionRegistry
|
||||
?? throw new ArgumentNullException(nameof(sessionRegistry));
|
||||
private readonly TimeProvider timeProvider = timeProvider ?? TimeProvider.System;
|
||||
|
||||
/// <summary>
|
||||
/// Parse a full alarm reference of the form <c>Provider!Group.Tag</c>
|
||||
/// into its components. Convention: the first <c>!</c> separates
|
||||
/// provider from <c>Group.Tag</c>; the first <c>.</c> after the
|
||||
/// <c>!</c> separates group from tag (the tag itself may contain
|
||||
/// more dots — e.g. <c>TestMachine_001.TestAlarm001</c>).
|
||||
/// </summary>
|
||||
/// <returns>true on a well-formed reference; false otherwise.</returns>
|
||||
public static bool TryParseAlarmReference(
|
||||
string? reference,
|
||||
out string providerName,
|
||||
out string groupName,
|
||||
out string alarmName)
|
||||
{
|
||||
providerName = string.Empty;
|
||||
groupName = string.Empty;
|
||||
alarmName = string.Empty;
|
||||
if (string.IsNullOrWhiteSpace(reference)) return false;
|
||||
|
||||
int bang = reference!.IndexOf('!');
|
||||
if (bang <= 0 || bang == reference.Length - 1) return false;
|
||||
|
||||
string left = reference[..bang];
|
||||
string right = reference[(bang + 1)..];
|
||||
int dot = right.IndexOf('.');
|
||||
if (dot <= 0 || dot == right.Length - 1) return false;
|
||||
|
||||
providerName = left;
|
||||
groupName = right[..dot];
|
||||
alarmName = right[(dot + 1)..];
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<AcknowledgeAlarmReply> AcknowledgeAsync(
|
||||
AcknowledgeAlarmRequest request,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
|
||||
if (!sessionRegistry.TryGet(request.SessionId, out GatewaySession? session) || session is null)
|
||||
{
|
||||
return new AcknowledgeAlarmReply
|
||||
{
|
||||
SessionId = request.SessionId,
|
||||
CorrelationId = request.ClientCorrelationId,
|
||||
ProtocolStatus = MxAccessGrpcMapper.SessionNotFound(
|
||||
$"Session '{request.SessionId}' not found."),
|
||||
DiagnosticMessage = "AcknowledgeAlarm: session not found.",
|
||||
};
|
||||
}
|
||||
|
||||
WorkerCommand workerCommand;
|
||||
if (Guid.TryParse(request.AlarmFullReference, out Guid guid))
|
||||
{
|
||||
workerCommand = new WorkerCommand
|
||||
{
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.AcknowledgeAlarm,
|
||||
AcknowledgeAlarmCommand = new AcknowledgeAlarmCommand
|
||||
{
|
||||
AlarmGuid = guid.ToString(),
|
||||
Comment = request.Comment ?? string.Empty,
|
||||
OperatorUser = request.OperatorUser ?? string.Empty,
|
||||
// Operator node/domain/full-name are not on the public
|
||||
// RPC surface today; pass empty strings so the worker
|
||||
// honours the existing AcknowledgeAlarmCommand schema.
|
||||
OperatorNode = string.Empty,
|
||||
OperatorDomain = string.Empty,
|
||||
OperatorFullName = string.Empty,
|
||||
},
|
||||
},
|
||||
EnqueueTimestamp = Timestamp.FromDateTimeOffset(timeProvider.GetUtcNow()),
|
||||
};
|
||||
}
|
||||
else if (TryParseAlarmReference(
|
||||
request.AlarmFullReference,
|
||||
out string providerName,
|
||||
out string groupName,
|
||||
out string alarmName))
|
||||
{
|
||||
workerCommand = new WorkerCommand
|
||||
{
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.AcknowledgeAlarmByName,
|
||||
AcknowledgeAlarmByNameCommand = new AcknowledgeAlarmByNameCommand
|
||||
{
|
||||
AlarmName = alarmName,
|
||||
ProviderName = providerName,
|
||||
GroupName = groupName,
|
||||
Comment = request.Comment ?? string.Empty,
|
||||
OperatorUser = request.OperatorUser ?? string.Empty,
|
||||
OperatorNode = string.Empty,
|
||||
OperatorDomain = string.Empty,
|
||||
OperatorFullName = string.Empty,
|
||||
},
|
||||
},
|
||||
EnqueueTimestamp = Timestamp.FromDateTimeOffset(timeProvider.GetUtcNow()),
|
||||
};
|
||||
}
|
||||
else
|
||||
{
|
||||
return new AcknowledgeAlarmReply
|
||||
{
|
||||
SessionId = request.SessionId,
|
||||
CorrelationId = request.ClientCorrelationId,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.InvalidRequest,
|
||||
Message = "AlarmFullReference must be a canonical GUID or 'Provider!Group.Tag' format.",
|
||||
},
|
||||
DiagnosticMessage = $"AcknowledgeAlarm received unrecognized reference '{request.AlarmFullReference}'.",
|
||||
};
|
||||
}
|
||||
|
||||
WorkerCommandReply workerReply = await session.InvokeAsync(workerCommand, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
MxCommandReply mxReply = workerReply.Reply ?? new MxCommandReply
|
||||
{
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.ProtocolViolation,
|
||||
Message = "Worker reply did not include an MxCommandReply.",
|
||||
},
|
||||
};
|
||||
|
||||
AcknowledgeAlarmReply reply = new AcknowledgeAlarmReply
|
||||
{
|
||||
SessionId = request.SessionId,
|
||||
CorrelationId = request.ClientCorrelationId,
|
||||
ProtocolStatus = mxReply.ProtocolStatus ?? MxAccessGrpcMapper.Ok(),
|
||||
DiagnosticMessage = mxReply.DiagnosticMessage ?? string.Empty,
|
||||
};
|
||||
if (mxReply.HasHresult)
|
||||
{
|
||||
reply.Hresult = mxReply.Hresult;
|
||||
}
|
||||
return reply;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync(
|
||||
QueryActiveAlarmsRequest request,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(request);
|
||||
|
||||
if (!sessionRegistry.TryGet(request.SessionId, out GatewaySession? session) || session is null)
|
||||
{
|
||||
// Server-019: align with AcknowledgeAsync's missing-session handling and
|
||||
// surface a SessionNotFound error rather than yielding an empty stream.
|
||||
// QueryActiveAlarms is server-streaming, so a thrown exception is the
|
||||
// cleaner fit than an in-band ProtocolStatus; MxAccessGatewayService maps
|
||||
// SessionManagerException(SessionNotFound) to gRPC NotFound.
|
||||
throw new SessionManagerException(
|
||||
SessionManagerErrorCode.SessionNotFound,
|
||||
$"Session '{request.SessionId}' not found.");
|
||||
}
|
||||
|
||||
WorkerCommand workerCommand = new WorkerCommand
|
||||
{
|
||||
Command = new MxCommand
|
||||
{
|
||||
Kind = MxCommandKind.QueryActiveAlarms,
|
||||
QueryActiveAlarmsCommand = new QueryActiveAlarmsCommand
|
||||
{
|
||||
AlarmFilterPrefix = request.AlarmFilterPrefix ?? string.Empty,
|
||||
},
|
||||
},
|
||||
EnqueueTimestamp = Timestamp.FromDateTimeOffset(timeProvider.GetUtcNow()),
|
||||
};
|
||||
|
||||
WorkerCommandReply workerReply = await session.InvokeAsync(workerCommand, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
MxCommandReply? mxReply = workerReply.Reply;
|
||||
if (mxReply?.ProtocolStatus?.Code != ProtocolStatusCode.Ok) yield break;
|
||||
|
||||
QueryActiveAlarmsReplyPayload? payload = mxReply.QueryActiveAlarms;
|
||||
if (payload is null) yield break;
|
||||
|
||||
foreach (ActiveAlarmSnapshot snapshot in payload.Snapshots)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
yield return snapshot;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -71,7 +71,7 @@
|
||||
"Enabled": true,
|
||||
"SubscriptionExpression": "\\\\DESKTOP-6JL3KKO\\Galaxy!DEV",
|
||||
"DefaultArea": "",
|
||||
"RequireSubscribeOnOpen": false
|
||||
"ReconcileIntervalSeconds": 30
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ public sealed class ProtobufContractRoundTripTests
|
||||
Assert.Contains(service.Methods, method => method.Name == "Invoke");
|
||||
Assert.Contains(service.Methods, method => method.Name == "StreamEvents");
|
||||
Assert.Contains(service.Methods, method => method.Name == "AcknowledgeAlarm");
|
||||
Assert.Contains(service.Methods, method => method.Name == "QueryActiveAlarms");
|
||||
Assert.Contains(service.Methods, method => method.Name == "StreamAlarms");
|
||||
}
|
||||
|
||||
/// <summary>Verifies that worker envelope descriptor contains required correlation fields.</summary>
|
||||
@@ -306,7 +306,6 @@ public sealed class ProtobufContractRoundTripTests
|
||||
{
|
||||
var original = new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "session-1",
|
||||
ClientCorrelationId = "client-correlation-7",
|
||||
AlarmFullReference = "Tank01.Level.HiHi",
|
||||
Comment = "shift handover",
|
||||
@@ -324,7 +323,6 @@ public sealed class ProtobufContractRoundTripTests
|
||||
{
|
||||
var original = new AcknowledgeAlarmReply
|
||||
{
|
||||
SessionId = "session-1",
|
||||
CorrelationId = "gateway-correlation-7",
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
Hresult = 0,
|
||||
@@ -420,25 +418,23 @@ public sealed class ProtobufContractRoundTripTests
|
||||
Assert.Equal(AlarmConditionState.ActiveAcked, parsed.CurrentState);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that QueryActiveAlarmsRequest round-trips empty filter prefix.</summary>
|
||||
/// <summary>Verifies that StreamAlarmsRequest round-trips with and without a filter prefix.</summary>
|
||||
[Fact]
|
||||
public void QueryActiveAlarmsRequest_RoundTripsWithAndWithoutFilter()
|
||||
public void StreamAlarmsRequest_RoundTripsWithAndWithoutFilter()
|
||||
{
|
||||
var withoutFilter = new QueryActiveAlarmsRequest
|
||||
var withoutFilter = new StreamAlarmsRequest
|
||||
{
|
||||
SessionId = "session-1",
|
||||
ClientCorrelationId = "client-correlation-8",
|
||||
};
|
||||
|
||||
var withFilter = new QueryActiveAlarmsRequest
|
||||
var withFilter = new StreamAlarmsRequest
|
||||
{
|
||||
SessionId = "session-1",
|
||||
ClientCorrelationId = "client-correlation-9",
|
||||
AlarmFilterPrefix = "Tank01.",
|
||||
};
|
||||
|
||||
Assert.Equal(withoutFilter, QueryActiveAlarmsRequest.Parser.ParseFrom(withoutFilter.ToByteArray()));
|
||||
Assert.Equal(withFilter, QueryActiveAlarmsRequest.Parser.ParseFrom(withFilter.ToByteArray()));
|
||||
Assert.Equal(withoutFilter, StreamAlarmsRequest.Parser.ParseFrom(withoutFilter.ToByteArray()));
|
||||
Assert.Equal(withFilter, StreamAlarmsRequest.Parser.ParseFrom(withFilter.ToByteArray()));
|
||||
}
|
||||
|
||||
/// <summary>Verifies that an MxValue carrying a raw_value bytes payload round-trips.</summary>
|
||||
|
||||
@@ -188,7 +188,8 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests
|
||||
mapper,
|
||||
eventStreamService,
|
||||
_metrics,
|
||||
NullLogger<MxAccessGatewayService>.Instance);
|
||||
NullLogger<MxAccessGatewayService>.Instance,
|
||||
new FakeGatewayAlarmService());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -612,7 +612,8 @@ public sealed class MxAccessGatewayServiceConstraintTests
|
||||
new MxAccessGrpcMapper(),
|
||||
new FakeEventStreamService(sessionManager),
|
||||
new GatewayMetrics(),
|
||||
NullLogger<MxAccessGatewayService>.Instance);
|
||||
NullLogger<MxAccessGatewayService>.Instance,
|
||||
new FakeGatewayAlarmService());
|
||||
}
|
||||
|
||||
private static FakeSessionManager CreateSessionManagerWithSeed()
|
||||
|
||||
@@ -92,54 +92,6 @@ public sealed class MxAccessGatewayServiceTests
|
||||
Assert.Equal(1, sessionManager.InvokeCount);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that AcknowledgeAlarm maps a genuinely missing session to NotFound via
|
||||
/// the service's own <c>ResolveSession</c> lookup rather than an injected exception.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task AcknowledgeAlarm_WhenSessionMissing_ThrowsNotFound()
|
||||
{
|
||||
FakeSessionManager sessionManager = new() { ResolveOnlySeededSessions = true };
|
||||
MxAccessGatewayService service = CreateService(sessionManager);
|
||||
|
||||
RpcException exception = await Assert.ThrowsAsync<RpcException>(
|
||||
async () => await service.AcknowledgeAlarm(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "session-missing",
|
||||
AlarmFullReference = "Tank01.Level.HiHi",
|
||||
OperatorUser = "alice",
|
||||
},
|
||||
new TestServerCallContext()));
|
||||
|
||||
Assert.Equal(StatusCode.NotFound, exception.StatusCode);
|
||||
Assert.Contains("session-missing", exception.Status.Detail, StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that QueryActiveAlarms maps a genuinely missing session to NotFound via
|
||||
/// the service's own <c>ResolveSession</c> lookup rather than an injected exception.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task QueryActiveAlarms_WhenSessionMissing_ThrowsNotFound()
|
||||
{
|
||||
FakeSessionManager sessionManager = new() { ResolveOnlySeededSessions = true };
|
||||
MxAccessGatewayService service = CreateService(sessionManager);
|
||||
|
||||
RpcException exception = await Assert.ThrowsAsync<RpcException>(
|
||||
async () => await service.QueryActiveAlarms(
|
||||
new QueryActiveAlarmsRequest
|
||||
{
|
||||
SessionId = "session-missing",
|
||||
AlarmFilterPrefix = "Tank01.",
|
||||
},
|
||||
new RecordingServerStreamWriter<ActiveAlarmSnapshot>(),
|
||||
new TestServerCallContext()));
|
||||
|
||||
Assert.Equal(StatusCode.NotFound, exception.StatusCode);
|
||||
Assert.Contains("session-missing", exception.Status.Detail, StringComparison.Ordinal);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that Invoke throws InvalidArgument and does not invoke the session manager when payload is mismatched.</summary>
|
||||
[Fact]
|
||||
public async Task Invoke_WithMismatchedPayload_ThrowsInvalidArgumentAndDoesNotCallSessionManager()
|
||||
@@ -301,32 +253,13 @@ public sealed class MxAccessGatewayServiceTests
|
||||
Assert.Equal(StatusCode.InvalidArgument, exception.StatusCode);
|
||||
}
|
||||
|
||||
// ===== PR A.4 — AcknowledgeAlarm + QueryActiveAlarms handler contract =====
|
||||
// ===== AcknowledgeAlarm + StreamAlarms handler contract =====
|
||||
//
|
||||
// Worker-side dispatch (translating AcknowledgeAlarm to MxAccess Acknowledge,
|
||||
// walking the active-alarm collection for QueryActiveAlarms) is gated on PR
|
||||
// A.2's dev-rig validation. These tests pin the public surface so the worker
|
||||
// wiring lands without changing observable behaviour for clients.
|
||||
// AcknowledgeAlarm validates alarm_full_reference then delegates to the
|
||||
// session-less IGatewayAlarmService; StreamAlarms forwards the central
|
||||
// alarm feed. CreateService injects FakeGatewayAlarmService.
|
||||
|
||||
/// <summary>Verifies AcknowledgeAlarm rejects empty session_id.</summary>
|
||||
[Fact]
|
||||
public async Task AcknowledgeAlarm_WithMissingSessionId_ThrowsInvalidArgument()
|
||||
{
|
||||
MxAccessGatewayService service = CreateService(new FakeSessionManager());
|
||||
|
||||
RpcException exception = await Assert.ThrowsAsync<RpcException>(
|
||||
async () => await service.AcknowledgeAlarm(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
AlarmFullReference = "Tank01.Level.HiHi",
|
||||
OperatorUser = "alice",
|
||||
},
|
||||
new TestServerCallContext()));
|
||||
|
||||
Assert.Equal(StatusCode.InvalidArgument, exception.StatusCode);
|
||||
}
|
||||
|
||||
/// <summary>Verifies AcknowledgeAlarm rejects empty alarm_full_reference.</summary>
|
||||
/// <summary>Verifies AcknowledgeAlarm rejects an empty alarm_full_reference.</summary>
|
||||
[Fact]
|
||||
public async Task AcknowledgeAlarm_WithMissingAlarmReference_ThrowsInvalidArgument()
|
||||
{
|
||||
@@ -334,71 +267,47 @@ public sealed class MxAccessGatewayServiceTests
|
||||
|
||||
RpcException exception = await Assert.ThrowsAsync<RpcException>(
|
||||
async () => await service.AcknowledgeAlarm(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "session-1",
|
||||
OperatorUser = "alice",
|
||||
},
|
||||
new AcknowledgeAlarmRequest { OperatorUser = "alice" },
|
||||
new TestServerCallContext()));
|
||||
|
||||
Assert.Equal(StatusCode.InvalidArgument, exception.StatusCode);
|
||||
}
|
||||
|
||||
/// <summary>Verifies AcknowledgeAlarm returns OK with a "dispatcher not registered" diagnostic when DI omits the dispatcher.</summary>
|
||||
/// <summary>Verifies AcknowledgeAlarm delegates a valid request to the alarm service.</summary>
|
||||
[Fact]
|
||||
public async Task AcknowledgeAlarm_WithValidRequest_ReturnsOkWithNotRegisteredDiagnostic()
|
||||
public async Task AcknowledgeAlarm_WithValidRequest_DelegatesToAlarmService()
|
||||
{
|
||||
MxAccessGatewayService service = CreateService(new FakeSessionManager());
|
||||
|
||||
AcknowledgeAlarmReply reply = await service.AcknowledgeAlarm(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "session-1",
|
||||
ClientCorrelationId = "corr-1",
|
||||
AlarmFullReference = "Tank01.Level.HiHi",
|
||||
AlarmFullReference = "Galaxy!Area.Tank01.Level.HiHi",
|
||||
Comment = "investigating",
|
||||
OperatorUser = "alice",
|
||||
},
|
||||
new TestServerCallContext());
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
||||
Assert.Equal("session-1", reply.SessionId);
|
||||
Assert.Equal("corr-1", reply.CorrelationId);
|
||||
Assert.Contains("not registered", reply.DiagnosticMessage, StringComparison.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
/// <summary>Verifies QueryActiveAlarms rejects empty session_id.</summary>
|
||||
/// <summary>Verifies StreamAlarms forwards the central alarm feed, ending with snapshot_complete.</summary>
|
||||
[Fact]
|
||||
public async Task QueryActiveAlarms_WithMissingSessionId_ThrowsInvalidArgument()
|
||||
public async Task StreamAlarms_ForwardsTheCentralAlarmFeed()
|
||||
{
|
||||
MxAccessGatewayService service = CreateService(new FakeSessionManager());
|
||||
RecordingServerStreamWriter<AlarmFeedMessage> sink = new();
|
||||
|
||||
RpcException exception = await Assert.ThrowsAsync<RpcException>(
|
||||
async () => await service.QueryActiveAlarms(
|
||||
new QueryActiveAlarmsRequest(),
|
||||
new RecordingServerStreamWriter<ActiveAlarmSnapshot>(),
|
||||
new TestServerCallContext()));
|
||||
|
||||
Assert.Equal(StatusCode.InvalidArgument, exception.StatusCode);
|
||||
}
|
||||
|
||||
/// <summary>Verifies QueryActiveAlarms streams zero snapshots until PR A.2 wires the worker walk.</summary>
|
||||
[Fact]
|
||||
public async Task QueryActiveAlarms_WithValidRequest_StreamsZeroSnapshots()
|
||||
{
|
||||
MxAccessGatewayService service = CreateService(new FakeSessionManager());
|
||||
RecordingServerStreamWriter<ActiveAlarmSnapshot> sink = new();
|
||||
|
||||
await service.QueryActiveAlarms(
|
||||
new QueryActiveAlarmsRequest
|
||||
{
|
||||
SessionId = "session-1",
|
||||
AlarmFilterPrefix = "Tank01.",
|
||||
},
|
||||
await service.StreamAlarms(
|
||||
new StreamAlarmsRequest(),
|
||||
sink,
|
||||
new TestServerCallContext());
|
||||
|
||||
Assert.Empty(sink.Messages);
|
||||
Assert.Contains(
|
||||
sink.Messages,
|
||||
message => message.PayloadCase == AlarmFeedMessage.PayloadOneofCase.SnapshotComplete);
|
||||
}
|
||||
|
||||
/// <summary>Verifies OpenSession advertises the alarm RPC capability strings.</summary>
|
||||
@@ -433,7 +342,8 @@ public sealed class MxAccessGatewayServiceTests
|
||||
new MxAccessGrpcMapper(),
|
||||
new FakeEventStreamService(sessionManager),
|
||||
metrics ?? new GatewayMetrics(),
|
||||
NullLogger<MxAccessGatewayService>.Instance);
|
||||
NullLogger<MxAccessGatewayService>.Instance,
|
||||
new FakeGatewayAlarmService());
|
||||
}
|
||||
|
||||
private static ApiKeyIdentity CreateIdentity()
|
||||
|
||||
@@ -1,52 +0,0 @@
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Sessions;
|
||||
|
||||
namespace MxGateway.Tests.Gateway.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// Pins the null-fallback dispatcher's behaviour: AcknowledgeAsync
|
||||
/// returns OK with a "dispatcher not registered" diagnostic and
|
||||
/// QueryActiveAlarmsAsync yields an empty stream. Production binds
|
||||
/// <c>WorkerAlarmRpcDispatcher</c> in DI; this fallback is only used
|
||||
/// when no dispatcher is registered (DI omission / standalone tests).
|
||||
/// </summary>
|
||||
public sealed class NotWiredAlarmRpcDispatcherTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_WhenNotWired_ReturnsOkWithNotRegisteredDiagnostic()
|
||||
{
|
||||
IAlarmRpcDispatcher dispatcher = new NotWiredAlarmRpcDispatcher();
|
||||
|
||||
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "session-1",
|
||||
ClientCorrelationId = "corr-1",
|
||||
AlarmFullReference = "Tank01.Level.HiHi",
|
||||
Comment = "investigating",
|
||||
OperatorUser = "alice",
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
||||
Assert.Equal("session-1", reply.SessionId);
|
||||
Assert.Equal("corr-1", reply.CorrelationId);
|
||||
Assert.Contains("not registered", reply.DiagnosticMessage, StringComparison.OrdinalIgnoreCase);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task QueryActiveAlarmsAsync_WhenNotWired_YieldsNoSnapshots()
|
||||
{
|
||||
IAlarmRpcDispatcher dispatcher = new NotWiredAlarmRpcDispatcher();
|
||||
|
||||
int count = 0;
|
||||
await foreach (ActiveAlarmSnapshot _ in dispatcher.QueryActiveAlarmsAsync(
|
||||
new QueryActiveAlarmsRequest { SessionId = "session-1" },
|
||||
CancellationToken.None))
|
||||
{
|
||||
count++;
|
||||
}
|
||||
|
||||
Assert.Equal(0, count);
|
||||
}
|
||||
}
|
||||
@@ -1,305 +0,0 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using Microsoft.Extensions.Options;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Configuration;
|
||||
using MxGateway.Server.Metrics;
|
||||
using MxGateway.Server.Sessions;
|
||||
using MxGateway.Server.Workers;
|
||||
|
||||
namespace MxGateway.Tests.Gateway.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// Pins the alarm auto-subscribe hook on session open. Runs in
|
||||
/// its own file because the cases are orthogonal to
|
||||
/// <see cref="SessionManagerTests"/> (alarms-disabled vs.
|
||||
/// alarms-enabled lanes), and the fake worker client below verifies
|
||||
/// the issued <c>SubscribeAlarms</c> command shape directly.
|
||||
/// </summary>
|
||||
public sealed class SessionManagerAlarmAutoSubscribeTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task OpenSessionAsync_DoesNotAutoSubscribe_WhenAlarmsDisabled()
|
||||
{
|
||||
AlarmAutoSubscribeWorkerClient worker = new();
|
||||
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions { Enabled = false });
|
||||
|
||||
await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
|
||||
Assert.Equal(0, worker.SubscribeAlarmsInvokeCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OpenSessionAsync_AutoSubscribes_WhenEnabledWithExpression()
|
||||
{
|
||||
AlarmAutoSubscribeWorkerClient worker = new();
|
||||
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
|
||||
{
|
||||
Enabled = true,
|
||||
SubscriptionExpression = @"\\HOST\Galaxy!Area1",
|
||||
});
|
||||
|
||||
GatewaySession session = await manager.OpenSessionAsync(
|
||||
CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
|
||||
Assert.Equal(SessionState.Ready, session.State);
|
||||
Assert.Equal(1, worker.SubscribeAlarmsInvokeCount);
|
||||
Assert.Equal(@"\\HOST\Galaxy!Area1",
|
||||
worker.LastSubscribeAlarmsCommand!.SubscriptionExpression);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OpenSessionAsync_FallsBackToDefaultArea_WhenExpressionEmpty()
|
||||
{
|
||||
AlarmAutoSubscribeWorkerClient worker = new();
|
||||
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
|
||||
{
|
||||
Enabled = true,
|
||||
DefaultArea = "DEV",
|
||||
});
|
||||
|
||||
await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
|
||||
Assert.Equal(1, worker.SubscribeAlarmsInvokeCount);
|
||||
Assert.Contains(@"\Galaxy!DEV",
|
||||
worker.LastSubscribeAlarmsCommand!.SubscriptionExpression);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OpenSessionAsync_Succeeds_WhenAutoSubscribeFailsWithRequireOff()
|
||||
{
|
||||
// Worker rejects the SubscribeAlarms command. With RequireSubscribeOnOpen=false
|
||||
// (the default), the session still opens — alarm-side commands later return
|
||||
// "not subscribed", but data subscriptions work.
|
||||
AlarmAutoSubscribeWorkerClient worker = new()
|
||||
{
|
||||
SubscribeAlarmsReplyFactory = _ => new MxCommandReply
|
||||
{
|
||||
Kind = MxCommandKind.SubscribeAlarms,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.MxaccessFailure,
|
||||
Message = "wnwrap subscribe failed",
|
||||
},
|
||||
DiagnosticMessage = "alarm provider unavailable",
|
||||
},
|
||||
};
|
||||
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
|
||||
{
|
||||
Enabled = true,
|
||||
SubscriptionExpression = @"\\HOST\Galaxy!Area1",
|
||||
RequireSubscribeOnOpen = false,
|
||||
});
|
||||
|
||||
GatewaySession session = await manager.OpenSessionAsync(
|
||||
CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
|
||||
Assert.Equal(SessionState.Ready, session.State);
|
||||
Assert.Equal(1, worker.SubscribeAlarmsInvokeCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OpenSessionAsync_Throws_WhenAutoSubscribeFailsWithRequireOn()
|
||||
{
|
||||
AlarmAutoSubscribeWorkerClient worker = new()
|
||||
{
|
||||
SubscribeAlarmsReplyFactory = _ => new MxCommandReply
|
||||
{
|
||||
Kind = MxCommandKind.SubscribeAlarms,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.MxaccessFailure,
|
||||
Message = "wnwrap subscribe failed",
|
||||
},
|
||||
},
|
||||
};
|
||||
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
|
||||
{
|
||||
Enabled = true,
|
||||
SubscriptionExpression = @"\\HOST\Galaxy!Area1",
|
||||
RequireSubscribeOnOpen = true,
|
||||
});
|
||||
|
||||
await Assert.ThrowsAsync<SessionManagerException>(
|
||||
async () => await manager.OpenSessionAsync(
|
||||
CreateOpenRequest(), "client-1", CancellationToken.None));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Server-006 regression: when auto-subscribe throws after
|
||||
/// <c>SessionOpened()</c> incremented the open-session gauge, the failed
|
||||
/// open must not leave <c>mxgateway.sessions.open</c> over-counted.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task OpenSessionAsync_DoesNotLeakOpenSessionGauge_WhenAutoSubscribeFailsWithRequireOn()
|
||||
{
|
||||
AlarmAutoSubscribeWorkerClient worker = new()
|
||||
{
|
||||
SubscribeAlarmsReplyFactory = _ => new MxCommandReply
|
||||
{
|
||||
Kind = MxCommandKind.SubscribeAlarms,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.MxaccessFailure,
|
||||
Message = "wnwrap subscribe failed",
|
||||
},
|
||||
},
|
||||
};
|
||||
using GatewayMetrics metrics = new();
|
||||
SessionManager manager = NewManager(
|
||||
worker,
|
||||
alarms: new AlarmsOptions
|
||||
{
|
||||
Enabled = true,
|
||||
SubscriptionExpression = @"\\HOST\Galaxy!Area1",
|
||||
RequireSubscribeOnOpen = true,
|
||||
},
|
||||
metrics: metrics);
|
||||
|
||||
await Assert.ThrowsAsync<SessionManagerException>(
|
||||
async () => await manager.OpenSessionAsync(
|
||||
CreateOpenRequest(), "client-1", CancellationToken.None));
|
||||
|
||||
Assert.Equal(0, metrics.GetSnapshot().OpenSessions);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OpenSessionAsync_Throws_WhenEnabledButNoExpressionAndRequireOn()
|
||||
{
|
||||
AlarmAutoSubscribeWorkerClient worker = new();
|
||||
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
|
||||
{
|
||||
Enabled = true,
|
||||
// No SubscriptionExpression and no DefaultArea.
|
||||
RequireSubscribeOnOpen = true,
|
||||
});
|
||||
|
||||
await Assert.ThrowsAsync<SessionManagerException>(
|
||||
async () => await manager.OpenSessionAsync(
|
||||
CreateOpenRequest(), "client-1", CancellationToken.None));
|
||||
Assert.Equal(0, worker.SubscribeAlarmsInvokeCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OpenSessionAsync_Succeeds_WhenEnabledButNoExpressionAndRequireOff()
|
||||
{
|
||||
AlarmAutoSubscribeWorkerClient worker = new();
|
||||
SessionManager manager = NewManager(worker, alarms: new AlarmsOptions
|
||||
{
|
||||
Enabled = true,
|
||||
// No SubscriptionExpression and no DefaultArea — default require=false.
|
||||
});
|
||||
|
||||
GatewaySession session = await manager.OpenSessionAsync(
|
||||
CreateOpenRequest(), "client-1", CancellationToken.None);
|
||||
|
||||
Assert.Equal(SessionState.Ready, session.State);
|
||||
Assert.Equal(0, worker.SubscribeAlarmsInvokeCount);
|
||||
}
|
||||
|
||||
private static SessionManager NewManager(
|
||||
AlarmAutoSubscribeWorkerClient worker,
|
||||
AlarmsOptions alarms,
|
||||
GatewayMetrics? metrics = null)
|
||||
{
|
||||
FakeSessionWorkerClientFactory factory = new(worker);
|
||||
GatewayOptions options = new GatewayOptions
|
||||
{
|
||||
Sessions = new SessionOptions
|
||||
{
|
||||
DefaultCommandTimeoutSeconds = 30,
|
||||
MaxSessions = 64,
|
||||
DefaultLeaseSeconds = 1800,
|
||||
},
|
||||
Worker = new WorkerOptions
|
||||
{
|
||||
StartupTimeoutSeconds = 30,
|
||||
ShutdownTimeoutSeconds = 10,
|
||||
},
|
||||
Alarms = alarms,
|
||||
};
|
||||
return new SessionManager(
|
||||
new SessionRegistry(),
|
||||
factory,
|
||||
Options.Create(options),
|
||||
metrics ?? new GatewayMetrics());
|
||||
}
|
||||
|
||||
private static SessionOpenRequest CreateOpenRequest()
|
||||
{
|
||||
return new SessionOpenRequest(
|
||||
RequestedBackend: null,
|
||||
ClientSessionName: "test-session",
|
||||
ClientCorrelationId: "client-correlation-1",
|
||||
CommandTimeout: Duration.FromTimeSpan(TimeSpan.FromSeconds(5)));
|
||||
}
|
||||
|
||||
private sealed class FakeSessionWorkerClientFactory(IWorkerClient client) : ISessionWorkerClientFactory
|
||||
{
|
||||
public Task<IWorkerClient> CreateAsync(
|
||||
GatewaySession session,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.FromResult(client);
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class AlarmAutoSubscribeWorkerClient : IWorkerClient
|
||||
{
|
||||
public string SessionId { get; } = "session-1";
|
||||
public int? ProcessId { get; } = 1234;
|
||||
public WorkerClientState State { get; set; } = WorkerClientState.Ready;
|
||||
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
|
||||
|
||||
public int SubscribeAlarmsInvokeCount { get; private set; }
|
||||
public SubscribeAlarmsCommand? LastSubscribeAlarmsCommand { get; private set; }
|
||||
public Func<WorkerCommand, MxCommandReply>? SubscribeAlarmsReplyFactory { get; init; }
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
|
||||
public Task<WorkerCommandReply> InvokeAsync(
|
||||
WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken)
|
||||
{
|
||||
if (command.Command?.Kind == MxCommandKind.SubscribeAlarms)
|
||||
{
|
||||
SubscribeAlarmsInvokeCount++;
|
||||
LastSubscribeAlarmsCommand = command.Command.SubscribeAlarms;
|
||||
MxCommandReply reply = SubscribeAlarmsReplyFactory?.Invoke(command)
|
||||
?? new MxCommandReply
|
||||
{
|
||||
Kind = MxCommandKind.SubscribeAlarms,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.Ok,
|
||||
Message = "OK",
|
||||
},
|
||||
};
|
||||
return Task.FromResult(new WorkerCommandReply { Reply = reply });
|
||||
}
|
||||
return Task.FromResult(new WorkerCommandReply
|
||||
{
|
||||
Reply = new MxCommandReply
|
||||
{
|
||||
Kind = command.Command?.Kind ?? MxCommandKind.Unspecified,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.Ok,
|
||||
Message = "OK",
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
await Task.CompletedTask;
|
||||
yield break;
|
||||
}
|
||||
|
||||
public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken)
|
||||
=> Task.CompletedTask;
|
||||
public void Kill(string reason) { }
|
||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -1,393 +0,0 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Sessions;
|
||||
using MxGateway.Server.Workers;
|
||||
|
||||
namespace MxGateway.Tests.Gateway.Sessions;
|
||||
|
||||
/// <summary>
|
||||
/// Pins the production <see cref="WorkerAlarmRpcDispatcher"/>'s behaviour:
|
||||
/// resolves the session by id, issues the matching MxCommand over the
|
||||
/// worker pipe, and unwraps the reply into AcknowledgeAlarmReply or the
|
||||
/// ActiveAlarmSnapshot stream.
|
||||
/// </summary>
|
||||
public sealed class WorkerAlarmRpcDispatcherTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_WhenSessionMissing_ReturnsSessionNotFound()
|
||||
{
|
||||
SessionRegistry registry = new();
|
||||
WorkerAlarmRpcDispatcher dispatcher = new(registry);
|
||||
|
||||
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "missing",
|
||||
ClientCorrelationId = "c1",
|
||||
AlarmFullReference = Guid.NewGuid().ToString(),
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.SessionNotFound, reply.ProtocolStatus.Code);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_WithGuidReference_ForwardsGuidAndReturnsNativeStatus()
|
||||
{
|
||||
SessionRegistry registry = new();
|
||||
Guid alarmGuid = Guid.NewGuid();
|
||||
FakeAlarmWorkerClient worker = new()
|
||||
{
|
||||
ReplyFactory = command =>
|
||||
{
|
||||
Assert.Equal(MxCommandKind.AcknowledgeAlarm, command.Command.Kind);
|
||||
Assert.Equal(alarmGuid.ToString(), command.Command.AcknowledgeAlarmCommand.AlarmGuid);
|
||||
Assert.Equal("ack", command.Command.AcknowledgeAlarmCommand.Comment);
|
||||
Assert.Equal("alice", command.Command.AcknowledgeAlarmCommand.OperatorUser);
|
||||
return new MxCommandReply
|
||||
{
|
||||
Kind = MxCommandKind.AcknowledgeAlarm,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok, Message = "OK" },
|
||||
Hresult = 0,
|
||||
AcknowledgeAlarm = new AcknowledgeAlarmReplyPayload { NativeStatus = 0 },
|
||||
};
|
||||
},
|
||||
};
|
||||
GatewaySession session = NewSession("s1");
|
||||
session.AttachWorkerClient(worker);
|
||||
session.MarkReady();
|
||||
registry.TryAdd(session);
|
||||
|
||||
WorkerAlarmRpcDispatcher dispatcher = new(registry);
|
||||
|
||||
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "s1",
|
||||
ClientCorrelationId = "c1",
|
||||
AlarmFullReference = alarmGuid.ToString(),
|
||||
Comment = "ack",
|
||||
OperatorUser = "alice",
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
||||
Assert.Equal(0, reply.Hresult);
|
||||
Assert.Equal("s1", reply.SessionId);
|
||||
Assert.Equal("c1", reply.CorrelationId);
|
||||
Assert.Equal(1, worker.InvokeCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_WhenWorkerFails_PropagatesWorkerDiagnostic()
|
||||
{
|
||||
SessionRegistry registry = new();
|
||||
FakeAlarmWorkerClient worker = new()
|
||||
{
|
||||
ReplyFactory = _ => new MxCommandReply
|
||||
{
|
||||
Kind = MxCommandKind.AcknowledgeAlarm,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.MxaccessFailure,
|
||||
Message = "AVEVA Acknowledge failed.",
|
||||
},
|
||||
Hresult = -123,
|
||||
DiagnosticMessage = "AVEVA AlarmAckByGUID returned non-zero status -123.",
|
||||
},
|
||||
};
|
||||
GatewaySession session = NewSession("s1");
|
||||
session.AttachWorkerClient(worker);
|
||||
session.MarkReady();
|
||||
registry.TryAdd(session);
|
||||
|
||||
WorkerAlarmRpcDispatcher dispatcher = new(registry);
|
||||
|
||||
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "s1",
|
||||
AlarmFullReference = Guid.NewGuid().ToString(),
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.MxaccessFailure, reply.ProtocolStatus.Code);
|
||||
Assert.Equal(-123, reply.Hresult);
|
||||
Assert.Contains("-123", reply.DiagnosticMessage);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("Galaxy!TestArea.TestMachine_001.TestAlarm001", "Galaxy", "TestArea", "TestMachine_001.TestAlarm001")]
|
||||
[InlineData("Galaxy!Area.Tag", "Galaxy", "Area", "Tag")]
|
||||
[InlineData("Provider!Group.Tag.With.Dots", "Provider", "Group", "Tag.With.Dots")]
|
||||
public void TryParseAlarmReference_WithProviderGroupTag_DecomposesParts(
|
||||
string reference, string expectedProvider, string expectedGroup, string expectedName)
|
||||
{
|
||||
Assert.True(WorkerAlarmRpcDispatcher.TryParseAlarmReference(
|
||||
reference, out string provider, out string group, out string name));
|
||||
Assert.Equal(expectedProvider, provider);
|
||||
Assert.Equal(expectedGroup, group);
|
||||
Assert.Equal(expectedName, name);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData("")]
|
||||
[InlineData(" ")]
|
||||
[InlineData(null)]
|
||||
[InlineData("no-bang-here")]
|
||||
[InlineData("!Group.Tag")] // empty provider
|
||||
[InlineData("Galaxy!")] // bang at end
|
||||
[InlineData("Galaxy!Group")] // missing dot
|
||||
[InlineData("Galaxy!.Tag")] // empty group
|
||||
[InlineData("Galaxy!Group.")] // empty tag
|
||||
public void TryParseAlarmReference_WithMalformedReference_ReturnsFalse(string? reference)
|
||||
{
|
||||
Assert.False(WorkerAlarmRpcDispatcher.TryParseAlarmReference(
|
||||
reference, out _, out _, out _));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_WithProviderGroupTagReference_RoutesViaAckByName()
|
||||
{
|
||||
SessionRegistry registry = new();
|
||||
AcknowledgeAlarmByNameCommand? observed = null;
|
||||
FakeAlarmWorkerClient worker = new()
|
||||
{
|
||||
ReplyFactory = command =>
|
||||
{
|
||||
Assert.Equal(MxCommandKind.AcknowledgeAlarmByName, command.Command.Kind);
|
||||
observed = command.Command.AcknowledgeAlarmByNameCommand;
|
||||
return new MxCommandReply
|
||||
{
|
||||
Kind = MxCommandKind.AcknowledgeAlarmByName,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok, Message = "OK" },
|
||||
Hresult = 0,
|
||||
AcknowledgeAlarm = new AcknowledgeAlarmReplyPayload { NativeStatus = 0 },
|
||||
};
|
||||
},
|
||||
};
|
||||
GatewaySession session = NewSession("s1");
|
||||
session.AttachWorkerClient(worker);
|
||||
session.MarkReady();
|
||||
registry.TryAdd(session);
|
||||
|
||||
WorkerAlarmRpcDispatcher dispatcher = new(registry);
|
||||
|
||||
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "s1",
|
||||
ClientCorrelationId = "c1",
|
||||
AlarmFullReference = "Galaxy!TestArea.TestMachine_001.TestAlarm001",
|
||||
Comment = "ack-by-name",
|
||||
OperatorUser = "bob",
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
||||
Assert.NotNull(observed);
|
||||
Assert.Equal("TestMachine_001.TestAlarm001", observed!.AlarmName);
|
||||
Assert.Equal("Galaxy", observed.ProviderName);
|
||||
Assert.Equal("TestArea", observed.GroupName);
|
||||
Assert.Equal("bob", observed.OperatorUser);
|
||||
Assert.Equal("ack-by-name", observed.Comment);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_WithUnparseableReference_ReturnsInvalidRequest()
|
||||
{
|
||||
SessionRegistry registry = new();
|
||||
FakeAlarmWorkerClient worker = new();
|
||||
GatewaySession session = NewSession("s1");
|
||||
session.AttachWorkerClient(worker);
|
||||
session.MarkReady();
|
||||
registry.TryAdd(session);
|
||||
|
||||
WorkerAlarmRpcDispatcher dispatcher = new(registry);
|
||||
|
||||
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "s1",
|
||||
AlarmFullReference = "no-bang-no-dot",
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
Assert.Equal(ProtocolStatusCode.InvalidRequest, reply.ProtocolStatus.Code);
|
||||
Assert.Equal(0, worker.InvokeCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task QueryActiveAlarmsAsync_WithPayloadSnapshots_YieldsEachSnapshot()
|
||||
{
|
||||
SessionRegistry registry = new();
|
||||
FakeAlarmWorkerClient worker = new()
|
||||
{
|
||||
ReplyFactory = command =>
|
||||
{
|
||||
Assert.Equal(MxCommandKind.QueryActiveAlarms, command.Command.Kind);
|
||||
QueryActiveAlarmsReplyPayload payload = new QueryActiveAlarmsReplyPayload();
|
||||
payload.Snapshots.Add(new ActiveAlarmSnapshot
|
||||
{
|
||||
AlarmFullReference = "Galaxy!A.T1",
|
||||
CurrentState = AlarmConditionState.Active,
|
||||
Severity = 500,
|
||||
});
|
||||
payload.Snapshots.Add(new ActiveAlarmSnapshot
|
||||
{
|
||||
AlarmFullReference = "Galaxy!A.T2",
|
||||
CurrentState = AlarmConditionState.ActiveAcked,
|
||||
Severity = 100,
|
||||
});
|
||||
return new MxCommandReply
|
||||
{
|
||||
Kind = MxCommandKind.QueryActiveAlarms,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok, Message = "OK" },
|
||||
QueryActiveAlarms = payload,
|
||||
};
|
||||
},
|
||||
};
|
||||
GatewaySession session = NewSession("s1");
|
||||
session.AttachWorkerClient(worker);
|
||||
session.MarkReady();
|
||||
registry.TryAdd(session);
|
||||
|
||||
WorkerAlarmRpcDispatcher dispatcher = new(registry);
|
||||
|
||||
List<ActiveAlarmSnapshot> collected = new();
|
||||
await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync(
|
||||
new QueryActiveAlarmsRequest { SessionId = "s1" },
|
||||
CancellationToken.None))
|
||||
{
|
||||
collected.Add(snap);
|
||||
}
|
||||
|
||||
Assert.Equal(2, collected.Count);
|
||||
Assert.Equal("Galaxy!A.T1", collected[0].AlarmFullReference);
|
||||
Assert.Equal("Galaxy!A.T2", collected[1].AlarmFullReference);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Server-019 regression: <c>QueryActiveAlarmsAsync</c> used to silently
|
||||
/// <c>yield break</c> when the session id was not in the registry, while the
|
||||
/// peer <c>AcknowledgeAsync</c> returned <c>SessionNotFound</c>. Both methods
|
||||
/// now signal a missing session — <c>QueryActiveAlarms</c> throws a
|
||||
/// <see cref="SessionManagerException"/> with
|
||||
/// <see cref="SessionManagerErrorCode.SessionNotFound"/> (the gateway gRPC
|
||||
/// layer maps it to gRPC <c>NotFound</c>), aligning the dispatcher's
|
||||
/// missing-session contract across the two RPCs.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task QueryActiveAlarmsAsync_WhenSessionMissing_ThrowsSessionNotFound()
|
||||
{
|
||||
SessionRegistry registry = new();
|
||||
WorkerAlarmRpcDispatcher dispatcher = new(registry);
|
||||
|
||||
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(async () =>
|
||||
{
|
||||
await foreach (ActiveAlarmSnapshot _ in dispatcher.QueryActiveAlarmsAsync(
|
||||
new QueryActiveAlarmsRequest { SessionId = "missing" },
|
||||
CancellationToken.None))
|
||||
{
|
||||
// No yield expected — the throw happens before the first iteration.
|
||||
}
|
||||
});
|
||||
|
||||
Assert.Equal(SessionManagerErrorCode.SessionNotFound, exception.ErrorCode);
|
||||
|
||||
// Peer-method parity: AcknowledgeAsync still signals SessionNotFound (as an
|
||||
// in-band ProtocolStatus, since it's a unary RPC). The two methods now agree
|
||||
// that a missing session is an error, not an empty success.
|
||||
AcknowledgeAlarmReply ackReply = await dispatcher.AcknowledgeAsync(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = "missing",
|
||||
AlarmFullReference = Guid.NewGuid().ToString(),
|
||||
},
|
||||
CancellationToken.None);
|
||||
Assert.Equal(ProtocolStatusCode.SessionNotFound, ackReply.ProtocolStatus.Code);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task QueryActiveAlarmsAsync_WhenWorkerFails_YieldsEmpty()
|
||||
{
|
||||
SessionRegistry registry = new();
|
||||
FakeAlarmWorkerClient worker = new()
|
||||
{
|
||||
ReplyFactory = _ => new MxCommandReply
|
||||
{
|
||||
Kind = MxCommandKind.QueryActiveAlarms,
|
||||
ProtocolStatus = new ProtocolStatus
|
||||
{
|
||||
Code = ProtocolStatusCode.MxaccessFailure,
|
||||
Message = "alarm consumer not subscribed",
|
||||
},
|
||||
},
|
||||
};
|
||||
GatewaySession session = NewSession("s1");
|
||||
session.AttachWorkerClient(worker);
|
||||
session.MarkReady();
|
||||
registry.TryAdd(session);
|
||||
|
||||
WorkerAlarmRpcDispatcher dispatcher = new(registry);
|
||||
|
||||
List<ActiveAlarmSnapshot> collected = new();
|
||||
await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync(
|
||||
new QueryActiveAlarmsRequest { SessionId = "s1" },
|
||||
CancellationToken.None))
|
||||
{
|
||||
collected.Add(snap);
|
||||
}
|
||||
|
||||
Assert.Empty(collected);
|
||||
}
|
||||
|
||||
private static GatewaySession NewSession(string sessionId)
|
||||
{
|
||||
return new(
|
||||
sessionId,
|
||||
"mxaccess",
|
||||
$"mxaccess-gateway-1-{sessionId}",
|
||||
"nonce",
|
||||
"client-1",
|
||||
"test-session",
|
||||
"client-correlation-1",
|
||||
commandTimeout: TimeSpan.FromSeconds(30),
|
||||
startupTimeout: TimeSpan.FromSeconds(5),
|
||||
shutdownTimeout: TimeSpan.FromSeconds(5),
|
||||
leaseDuration: TimeSpan.FromMinutes(30),
|
||||
openedAt: DateTimeOffset.UtcNow);
|
||||
}
|
||||
|
||||
private sealed class FakeAlarmWorkerClient : IWorkerClient
|
||||
{
|
||||
public string SessionId { get; } = "session-1";
|
||||
public int? ProcessId { get; } = 1;
|
||||
public WorkerClientState State { get; } = WorkerClientState.Ready;
|
||||
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
|
||||
|
||||
public Func<WorkerCommand, MxCommandReply>? ReplyFactory { get; set; }
|
||||
public int InvokeCount { get; private set; }
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
|
||||
public Task<WorkerCommandReply> InvokeAsync(
|
||||
WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken)
|
||||
{
|
||||
InvokeCount++;
|
||||
MxCommandReply reply = ReplyFactory?.Invoke(command) ?? new MxCommandReply();
|
||||
return Task.FromResult(new WorkerCommandReply { Reply = reply });
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
await Task.CompletedTask;
|
||||
yield break;
|
||||
}
|
||||
|
||||
public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||
public void Kill(string reason) { }
|
||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
+6
-5
@@ -266,7 +266,7 @@ public sealed class GatewayGrpcAuthorizationInterceptorTests
|
||||
|
||||
RpcException exception = await Assert.ThrowsAsync<RpcException>(
|
||||
() => interceptor.UnaryServerHandler(
|
||||
new AcknowledgeAlarmRequest { SessionId = "session-1", AlarmFullReference = "ref" },
|
||||
new AcknowledgeAlarmRequest { AlarmFullReference = "ref" },
|
||||
ContextWithAuthorization("Bearer mxgw_operator01_secret"),
|
||||
(_, _) => Task.FromResult(new AcknowledgeAlarmReply())));
|
||||
|
||||
@@ -284,7 +284,7 @@ public sealed class GatewayGrpcAuthorizationInterceptorTests
|
||||
bool handlerRan = false;
|
||||
|
||||
AcknowledgeAlarmReply reply = await interceptor.UnaryServerHandler(
|
||||
new AcknowledgeAlarmRequest { SessionId = "session-1", AlarmFullReference = "ref" },
|
||||
new AcknowledgeAlarmRequest { AlarmFullReference = "ref" },
|
||||
ContextWithAuthorization("Bearer mxgw_operator01_secret"),
|
||||
(_, _) =>
|
||||
{
|
||||
@@ -310,7 +310,7 @@ public sealed class GatewayGrpcAuthorizationInterceptorTests
|
||||
|
||||
RpcException exception = await Assert.ThrowsAsync<RpcException>(
|
||||
() => interceptor.ServerStreamingServerHandler(
|
||||
new QueryActiveAlarmsRequest { SessionId = "session-1" },
|
||||
new StreamAlarmsRequest(),
|
||||
new RecordingServerStreamWriter<ActiveAlarmSnapshot>(),
|
||||
ContextWithAuthorization("Bearer mxgw_operator01_secret"),
|
||||
(_, _, _) => Task.CompletedTask));
|
||||
@@ -329,7 +329,7 @@ public sealed class GatewayGrpcAuthorizationInterceptorTests
|
||||
RecordingServerStreamWriter<ActiveAlarmSnapshot> streamWriter = new();
|
||||
|
||||
await interceptor.ServerStreamingServerHandler(
|
||||
new QueryActiveAlarmsRequest { SessionId = "session-1" },
|
||||
new StreamAlarmsRequest(),
|
||||
streamWriter,
|
||||
ContextWithAuthorization("Bearer mxgw_operator01_secret"),
|
||||
async (_, writer, _) =>
|
||||
@@ -352,7 +352,8 @@ public sealed class GatewayGrpcAuthorizationInterceptorTests
|
||||
new MxAccessGrpcMapper(),
|
||||
new NoOpEventStreamService(),
|
||||
new GatewayMetrics(),
|
||||
NullLogger<MxAccessGatewayService>.Instance);
|
||||
NullLogger<MxAccessGatewayService>.Instance,
|
||||
new FakeGatewayAlarmService());
|
||||
}
|
||||
|
||||
private static GatewayGrpcAuthorizationInterceptor CreateInterceptor(
|
||||
|
||||
@@ -14,7 +14,7 @@ public sealed class GatewayGrpcScopeResolverTests
|
||||
[InlineData(typeof(CloseSessionRequest), GatewayScopes.SessionClose)]
|
||||
[InlineData(typeof(StreamEventsRequest), GatewayScopes.EventsRead)]
|
||||
[InlineData(typeof(AcknowledgeAlarmRequest), GatewayScopes.InvokeWrite)]
|
||||
[InlineData(typeof(QueryActiveAlarmsRequest), GatewayScopes.EventsRead)]
|
||||
[InlineData(typeof(StreamAlarmsRequest), GatewayScopes.EventsRead)]
|
||||
[InlineData(typeof(TestConnectionRequest), GatewayScopes.MetadataRead)]
|
||||
[InlineData(typeof(GetLastDeployTimeRequest), GatewayScopes.MetadataRead)]
|
||||
[InlineData(typeof(DiscoverHierarchyRequest), GatewayScopes.MetadataRead)]
|
||||
|
||||
@@ -0,0 +1,54 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using MxGateway.Server.Alarms;
|
||||
|
||||
namespace MxGateway.Tests.TestSupport;
|
||||
|
||||
/// <summary>
|
||||
/// <see cref="IGatewayAlarmService"/> test double — serves a scripted
|
||||
/// active-alarm set and acknowledges every request with an OK status,
|
||||
/// so gRPC service tests can exercise the alarm handlers without the
|
||||
/// real gateway alarm monitor or a worker.
|
||||
/// </summary>
|
||||
public sealed class FakeGatewayAlarmService : IGatewayAlarmService
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public GatewayAlarmMonitorState State { get; set; } = GatewayAlarmMonitorState.Monitoring;
|
||||
|
||||
/// <inheritdoc />
|
||||
public string? LastError { get; set; }
|
||||
|
||||
/// <inheritdoc />
|
||||
public int? WorkerProcessId { get; set; }
|
||||
|
||||
/// <inheritdoc />
|
||||
public IReadOnlyList<ActiveAlarmSnapshot> CurrentAlarms { get; set; } = [];
|
||||
|
||||
/// <inheritdoc />
|
||||
public async IAsyncEnumerable<AlarmFeedMessage> StreamAsync(
|
||||
string? alarmFilterPrefix,
|
||||
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||
{
|
||||
foreach (ActiveAlarmSnapshot alarm in CurrentAlarms)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
yield return new AlarmFeedMessage { ActiveAlarm = alarm };
|
||||
}
|
||||
|
||||
yield return new AlarmFeedMessage { SnapshotComplete = true };
|
||||
await Task.CompletedTask.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<AcknowledgeAlarmReply> AcknowledgeAsync(
|
||||
AcknowledgeAlarmRequest request,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.FromResult(new AcknowledgeAlarmReply
|
||||
{
|
||||
CorrelationId = request.ClientCorrelationId,
|
||||
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||
DiagnosticMessage = string.Empty,
|
||||
});
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user