Files
mxaccessgw/clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs
Joseph Doherty ac12c150c3 Point the .NET client at the StreamAlarms alarm feed
Replace the client's QueryActiveAlarmsAsync with StreamAlarmsAsync —
a session-less subscription to the gateway's central alarm feed that
yields the active-alarm snapshot followed by live transitions.
AcknowledgeAlarm is session-less (AcknowledgeAlarmRequest no longer
carries a session id). Updates the transport interface, the gRPC
transport, the test fake, and the alarm tests; the .NET client
solution builds and its alarm tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 16:30:50 -04:00

178 lines
5.4 KiB
C#

using Grpc.Core;
using MxGateway.Contracts.Proto;
namespace MxGateway.Client;
/// <summary>
/// gRPC implementation of IMxGatewayClientTransport.
/// </summary>
internal sealed class GrpcMxGatewayClientTransport(
MxGatewayClientOptions options,
MxAccessGateway.MxAccessGatewayClient rawClient) : IMxGatewayClientTransport
{
/// <summary>
/// Gets the gateway client options.
/// </summary>
public MxGatewayClientOptions Options { get; } = options;
/// <summary>
/// Gets the underlying gRPC client.
/// </summary>
public MxAccessGateway.MxAccessGatewayClient RawClient { get; } = rawClient;
/// <inheritdoc />
MxAccessGateway.MxAccessGatewayClient? IMxGatewayClientTransport.RawClient => RawClient;
/// <inheritdoc />
public async Task<OpenSessionReply> OpenSessionAsync(
OpenSessionRequest request,
CallOptions callOptions)
{
try
{
return await RawClient.OpenSessionAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
catch (RpcException exception)
{
throw RpcExceptionMapper.Map(exception, callOptions.CancellationToken);
}
}
/// <inheritdoc />
public async Task<CloseSessionReply> CloseSessionAsync(
CloseSessionRequest request,
CallOptions callOptions)
{
try
{
return await RawClient.CloseSessionAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
catch (RpcException exception)
{
throw RpcExceptionMapper.Map(exception, callOptions.CancellationToken);
}
}
/// <inheritdoc />
public async Task<MxCommandReply> InvokeAsync(
MxCommandRequest request,
CallOptions callOptions)
{
try
{
return await RawClient.InvokeAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
catch (RpcException exception)
{
throw RpcExceptionMapper.Map(exception, callOptions.CancellationToken);
}
}
/// <inheritdoc />
public async IAsyncEnumerable<MxEvent> StreamEventsAsync(
StreamEventsRequest request,
CallOptions callOptions,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
CancellationToken effectiveCancellationToken = cancellationToken.CanBeCanceled
? cancellationToken
: callOptions.CancellationToken;
using AsyncServerStreamingCall<MxEvent> call = RawClient.StreamEvents(request, callOptions);
IAsyncStreamReader<MxEvent> responseStream = call.ResponseStream;
while (true)
{
MxEvent? gatewayEvent;
try
{
if (!await responseStream.MoveNext(effectiveCancellationToken).ConfigureAwait(false))
{
break;
}
gatewayEvent = responseStream.Current;
}
catch (RpcException exception)
{
throw RpcExceptionMapper.Map(exception, effectiveCancellationToken);
}
yield return gatewayEvent;
}
}
/// <inheritdoc />
IAsyncEnumerable<MxEvent> IMxGatewayClientTransport.StreamEventsAsync(
StreamEventsRequest request,
CallOptions callOptions)
{
return StreamEventsAsync(request, callOptions);
}
/// <inheritdoc />
public async Task<AcknowledgeAlarmReply> AcknowledgeAlarmAsync(
AcknowledgeAlarmRequest request,
CallOptions callOptions)
{
try
{
return await RawClient.AcknowledgeAlarmAsync(request, callOptions)
.ResponseAsync
.ConfigureAwait(false);
}
catch (RpcException exception)
{
throw RpcExceptionMapper.Map(exception, callOptions.CancellationToken);
}
}
/// <inheritdoc />
public async IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
StreamAlarmsRequest request,
CallOptions callOptions,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{
CancellationToken effectiveCancellationToken = cancellationToken.CanBeCanceled
? cancellationToken
: callOptions.CancellationToken;
using AsyncServerStreamingCall<AlarmFeedMessage> call = RawClient.StreamAlarms(request, callOptions);
IAsyncStreamReader<AlarmFeedMessage> responseStream = call.ResponseStream;
while (true)
{
AlarmFeedMessage? message;
try
{
if (!await responseStream.MoveNext(effectiveCancellationToken).ConfigureAwait(false))
{
break;
}
message = responseStream.Current;
}
catch (RpcException exception)
{
throw RpcExceptionMapper.Map(exception, effectiveCancellationToken);
}
yield return message;
}
}
/// <inheritdoc />
IAsyncEnumerable<AlarmFeedMessage> IMxGatewayClientTransport.StreamAlarmsAsync(
StreamAlarmsRequest request,
CallOptions callOptions)
{
return StreamAlarmsAsync(request, callOptions);
}
}