A.3 (gateway dispatcher): WorkerAlarmRpcDispatcher routes alarm RPCs over the worker pipe
Replaces NotWiredAlarmRpcDispatcher in DI with a production
implementation that issues the new MxCommandKind.{AcknowledgeAlarm,
QueryActiveAlarms} commands across the IPC and unwraps the resulting
MxCommandReply into the public RPC types.
QueryActiveAlarms is fully wired: builds the QueryActiveAlarmsCommand
(forwarding alarm_filter_prefix), invokes it on the resolved
GatewaySession's worker client, and yields each ActiveAlarmSnapshot
from the QueryActiveAlarmsReplyPayload as the RPC stream. Worker
failures + missing sessions yield an empty stream — matches the
ConditionRefresh contract clients already speak to.
AcknowledgeAlarm is partially wired: the public RPC takes
AlarmFullReference (Provider!Group.Tag), but the worker's wnwrap
consumer acks by GUID. Strategy:
- If AlarmFullReference parses as a canonical GUID, forward it
directly through MxCommandKind.AcknowledgeAlarm. Native status
flows back via MxCommandReply.Hresult and the dedicated
AcknowledgeAlarmReplyPayload.NativeStatus.
- Otherwise, return InvalidRequest with a clear diagnostic naming the
follow-up — reference→GUID lookup needs a worker-side AlarmAckByName
command wrapping wwAlarmConsumerClass.AlarmAckByName.
DI: SessionServiceCollectionExtensions registers WorkerAlarmRpcDispatcher
as the default IAlarmRpcDispatcher; MxAccessGatewayService picks it up
via constructor injection. NotWiredAlarmRpcDispatcher is retained for
test fixtures that want the no-side-effect fake.
Tests: 7 new unit tests cover session-not-found short-circuit, GUID-vs-
reference branching, native-status propagation, worker MxaccessFailure
diagnostic propagation, and snapshot-stream yielding. Server test
suite total: 288/0 fail. Solution builds clean.
End-to-end alarms-over-gateway pipeline status:
consumer → sink → queue (A.2 + A.3 in-process slice)
worker IPC commands (A.3 worker slice)
gateway dispatcher (this slice)
Remaining for full E2E:
- Auto-issue SubscribeAlarms on session open (or add a public
SubscribeAlarms RPC). Without this trigger the consumer never
starts and Acknowledge/Query return "not subscribed".
- AlarmAckByName worker command for ack-by-reference.
- End-to-end live test against the dev rig.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -11,6 +11,7 @@ public static class SessionServiceCollectionExtensions
|
|||||||
services.AddSingleton<ISessionRegistry, SessionRegistry>();
|
services.AddSingleton<ISessionRegistry, SessionRegistry>();
|
||||||
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
|
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
|
||||||
services.AddSingleton<ISessionManager, SessionManager>();
|
services.AddSingleton<ISessionManager, SessionManager>();
|
||||||
|
services.AddSingleton<IAlarmRpcDispatcher, WorkerAlarmRpcDispatcher>();
|
||||||
services.AddHostedService<SessionLeaseMonitorHostedService>();
|
services.AddHostedService<SessionLeaseMonitorHostedService>();
|
||||||
services.AddHostedService<SessionShutdownHostedService>();
|
services.AddHostedService<SessionShutdownHostedService>();
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,172 @@
|
|||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Runtime.CompilerServices;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Google.Protobuf.WellKnownTypes;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Grpc;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Sessions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Production <see cref="IAlarmRpcDispatcher"/> that routes the public
|
||||||
|
/// <c>AcknowledgeAlarm</c> + <c>QueryActiveAlarms</c> RPCs through the
|
||||||
|
/// worker pipe IPC. Replaces <see cref="NotWiredAlarmRpcDispatcher"/>
|
||||||
|
/// once the worker AlarmCommandHandler is wired in.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>
|
||||||
|
/// <c>QueryActiveAlarms</c> is fully wired: issues a
|
||||||
|
/// <see cref="QueryActiveAlarmsCommand"/> over the pipe and yields
|
||||||
|
/// each <see cref="ActiveAlarmSnapshot"/> from the
|
||||||
|
/// <see cref="QueryActiveAlarmsReplyPayload"/>.
|
||||||
|
/// </para>
|
||||||
|
/// <para>
|
||||||
|
/// <c>AcknowledgeAlarm</c> is partially wired: the public RPC's
|
||||||
|
/// <see cref="AcknowledgeAlarmRequest.AlarmFullReference"/> is a
|
||||||
|
/// <c>Provider!Group.Tag</c> string, but the worker's wnwrap consumer
|
||||||
|
/// acks by GUID. When the supplied reference parses as a GUID
|
||||||
|
/// directly, the dispatcher forwards it as-is. Otherwise it
|
||||||
|
/// returns an <c>Unimplemented</c> diagnostic. Resolving
|
||||||
|
/// reference→GUID requires an additional worker IPC command
|
||||||
|
/// (e.g. <c>AlarmAckByName</c> wrapping
|
||||||
|
/// <c>wwAlarmConsumerClass.AlarmAckByName</c>) and is tracked as
|
||||||
|
/// a follow-up.
|
||||||
|
/// </para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class WorkerAlarmRpcDispatcher : IAlarmRpcDispatcher
|
||||||
|
{
|
||||||
|
private readonly ISessionRegistry sessionRegistry;
|
||||||
|
private readonly TimeProvider timeProvider;
|
||||||
|
|
||||||
|
public WorkerAlarmRpcDispatcher(ISessionRegistry sessionRegistry, TimeProvider? timeProvider = null)
|
||||||
|
{
|
||||||
|
this.sessionRegistry = sessionRegistry ?? throw new System.ArgumentNullException(nameof(sessionRegistry));
|
||||||
|
this.timeProvider = timeProvider ?? TimeProvider.System;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task<AcknowledgeAlarmReply> AcknowledgeAsync(
|
||||||
|
AcknowledgeAlarmRequest request,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
if (request is null) throw new System.ArgumentNullException(nameof(request));
|
||||||
|
|
||||||
|
if (!sessionRegistry.TryGet(request.SessionId, out GatewaySession session))
|
||||||
|
{
|
||||||
|
return new AcknowledgeAlarmReply
|
||||||
|
{
|
||||||
|
SessionId = request.SessionId,
|
||||||
|
CorrelationId = request.ClientCorrelationId,
|
||||||
|
ProtocolStatus = MxAccessGrpcMapper.SessionNotFound(
|
||||||
|
$"Session '{request.SessionId}' not found."),
|
||||||
|
DiagnosticMessage = "AcknowledgeAlarm: session not found.",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!System.Guid.TryParse(request.AlarmFullReference, out System.Guid guid))
|
||||||
|
{
|
||||||
|
// Reference→GUID lookup not yet implemented. Surface a clear
|
||||||
|
// diagnostic so client teams can plumb the reference parser
|
||||||
|
// when the worker AlarmAckByName command lands.
|
||||||
|
return new AcknowledgeAlarmReply
|
||||||
|
{
|
||||||
|
SessionId = request.SessionId,
|
||||||
|
CorrelationId = request.ClientCorrelationId,
|
||||||
|
ProtocolStatus = new ProtocolStatus
|
||||||
|
{
|
||||||
|
Code = ProtocolStatusCode.InvalidRequest,
|
||||||
|
Message = "AlarmFullReference must currently be a canonical GUID; reference→GUID lookup is pending the AlarmAckByName worker command.",
|
||||||
|
},
|
||||||
|
DiagnosticMessage = $"AcknowledgeAlarm received non-GUID reference '{request.AlarmFullReference}'.",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerCommand workerCommand = new WorkerCommand
|
||||||
|
{
|
||||||
|
Command = new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.AcknowledgeAlarm,
|
||||||
|
AcknowledgeAlarmCommand = new AcknowledgeAlarmCommand
|
||||||
|
{
|
||||||
|
AlarmGuid = guid.ToString(),
|
||||||
|
Comment = request.Comment ?? string.Empty,
|
||||||
|
OperatorUser = request.OperatorUser ?? string.Empty,
|
||||||
|
// Operator node/domain/full-name are not on the public
|
||||||
|
// RPC surface today; pass empty strings so the worker
|
||||||
|
// honours the existing AcknowledgeAlarmCommand schema.
|
||||||
|
OperatorNode = string.Empty,
|
||||||
|
OperatorDomain = string.Empty,
|
||||||
|
OperatorFullName = string.Empty,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
EnqueueTimestamp = Timestamp.FromDateTimeOffset(timeProvider.GetUtcNow()),
|
||||||
|
};
|
||||||
|
|
||||||
|
WorkerCommandReply workerReply = await session.InvokeAsync(workerCommand, cancellationToken)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
|
||||||
|
MxCommandReply mxReply = workerReply.Reply ?? new MxCommandReply
|
||||||
|
{
|
||||||
|
ProtocolStatus = new ProtocolStatus
|
||||||
|
{
|
||||||
|
Code = ProtocolStatusCode.ProtocolViolation,
|
||||||
|
Message = "Worker reply did not include an MxCommandReply.",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
AcknowledgeAlarmReply reply = new AcknowledgeAlarmReply
|
||||||
|
{
|
||||||
|
SessionId = request.SessionId,
|
||||||
|
CorrelationId = request.ClientCorrelationId,
|
||||||
|
ProtocolStatus = mxReply.ProtocolStatus ?? MxAccessGrpcMapper.Ok(),
|
||||||
|
DiagnosticMessage = mxReply.DiagnosticMessage ?? string.Empty,
|
||||||
|
};
|
||||||
|
if (mxReply.HasHresult)
|
||||||
|
{
|
||||||
|
reply.Hresult = mxReply.Hresult;
|
||||||
|
}
|
||||||
|
return reply;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync(
|
||||||
|
QueryActiveAlarmsRequest request,
|
||||||
|
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
if (request is null) throw new System.ArgumentNullException(nameof(request));
|
||||||
|
|
||||||
|
if (!sessionRegistry.TryGet(request.SessionId, out GatewaySession session))
|
||||||
|
{
|
||||||
|
yield break;
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerCommand workerCommand = new WorkerCommand
|
||||||
|
{
|
||||||
|
Command = new MxCommand
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.QueryActiveAlarms,
|
||||||
|
QueryActiveAlarmsCommand = new QueryActiveAlarmsCommand
|
||||||
|
{
|
||||||
|
AlarmFilterPrefix = request.AlarmFilterPrefix ?? string.Empty,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
EnqueueTimestamp = Timestamp.FromDateTimeOffset(timeProvider.GetUtcNow()),
|
||||||
|
};
|
||||||
|
|
||||||
|
WorkerCommandReply workerReply = await session.InvokeAsync(workerCommand, cancellationToken)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
|
||||||
|
MxCommandReply? mxReply = workerReply.Reply;
|
||||||
|
if (mxReply?.ProtocolStatus?.Code != ProtocolStatusCode.Ok) yield break;
|
||||||
|
|
||||||
|
QueryActiveAlarmsReplyPayload? payload = mxReply.QueryActiveAlarms;
|
||||||
|
if (payload is null) yield break;
|
||||||
|
|
||||||
|
foreach (ActiveAlarmSnapshot snapshot in payload.Snapshots)
|
||||||
|
{
|
||||||
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
|
yield return snapshot;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,300 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Runtime.CompilerServices;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Sessions;
|
||||||
|
using MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
namespace MxGateway.Tests.Gateway.Sessions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Pins the production <see cref="WorkerAlarmRpcDispatcher"/>'s behaviour:
|
||||||
|
/// resolves the session by id, issues the matching MxCommand over the
|
||||||
|
/// worker pipe, and unwraps the reply into AcknowledgeAlarmReply or the
|
||||||
|
/// ActiveAlarmSnapshot stream.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class WorkerAlarmRpcDispatcherTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task AcknowledgeAsync_returns_session_not_found_when_session_missing()
|
||||||
|
{
|
||||||
|
SessionRegistry registry = new SessionRegistry();
|
||||||
|
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
|
||||||
|
|
||||||
|
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||||
|
new AcknowledgeAlarmRequest
|
||||||
|
{
|
||||||
|
SessionId = "missing",
|
||||||
|
ClientCorrelationId = "c1",
|
||||||
|
AlarmFullReference = Guid.NewGuid().ToString(),
|
||||||
|
},
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.SessionNotFound, reply.ProtocolStatus.Code);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task AcknowledgeAsync_returns_invalid_request_when_reference_is_not_a_guid()
|
||||||
|
{
|
||||||
|
SessionRegistry registry = new SessionRegistry();
|
||||||
|
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient();
|
||||||
|
GatewaySession session = NewSession("s1");
|
||||||
|
session.AttachWorkerClient(worker);
|
||||||
|
session.MarkReady();
|
||||||
|
registry.TryAdd(session);
|
||||||
|
|
||||||
|
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
|
||||||
|
|
||||||
|
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||||
|
new AcknowledgeAlarmRequest
|
||||||
|
{
|
||||||
|
SessionId = "s1",
|
||||||
|
ClientCorrelationId = "c1",
|
||||||
|
AlarmFullReference = "Galaxy!Area.Tag", // not a GUID
|
||||||
|
Comment = "x",
|
||||||
|
OperatorUser = "u",
|
||||||
|
},
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.InvalidRequest, reply.ProtocolStatus.Code);
|
||||||
|
Assert.Equal(0, worker.InvokeCount); // dispatcher short-circuits before the IPC.
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task AcknowledgeAsync_forwards_guid_and_returns_native_status()
|
||||||
|
{
|
||||||
|
SessionRegistry registry = new SessionRegistry();
|
||||||
|
Guid alarmGuid = Guid.NewGuid();
|
||||||
|
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
|
||||||
|
{
|
||||||
|
ReplyFactory = command =>
|
||||||
|
{
|
||||||
|
Assert.Equal(MxCommandKind.AcknowledgeAlarm, command.Command.Kind);
|
||||||
|
Assert.Equal(alarmGuid.ToString(), command.Command.AcknowledgeAlarmCommand.AlarmGuid);
|
||||||
|
Assert.Equal("ack", command.Command.AcknowledgeAlarmCommand.Comment);
|
||||||
|
Assert.Equal("alice", command.Command.AcknowledgeAlarmCommand.OperatorUser);
|
||||||
|
return new MxCommandReply
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.AcknowledgeAlarm,
|
||||||
|
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok, Message = "OK" },
|
||||||
|
Hresult = 0,
|
||||||
|
AcknowledgeAlarm = new AcknowledgeAlarmReplyPayload { NativeStatus = 0 },
|
||||||
|
};
|
||||||
|
},
|
||||||
|
};
|
||||||
|
GatewaySession session = NewSession("s1");
|
||||||
|
session.AttachWorkerClient(worker);
|
||||||
|
session.MarkReady();
|
||||||
|
registry.TryAdd(session);
|
||||||
|
|
||||||
|
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
|
||||||
|
|
||||||
|
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||||
|
new AcknowledgeAlarmRequest
|
||||||
|
{
|
||||||
|
SessionId = "s1",
|
||||||
|
ClientCorrelationId = "c1",
|
||||||
|
AlarmFullReference = alarmGuid.ToString(),
|
||||||
|
Comment = "ack",
|
||||||
|
OperatorUser = "alice",
|
||||||
|
},
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.Ok, reply.ProtocolStatus.Code);
|
||||||
|
Assert.Equal(0, reply.Hresult);
|
||||||
|
Assert.Equal("s1", reply.SessionId);
|
||||||
|
Assert.Equal("c1", reply.CorrelationId);
|
||||||
|
Assert.Equal(1, worker.InvokeCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task AcknowledgeAsync_propagates_worker_diagnostic_on_failure()
|
||||||
|
{
|
||||||
|
SessionRegistry registry = new SessionRegistry();
|
||||||
|
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
|
||||||
|
{
|
||||||
|
ReplyFactory = _ => new MxCommandReply
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.AcknowledgeAlarm,
|
||||||
|
ProtocolStatus = new ProtocolStatus
|
||||||
|
{
|
||||||
|
Code = ProtocolStatusCode.MxaccessFailure,
|
||||||
|
Message = "AVEVA Acknowledge failed.",
|
||||||
|
},
|
||||||
|
Hresult = -123,
|
||||||
|
DiagnosticMessage = "AVEVA AlarmAckByGUID returned non-zero status -123.",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
GatewaySession session = NewSession("s1");
|
||||||
|
session.AttachWorkerClient(worker);
|
||||||
|
session.MarkReady();
|
||||||
|
registry.TryAdd(session);
|
||||||
|
|
||||||
|
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
|
||||||
|
|
||||||
|
AcknowledgeAlarmReply reply = await dispatcher.AcknowledgeAsync(
|
||||||
|
new AcknowledgeAlarmRequest
|
||||||
|
{
|
||||||
|
SessionId = "s1",
|
||||||
|
AlarmFullReference = Guid.NewGuid().ToString(),
|
||||||
|
},
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
Assert.Equal(ProtocolStatusCode.MxaccessFailure, reply.ProtocolStatus.Code);
|
||||||
|
Assert.Equal(-123, reply.Hresult);
|
||||||
|
Assert.Contains("-123", reply.DiagnosticMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task QueryActiveAlarmsAsync_yields_each_snapshot_from_payload()
|
||||||
|
{
|
||||||
|
SessionRegistry registry = new SessionRegistry();
|
||||||
|
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
|
||||||
|
{
|
||||||
|
ReplyFactory = command =>
|
||||||
|
{
|
||||||
|
Assert.Equal(MxCommandKind.QueryActiveAlarms, command.Command.Kind);
|
||||||
|
QueryActiveAlarmsReplyPayload payload = new QueryActiveAlarmsReplyPayload();
|
||||||
|
payload.Snapshots.Add(new ActiveAlarmSnapshot
|
||||||
|
{
|
||||||
|
AlarmFullReference = "Galaxy!A.T1",
|
||||||
|
CurrentState = AlarmConditionState.Active,
|
||||||
|
Severity = 500,
|
||||||
|
});
|
||||||
|
payload.Snapshots.Add(new ActiveAlarmSnapshot
|
||||||
|
{
|
||||||
|
AlarmFullReference = "Galaxy!A.T2",
|
||||||
|
CurrentState = AlarmConditionState.ActiveAcked,
|
||||||
|
Severity = 100,
|
||||||
|
});
|
||||||
|
return new MxCommandReply
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.QueryActiveAlarms,
|
||||||
|
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok, Message = "OK" },
|
||||||
|
QueryActiveAlarms = payload,
|
||||||
|
};
|
||||||
|
},
|
||||||
|
};
|
||||||
|
GatewaySession session = NewSession("s1");
|
||||||
|
session.AttachWorkerClient(worker);
|
||||||
|
session.MarkReady();
|
||||||
|
registry.TryAdd(session);
|
||||||
|
|
||||||
|
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
|
||||||
|
|
||||||
|
List<ActiveAlarmSnapshot> collected = new List<ActiveAlarmSnapshot>();
|
||||||
|
await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync(
|
||||||
|
new QueryActiveAlarmsRequest { SessionId = "s1" },
|
||||||
|
CancellationToken.None))
|
||||||
|
{
|
||||||
|
collected.Add(snap);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.Equal(2, collected.Count);
|
||||||
|
Assert.Equal("Galaxy!A.T1", collected[0].AlarmFullReference);
|
||||||
|
Assert.Equal("Galaxy!A.T2", collected[1].AlarmFullReference);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task QueryActiveAlarmsAsync_yields_empty_when_session_missing()
|
||||||
|
{
|
||||||
|
SessionRegistry registry = new SessionRegistry();
|
||||||
|
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
|
||||||
|
|
||||||
|
List<ActiveAlarmSnapshot> collected = new List<ActiveAlarmSnapshot>();
|
||||||
|
await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync(
|
||||||
|
new QueryActiveAlarmsRequest { SessionId = "missing" },
|
||||||
|
CancellationToken.None))
|
||||||
|
{
|
||||||
|
collected.Add(snap);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.Empty(collected);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task QueryActiveAlarmsAsync_yields_empty_on_worker_failure()
|
||||||
|
{
|
||||||
|
SessionRegistry registry = new SessionRegistry();
|
||||||
|
FakeAlarmWorkerClient worker = new FakeAlarmWorkerClient
|
||||||
|
{
|
||||||
|
ReplyFactory = _ => new MxCommandReply
|
||||||
|
{
|
||||||
|
Kind = MxCommandKind.QueryActiveAlarms,
|
||||||
|
ProtocolStatus = new ProtocolStatus
|
||||||
|
{
|
||||||
|
Code = ProtocolStatusCode.MxaccessFailure,
|
||||||
|
Message = "alarm consumer not subscribed",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
GatewaySession session = NewSession("s1");
|
||||||
|
session.AttachWorkerClient(worker);
|
||||||
|
session.MarkReady();
|
||||||
|
registry.TryAdd(session);
|
||||||
|
|
||||||
|
WorkerAlarmRpcDispatcher dispatcher = new WorkerAlarmRpcDispatcher(registry);
|
||||||
|
|
||||||
|
List<ActiveAlarmSnapshot> collected = new List<ActiveAlarmSnapshot>();
|
||||||
|
await foreach (ActiveAlarmSnapshot snap in dispatcher.QueryActiveAlarmsAsync(
|
||||||
|
new QueryActiveAlarmsRequest { SessionId = "s1" },
|
||||||
|
CancellationToken.None))
|
||||||
|
{
|
||||||
|
collected.Add(snap);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.Empty(collected);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static GatewaySession NewSession(string sessionId)
|
||||||
|
{
|
||||||
|
return new GatewaySession(
|
||||||
|
sessionId,
|
||||||
|
"mxaccess",
|
||||||
|
$"mxaccess-gateway-1-{sessionId}",
|
||||||
|
"nonce",
|
||||||
|
"client-1",
|
||||||
|
"test-session",
|
||||||
|
"client-correlation-1",
|
||||||
|
commandTimeout: TimeSpan.FromSeconds(30),
|
||||||
|
startupTimeout: TimeSpan.FromSeconds(5),
|
||||||
|
shutdownTimeout: TimeSpan.FromSeconds(5),
|
||||||
|
leaseDuration: TimeSpan.FromMinutes(30),
|
||||||
|
openedAt: DateTimeOffset.UtcNow);
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class FakeAlarmWorkerClient : IWorkerClient
|
||||||
|
{
|
||||||
|
public string SessionId { get; } = "session-1";
|
||||||
|
public int? ProcessId { get; } = 1;
|
||||||
|
public WorkerClientState State { get; } = WorkerClientState.Ready;
|
||||||
|
public DateTimeOffset LastHeartbeatAt { get; } = DateTimeOffset.UtcNow;
|
||||||
|
|
||||||
|
public Func<WorkerCommand, MxCommandReply>? ReplyFactory { get; set; }
|
||||||
|
public int InvokeCount { get; private set; }
|
||||||
|
|
||||||
|
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;
|
||||||
|
|
||||||
|
public Task<WorkerCommandReply> InvokeAsync(
|
||||||
|
WorkerCommand command, TimeSpan timeout, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
InvokeCount++;
|
||||||
|
MxCommandReply reply = ReplyFactory?.Invoke(command) ?? new MxCommandReply();
|
||||||
|
return Task.FromResult(new WorkerCommandReply { Reply = reply });
|
||||||
|
}
|
||||||
|
|
||||||
|
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
|
||||||
|
[EnumeratorCancellation] CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
await Task.CompletedTask;
|
||||||
|
yield break;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task ShutdownAsync(TimeSpan timeout, CancellationToken cancellationToken) => Task.CompletedTask;
|
||||||
|
public void Kill(string reason) { }
|
||||||
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user