diff --git a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/IMxGatewayCliClient.cs b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/IMxGatewayCliClient.cs index 26979c9..a286f0e 100644 --- a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/IMxGatewayCliClient.cs +++ b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/IMxGatewayCliClient.cs @@ -45,6 +45,27 @@ public interface IMxGatewayCliClient : IAsyncDisposable StreamEventsRequest request, CancellationToken cancellationToken); + /// + /// Acknowledges an active MXAccess alarm condition through the gateway. + /// + /// The acknowledge request — alarm reference, comment, operator user. + /// Cancellation token for the operation. + /// The acknowledge reply with protocol + native MxStatus. + Task AcknowledgeAlarmAsync( + AcknowledgeAlarmRequest request, + CancellationToken cancellationToken); + + /// + /// Attaches to the gateway's central alarm feed — the current active-alarm + /// snapshot followed by live transitions. + /// + /// The stream request, optionally scoped by alarm-reference prefix. + /// Cancellation token for the operation. + /// An async enumerable of alarm feed messages. + IAsyncEnumerable StreamAlarmsAsync( + StreamAlarmsRequest request, + CancellationToken cancellationToken); + /// /// Tests connection to the Galaxy Repository. /// diff --git a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayCliClientAdapter.cs b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayCliClientAdapter.cs index 65e83dd..6601dcb 100644 --- a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayCliClientAdapter.cs +++ b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayCliClientAdapter.cs @@ -52,6 +52,22 @@ internal sealed class MxGatewayCliClientAdapter : IMxGatewayCliClient return _client.StreamEventsAsync(request, cancellationToken); } + /// + public Task AcknowledgeAlarmAsync( + AcknowledgeAlarmRequest request, + CancellationToken cancellationToken) + { + return _client.AcknowledgeAlarmAsync(request, cancellationToken); + } + + /// + public IAsyncEnumerable StreamAlarmsAsync( + StreamAlarmsRequest request, + CancellationToken cancellationToken) + { + return _client.StreamAlarmsAsync(request, cancellationToken); + } + /// public Task GalaxyTestConnectionAsync( TestConnectionRequest request, diff --git a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayClientCli.cs b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayClientCli.cs index 44ec41b..96f986e 100644 --- a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayClientCli.cs +++ b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Cli/MxGatewayClientCli.cs @@ -128,6 +128,10 @@ public static class MxGatewayClientCli .ConfigureAwait(false), "stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), + "stream-alarms" => await StreamAlarmsAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), + "acknowledge-alarm" => await AcknowledgeAlarmAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), "write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "write2" => await Write2Async(arguments, client, standardOutput, cancellation.Token) @@ -212,9 +216,13 @@ public static class MxGatewayClientCli forceJsonErrors: true) .ConfigureAwait(false); } - catch (Exception exception) when (exception is not OperationCanceledException) + catch (Exception exception) { // Unexpected exception that escaped RunCoreAsync (shouldn't happen, but be safe). + // OperationCanceledException from long-running streaming commands + // (e.g. galaxy-watch hit by --timeout) is caught here too — the + // batch process must continue with the next command rather than + // unwinding. commandError.WriteLine(JsonSerializer.Serialize( new { error = exception.Message, type = exception.GetType().Name }, JsonOptions)); @@ -993,29 +1001,37 @@ public static class MxGatewayClientCli AfterWorkerSequence = arguments.GetUInt64("after-worker-sequence", 0), }; - await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(request, cancellationToken) - .WithCancellation(cancellationToken) - .ConfigureAwait(false)) + try { - if (jsonLines) + await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(request, cancellationToken) + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) { - output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent)); - } - else if (json) - { - events.Add(gatewayEvent); - } - else - { - output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent)); - } + if (jsonLines) + { + output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent)); + } + else if (json) + { + events.Add(gatewayEvent); + } + else + { + output.WriteLine(ProtobufJsonFormatter.Format(gatewayEvent)); + } - eventCount++; - if (maxEvents > 0 && eventCount >= maxEvents) - { - break; + eventCount++; + if (maxEvents > 0 && eventCount >= maxEvents) + { + break; + } } } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Client.Dotnet-017: graceful end-of-window completion mode for a + // finite-window event collector. Emit aggregate JSON below and exit 0. + } if (json && !jsonLines) { @@ -1027,6 +1043,124 @@ public static class MxGatewayClientCli return 0; } + private static async Task StreamAlarmsAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + uint maxEvents = arguments.GetUInt32("max-events", 0); + bool json = arguments.HasFlag("json"); + bool jsonLines = arguments.HasFlag("jsonl"); + if (json && !jsonLines && maxEvents is 0) + { + throw new ArgumentException("--json stream-alarms requires --max-events to bound aggregate output."); + } + + if (maxEvents > MaxAggregateEvents) + { + throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}."); + } + + var messages = json && !jsonLines + ? new List(checked((int)maxEvents)) + : []; + uint messageCount = 0; + var request = new StreamAlarmsRequest + { + ClientCorrelationId = CreateCorrelationId(), + AlarmFilterPrefix = arguments.GetOptional("filter-prefix") ?? string.Empty, + }; + + try + { + await foreach (AlarmFeedMessage feedMessage in client.StreamAlarmsAsync(request, cancellationToken) + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) + { + if (jsonLines) + { + output.WriteLine(ProtobufJsonFormatter.Format(feedMessage)); + } + else if (json) + { + messages.Add(feedMessage); + } + else + { + output.WriteLine(FormatAlarmFeedMessage(feedMessage)); + } + + messageCount++; + if (maxEvents > 0 && messageCount >= maxEvents) + { + break; + } + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Mirrors stream-events (Client.Dotnet-017): the supplied token covers + // the user's --timeout wall-clock budget and external Ctrl+C / parent + // CTS cancellation. All are graceful completion modes for a + // finite-window alarm-feed collector: emit what arrived and exit 0. + } + + if (json && !jsonLines) + { + output.WriteLine(JsonSerializer.Serialize( + new { alarms = messages.Select(AlarmFeedMessageToJsonElement).ToArray() }, + JsonOptions)); + } + + return 0; + } + + private static Task AcknowledgeAlarmAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + var request = new AcknowledgeAlarmRequest + { + ClientCorrelationId = CreateCorrelationId(), + AlarmFullReference = arguments.GetRequired("reference"), + Comment = arguments.GetOptional("comment") ?? string.Empty, + OperatorUser = arguments.GetOptional("operator") ?? string.Empty, + }; + + return WriteReplyAsync( + client.AcknowledgeAlarmAsync(request, cancellationToken), + arguments, + output); + } + + /// + /// Renders one for the human-readable + /// (non-JSON) stream-alarms output, distinguishing the payload oneof + /// arms: a snapshot active alarm, the snapshot-complete sentinel, or a live + /// transition. + /// + private static string FormatAlarmFeedMessage(AlarmFeedMessage feedMessage) + { + return feedMessage.PayloadCase switch + { + AlarmFeedMessage.PayloadOneofCase.ActiveAlarm => + $"active-alarm {ProtobufJsonFormatter.Format(feedMessage.ActiveAlarm)}", + AlarmFeedMessage.PayloadOneofCase.SnapshotComplete => + $"snapshot-complete {feedMessage.SnapshotComplete}", + AlarmFeedMessage.PayloadOneofCase.Transition => + $"transition {ProtobufJsonFormatter.Format(feedMessage.Transition)}", + _ => $"unknown-payload {feedMessage.PayloadCase}", + }; + } + + private static JsonElement AlarmFeedMessageToJsonElement(AlarmFeedMessage feedMessage) + { + return JsonDocument.Parse(ProtobufJsonFormatter.Format(feedMessage)).RootElement.Clone(); + } + private static async Task SmokeAsync( CliArguments arguments, IMxGatewayCliClient client, @@ -1546,6 +1680,8 @@ public static class MxGatewayClientCli or "write-secured2-bulk" or "bench-read-bulk" or "stream-events" + or "stream-alarms" + or "acknowledge-alarm" or "write" or "write2" or "smoke" @@ -1605,6 +1741,8 @@ public static class MxGatewayClientCli writer.WriteLine("mxgw-dotnet write-secured2-bulk --session-id --server-handle --item-handles --type --values --current-user-id [--verifier-user-id ] [--timestamp ] [--json]"); writer.WriteLine("mxgw-dotnet bench-read-bulk [--duration-seconds ] [--warmup-seconds ] [--bulk-size ] [--tag-start ] [--tag-prefix ] [--tag-attribute ] [--timeout-ms ] [--client-name ]"); writer.WriteLine("mxgw-dotnet stream-events --session-id [--max-events ] [--json]"); + writer.WriteLine("mxgw-dotnet stream-alarms [--filter-prefix ] [--max-events ] [--json] [--jsonl]"); + writer.WriteLine("mxgw-dotnet acknowledge-alarm --reference [--comment ] [--operator ] [--json]"); writer.WriteLine("mxgw-dotnet write --session-id --server-handle --item-handle --type --value [--json]"); writer.WriteLine("mxgw-dotnet write2 --session-id --server-handle --item-handle --type --value [--timestamp ] [--json]"); writer.WriteLine("mxgw-dotnet smoke --item [--value --type ] [--json]"); diff --git a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Tests/FakeGatewayTransport.cs b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Tests/FakeGatewayTransport.cs index cd9ade9..d625650 100644 --- a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Tests/FakeGatewayTransport.cs +++ b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Tests/FakeGatewayTransport.cs @@ -51,6 +51,11 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx /// public List<(QueryActiveAlarmsRequest Request, CallOptions CallOptions)> QueryActiveAlarmsCalls { get; } = []; + /// + /// Gets the list of captured StreamAlarmsAsync calls. + /// + public List<(StreamAlarmsRequest Request, CallOptions CallOptions)> StreamAlarmsCalls { get; } = []; + /// /// Gets the queue of exceptions to throw from AcknowledgeAlarmAsync. /// @@ -58,6 +63,7 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx private readonly Queue _acknowledgeReplies = new(); private readonly List _activeAlarmSnapshots = []; + private readonly List _alarmFeedMessages = []; /// /// Gets or sets the reply to return from OpenSessionAsync. @@ -238,4 +244,27 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx { _activeAlarmSnapshots.Add(snapshot); } + + /// + /// Records the stream-alarms call and yields each enqueued feed message. + /// + public async IAsyncEnumerable StreamAlarmsAsync( + StreamAlarmsRequest request, + CallOptions callOptions) + { + StreamAlarmsCalls.Add((request, callOptions)); + + foreach (AlarmFeedMessage message in _alarmFeedMessages) + { + callOptions.CancellationToken.ThrowIfCancellationRequested(); + await Task.Yield(); + yield return message; + } + } + + /// Enqueues an alarm feed message to be yielded from StreamAlarmsAsync. + public void AddAlarmFeedMessage(AlarmFeedMessage message) + { + _alarmFeedMessages.Add(message); + } } diff --git a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Tests/MxGatewayClientCliTests.cs b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Tests/MxGatewayClientCliTests.cs index d6122ca..fa790c0 100644 --- a/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Tests/MxGatewayClientCliTests.cs +++ b/clients/dotnet/ZB.MOM.WW.MxGateway.Client.Tests/MxGatewayClientCliTests.cs @@ -148,6 +148,87 @@ public sealed class MxGatewayClientCliTests } + /// Verifies that stream-alarms with --max-events stops output and distinguishes payload cases. + [Fact] + public async Task RunAsync_StreamAlarms_WithMaxEventsStopsAndDistinguishesPayloadCases() + { + using var output = new StringWriter(); + using var error = new StringWriter(); + FakeCliClient fakeClient = new(); + fakeClient.AlarmFeedMessages.Add(new AlarmFeedMessage + { + ActiveAlarm = new ActiveAlarmSnapshot { AlarmFullReference = "Tank01.Level.HiHi" }, + }); + fakeClient.AlarmFeedMessages.Add(new AlarmFeedMessage { SnapshotComplete = true }); + + int exitCode = await MxGatewayClientCli.RunAsync( + [ + "stream-alarms", + "--endpoint", + "http://localhost:5000", + "--api-key", + "test-api-key", + "--filter-prefix", + "Tank01", + "--max-events", + "1", + ], + output, + error, + _ => fakeClient); + + Assert.Equal(0, exitCode); + StreamAlarmsRequest request = Assert.Single(fakeClient.StreamAlarmsRequests); + Assert.Equal("Tank01", request.AlarmFilterPrefix); + string text = output.ToString(); + Assert.Contains("active-alarm", text); + Assert.Contains("Tank01.Level.HiHi", text); + Assert.DoesNotContain("snapshot-complete", text); + Assert.Equal(string.Empty, error.ToString()); + } + + /// Verifies that acknowledge-alarm builds a request and prints the JSON reply. + [Fact] + public async Task RunAsync_AcknowledgeAlarm_BuildsRequestAndPrintsJsonReply() + { + using var output = new StringWriter(); + using var error = new StringWriter(); + FakeCliClient fakeClient = new(); + fakeClient.AcknowledgeAlarmReplies.Enqueue(new AcknowledgeAlarmReply + { + CorrelationId = "ack-fixture", + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + Hresult = 0, + }); + + int exitCode = await MxGatewayClientCli.RunAsync( + [ + "acknowledge-alarm", + "--endpoint", + "http://localhost:5000", + "--api-key", + "test-api-key", + "--reference", + "Tank01.Level.HiHi", + "--comment", + "ack from cli", + "--operator", + "operator1", + "--json", + ], + output, + error, + _ => fakeClient); + + Assert.Equal(0, exitCode); + AcknowledgeAlarmRequest request = Assert.Single(fakeClient.AcknowledgeAlarmRequests); + Assert.Equal("Tank01.Level.HiHi", request.AlarmFullReference); + Assert.Equal("ack from cli", request.Comment); + Assert.Equal("operator1", request.OperatorUser); + Assert.Contains("ack-fixture", output.ToString()); + Assert.Equal(string.Empty, error.ToString()); + } + /// Verifies that smoke command closes opened session when a command fails. [Fact] public async Task RunAsync_Smoke_WhenCommandFails_ClosesOpenedSession() @@ -507,6 +588,41 @@ public sealed class MxGatewayClientCliTests } } + /// Queue of acknowledge-alarm replies to return. + public Queue AcknowledgeAlarmReplies { get; } = new(); + + /// List of received acknowledge-alarm requests. + public List AcknowledgeAlarmRequests { get; } = []; + + /// List of received stream-alarms requests. + public List StreamAlarmsRequests { get; } = []; + + /// List of alarm feed messages to yield when streaming alarms. + public List AlarmFeedMessages { get; } = []; + + /// + public Task AcknowledgeAlarmAsync( + AcknowledgeAlarmRequest request, + CancellationToken cancellationToken) + { + AcknowledgeAlarmRequests.Add(request); + return Task.FromResult(AcknowledgeAlarmReplies.Dequeue()); + } + + /// + public async IAsyncEnumerable StreamAlarmsAsync( + StreamAlarmsRequest request, + [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) + { + StreamAlarmsRequests.Add(request); + foreach (AlarmFeedMessage feedMessage in AlarmFeedMessages) + { + cancellationToken.ThrowIfCancellationRequested(); + await Task.Yield(); + yield return feedMessage; + } + } + /// Galaxy test connection reply to return. public TestConnectionReply GalaxyTestConnectionReply { get; set; } = new() { Ok = true }; diff --git a/clients/dotnet/ZB.MOM.WW.MxGateway.Client/GrpcMxGatewayClientTransport.cs b/clients/dotnet/ZB.MOM.WW.MxGateway.Client/GrpcMxGatewayClientTransport.cs index 92c3f91..5ff4e75 100644 --- a/clients/dotnet/ZB.MOM.WW.MxGateway.Client/GrpcMxGatewayClientTransport.cs +++ b/clients/dotnet/ZB.MOM.WW.MxGateway.Client/GrpcMxGatewayClientTransport.cs @@ -175,6 +175,48 @@ internal sealed class GrpcMxGatewayClientTransport( return QueryActiveAlarmsAsync(request, callOptions); } + /// + public async IAsyncEnumerable StreamAlarmsAsync( + StreamAlarmsRequest request, + CallOptions callOptions, + [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) + { + CancellationToken effectiveCancellationToken = cancellationToken.CanBeCanceled + ? cancellationToken + : callOptions.CancellationToken; + + using AsyncServerStreamingCall call = RawClient.StreamAlarms(request, callOptions); + + IAsyncStreamReader responseStream = call.ResponseStream; + while (true) + { + AlarmFeedMessage? message; + try + { + if (!await responseStream.MoveNext(effectiveCancellationToken).ConfigureAwait(false)) + { + break; + } + + message = responseStream.Current; + } + catch (RpcException exception) + { + throw MapRpcException(exception, effectiveCancellationToken); + } + + yield return message; + } + } + + /// + IAsyncEnumerable IMxGatewayClientTransport.StreamAlarmsAsync( + StreamAlarmsRequest request, + CallOptions callOptions) + { + return StreamAlarmsAsync(request, callOptions); + } + private static Exception MapRpcException( RpcException exception, CancellationToken cancellationToken) diff --git a/clients/dotnet/ZB.MOM.WW.MxGateway.Client/IMxGatewayClientTransport.cs b/clients/dotnet/ZB.MOM.WW.MxGateway.Client/IMxGatewayClientTransport.cs index 88a69ec..6e708a6 100644 --- a/clients/dotnet/ZB.MOM.WW.MxGateway.Client/IMxGatewayClientTransport.cs +++ b/clients/dotnet/ZB.MOM.WW.MxGateway.Client/IMxGatewayClientTransport.cs @@ -75,4 +75,15 @@ internal interface IMxGatewayClientTransport IAsyncEnumerable QueryActiveAlarmsAsync( QueryActiveAlarmsRequest request, CallOptions callOptions); + + /// + /// Attaches to the gateway's central alarm feed — the current active-alarm + /// snapshot followed by live transitions. + /// + /// The stream request, optionally scoped by alarm-reference prefix. + /// gRPC call options. + /// An async enumerable of alarm feed messages. + IAsyncEnumerable StreamAlarmsAsync( + StreamAlarmsRequest request, + CallOptions callOptions); } diff --git a/clients/dotnet/ZB.MOM.WW.MxGateway.Client/MxGatewayClient.cs b/clients/dotnet/ZB.MOM.WW.MxGateway.Client/MxGatewayClient.cs index 7254521..2ae95ff 100644 --- a/clients/dotnet/ZB.MOM.WW.MxGateway.Client/MxGatewayClient.cs +++ b/clients/dotnet/ZB.MOM.WW.MxGateway.Client/MxGatewayClient.cs @@ -224,6 +224,28 @@ public sealed class MxGatewayClient : IAsyncDisposable return _transport.QueryActiveAlarmsAsync(request, CreateStreamCallOptions(cancellationToken)); } + /// + /// Attaches to the gateway's central alarm feed. The stream opens with one + /// per currently-active alarm (the + /// ConditionRefresh snapshot), then a single snapshot_complete, then a + /// transition for every subsequent raise / acknowledge / clear. Served + /// by the gateway's always-on alarm monitor — no worker session is opened, so + /// any number of clients may attach. Optionally scoped by alarm-reference + /// prefix (). + /// + /// The stream request, optionally scoped by alarm-reference prefix. + /// Cancellation token for the stream. + /// An async enumerable of alarm feed messages. + public IAsyncEnumerable StreamAlarmsAsync( + StreamAlarmsRequest request, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(request); + ThrowIfDisposed(); + + return _transport.StreamAlarmsAsync(request, CreateStreamCallOptions(cancellationToken)); + } + /// /// Disposes the client and releases all resources. ///