diff --git a/clients/dotnet/MxGateway.Client.Cli/IMxGatewayCliClient.cs b/clients/dotnet/MxGateway.Client.Cli/IMxGatewayCliClient.cs index e7e5e6b..72697ba 100644 --- a/clients/dotnet/MxGateway.Client.Cli/IMxGatewayCliClient.cs +++ b/clients/dotnet/MxGateway.Client.Cli/IMxGatewayCliClient.cs @@ -51,6 +51,27 @@ public interface IMxGatewayCliClient : IAsyncDisposable StreamEventsRequest request, CancellationToken cancellationToken); + /// + /// Acknowledges an active MXAccess alarm condition through the gateway. + /// + /// The acknowledge request — alarm reference, comment, operator user. + /// Cancellation token for the operation. + /// The acknowledge reply with protocol + native MxStatus. + Task AcknowledgeAlarmAsync( + AcknowledgeAlarmRequest request, + CancellationToken cancellationToken); + + /// + /// Attaches to the gateway's central alarm feed — the current active-alarm + /// snapshot followed by live transitions. + /// + /// The stream request, optionally scoped by alarm-reference prefix. + /// Cancellation token for the operation. + /// An async enumerable of alarm feed messages. + IAsyncEnumerable StreamAlarmsAsync( + StreamAlarmsRequest request, + CancellationToken cancellationToken); + /// /// Tests connection to the Galaxy Repository. /// diff --git a/clients/dotnet/MxGateway.Client.Cli/MxGatewayCliClientAdapter.cs b/clients/dotnet/MxGateway.Client.Cli/MxGatewayCliClientAdapter.cs index 47f0d06..3b3d51b 100644 --- a/clients/dotnet/MxGateway.Client.Cli/MxGatewayCliClientAdapter.cs +++ b/clients/dotnet/MxGateway.Client.Cli/MxGatewayCliClientAdapter.cs @@ -52,6 +52,22 @@ internal sealed class MxGatewayCliClientAdapter : IMxGatewayCliClient return _client.StreamEventsAsync(request, cancellationToken); } + /// + public Task AcknowledgeAlarmAsync( + AcknowledgeAlarmRequest request, + CancellationToken cancellationToken) + { + return _client.AcknowledgeAlarmAsync(request, cancellationToken); + } + + /// + public IAsyncEnumerable StreamAlarmsAsync( + StreamAlarmsRequest request, + CancellationToken cancellationToken) + { + return _client.StreamAlarmsAsync(request, cancellationToken); + } + /// public Task GalaxyTestConnectionAsync( TestConnectionRequest request, diff --git a/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs b/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs index 5f15e2e..c240d9a 100644 --- a/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs +++ b/clients/dotnet/MxGateway.Client.Cli/MxGatewayClientCli.cs @@ -130,6 +130,10 @@ public static class MxGatewayClientCli .ConfigureAwait(false), "stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), + "stream-alarms" => await StreamAlarmsAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), + "acknowledge-alarm" => await AcknowledgeAlarmAsync(arguments, client, standardOutput, cancellation.Token) + .ConfigureAwait(false), "write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token) .ConfigureAwait(false), "write2" => await Write2Async(arguments, client, standardOutput, cancellation.Token) @@ -1353,6 +1357,124 @@ public static class MxGatewayClientCli return 0; } + private static async Task StreamAlarmsAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + uint maxEvents = arguments.GetUInt32("max-events", 0); + bool json = arguments.HasFlag("json"); + bool jsonLines = arguments.HasFlag("jsonl"); + if (json && !jsonLines && maxEvents is 0) + { + throw new ArgumentException("--json stream-alarms requires --max-events to bound aggregate output."); + } + + if (maxEvents > MaxAggregateEvents) + { + throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}."); + } + + var messages = json && !jsonLines + ? new List(checked((int)maxEvents)) + : []; + uint messageCount = 0; + var request = new StreamAlarmsRequest + { + ClientCorrelationId = CreateCorrelationId(), + AlarmFilterPrefix = arguments.GetOptional("filter-prefix") ?? string.Empty, + }; + + try + { + await foreach (AlarmFeedMessage feedMessage in client.StreamAlarmsAsync(request, cancellationToken) + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) + { + if (jsonLines) + { + output.WriteLine(ProtobufJsonFormatter.Format(feedMessage)); + } + else if (json) + { + messages.Add(feedMessage); + } + else + { + output.WriteLine(FormatAlarmFeedMessage(feedMessage)); + } + + messageCount++; + if (maxEvents > 0 && messageCount >= maxEvents) + { + break; + } + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Mirrors stream-events (Client.Dotnet-017): the supplied token covers + // the user's --timeout wall-clock budget and external Ctrl+C / parent + // CTS cancellation. All are graceful completion modes for a + // finite-window alarm-feed collector: emit what arrived and exit 0. + } + + if (json && !jsonLines) + { + output.WriteLine(JsonSerializer.Serialize( + new { alarms = messages.Select(AlarmFeedMessageToJsonElement).ToArray() }, + JsonOptions)); + } + + return 0; + } + + private static Task AcknowledgeAlarmAsync( + CliArguments arguments, + IMxGatewayCliClient client, + TextWriter output, + CancellationToken cancellationToken) + { + var request = new AcknowledgeAlarmRequest + { + ClientCorrelationId = CreateCorrelationId(), + AlarmFullReference = arguments.GetRequired("reference"), + Comment = arguments.GetOptional("comment") ?? string.Empty, + OperatorUser = arguments.GetOptional("operator") ?? string.Empty, + }; + + return WriteReplyAsync( + client.AcknowledgeAlarmAsync(request, cancellationToken), + arguments, + output); + } + + /// + /// Renders one for the human-readable + /// (non-JSON) stream-alarms output, distinguishing the payload oneof + /// arms: a snapshot active alarm, the snapshot-complete sentinel, or a live + /// transition. + /// + private static string FormatAlarmFeedMessage(AlarmFeedMessage feedMessage) + { + return feedMessage.PayloadCase switch + { + AlarmFeedMessage.PayloadOneofCase.ActiveAlarm => + $"active-alarm {ProtobufJsonFormatter.Format(feedMessage.ActiveAlarm)}", + AlarmFeedMessage.PayloadOneofCase.SnapshotComplete => + $"snapshot-complete {feedMessage.SnapshotComplete}", + AlarmFeedMessage.PayloadOneofCase.Transition => + $"transition {ProtobufJsonFormatter.Format(feedMessage.Transition)}", + _ => $"unknown-payload {feedMessage.PayloadCase}", + }; + } + + private static JsonElement AlarmFeedMessageToJsonElement(AlarmFeedMessage feedMessage) + { + return JsonDocument.Parse(ProtobufJsonFormatter.Format(feedMessage)).RootElement.Clone(); + } + private static async Task SmokeAsync( CliArguments arguments, IMxGatewayCliClient client, @@ -1908,6 +2030,8 @@ public static class MxGatewayClientCli or "bench-read-bulk" or "bench-stream-events" or "stream-events" + or "stream-alarms" + or "acknowledge-alarm" or "write" or "write2" or "smoke" @@ -1966,6 +2090,8 @@ public static class MxGatewayClientCli writer.WriteLine("mxgw-dotnet subscribe-bulk --session-id --server-handle --items [--json]"); writer.WriteLine("mxgw-dotnet unsubscribe-bulk --session-id --server-handle --item-handles [--json]"); writer.WriteLine("mxgw-dotnet stream-events --session-id [--max-events ] [--json]"); + writer.WriteLine("mxgw-dotnet stream-alarms [--filter-prefix ] [--max-events ] [--json] [--jsonl]"); + writer.WriteLine("mxgw-dotnet acknowledge-alarm --reference [--comment ] [--operator ] [--json]"); writer.WriteLine("mxgw-dotnet write --session-id --server-handle --item-handle --type --value [--json]"); writer.WriteLine("mxgw-dotnet write2 --session-id --server-handle --item-handle --type --value [--timestamp ] [--json]"); writer.WriteLine("mxgw-dotnet smoke --item [--value --type ] [--json]"); diff --git a/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientCliTests.cs b/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientCliTests.cs index 6e718ac..7b5b850 100644 --- a/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientCliTests.cs +++ b/clients/dotnet/MxGateway.Client.Tests/MxGatewayClientCliTests.cs @@ -248,6 +248,87 @@ public sealed class MxGatewayClientCliTests } + /// Verifies that stream-alarms with --max-events stops output and distinguishes payload cases. + [Fact] + public async Task RunAsync_StreamAlarms_WithMaxEventsStopsAndDistinguishesPayloadCases() + { + using var output = new StringWriter(); + using var error = new StringWriter(); + FakeCliClient fakeClient = new(); + fakeClient.AlarmFeedMessages.Add(new AlarmFeedMessage + { + ActiveAlarm = new ActiveAlarmSnapshot { AlarmFullReference = "Tank01.Level.HiHi" }, + }); + fakeClient.AlarmFeedMessages.Add(new AlarmFeedMessage { SnapshotComplete = true }); + + int exitCode = await MxGatewayClientCli.RunAsync( + [ + "stream-alarms", + "--endpoint", + "http://localhost:5000", + "--api-key", + "test-api-key", + "--filter-prefix", + "Tank01", + "--max-events", + "1", + ], + output, + error, + _ => fakeClient); + + Assert.Equal(0, exitCode); + StreamAlarmsRequest request = Assert.Single(fakeClient.StreamAlarmsRequests); + Assert.Equal("Tank01", request.AlarmFilterPrefix); + string text = output.ToString(); + Assert.Contains("active-alarm", text); + Assert.Contains("Tank01.Level.HiHi", text); + Assert.DoesNotContain("snapshot-complete", text); + Assert.Equal(string.Empty, error.ToString()); + } + + /// Verifies that acknowledge-alarm builds a request and prints the JSON reply. + [Fact] + public async Task RunAsync_AcknowledgeAlarm_BuildsRequestAndPrintsJsonReply() + { + using var output = new StringWriter(); + using var error = new StringWriter(); + FakeCliClient fakeClient = new(); + fakeClient.AcknowledgeAlarmReplies.Enqueue(new AcknowledgeAlarmReply + { + CorrelationId = "ack-fixture", + ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok }, + Hresult = 0, + }); + + int exitCode = await MxGatewayClientCli.RunAsync( + [ + "acknowledge-alarm", + "--endpoint", + "http://localhost:5000", + "--api-key", + "test-api-key", + "--reference", + "Tank01.Level.HiHi", + "--comment", + "ack from cli", + "--operator", + "operator1", + "--json", + ], + output, + error, + _ => fakeClient); + + Assert.Equal(0, exitCode); + AcknowledgeAlarmRequest request = Assert.Single(fakeClient.AcknowledgeAlarmRequests); + Assert.Equal("Tank01.Level.HiHi", request.AlarmFullReference); + Assert.Equal("ack from cli", request.Comment); + Assert.Equal("operator1", request.OperatorUser); + Assert.Contains("ack-fixture", output.ToString()); + Assert.Equal(string.Empty, error.ToString()); + } + /// Verifies that smoke command closes opened session when a command fails. [Fact] public async Task RunAsync_Smoke_WhenCommandFails_ClosesOpenedSession() @@ -695,6 +776,41 @@ public sealed class MxGatewayClientCliTests } } + /// Queue of acknowledge-alarm replies to return. + public Queue AcknowledgeAlarmReplies { get; } = new(); + + /// List of received acknowledge-alarm requests. + public List AcknowledgeAlarmRequests { get; } = []; + + /// List of received stream-alarms requests. + public List StreamAlarmsRequests { get; } = []; + + /// List of alarm feed messages to yield when streaming alarms. + public List AlarmFeedMessages { get; } = []; + + /// + public Task AcknowledgeAlarmAsync( + AcknowledgeAlarmRequest request, + CancellationToken cancellationToken) + { + AcknowledgeAlarmRequests.Add(request); + return Task.FromResult(AcknowledgeAlarmReplies.Dequeue()); + } + + /// + public async IAsyncEnumerable StreamAlarmsAsync( + StreamAlarmsRequest request, + [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) + { + StreamAlarmsRequests.Add(request); + foreach (AlarmFeedMessage feedMessage in AlarmFeedMessages) + { + cancellationToken.ThrowIfCancellationRequested(); + await Task.Yield(); + yield return feedMessage; + } + } + /// Galaxy test connection reply to return. public TestConnectionReply GalaxyTestConnectionReply { get; set; } = new() { Ok = true };