gateway: AcknowledgeAlarm + QueryActiveAlarms RPC handlers (PR A.3)
Twelfth PR of the alarms-over-gateway epic (docs/plans/alarms-over-gateway.md). Lands the public RPC handler surface that PR A.1's proto introduced. The actual worker-side ack call + active-alarm walk depend on PR A.2 (worker MxAccess subscription); this PR ensures clients can call the RPCs and receive a meaningful response without UNIMPLEMENTED at the gRPC layer. - AcknowledgeAlarm — validates session_id + alarm_full_reference, resolves the session (NotFound on miss), returns a successful reply with a structured DiagnosticMessage indicating worker dispatch is pending PR A.2. Once A.2 ships, the body translates the request into a WorkerCommand and forwards through SessionManager.InvokeAsync. - QueryActiveAlarms — validates session_id, returns an empty stream. PR A.4 layers the actual ConditionRefresh implementation once the worker's QueryActiveAlarmsCommand is available. - OpenSessionReply.Capabilities advertises both new RPCs (unary-acknowledge-alarm, server-stream-active-alarms) so clients can negotiate against the contract surface. OnAlarmTransition events flow through the existing StreamEvents path automatically — EventStreamService and MxAccessGrpcMapper forward whatever family the worker emits without filtering, so no changes are needed there for A.3. Tests: full 273-test suite still green. Per-handler unit tests ship with PR A.4's expanded surface; A.3's stub handlers are narrow enough that the existing parity-fixture tests cover the contract round-trip. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -52,6 +52,8 @@ public sealed class MxAccessGatewayService(
|
|||||||
reply.Capabilities.Add("unary-invoke");
|
reply.Capabilities.Add("unary-invoke");
|
||||||
reply.Capabilities.Add("server-stream-events");
|
reply.Capabilities.Add("server-stream-events");
|
||||||
reply.Capabilities.Add("bulk-subscribe-commands");
|
reply.Capabilities.Add("bulk-subscribe-commands");
|
||||||
|
reply.Capabilities.Add("unary-acknowledge-alarm");
|
||||||
|
reply.Capabilities.Add("server-stream-active-alarms");
|
||||||
|
|
||||||
return reply;
|
return reply;
|
||||||
}
|
}
|
||||||
@@ -155,6 +157,83 @@ public sealed class MxAccessGatewayService(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
/// <remarks>
|
||||||
|
/// PR A.3 — surfaces the public AcknowledgeAlarm RPC. The gateway resolves the
|
||||||
|
/// session and returns a successful reply; the actual worker-side ack call ships
|
||||||
|
/// in <c>PR A.2</c> which adds the MxAccess alarm subscription + worker command
|
||||||
|
/// handler. Clients calling this method today receive an OK reply with a
|
||||||
|
/// "worker alarm path not yet wired" diagnostic — no PERMISSION_DENIED, no
|
||||||
|
/// UNIMPLEMENTED, so the .NET / Python / Go / Java / Rust SDK call sites land
|
||||||
|
/// on a stable surface.
|
||||||
|
/// </remarks>
|
||||||
|
public override Task<AcknowledgeAlarmReply> AcknowledgeAlarm(
|
||||||
|
AcknowledgeAlarmRequest request,
|
||||||
|
ServerCallContext context)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(request);
|
||||||
|
if (string.IsNullOrEmpty(request.SessionId))
|
||||||
|
{
|
||||||
|
throw new RpcException(new Status(StatusCode.InvalidArgument, "session_id is required."));
|
||||||
|
}
|
||||||
|
if (string.IsNullOrEmpty(request.AlarmFullReference))
|
||||||
|
{
|
||||||
|
throw new RpcException(new Status(StatusCode.InvalidArgument, "alarm_full_reference is required."));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate the session exists. Throws SessionManagerException → mapped to
|
||||||
|
// gRPC NotFound by the caller's MapException.
|
||||||
|
_ = ResolveSession(request.SessionId);
|
||||||
|
|
||||||
|
return Task.FromResult(new AcknowledgeAlarmReply
|
||||||
|
{
|
||||||
|
SessionId = request.SessionId,
|
||||||
|
CorrelationId = request.ClientCorrelationId,
|
||||||
|
ProtocolStatus = MxAccessGrpcMapper.Ok("AcknowledgeAlarm accepted; worker dispatch pending PR A.2."),
|
||||||
|
DiagnosticMessage = "Gateway-side AcknowledgeAlarm contract is live (PR A.3); worker-side MxAccess Acknowledge call ships in PR A.2.",
|
||||||
|
});
|
||||||
|
}
|
||||||
|
catch (Exception exception) when (exception is not RpcException)
|
||||||
|
{
|
||||||
|
throw MapException(exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
/// <remarks>
|
||||||
|
/// PR A.3 — surfaces the public QueryActiveAlarms RPC as an empty stream until
|
||||||
|
/// PR A.2 adds the worker-side QueryActiveAlarmsCommand that walks the
|
||||||
|
/// MxAccess active-alarm collection. Clients can call the RPC and iterate the
|
||||||
|
/// stream; today the stream completes immediately. Once A.2 ships, this
|
||||||
|
/// handler will translate the request into a WorkerCommand and stream the
|
||||||
|
/// resulting snapshots.
|
||||||
|
/// </remarks>
|
||||||
|
public override Task QueryActiveAlarms(
|
||||||
|
QueryActiveAlarmsRequest request,
|
||||||
|
IServerStreamWriter<ActiveAlarmSnapshot> responseStream,
|
||||||
|
ServerCallContext context)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(request);
|
||||||
|
if (string.IsNullOrEmpty(request.SessionId))
|
||||||
|
{
|
||||||
|
throw new RpcException(new Status(StatusCode.InvalidArgument, "session_id is required."));
|
||||||
|
}
|
||||||
|
_ = ResolveSession(request.SessionId);
|
||||||
|
|
||||||
|
// Empty stream — PR A.4 implements ConditionRefresh server-side once the
|
||||||
|
// worker's QueryActiveAlarmsCommand is available.
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
catch (Exception exception) when (exception is not RpcException)
|
||||||
|
{
|
||||||
|
throw MapException(exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private string? ResolveClientIdentity()
|
private string? ResolveClientIdentity()
|
||||||
{
|
{
|
||||||
return identityAccessor.Current?.DisplayName ?? identityAccessor.Current?.KeyId;
|
return identityAccessor.Current?.DisplayName ?? identityAccessor.Current?.KeyId;
|
||||||
|
|||||||
Reference in New Issue
Block a user