Point the .NET client at the StreamAlarms alarm feed

Replace the client's QueryActiveAlarmsAsync with StreamAlarmsAsync —
a session-less subscription to the gateway's central alarm feed that
yields the active-alarm snapshot followed by live transitions.
AcknowledgeAlarm is session-less (AcknowledgeAlarmRequest no longer
carries a session id). Updates the transport interface, the gRPC
transport, the test fake, and the alarm tests; the .NET client
solution builds and its alarm tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-21 16:30:50 -04:00
parent 40ca4b6908
commit ac12c150c3
5 changed files with 58 additions and 63 deletions
@@ -47,9 +47,9 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx
public List<(AcknowledgeAlarmRequest Request, CallOptions CallOptions)> AcknowledgeAlarmCalls { get; } = []; public List<(AcknowledgeAlarmRequest Request, CallOptions CallOptions)> AcknowledgeAlarmCalls { get; } = [];
/// <summary> /// <summary>
/// Gets the list of captured QueryActiveAlarmsAsync calls. /// Gets the list of captured StreamAlarmsAsync calls.
/// </summary> /// </summary>
public List<(QueryActiveAlarmsRequest Request, CallOptions CallOptions)> QueryActiveAlarmsCalls { get; } = []; public List<(StreamAlarmsRequest Request, CallOptions CallOptions)> StreamAlarmsCalls { get; } = [];
/// <summary> /// <summary>
/// Gets the queue of exceptions to throw from AcknowledgeAlarmAsync. /// Gets the queue of exceptions to throw from AcknowledgeAlarmAsync.
@@ -223,7 +223,6 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx
? _acknowledgeReplies.Dequeue() ? _acknowledgeReplies.Dequeue()
: new AcknowledgeAlarmReply : new AcknowledgeAlarmReply
{ {
SessionId = request.SessionId,
CorrelationId = request.ClientCorrelationId, CorrelationId = request.ClientCorrelationId,
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
Status = new MxStatusProxy { Success = 1, Category = MxStatusCategory.Ok }, Status = new MxStatusProxy { Success = 1, Category = MxStatusCategory.Ok },
@@ -231,20 +230,23 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx
} }
/// <summary> /// <summary>
/// Records the query call and yields each enqueued snapshot. /// Records the call and yields each enqueued snapshot as an active-alarm
/// feed message, then a snapshot-complete sentinel.
/// </summary> /// </summary>
public async IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync( public async IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
QueryActiveAlarmsRequest request, StreamAlarmsRequest request,
CallOptions callOptions) CallOptions callOptions)
{ {
QueryActiveAlarmsCalls.Add((request, callOptions)); StreamAlarmsCalls.Add((request, callOptions));
foreach (ActiveAlarmSnapshot snapshot in _activeAlarmSnapshots) foreach (ActiveAlarmSnapshot snapshot in _activeAlarmSnapshots)
{ {
callOptions.CancellationToken.ThrowIfCancellationRequested(); callOptions.CancellationToken.ThrowIfCancellationRequested();
await Task.Yield(); await Task.Yield();
yield return snapshot; yield return new AlarmFeedMessage { ActiveAlarm = snapshot };
} }
yield return new AlarmFeedMessage { SnapshotComplete = true };
} }
/// <summary>Enqueues an acknowledge reply.</summary> /// <summary>Enqueues an acknowledge reply.</summary>
@@ -253,7 +255,7 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx
_acknowledgeReplies.Enqueue(reply); _acknowledgeReplies.Enqueue(reply);
} }
/// <summary>Enqueues a snapshot to be yielded from QueryActiveAlarmsAsync.</summary> /// <summary>Enqueues a snapshot yielded from StreamAlarmsAsync as an active-alarm message.</summary>
public void AddActiveAlarmSnapshot(ActiveAlarmSnapshot snapshot) public void AddActiveAlarmSnapshot(ActiveAlarmSnapshot snapshot)
{ {
_activeAlarmSnapshots.Add(snapshot); _activeAlarmSnapshots.Add(snapshot);
@@ -5,9 +5,9 @@ using MxGateway.Contracts.Proto;
namespace MxGateway.Client.Tests; namespace MxGateway.Client.Tests;
/// <summary> /// <summary>
/// PR E.2 — pins the .NET SDK surface for the new alarm RPCs: /// Pins the .NET SDK surface for the alarm RPCs:
/// <see cref="MxGatewayClient.AcknowledgeAlarmAsync"/> and /// <see cref="MxGatewayClient.AcknowledgeAlarmAsync"/> and
/// <see cref="MxGatewayClient.QueryActiveAlarmsAsync"/>. /// <see cref="MxGatewayClient.StreamAlarmsAsync"/>.
/// </summary> /// </summary>
public sealed class MxGatewayClientAlarmsTests public sealed class MxGatewayClientAlarmsTests
{ {
@@ -17,7 +17,6 @@ public sealed class MxGatewayClientAlarmsTests
FakeGatewayTransport transport = CreateTransport(); FakeGatewayTransport transport = CreateTransport();
transport.AddAcknowledgeReply(new AcknowledgeAlarmReply transport.AddAcknowledgeReply(new AcknowledgeAlarmReply
{ {
SessionId = "session-fixture",
CorrelationId = "corr-1", CorrelationId = "corr-1",
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
Status = new MxStatusProxy Status = new MxStatusProxy
@@ -31,7 +30,6 @@ public sealed class MxGatewayClientAlarmsTests
AcknowledgeAlarmReply reply = await client.AcknowledgeAlarmAsync(new AcknowledgeAlarmRequest AcknowledgeAlarmReply reply = await client.AcknowledgeAlarmAsync(new AcknowledgeAlarmRequest
{ {
SessionId = "session-fixture",
ClientCorrelationId = "corr-1", ClientCorrelationId = "corr-1",
AlarmFullReference = "Tank01.Level.HiHi", AlarmFullReference = "Tank01.Level.HiHi",
Comment = "investigating", Comment = "investigating",
@@ -64,7 +62,6 @@ public sealed class MxGatewayClientAlarmsTests
client.AcknowledgeAlarmAsync( client.AcknowledgeAlarmAsync(
new AcknowledgeAlarmRequest new AcknowledgeAlarmRequest
{ {
SessionId = "session-fixture",
AlarmFullReference = "Tank01.Level.HiHi", AlarmFullReference = "Tank01.Level.HiHi",
Comment = string.Empty, Comment = string.Empty,
OperatorUser = "alice", OperatorUser = "alice",
@@ -87,7 +84,6 @@ public sealed class MxGatewayClientAlarmsTests
var ex = await Assert.ThrowsAsync<RpcException>( var ex = await Assert.ThrowsAsync<RpcException>(
() => client.AcknowledgeAlarmAsync(new AcknowledgeAlarmRequest () => client.AcknowledgeAlarmAsync(new AcknowledgeAlarmRequest
{ {
SessionId = "session-fixture",
AlarmFullReference = "Tank01.Level.HiHi", AlarmFullReference = "Tank01.Level.HiHi",
Comment = string.Empty, Comment = string.Empty,
OperatorUser = "alice", OperatorUser = "alice",
@@ -113,7 +109,6 @@ public sealed class MxGatewayClientAlarmsTests
var ex = await Assert.ThrowsAsync<MxGatewayAuthenticationException>( var ex = await Assert.ThrowsAsync<MxGatewayAuthenticationException>(
() => client.AcknowledgeAlarmAsync(new AcknowledgeAlarmRequest () => client.AcknowledgeAlarmAsync(new AcknowledgeAlarmRequest
{ {
SessionId = "session-fixture",
AlarmFullReference = "Tank01.Level.HiHi", AlarmFullReference = "Tank01.Level.HiHi",
Comment = string.Empty, Comment = string.Empty,
OperatorUser = "alice", OperatorUser = "alice",
@@ -122,50 +117,47 @@ public sealed class MxGatewayClientAlarmsTests
} }
[Fact] [Fact]
public async Task QueryActiveAlarmsAsync_StreamsEnqueuedSnapshots() public async Task StreamAlarmsAsync_StreamsSnapshotThenSnapshotComplete()
{ {
FakeGatewayTransport transport = CreateTransport(); FakeGatewayTransport transport = CreateTransport();
transport.AddActiveAlarmSnapshot(MakeSnapshot("Tank01.Level.HiHi", AlarmConditionState.Active)); transport.AddActiveAlarmSnapshot(MakeSnapshot("Tank01.Level.HiHi", AlarmConditionState.Active));
transport.AddActiveAlarmSnapshot(MakeSnapshot("Tank02.Level.HiHi", AlarmConditionState.ActiveAcked)); transport.AddActiveAlarmSnapshot(MakeSnapshot("Tank02.Level.HiHi", AlarmConditionState.ActiveAcked));
await using MxGatewayClient client = CreateClient(transport); await using MxGatewayClient client = CreateClient(transport);
List<ActiveAlarmSnapshot> snapshots = []; List<AlarmFeedMessage> messages = [];
await foreach (ActiveAlarmSnapshot snapshot in client.QueryActiveAlarmsAsync(new QueryActiveAlarmsRequest await foreach (AlarmFeedMessage message in client.StreamAlarmsAsync(new StreamAlarmsRequest()))
{ {
SessionId = "session-fixture", messages.Add(message);
}))
{
snapshots.Add(snapshot);
} }
Assert.Equal(2, snapshots.Count); Assert.Equal(3, messages.Count);
Assert.Equal("Tank01.Level.HiHi", snapshots[0].AlarmFullReference); Assert.Equal("Tank01.Level.HiHi", messages[0].ActiveAlarm.AlarmFullReference);
Assert.Equal(AlarmConditionState.Active, snapshots[0].CurrentState); Assert.Equal(AlarmConditionState.Active, messages[0].ActiveAlarm.CurrentState);
Assert.Equal(AlarmConditionState.ActiveAcked, snapshots[1].CurrentState); Assert.Equal(AlarmConditionState.ActiveAcked, messages[1].ActiveAlarm.CurrentState);
Assert.Single(transport.QueryActiveAlarmsCalls); Assert.True(messages[2].SnapshotComplete);
Assert.Single(transport.StreamAlarmsCalls);
} }
[Fact] [Fact]
public async Task QueryActiveAlarmsAsync_PassesFilterPrefix() public async Task StreamAlarmsAsync_PassesFilterPrefix()
{ {
FakeGatewayTransport transport = CreateTransport(); FakeGatewayTransport transport = CreateTransport();
await using MxGatewayClient client = CreateClient(transport); await using MxGatewayClient client = CreateClient(transport);
await foreach (ActiveAlarmSnapshot _ in client.QueryActiveAlarmsAsync(new QueryActiveAlarmsRequest await foreach (AlarmFeedMessage _ in client.StreamAlarmsAsync(new StreamAlarmsRequest
{ {
SessionId = "session-fixture",
AlarmFilterPrefix = "Tank01.", AlarmFilterPrefix = "Tank01.",
})) }))
{ {
// no snapshots enqueued; just verifying the request passes through // only the snapshot-complete sentinel; verifying the request passes through
} }
var call = Assert.Single(transport.QueryActiveAlarmsCalls); var call = Assert.Single(transport.StreamAlarmsCalls);
Assert.Equal("Tank01.", call.Request.AlarmFilterPrefix); Assert.Equal("Tank01.", call.Request.AlarmFilterPrefix);
} }
[Fact] [Fact]
public async Task QueryActiveAlarmsAsync_HonorsCancellationDuringEnumeration() public async Task StreamAlarmsAsync_HonorsCancellationDuringEnumeration()
{ {
FakeGatewayTransport transport = CreateTransport(); FakeGatewayTransport transport = CreateTransport();
transport.AddActiveAlarmSnapshot(MakeSnapshot("Tank01.Level.HiHi", AlarmConditionState.Active)); transport.AddActiveAlarmSnapshot(MakeSnapshot("Tank01.Level.HiHi", AlarmConditionState.Active));
@@ -175,8 +167,8 @@ public sealed class MxGatewayClientAlarmsTests
using CancellationTokenSource cancellation = new(); using CancellationTokenSource cancellation = new();
await Assert.ThrowsAsync<OperationCanceledException>(async () => await Assert.ThrowsAsync<OperationCanceledException>(async () =>
{ {
await foreach (ActiveAlarmSnapshot _ in client.QueryActiveAlarmsAsync( await foreach (AlarmFeedMessage _ in client.StreamAlarmsAsync(
new QueryActiveAlarmsRequest { SessionId = "session-fixture" }, new StreamAlarmsRequest(),
cancellation.Token)) cancellation.Token))
{ {
cancellation.Cancel(); cancellation.Cancel();
@@ -134,8 +134,8 @@ internal sealed class GrpcMxGatewayClientTransport(
} }
/// <inheritdoc /> /// <inheritdoc />
public async IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync( public async IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
QueryActiveAlarmsRequest request, StreamAlarmsRequest request,
CallOptions callOptions, CallOptions callOptions,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
{ {
@@ -143,12 +143,12 @@ internal sealed class GrpcMxGatewayClientTransport(
? cancellationToken ? cancellationToken
: callOptions.CancellationToken; : callOptions.CancellationToken;
using AsyncServerStreamingCall<ActiveAlarmSnapshot> call = RawClient.QueryActiveAlarms(request, callOptions); using AsyncServerStreamingCall<AlarmFeedMessage> call = RawClient.StreamAlarms(request, callOptions);
IAsyncStreamReader<ActiveAlarmSnapshot> responseStream = call.ResponseStream; IAsyncStreamReader<AlarmFeedMessage> responseStream = call.ResponseStream;
while (true) while (true)
{ {
ActiveAlarmSnapshot? snapshot; AlarmFeedMessage? message;
try try
{ {
if (!await responseStream.MoveNext(effectiveCancellationToken).ConfigureAwait(false)) if (!await responseStream.MoveNext(effectiveCancellationToken).ConfigureAwait(false))
@@ -156,22 +156,22 @@ internal sealed class GrpcMxGatewayClientTransport(
break; break;
} }
snapshot = responseStream.Current; message = responseStream.Current;
} }
catch (RpcException exception) catch (RpcException exception)
{ {
throw RpcExceptionMapper.Map(exception, effectiveCancellationToken); throw RpcExceptionMapper.Map(exception, effectiveCancellationToken);
} }
yield return snapshot; yield return message;
} }
} }
/// <inheritdoc /> /// <inheritdoc />
IAsyncEnumerable<ActiveAlarmSnapshot> IMxGatewayClientTransport.QueryActiveAlarmsAsync( IAsyncEnumerable<AlarmFeedMessage> IMxGatewayClientTransport.StreamAlarmsAsync(
QueryActiveAlarmsRequest request, StreamAlarmsRequest request,
CallOptions callOptions) CallOptions callOptions)
{ {
return QueryActiveAlarmsAsync(request, callOptions); return StreamAlarmsAsync(request, callOptions);
} }
} }
@@ -66,13 +66,13 @@ internal interface IMxGatewayClientTransport
CallOptions callOptions); CallOptions callOptions);
/// <summary> /// <summary>
/// Streams a snapshot of all alarms currently in Active or ActiveAcked state — the /// Attaches to the gateway's central alarm feed — the current active-alarm
/// ConditionRefresh equivalent for the gateway. /// snapshot followed by live transitions.
/// </summary> /// </summary>
/// <param name="request">The query request, optionally scoped by alarm-reference prefix.</param> /// <param name="request">The stream request, optionally scoped by alarm-reference prefix.</param>
/// <param name="callOptions">gRPC call options.</param> /// <param name="callOptions">gRPC call options.</param>
/// <returns>An async enumerable of active-alarm snapshots.</returns> /// <returns>An async enumerable of alarm feed messages.</returns>
IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync( IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
QueryActiveAlarmsRequest request, StreamAlarmsRequest request,
CallOptions callOptions); CallOptions callOptions);
} }
@@ -205,24 +205,25 @@ public sealed class MxGatewayClient : IAsyncDisposable
} }
/// <summary> /// <summary>
/// Streams a snapshot of all alarms currently Active or ActiveAcked — the gateway's /// Attaches to the gateway's central alarm feed. The stream opens with one
/// ConditionRefresh equivalent. Used after reconnect to seed the local Part 9 state /// <see cref="AlarmFeedMessage"/> per currently-active alarm (the
/// machine, or to reconcile alarms that may have been missed during a transport /// ConditionRefresh snapshot), then a single <c>snapshot_complete</c>, then a
/// blip. Optionally scoped by alarm-reference prefix /// <c>transition</c> for every subsequent raise / acknowledge / clear. Served
/// (<see cref="QueryActiveAlarmsRequest.AlarmFilterPrefix"/>) so a partial refresh /// by the gateway's always-on alarm monitor — no worker session is opened, so
/// can target an equipment sub-tree. /// any number of clients may attach. Optionally scoped by alarm-reference
/// prefix (<see cref="StreamAlarmsRequest.AlarmFilterPrefix"/>).
/// </summary> /// </summary>
/// <param name="request">The query request, optionally scoped by alarm-reference prefix.</param> /// <param name="request">The stream request, optionally scoped by alarm-reference prefix.</param>
/// <param name="cancellationToken">Cancellation token for the stream.</param> /// <param name="cancellationToken">Cancellation token for the stream.</param>
/// <returns>An async enumerable of active-alarm snapshots.</returns> /// <returns>An async enumerable of alarm feed messages.</returns>
public IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync( public IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
QueryActiveAlarmsRequest request, StreamAlarmsRequest request,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
{ {
ArgumentNullException.ThrowIfNull(request); ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed(); ThrowIfDisposed();
return _transport.QueryActiveAlarmsAsync(request, CreateStreamCallOptions(cancellationToken)); return _transport.StreamAlarmsAsync(request, CreateStreamCallOptions(cancellationToken));
} }
/// <summary> /// <summary>