Compare commits

...

4 Commits

Author SHA1 Message Date
Joseph Doherty 56949c967b Add stream-alarms and acknowledge-alarm to the .NET CLI
stream-alarms attaches to the gateway's central alarm feed (mirrors
stream-events: --max-events cap, --json/--jsonl, --filter-prefix);
acknowledge-alarm is a unary session-less ack (--reference required,
--comment, --operator). Both wired through IMxGatewayCliClient and the
adapter.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 19:01:58 -04:00
Joseph Doherty 7dec9b30f5 Add stream-alarms and acknowledge-alarm to the Java CLI
stream-alarms attaches to the gateway's central alarm feed (mirrors
stream-events: --limit cap, --json, --filter-prefix); acknowledge-alarm
is a unary session-less ack (--reference required, --comment,
--operator). Both route through new session-less methods on the CLI
client abstraction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 19:01:57 -04:00
Joseph Doherty 1d3c8edb44 Add stream-alarms and acknowledge-alarm to the Rust CLI
stream-alarms attaches to the gateway's central alarm feed (mirrors
stream-events: --max-events cap, --json/--jsonl, --filter-prefix);
acknowledge-alarm is a unary session-less ack (--reference required,
--comment, --operator).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 19:01:49 -04:00
Joseph Doherty 58259016b0 Add stream-alarms and acknowledge-alarm to the Go CLI
stream-alarms attaches to the gateway's central alarm feed (mirrors
stream-events: --limit cap, --json); acknowledge-alarm is a unary
session-less ack (--reference required, --comment, --operator).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 19:01:48 -04:00
8 changed files with 922 additions and 3 deletions
@@ -51,6 +51,27 @@ public interface IMxGatewayCliClient : IAsyncDisposable
StreamEventsRequest request,
CancellationToken cancellationToken);
/// <summary>
/// Acknowledges an active MXAccess alarm condition through the gateway.
/// </summary>
/// <param name="request">The acknowledge request — alarm reference, comment, operator user.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
/// <returns>The acknowledge reply with protocol + native MxStatus.</returns>
Task<AcknowledgeAlarmReply> AcknowledgeAlarmAsync(
AcknowledgeAlarmRequest request,
CancellationToken cancellationToken);
/// <summary>
/// Attaches to the gateway's central alarm feed — the current active-alarm
/// snapshot followed by live transitions.
/// </summary>
/// <param name="request">The stream request, optionally scoped by alarm-reference prefix.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
/// <returns>An async enumerable of alarm feed messages.</returns>
IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
StreamAlarmsRequest request,
CancellationToken cancellationToken);
/// <summary>
/// Tests connection to the Galaxy Repository.
/// </summary>
@@ -52,6 +52,22 @@ internal sealed class MxGatewayCliClientAdapter : IMxGatewayCliClient
return _client.StreamEventsAsync(request, cancellationToken);
}
/// <inheritdoc />
public Task<AcknowledgeAlarmReply> AcknowledgeAlarmAsync(
AcknowledgeAlarmRequest request,
CancellationToken cancellationToken)
{
return _client.AcknowledgeAlarmAsync(request, cancellationToken);
}
/// <inheritdoc />
public IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
StreamAlarmsRequest request,
CancellationToken cancellationToken)
{
return _client.StreamAlarmsAsync(request, cancellationToken);
}
/// <inheritdoc />
public Task<TestConnectionReply> GalaxyTestConnectionAsync(
TestConnectionRequest request,
@@ -130,6 +130,10 @@ public static class MxGatewayClientCli
.ConfigureAwait(false),
"stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"stream-alarms" => await StreamAlarmsAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"acknowledge-alarm" => await AcknowledgeAlarmAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write2" => await Write2Async(arguments, client, standardOutput, cancellation.Token)
@@ -1353,6 +1357,124 @@ public static class MxGatewayClientCli
return 0;
}
private static async Task<int> StreamAlarmsAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
uint maxEvents = arguments.GetUInt32("max-events", 0);
bool json = arguments.HasFlag("json");
bool jsonLines = arguments.HasFlag("jsonl");
if (json && !jsonLines && maxEvents is 0)
{
throw new ArgumentException("--json stream-alarms requires --max-events to bound aggregate output.");
}
if (maxEvents > MaxAggregateEvents)
{
throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}.");
}
var messages = json && !jsonLines
? new List<AlarmFeedMessage>(checked((int)maxEvents))
: [];
uint messageCount = 0;
var request = new StreamAlarmsRequest
{
ClientCorrelationId = CreateCorrelationId(),
AlarmFilterPrefix = arguments.GetOptional("filter-prefix") ?? string.Empty,
};
try
{
await foreach (AlarmFeedMessage feedMessage in client.StreamAlarmsAsync(request, cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
if (jsonLines)
{
output.WriteLine(ProtobufJsonFormatter.Format(feedMessage));
}
else if (json)
{
messages.Add(feedMessage);
}
else
{
output.WriteLine(FormatAlarmFeedMessage(feedMessage));
}
messageCount++;
if (maxEvents > 0 && messageCount >= maxEvents)
{
break;
}
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Mirrors stream-events (Client.Dotnet-017): the supplied token covers
// the user's --timeout wall-clock budget and external Ctrl+C / parent
// CTS cancellation. All are graceful completion modes for a
// finite-window alarm-feed collector: emit what arrived and exit 0.
}
if (json && !jsonLines)
{
output.WriteLine(JsonSerializer.Serialize(
new { alarms = messages.Select(AlarmFeedMessageToJsonElement).ToArray() },
JsonOptions));
}
return 0;
}
private static Task<int> AcknowledgeAlarmAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
var request = new AcknowledgeAlarmRequest
{
ClientCorrelationId = CreateCorrelationId(),
AlarmFullReference = arguments.GetRequired("reference"),
Comment = arguments.GetOptional("comment") ?? string.Empty,
OperatorUser = arguments.GetOptional("operator") ?? string.Empty,
};
return WriteReplyAsync(
client.AcknowledgeAlarmAsync(request, cancellationToken),
arguments,
output);
}
/// <summary>
/// Renders one <see cref="AlarmFeedMessage"/> for the human-readable
/// (non-JSON) stream-alarms output, distinguishing the <c>payload</c> oneof
/// arms: a snapshot active alarm, the snapshot-complete sentinel, or a live
/// transition.
/// </summary>
private static string FormatAlarmFeedMessage(AlarmFeedMessage feedMessage)
{
return feedMessage.PayloadCase switch
{
AlarmFeedMessage.PayloadOneofCase.ActiveAlarm =>
$"active-alarm {ProtobufJsonFormatter.Format(feedMessage.ActiveAlarm)}",
AlarmFeedMessage.PayloadOneofCase.SnapshotComplete =>
$"snapshot-complete {feedMessage.SnapshotComplete}",
AlarmFeedMessage.PayloadOneofCase.Transition =>
$"transition {ProtobufJsonFormatter.Format(feedMessage.Transition)}",
_ => $"unknown-payload {feedMessage.PayloadCase}",
};
}
private static JsonElement AlarmFeedMessageToJsonElement(AlarmFeedMessage feedMessage)
{
return JsonDocument.Parse(ProtobufJsonFormatter.Format(feedMessage)).RootElement.Clone();
}
private static async Task<int> SmokeAsync(
CliArguments arguments,
IMxGatewayCliClient client,
@@ -1908,6 +2030,8 @@ public static class MxGatewayClientCli
or "bench-read-bulk"
or "bench-stream-events"
or "stream-events"
or "stream-alarms"
or "acknowledge-alarm"
or "write"
or "write2"
or "smoke"
@@ -1966,6 +2090,8 @@ public static class MxGatewayClientCli
writer.WriteLine("mxgw-dotnet subscribe-bulk --session-id <id> --server-handle <n> --items <ref,ref> [--json]");
writer.WriteLine("mxgw-dotnet unsubscribe-bulk --session-id <id> --server-handle <n> --item-handles <n,n> [--json]");
writer.WriteLine("mxgw-dotnet stream-events --session-id <id> [--max-events <n>] [--json]");
writer.WriteLine("mxgw-dotnet stream-alarms [--filter-prefix <ref>] [--max-events <n>] [--json] [--jsonl]");
writer.WriteLine("mxgw-dotnet acknowledge-alarm --reference <ref> [--comment <text>] [--operator <user>] [--json]");
writer.WriteLine("mxgw-dotnet write --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--json]");
writer.WriteLine("mxgw-dotnet write2 --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--timestamp <iso>] [--json]");
writer.WriteLine("mxgw-dotnet smoke --item <ref> [--value <value> --type <type>] [--json]");
@@ -248,6 +248,87 @@ public sealed class MxGatewayClientCliTests
}
/// <summary>Verifies that stream-alarms with --max-events stops output and distinguishes payload cases.</summary>
[Fact]
public async Task RunAsync_StreamAlarms_WithMaxEventsStopsAndDistinguishesPayloadCases()
{
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new();
fakeClient.AlarmFeedMessages.Add(new AlarmFeedMessage
{
ActiveAlarm = new ActiveAlarmSnapshot { AlarmFullReference = "Tank01.Level.HiHi" },
});
fakeClient.AlarmFeedMessages.Add(new AlarmFeedMessage { SnapshotComplete = true });
int exitCode = await MxGatewayClientCli.RunAsync(
[
"stream-alarms",
"--endpoint",
"http://localhost:5000",
"--api-key",
"test-api-key",
"--filter-prefix",
"Tank01",
"--max-events",
"1",
],
output,
error,
_ => fakeClient);
Assert.Equal(0, exitCode);
StreamAlarmsRequest request = Assert.Single(fakeClient.StreamAlarmsRequests);
Assert.Equal("Tank01", request.AlarmFilterPrefix);
string text = output.ToString();
Assert.Contains("active-alarm", text);
Assert.Contains("Tank01.Level.HiHi", text);
Assert.DoesNotContain("snapshot-complete", text);
Assert.Equal(string.Empty, error.ToString());
}
/// <summary>Verifies that acknowledge-alarm builds a request and prints the JSON reply.</summary>
[Fact]
public async Task RunAsync_AcknowledgeAlarm_BuildsRequestAndPrintsJsonReply()
{
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new();
fakeClient.AcknowledgeAlarmReplies.Enqueue(new AcknowledgeAlarmReply
{
CorrelationId = "ack-fixture",
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
Hresult = 0,
});
int exitCode = await MxGatewayClientCli.RunAsync(
[
"acknowledge-alarm",
"--endpoint",
"http://localhost:5000",
"--api-key",
"test-api-key",
"--reference",
"Tank01.Level.HiHi",
"--comment",
"ack from cli",
"--operator",
"operator1",
"--json",
],
output,
error,
_ => fakeClient);
Assert.Equal(0, exitCode);
AcknowledgeAlarmRequest request = Assert.Single(fakeClient.AcknowledgeAlarmRequests);
Assert.Equal("Tank01.Level.HiHi", request.AlarmFullReference);
Assert.Equal("ack from cli", request.Comment);
Assert.Equal("operator1", request.OperatorUser);
Assert.Contains("ack-fixture", output.ToString());
Assert.Equal(string.Empty, error.ToString());
}
/// <summary>Verifies that smoke command closes opened session when a command fails.</summary>
[Fact]
public async Task RunAsync_Smoke_WhenCommandFails_ClosesOpenedSession()
@@ -695,6 +776,41 @@ public sealed class MxGatewayClientCliTests
}
}
/// <summary>Queue of acknowledge-alarm replies to return.</summary>
public Queue<AcknowledgeAlarmReply> AcknowledgeAlarmReplies { get; } = new();
/// <summary>List of received acknowledge-alarm requests.</summary>
public List<AcknowledgeAlarmRequest> AcknowledgeAlarmRequests { get; } = [];
/// <summary>List of received stream-alarms requests.</summary>
public List<StreamAlarmsRequest> StreamAlarmsRequests { get; } = [];
/// <summary>List of alarm feed messages to yield when streaming alarms.</summary>
public List<AlarmFeedMessage> AlarmFeedMessages { get; } = [];
/// <inheritdoc />
public Task<AcknowledgeAlarmReply> AcknowledgeAlarmAsync(
AcknowledgeAlarmRequest request,
CancellationToken cancellationToken)
{
AcknowledgeAlarmRequests.Add(request);
return Task.FromResult(AcknowledgeAlarmReplies.Dequeue());
}
/// <inheritdoc />
public async IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
StreamAlarmsRequest request,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
StreamAlarmsRequests.Add(request);
foreach (AlarmFeedMessage feedMessage in AlarmFeedMessages)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
yield return feedMessage;
}
}
/// <summary>Galaxy test connection reply to return.</summary>
public TestConnectionReply GalaxyTestConnectionReply { get; set; } = new() { Ok = true };
+118 -1
View File
@@ -107,6 +107,10 @@ func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) err
return runWrite(ctx, args[1:], stdout, stderr)
case "stream-events":
return runStreamEvents(ctx, args[1:], stdout, stderr)
case "stream-alarms":
return runStreamAlarms(ctx, args[1:], stdout, stderr)
case "acknowledge-alarm":
return runAcknowledgeAlarm(ctx, args[1:], stdout, stderr)
case "smoke":
return runSmoke(ctx, args[1:], stdout, stderr)
case "galaxy-test-connection":
@@ -816,6 +820,119 @@ func runStreamEvents(ctx context.Context, args []string, stdout, stderr io.Write
return nil
}
func runStreamAlarms(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("stream-alarms", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
filterPrefix := flags.String("filter-prefix", "", "alarm-reference prefix scoping the feed; empty means unscoped")
limit := flags.Int("limit", 0, "maximum feed messages to read; 0 means unbounded")
if err := flags.Parse(args); err != nil {
return err
}
client, _, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
// Mirror runStreamEvents so Ctrl+C on a long-running stream-alarms command
// cancels the gRPC stream cleanly (the gateway sees codes.Canceled rather
// than a torn TCP connection) and the deferred client.Close() actually runs.
signalCtx, stopSignals := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer stopSignals()
streamCtx, cancelStream := context.WithCancel(signalCtx)
defer cancelStream()
stream, err := client.StreamAlarms(streamCtx, &mxgateway.StreamAlarmsRequest{AlarmFilterPrefix: *filterPrefix})
if err != nil {
return err
}
count := 0
for {
message, err := stream.Recv()
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
return err
}
if *jsonOutput {
fmt.Fprintln(stdout, string(mustMarshalProto(message)))
} else {
fmt.Fprintln(stdout, formatAlarmFeedMessage(message))
}
count++
if *limit > 0 && count >= *limit {
cancelStream()
return nil
}
}
}
// formatAlarmFeedMessage renders one AlarmFeedMessage in the CLI's plain-text
// output style, distinguishing the active-alarm snapshot, snapshot-complete
// sentinel, and transition cases of the message's payload oneof.
func formatAlarmFeedMessage(message *mxgateway.AlarmFeedMessage) string {
switch {
case message.GetActiveAlarm() != nil:
alarm := message.GetActiveAlarm()
return fmt.Sprintf("active-alarm %s state=%s severity=%d", alarm.GetAlarmFullReference(), alarm.GetCurrentState(), alarm.GetSeverity())
case message.GetSnapshotComplete():
return "snapshot-complete"
case message.GetTransition() != nil:
transition := message.GetTransition()
return fmt.Sprintf("transition %s kind=%s severity=%d", transition.GetAlarmFullReference(), transition.GetTransitionKind(), transition.GetSeverity())
default:
return "unknown"
}
}
func runAcknowledgeAlarm(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("acknowledge-alarm", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
reference := flags.String("reference", "", "full alarm reference to acknowledge")
comment := flags.String("comment", "", "operator acknowledge comment")
operator := flags.String("operator", "", "operator user performing the acknowledge")
if err := flags.Parse(args); err != nil {
return err
}
if *reference == "" {
return errors.New("reference is required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
reply, err := client.AcknowledgeAlarm(ctx, &mxgateway.AcknowledgeAlarmRequest{
AlarmFullReference: *reference,
Comment: *comment,
OperatorUser: *operator,
})
if err != nil {
return err
}
if *jsonOutput {
return writeJSON(stdout, commandReplyOutput{
Command: "acknowledge-alarm",
Options: options,
Reply: mustMarshalProto(reply),
})
}
fmt.Fprintln(stdout, reply.GetHresult())
return nil
}
func runSmoke(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("smoke", flag.ContinueOnError)
flags.SetOutput(stderr)
@@ -1120,7 +1237,7 @@ func runBatch(ctx context.Context, in io.Reader, stdout, stderr io.Writer) error
}
func writeUsage(writer io.Writer) {
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|read-bulk|write-bulk|write2-bulk|write-secured-bulk|write-secured2-bulk|bench-read-bulk|write|stream-events|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|read-bulk|write-bulk|write2-bulk|write-secured-bulk|write-secured2-bulk|bench-read-bulk|write|stream-events|stream-alarms|acknowledge-alarm|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
}
func dialGalaxyForCommand(ctx context.Context, common *commonOptions) (*mxgateway.GalaxyClient, commonOptions, error) {
@@ -3,6 +3,7 @@ package com.dohertylan.mxgateway.cli;
import com.dohertylan.mxgateway.client.DeployEventStream;
import com.dohertylan.mxgateway.client.GalaxyRepositoryClient;
import com.dohertylan.mxgateway.client.MxEventStream;
import com.dohertylan.mxgateway.client.MxGatewayAlarmFeedSubscription;
import com.dohertylan.mxgateway.client.MxGatewayClient;
import com.dohertylan.mxgateway.client.MxGatewayClientOptions;
import com.dohertylan.mxgateway.client.MxGatewayClientVersion;
@@ -28,14 +29,23 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import io.grpc.stub.StreamObserver;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry;
@@ -127,6 +137,8 @@ public final class MxGatewayCli implements Callable<Integer> {
commandLine.addSubcommand("bench-read-bulk", new BenchReadBulkCommand(clientFactory));
commandLine.addSubcommand("write", new WriteCommand(clientFactory));
commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory));
commandLine.addSubcommand("stream-alarms", new StreamAlarmsCommand(clientFactory));
commandLine.addSubcommand("acknowledge-alarm", new AcknowledgeAlarmCommand(clientFactory));
commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory));
commandLine.addSubcommand("galaxy-test", new GalaxyTestConnectionCommand());
commandLine.addSubcommand("galaxy-deploy-time", new GalaxyDeployTimeCommand());
@@ -139,6 +151,9 @@ public final class MxGatewayCli implements Callable<Integer> {
/** Sentinel written to stdout after every command result in batch mode. */
static final String BATCH_EOR = "__MXGW_BATCH_EOR__";
/** Sentinel queued by {@code stream-alarms} to mark a clean end of the alarm feed. */
private static final Object ALARM_FEED_END = new Object();
/**
* Reads one CLI invocation per stdin line, executes each via a fresh
* {@link CommandLine}, and writes {@value #BATCH_EOR} to stdout after
@@ -1155,6 +1170,115 @@ public final class MxGatewayCli implements Callable<Integer> {
}
}
@Command(name = "stream-alarms", description = "Streams the gateway central alarm feed.")
static final class StreamAlarmsCommand extends GatewayCommand {
@Option(names = "--filter-prefix", description = "Alarm-reference prefix scoping the feed; empty means unscoped.")
String filterPrefix = "";
@Option(names = "--limit", defaultValue = "0", description = "Maximum feed messages to print.")
int limit;
StreamAlarmsCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
// The async alarm feed delivers on a background gRPC thread; buffer
// messages in a bounded queue and drain them on this thread so the
// --limit termination mirrors stream-events. 1024 absorbs the
// gateway's initial active-alarm snapshot burst.
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(1024);
StreamAlarmsRequest request = StreamAlarmsRequest.newBuilder()
.setAlarmFilterPrefix(filterPrefix)
.build();
MxGatewayAlarmFeedSubscription subscription =
client.streamAlarms(request, new StreamObserver<>() {
@Override
public void onNext(AlarmFeedMessage value) {
queue.offer(value);
}
@Override
public void onError(Throwable error) {
queue.offer(error);
}
@Override
public void onCompleted() {
queue.offer(ALARM_FEED_END);
}
});
try {
int count = 0;
while (true) {
Object item = queue.take();
if (item == ALARM_FEED_END) {
break;
}
if (item instanceof Throwable error) {
throw new IllegalStateException(
"gateway stream alarms failed: " + error.getMessage(), error);
}
AlarmFeedMessage message = (AlarmFeedMessage) item;
if (json) {
client.out().println(protoJson(message));
} else {
client.out().println(formatAlarmFeedMessage(message));
}
client.out().flush();
count++;
if (limit > 0 && count >= limit) {
subscription.cancel();
break;
}
}
} catch (InterruptedException error) {
Thread.currentThread().interrupt();
subscription.cancel();
} finally {
subscription.cancel();
}
}
return 0;
}
}
@Command(name = "acknowledge-alarm", description = "Acknowledges an active MXAccess alarm.")
static final class AcknowledgeAlarmCommand extends GatewayCommand {
@Option(names = "--reference", required = true, description = "Full alarm reference to acknowledge.")
String reference;
@Option(names = "--comment", description = "Operator acknowledge comment.")
String comment = "";
@Option(names = "--operator", description = "Operator user performing the acknowledge.")
String operator = "";
AcknowledgeAlarmCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
AcknowledgeAlarmReply reply = client.acknowledgeAlarm(AcknowledgeAlarmRequest.newBuilder()
.setAlarmFullReference(reference)
.setComment(comment)
.setOperatorUser(operator)
.build());
writeOutput(
"acknowledge-alarm",
common,
json,
reply,
() -> Integer.toString(reply.getHresult()));
}
return 0;
}
}
@Command(name = "smoke", description = "Runs a bounded open/register/add/advise flow.")
static final class SmokeCommand extends GatewayCommand {
@Option(names = "--client-name", defaultValue = "mxgw-java-smoke", description = "MXAccess client name.")
@@ -1329,6 +1453,11 @@ public final class MxGatewayCli implements Callable<Integer> {
MxGatewayCliSession session(String sessionId);
AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request);
MxGatewayAlarmFeedSubscription streamAlarms(
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer);
@Override
void close();
}
@@ -1401,6 +1530,17 @@ public final class MxGatewayCli implements Callable<Integer> {
return new GrpcMxGatewayCliSession(MxGatewaySession.forSessionId(client, sessionId));
}
@Override
public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) {
return client.acknowledgeAlarm(request);
}
@Override
public MxGatewayAlarmFeedSubscription streamAlarms(
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer) {
return client.streamAlarms(request, observer);
}
@Override
public void close() {
client.close();
@@ -1576,6 +1716,32 @@ public final class MxGatewayCli implements Callable<Integer> {
return values;
}
/**
* Renders one {@link AlarmFeedMessage} in the CLI's plain-text output
* style, distinguishing the active-alarm snapshot, snapshot-complete
* sentinel, and transition cases of the message's {@code payload} oneof.
*/
private static String formatAlarmFeedMessage(AlarmFeedMessage message) {
return switch (message.getPayloadCase()) {
case ACTIVE_ALARM -> {
ActiveAlarmSnapshot alarm = message.getActiveAlarm();
yield String.format(
"active-alarm %s state=%s severity=%d",
alarm.getAlarmFullReference(), alarm.getCurrentState().name(), alarm.getSeverity());
}
case SNAPSHOT_COMPLETE -> "snapshot-complete";
case TRANSITION -> {
OnAlarmTransitionEvent transition = message.getTransition();
yield String.format(
"transition %s kind=%s severity=%d",
transition.getAlarmFullReference(),
transition.getTransitionKind().name(),
transition.getSeverity());
}
case PAYLOAD_NOT_SET -> "unknown";
};
}
private static MxValue parseValue(String type, String text) {
return switch (type) {
case "bool" -> MxValues.boolValue(Boolean.parseBoolean(text));
@@ -8,10 +8,18 @@ import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import com.dohertylan.mxgateway.client.MxGatewayAlarmFeedSubscription;
import io.grpc.stub.StreamObserver;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
import mxaccess_gateway.v1.MxaccessGateway.AlarmConditionState;
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
import mxaccess_gateway.v1.MxaccessGateway.AlarmTransitionKind;
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
@@ -20,9 +28,11 @@ import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
import mxaccess_gateway.v1.MxaccessGateway.RegisterReply;
import mxaccess_gateway.v1.MxaccessGateway.SessionState;
@@ -389,6 +399,70 @@ final class MxGatewayCliTests {
assertTrue(output.contains("TestMachine_002.TestChangingInt"), output);
}
// ---- stream-alarms / acknowledge-alarm subcommands ----
@Test
void streamAlarmsCommandForwardsFilterPrefixAndPrintsFeedMessages() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(factory, "stream-alarms", "--filter-prefix", "Tank01");
assertEquals(0, run.exitCode());
assertEquals("Tank01", factory.client.lastStreamAlarmsRequest.getAlarmFilterPrefix());
String out = run.output();
assertTrue(out.contains("active-alarm Tank01.Level.HiHi"), out);
assertTrue(out.contains("snapshot-complete"), out);
assertTrue(out.contains("transition Tank01.Level.HiHi"), out);
}
@Test
void streamAlarmsCommandHonoursLimit() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(factory, "stream-alarms", "--limit", "1");
assertEquals(0, run.exitCode());
long lines = run.output().lines().filter(line -> !line.isBlank()).count();
assertEquals(1, lines, "expected exactly one feed message with --limit 1, got: " + run.output());
}
@Test
void streamAlarmsCommandPrintsJson() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(factory, "stream-alarms", "--json");
assertEquals(0, run.exitCode());
assertTrue(run.output().contains("\"activeAlarm\""), run.output());
assertTrue(run.output().contains("\"snapshotComplete\""), run.output());
}
@Test
void acknowledgeAlarmCommandForwardsOptionsAndPrintsReply() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(
factory,
"acknowledge-alarm",
"--reference",
"Tank01.Level.HiHi",
"--comment",
"checked",
"--operator",
"operator1",
"--json");
assertEquals(0, run.exitCode());
assertEquals("Tank01.Level.HiHi", factory.client.lastAcknowledgeAlarmRequest.getAlarmFullReference());
assertEquals("checked", factory.client.lastAcknowledgeAlarmRequest.getComment());
assertEquals("operator1", factory.client.lastAcknowledgeAlarmRequest.getOperatorUser());
assertTrue(run.output().contains("\"command\":\"acknowledge-alarm\""), run.output());
}
@Test
void acknowledgeAlarmCommandRequiresReference() {
CliRun run = execute(new FakeClientFactory(), "acknowledge-alarm", "--comment", "checked");
assertFalse(run.exitCode() == 0, "expected non-zero exit without --reference");
assertTrue(run.errors().contains("--reference"), run.errors());
}
// ---- Client.Java-027: batch subcommand ----
@Test
@@ -501,6 +575,8 @@ final class MxGatewayCliTests {
private final PrintWriter out;
private final FakeSession session = new FakeSession();
private boolean closeCalled;
private AcknowledgeAlarmRequest lastAcknowledgeAlarmRequest;
private StreamAlarmsRequest lastStreamAlarmsRequest;
private FakeClient(PrintWriter out) {
this.out = out;
@@ -534,6 +610,40 @@ final class MxGatewayCliTests {
return session;
}
@Override
public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) {
lastAcknowledgeAlarmRequest = request;
return AcknowledgeAlarmReply.newBuilder()
.setCorrelationId(request.getClientCorrelationId())
.setProtocolStatus(ok())
.setHresult(0)
.build();
}
@Override
public MxGatewayAlarmFeedSubscription streamAlarms(
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer) {
lastStreamAlarmsRequest = request;
// Replay a deterministic active-alarm snapshot, snapshot-complete
// sentinel, transition, then complete the feed so the CLI command
// drains a bounded stream without contacting a live gateway.
observer.onNext(AlarmFeedMessage.newBuilder()
.setActiveAlarm(ActiveAlarmSnapshot.newBuilder()
.setAlarmFullReference("Tank01.Level.HiHi")
.setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE)
.setSeverity(700))
.build());
observer.onNext(AlarmFeedMessage.newBuilder().setSnapshotComplete(true).build());
observer.onNext(AlarmFeedMessage.newBuilder()
.setTransition(OnAlarmTransitionEvent.newBuilder()
.setAlarmFullReference("Tank01.Level.HiHi")
.setTransitionKind(AlarmTransitionKind.ALARM_TRANSITION_KIND_ACKNOWLEDGE)
.setSeverity(700))
.build());
observer.onCompleted();
return new MxGatewayAlarmFeedSubscription();
}
@Override
public void close() {
}
+249 -2
View File
@@ -18,8 +18,9 @@ use clap::{Args, Parser, Subcommand, ValueEnum};
use futures_util::StreamExt;
use mxgateway_client::generated::galaxy_repository::v1::DeployEvent;
use mxgateway_client::generated::mxaccess_gateway::v1::{
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily,
MxValue as ProtoMxValue, OpenSessionRequest, PingCommand, StreamEventsRequest, Write2BulkEntry,
alarm_feed_message, AcknowledgeAlarmRequest, AlarmFeedMessage, CloseSessionRequest, MxCommand,
MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily, MxValue as ProtoMxValue,
OpenSessionRequest, PingCommand, StreamAlarmsRequest, StreamEventsRequest, Write2BulkEntry,
WriteBulkEntry, WriteSecured2BulkEntry, WriteSecuredBulkEntry,
};
use mxgateway_client::{
@@ -272,6 +273,24 @@ enum Command {
#[arg(long)]
jsonl: bool,
},
/// Attach to the gateway's session-less central alarm feed. The stream
/// opens with one `active_alarm` per currently-active alarm, then a
/// single `snapshot_complete`, then a `transition` for every subsequent
/// raise / acknowledge / clear.
StreamAlarms {
#[command(flatten)]
connection: ConnectionArgs,
/// Optional alarm-reference prefix scoping the feed to an equipment
/// sub-tree. Omit to stream every active alarm.
#[arg(long)]
filter_prefix: Option<String>,
#[arg(long, default_value_t = 1)]
max_events: usize,
#[arg(long)]
json: bool,
#[arg(long)]
jsonl: bool,
},
Write {
#[command(flatten)]
connection: ConnectionArgs,
@@ -310,6 +329,20 @@ enum Command {
#[arg(long)]
json: bool,
},
/// Acknowledge an active MXAccess alarm condition through the gateway's
/// session-less AcknowledgeAlarm RPC.
AcknowledgeAlarm {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
reference: String,
#[arg(long, default_value = "")]
comment: String,
#[arg(long, default_value = "")]
operator: String,
#[arg(long)]
json: bool,
},
Smoke {
#[command(flatten)]
connection: ConnectionArgs,
@@ -788,6 +821,52 @@ async fn dispatch(command: Command) -> Result<(), Error> {
println!("{}", json!({ "eventCount": event_count, "events": events }));
}
}
Command::StreamAlarms {
connection,
filter_prefix,
max_events,
json,
jsonl,
} => {
if max_events > MAX_AGGREGATE_EVENTS {
return Err(Error::InvalidArgument {
name: "max-events".to_owned(),
detail: format!("must be less than or equal to {MAX_AGGREGATE_EVENTS}"),
});
}
let client = connect(connection).await?;
let mut stream = client
.stream_alarms(StreamAlarmsRequest {
client_correlation_id: mxgateway_client::next_correlation_id(
"cli-stream-alarms",
),
alarm_filter_prefix: filter_prefix.unwrap_or_default(),
})
.await?;
let mut messages: Vec<Value> = Vec::new();
let mut message_count = 0usize;
while message_count < max_events {
let Some(message) = stream.next().await else {
break;
};
let message = message?;
message_count += 1;
if jsonl {
println!("{}", alarm_feed_message_to_json(&message));
} else if json {
messages.push(alarm_feed_message_to_json(&message));
} else {
println!("{}", alarm_feed_message_summary(&message));
}
}
if json {
println!(
"{}",
json!({ "messageCount": message_count, "messages": messages })
);
}
}
Command::Write {
connection,
session_id,
@@ -832,6 +911,26 @@ async fn dispatch(command: Command) -> Result<(), Error> {
.await?;
print_ok("write2", json);
}
Command::AcknowledgeAlarm {
connection,
reference,
comment,
operator,
json,
} => {
let client = connect(connection).await?;
let reply = client
.acknowledge_alarm(AcknowledgeAlarmRequest {
client_correlation_id: mxgateway_client::next_correlation_id(
"cli-acknowledge-alarm",
),
alarm_full_reference: reference,
comment,
operator_user: operator,
})
.await?;
print_acknowledge_alarm_reply(&reply, json);
}
Command::Galaxy(galaxy_command) => run_galaxy(galaxy_command).await?,
Command::Smoke {
connection,
@@ -1533,6 +1632,113 @@ fn print_deploy_event(event: &DeployEvent, use_json: bool) {
}
}
/// Render a streamed [`AlarmFeedMessage`] as a terse one-line summary that
/// distinguishes the three `payload` oneof cases.
fn alarm_feed_message_summary(message: &AlarmFeedMessage) -> String {
match &message.payload {
Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => {
format!(
"active-alarm {} state={}",
snapshot.alarm_full_reference,
AlarmEnumName::condition_state(snapshot.current_state)
)
}
Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => {
format!("snapshot-complete {complete}")
}
Some(alarm_feed_message::Payload::Transition(transition)) => {
format!(
"transition {} kind={}",
transition.alarm_full_reference,
AlarmEnumName::transition_kind(transition.transition_kind)
)
}
None => "(empty)".to_owned(),
}
}
/// Render a streamed [`AlarmFeedMessage`] as a JSON object whose single
/// top-level key names the active `payload` oneof case, mirroring the
/// protobuf-JSON the .NET/Go/Java/Python CLIs emit.
fn alarm_feed_message_to_json(message: &AlarmFeedMessage) -> Value {
match &message.payload {
Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => json!({
"activeAlarm": {
"alarmFullReference": snapshot.alarm_full_reference,
"sourceObjectReference": snapshot.source_object_reference,
"alarmTypeName": snapshot.alarm_type_name,
"severity": snapshot.severity,
"currentState": AlarmEnumName::condition_state(snapshot.current_state),
"category": snapshot.category,
"description": snapshot.description,
"operatorUser": snapshot.operator_user,
"operatorComment": snapshot.operator_comment,
}
}),
Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => json!({
"snapshotComplete": complete,
}),
Some(alarm_feed_message::Payload::Transition(transition)) => json!({
"transition": {
"alarmFullReference": transition.alarm_full_reference,
"sourceObjectReference": transition.source_object_reference,
"alarmTypeName": transition.alarm_type_name,
"transitionKind": AlarmEnumName::transition_kind(transition.transition_kind),
"severity": transition.severity,
"operatorUser": transition.operator_user,
"operatorComment": transition.operator_comment,
"category": transition.category,
"description": transition.description,
}
}),
None => Value::Null,
}
}
/// Tiny namespace for alarm-enum name lookups used by the alarm-feed
/// renderers; keeps the proto-enum imports off the `main.rs` top level.
struct AlarmEnumName;
impl AlarmEnumName {
fn condition_state(value: i32) -> String {
use mxgateway_client::generated::mxaccess_gateway::v1::AlarmConditionState;
AlarmConditionState::try_from(value)
.map(|state| state.as_str_name().to_owned())
.unwrap_or_else(|_| value.to_string())
}
fn transition_kind(value: i32) -> String {
use mxgateway_client::generated::mxaccess_gateway::v1::AlarmTransitionKind;
AlarmTransitionKind::try_from(value)
.map(|kind| kind.as_str_name().to_owned())
.unwrap_or_else(|_| value.to_string())
}
}
/// Render an [`AcknowledgeAlarmReply`] as a terse line or a JSON document.
fn print_acknowledge_alarm_reply(
reply: &mxgateway_client::generated::mxaccess_gateway::v1::AcknowledgeAlarmReply,
use_json: bool,
) {
if use_json {
println!(
"{}",
json!({
"operation": "acknowledge-alarm",
"correlationId": reply.correlation_id,
"protocolStatus": reply.protocol_status.as_ref().map(|status| json!({
"code": status.code,
"message": status.message,
})),
"hresult": reply.hresult,
"diagnosticMessage": reply.diagnostic_message,
})
);
} else {
println!("acknowledge-alarm completed");
}
}
/// Render a streamed [`MxEvent`] as a JSON object. The scalar value is
/// projected into protojson-style `*Value` keys so the cross-language e2e
/// matrix can extract and compare event values uniformly across all five
@@ -1793,6 +1999,47 @@ mod tests {
);
}
#[test]
fn parses_stream_alarms_command() {
let parsed = Cli::try_parse_from([
"mxgw",
"stream-alarms",
"--filter-prefix",
"Tank01",
"--max-events",
"3",
"--json",
]);
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
}
#[test]
fn parses_stream_alarms_command_without_filter_prefix() {
let parsed = Cli::try_parse_from(["mxgw", "stream-alarms"]);
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
}
#[test]
fn parses_acknowledge_alarm_command() {
let parsed = Cli::try_parse_from([
"mxgw",
"acknowledge-alarm",
"--reference",
"Tank01.Level.HiHi",
"--comment",
"ack from cli",
"--operator",
"operator1",
]);
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
}
#[test]
fn acknowledge_alarm_requires_reference() {
let parsed = Cli::try_parse_from(["mxgw", "acknowledge-alarm"]);
assert!(parsed.is_err());
}
#[test]
fn parses_galaxy_watch_command_with_last_seen_and_max_events() {
let parsed = Cli::try_parse_from([