diff --git a/clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs b/clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs index 554d6a1..c21f2a4 100644 --- a/clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs +++ b/clients/dotnet/MxGateway.Client.Tests/FakeGatewayTransport.cs @@ -47,9 +47,9 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx public List<(AcknowledgeAlarmRequest Request, CallOptions CallOptions)> AcknowledgeAlarmCalls { get; } = []; /// - /// Gets the list of captured QueryActiveAlarmsAsync calls. + /// Gets the list of captured StreamAlarmsAsync calls. /// - public List<(QueryActiveAlarmsRequest Request, CallOptions CallOptions)> QueryActiveAlarmsCalls { get; } = []; + public List<(StreamAlarmsRequest Request, CallOptions CallOptions)> StreamAlarmsCalls { get; } = []; /// /// Gets the queue of exceptions to throw from AcknowledgeAlarmAsync. @@ -223,7 +223,6 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx ? _acknowledgeReplies.Dequeue() : new AcknowledgeAlarmReply { - SessionId = request.SessionId, CorrelationId = request.ClientCorrelationId, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, Status = new MxStatusProxy { Success = 1, Category = MxStatusCategory.Ok }, @@ -231,20 +230,23 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx } /// - /// Records the query call and yields each enqueued snapshot. + /// Records the call and yields each enqueued snapshot as an active-alarm + /// feed message, then a snapshot-complete sentinel. /// - public async IAsyncEnumerable QueryActiveAlarmsAsync( - QueryActiveAlarmsRequest request, + public async IAsyncEnumerable StreamAlarmsAsync( + StreamAlarmsRequest request, CallOptions callOptions) { - QueryActiveAlarmsCalls.Add((request, callOptions)); + StreamAlarmsCalls.Add((request, callOptions)); foreach (ActiveAlarmSnapshot snapshot in _activeAlarmSnapshots) { callOptions.CancellationToken.ThrowIfCancellationRequested(); await Task.Yield(); - yield return snapshot; + yield return new AlarmFeedMessage { ActiveAlarm = snapshot }; } + + yield return new AlarmFeedMessage { SnapshotComplete = true }; } /// Enqueues an acknowledge reply. @@ -253,7 +255,7 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx _acknowledgeReplies.Enqueue(reply); } - /// Enqueues a snapshot to be yielded from QueryActiveAlarmsAsync. + /// Enqueues a snapshot yielded from StreamAlarmsAsync as an active-alarm message. public void AddActiveAlarmSnapshot(ActiveAlarmSnapshot snapshot) { _activeAlarmSnapshots.Add(snapshot); diff --git a/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientAlarmsTests.cs b/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientAlarmsTests.cs index 3423c2b..9e43b60 100644 --- a/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientAlarmsTests.cs +++ b/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientAlarmsTests.cs @@ -5,9 +5,9 @@ using MxGateway.Contracts.Proto; namespace MxGateway.Client.Tests; /// -/// PR E.2 — pins the .NET SDK surface for the new alarm RPCs: +/// Pins the .NET SDK surface for the alarm RPCs: /// and -/// . +/// . /// public sealed class MxGatewayClientAlarmsTests { @@ -17,7 +17,6 @@ public sealed class MxGatewayClientAlarmsTests FakeGatewayTransport transport = CreateTransport(); transport.AddAcknowledgeReply(new AcknowledgeAlarmReply { - SessionId = "session-fixture", CorrelationId = "corr-1", ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, Status = new MxStatusProxy @@ -31,7 +30,6 @@ public sealed class MxGatewayClientAlarmsTests AcknowledgeAlarmReply reply = await client.AcknowledgeAlarmAsync(new AcknowledgeAlarmRequest { - SessionId = "session-fixture", ClientCorrelationId = "corr-1", AlarmFullReference = "Tank01.Level.HiHi", Comment = "investigating", @@ -64,7 +62,6 @@ public sealed class MxGatewayClientAlarmsTests client.AcknowledgeAlarmAsync( new AcknowledgeAlarmRequest { - SessionId = "session-fixture", AlarmFullReference = "Tank01.Level.HiHi", Comment = string.Empty, OperatorUser = "alice", @@ -87,7 +84,6 @@ public sealed class MxGatewayClientAlarmsTests var ex = await Assert.ThrowsAsync( () => client.AcknowledgeAlarmAsync(new AcknowledgeAlarmRequest { - SessionId = "session-fixture", AlarmFullReference = "Tank01.Level.HiHi", Comment = string.Empty, OperatorUser = "alice", @@ -113,7 +109,6 @@ public sealed class MxGatewayClientAlarmsTests var ex = await Assert.ThrowsAsync( () => client.AcknowledgeAlarmAsync(new AcknowledgeAlarmRequest { - SessionId = "session-fixture", AlarmFullReference = "Tank01.Level.HiHi", Comment = string.Empty, OperatorUser = "alice", @@ -122,50 +117,47 @@ public sealed class MxGatewayClientAlarmsTests } [Fact] - public async Task QueryActiveAlarmsAsync_StreamsEnqueuedSnapshots() + public async Task StreamAlarmsAsync_StreamsSnapshotThenSnapshotComplete() { FakeGatewayTransport transport = CreateTransport(); transport.AddActiveAlarmSnapshot(MakeSnapshot("Tank01.Level.HiHi", AlarmConditionState.Active)); transport.AddActiveAlarmSnapshot(MakeSnapshot("Tank02.Level.HiHi", AlarmConditionState.ActiveAcked)); await using MxGatewayClient client = CreateClient(transport); - List snapshots = []; - await foreach (ActiveAlarmSnapshot snapshot in client.QueryActiveAlarmsAsync(new QueryActiveAlarmsRequest + List messages = []; + await foreach (AlarmFeedMessage message in client.StreamAlarmsAsync(new StreamAlarmsRequest())) { - SessionId = "session-fixture", - })) - { - snapshots.Add(snapshot); + messages.Add(message); } - Assert.Equal(2, snapshots.Count); - Assert.Equal("Tank01.Level.HiHi", snapshots[0].AlarmFullReference); - Assert.Equal(AlarmConditionState.Active, snapshots[0].CurrentState); - Assert.Equal(AlarmConditionState.ActiveAcked, snapshots[1].CurrentState); - Assert.Single(transport.QueryActiveAlarmsCalls); + Assert.Equal(3, messages.Count); + Assert.Equal("Tank01.Level.HiHi", messages[0].ActiveAlarm.AlarmFullReference); + Assert.Equal(AlarmConditionState.Active, messages[0].ActiveAlarm.CurrentState); + Assert.Equal(AlarmConditionState.ActiveAcked, messages[1].ActiveAlarm.CurrentState); + Assert.True(messages[2].SnapshotComplete); + Assert.Single(transport.StreamAlarmsCalls); } [Fact] - public async Task QueryActiveAlarmsAsync_PassesFilterPrefix() + public async Task StreamAlarmsAsync_PassesFilterPrefix() { FakeGatewayTransport transport = CreateTransport(); await using MxGatewayClient client = CreateClient(transport); - await foreach (ActiveAlarmSnapshot _ in client.QueryActiveAlarmsAsync(new QueryActiveAlarmsRequest + await foreach (AlarmFeedMessage _ in client.StreamAlarmsAsync(new StreamAlarmsRequest { - SessionId = "session-fixture", AlarmFilterPrefix = "Tank01.", })) { - // no snapshots enqueued; just verifying the request passes through + // only the snapshot-complete sentinel; verifying the request passes through } - var call = Assert.Single(transport.QueryActiveAlarmsCalls); + var call = Assert.Single(transport.StreamAlarmsCalls); Assert.Equal("Tank01.", call.Request.AlarmFilterPrefix); } [Fact] - public async Task QueryActiveAlarmsAsync_HonorsCancellationDuringEnumeration() + public async Task StreamAlarmsAsync_HonorsCancellationDuringEnumeration() { FakeGatewayTransport transport = CreateTransport(); transport.AddActiveAlarmSnapshot(MakeSnapshot("Tank01.Level.HiHi", AlarmConditionState.Active)); @@ -175,8 +167,8 @@ public sealed class MxGatewayClientAlarmsTests using CancellationTokenSource cancellation = new(); await Assert.ThrowsAsync(async () => { - await foreach (ActiveAlarmSnapshot _ in client.QueryActiveAlarmsAsync( - new QueryActiveAlarmsRequest { SessionId = "session-fixture" }, + await foreach (AlarmFeedMessage _ in client.StreamAlarmsAsync( + new StreamAlarmsRequest(), cancellation.Token)) { cancellation.Cancel(); diff --git a/clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs b/clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs index ef967da..7cfa56f 100644 --- a/clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs +++ b/clients/dotnet/MxGateway.Client/GrpcMxGatewayClientTransport.cs @@ -134,8 +134,8 @@ internal sealed class GrpcMxGatewayClientTransport( } /// - public async IAsyncEnumerable QueryActiveAlarmsAsync( - QueryActiveAlarmsRequest request, + public async IAsyncEnumerable StreamAlarmsAsync( + StreamAlarmsRequest request, CallOptions callOptions, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) { @@ -143,12 +143,12 @@ internal sealed class GrpcMxGatewayClientTransport( ? cancellationToken : callOptions.CancellationToken; - using AsyncServerStreamingCall call = RawClient.QueryActiveAlarms(request, callOptions); + using AsyncServerStreamingCall call = RawClient.StreamAlarms(request, callOptions); - IAsyncStreamReader responseStream = call.ResponseStream; + IAsyncStreamReader responseStream = call.ResponseStream; while (true) { - ActiveAlarmSnapshot? snapshot; + AlarmFeedMessage? message; try { if (!await responseStream.MoveNext(effectiveCancellationToken).ConfigureAwait(false)) @@ -156,22 +156,22 @@ internal sealed class GrpcMxGatewayClientTransport( break; } - snapshot = responseStream.Current; + message = responseStream.Current; } catch (RpcException exception) { throw RpcExceptionMapper.Map(exception, effectiveCancellationToken); } - yield return snapshot; + yield return message; } } /// - IAsyncEnumerable IMxGatewayClientTransport.QueryActiveAlarmsAsync( - QueryActiveAlarmsRequest request, + IAsyncEnumerable IMxGatewayClientTransport.StreamAlarmsAsync( + StreamAlarmsRequest request, CallOptions callOptions) { - return QueryActiveAlarmsAsync(request, callOptions); + return StreamAlarmsAsync(request, callOptions); } } diff --git a/clients/dotnet/MxGateway.Client/IMxGatewayClientTransport.cs b/clients/dotnet/MxGateway.Client/IMxGatewayClientTransport.cs index 26e25fc..bed8f2e 100644 --- a/clients/dotnet/MxGateway.Client/IMxGatewayClientTransport.cs +++ b/clients/dotnet/MxGateway.Client/IMxGatewayClientTransport.cs @@ -66,13 +66,13 @@ internal interface IMxGatewayClientTransport CallOptions callOptions); /// - /// Streams a snapshot of all alarms currently in Active or ActiveAcked state — the - /// ConditionRefresh equivalent for the gateway. + /// Attaches to the gateway's central alarm feed — the current active-alarm + /// snapshot followed by live transitions. /// - /// The query request, optionally scoped by alarm-reference prefix. + /// The stream request, optionally scoped by alarm-reference prefix. /// gRPC call options. - /// An async enumerable of active-alarm snapshots. - IAsyncEnumerable QueryActiveAlarmsAsync( - QueryActiveAlarmsRequest request, + /// An async enumerable of alarm feed messages. + IAsyncEnumerable StreamAlarmsAsync( + StreamAlarmsRequest request, CallOptions callOptions); } diff --git a/clients/dotnet/MxGateway.Client/MxGatewayClient.cs b/clients/dotnet/MxGateway.Client/MxGatewayClient.cs index f46b7ed..3d390f6 100644 --- a/clients/dotnet/MxGateway.Client/MxGatewayClient.cs +++ b/clients/dotnet/MxGateway.Client/MxGatewayClient.cs @@ -205,24 +205,25 @@ public sealed class MxGatewayClient : IAsyncDisposable } /// - /// Streams a snapshot of all alarms currently Active or ActiveAcked — the gateway's - /// ConditionRefresh equivalent. Used after reconnect to seed the local Part 9 state - /// machine, or to reconcile alarms that may have been missed during a transport - /// blip. Optionally scoped by alarm-reference prefix - /// () so a partial refresh - /// can target an equipment sub-tree. + /// Attaches to the gateway's central alarm feed. The stream opens with one + /// per currently-active alarm (the + /// ConditionRefresh snapshot), then a single snapshot_complete, then a + /// transition for every subsequent raise / acknowledge / clear. Served + /// by the gateway's always-on alarm monitor — no worker session is opened, so + /// any number of clients may attach. Optionally scoped by alarm-reference + /// prefix (). /// - /// The query request, optionally scoped by alarm-reference prefix. + /// The stream request, optionally scoped by alarm-reference prefix. /// Cancellation token for the stream. - /// An async enumerable of active-alarm snapshots. - public IAsyncEnumerable QueryActiveAlarmsAsync( - QueryActiveAlarmsRequest request, + /// An async enumerable of alarm feed messages. + public IAsyncEnumerable StreamAlarmsAsync( + StreamAlarmsRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); ThrowIfDisposed(); - return _transport.QueryActiveAlarmsAsync(request, CreateStreamCallOptions(cancellationToken)); + return _transport.StreamAlarmsAsync(request, CreateStreamCallOptions(cancellationToken)); } ///