.NET client: port stream-alarms and acknowledge-alarm + fix stream-events OCE

Adds the session-less alarm CLI subcommands. stream-alarms attaches to the
gateway's central alarm feed (--filter-prefix, --max-events, --json/--jsonl);
acknowledge-alarm is a unary ack (--reference required, --comment, --operator).
StreamAlarmsAsync joins QueryActiveAlarmsAsync on MxGatewayClient and the
transport interface; the CLI client interface, adapter, and FakeGatewayTransport
follow.

Also fixes the OCE bug exposed by -VerifyWrite in the cross-language e2e:
StreamEventsAsync's await foreach now swallows OperationCanceledException when
the supplied cancellation token is the one that fired (graceful end-of-window),
and RunBatchAsync no longer excludes OCE from its outer catch — so a streaming
command that hits its --timeout reports a JSON error inside its EOR-delimited
record instead of killing the long-lived batch process.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-24 06:45:24 -04:00
parent f90bff01db
commit 11cc6715ed
8 changed files with 414 additions and 19 deletions
@@ -51,6 +51,11 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx
/// </summary>
public List<(QueryActiveAlarmsRequest Request, CallOptions CallOptions)> QueryActiveAlarmsCalls { get; } = [];
/// <summary>
/// Gets the list of captured StreamAlarmsAsync calls.
/// </summary>
public List<(StreamAlarmsRequest Request, CallOptions CallOptions)> StreamAlarmsCalls { get; } = [];
/// <summary>
/// Gets the queue of exceptions to throw from AcknowledgeAlarmAsync.
/// </summary>
@@ -58,6 +63,7 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx
private readonly Queue<AcknowledgeAlarmReply> _acknowledgeReplies = new();
private readonly List<ActiveAlarmSnapshot> _activeAlarmSnapshots = [];
private readonly List<AlarmFeedMessage> _alarmFeedMessages = [];
/// <summary>
/// Gets or sets the reply to return from OpenSessionAsync.
@@ -238,4 +244,27 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx
{
_activeAlarmSnapshots.Add(snapshot);
}
/// <summary>
/// Records the stream-alarms call and yields each enqueued feed message.
/// </summary>
public async IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
StreamAlarmsRequest request,
CallOptions callOptions)
{
StreamAlarmsCalls.Add((request, callOptions));
foreach (AlarmFeedMessage message in _alarmFeedMessages)
{
callOptions.CancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
yield return message;
}
}
/// <summary>Enqueues an alarm feed message to be yielded from StreamAlarmsAsync.</summary>
public void AddAlarmFeedMessage(AlarmFeedMessage message)
{
_alarmFeedMessages.Add(message);
}
}
@@ -148,6 +148,87 @@ public sealed class MxGatewayClientCliTests
}
/// <summary>Verifies that stream-alarms with --max-events stops output and distinguishes payload cases.</summary>
[Fact]
public async Task RunAsync_StreamAlarms_WithMaxEventsStopsAndDistinguishesPayloadCases()
{
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new();
fakeClient.AlarmFeedMessages.Add(new AlarmFeedMessage
{
ActiveAlarm = new ActiveAlarmSnapshot { AlarmFullReference = "Tank01.Level.HiHi" },
});
fakeClient.AlarmFeedMessages.Add(new AlarmFeedMessage { SnapshotComplete = true });
int exitCode = await MxGatewayClientCli.RunAsync(
[
"stream-alarms",
"--endpoint",
"http://localhost:5000",
"--api-key",
"test-api-key",
"--filter-prefix",
"Tank01",
"--max-events",
"1",
],
output,
error,
_ => fakeClient);
Assert.Equal(0, exitCode);
StreamAlarmsRequest request = Assert.Single(fakeClient.StreamAlarmsRequests);
Assert.Equal("Tank01", request.AlarmFilterPrefix);
string text = output.ToString();
Assert.Contains("active-alarm", text);
Assert.Contains("Tank01.Level.HiHi", text);
Assert.DoesNotContain("snapshot-complete", text);
Assert.Equal(string.Empty, error.ToString());
}
/// <summary>Verifies that acknowledge-alarm builds a request and prints the JSON reply.</summary>
[Fact]
public async Task RunAsync_AcknowledgeAlarm_BuildsRequestAndPrintsJsonReply()
{
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new();
fakeClient.AcknowledgeAlarmReplies.Enqueue(new AcknowledgeAlarmReply
{
CorrelationId = "ack-fixture",
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
Hresult = 0,
});
int exitCode = await MxGatewayClientCli.RunAsync(
[
"acknowledge-alarm",
"--endpoint",
"http://localhost:5000",
"--api-key",
"test-api-key",
"--reference",
"Tank01.Level.HiHi",
"--comment",
"ack from cli",
"--operator",
"operator1",
"--json",
],
output,
error,
_ => fakeClient);
Assert.Equal(0, exitCode);
AcknowledgeAlarmRequest request = Assert.Single(fakeClient.AcknowledgeAlarmRequests);
Assert.Equal("Tank01.Level.HiHi", request.AlarmFullReference);
Assert.Equal("ack from cli", request.Comment);
Assert.Equal("operator1", request.OperatorUser);
Assert.Contains("ack-fixture", output.ToString());
Assert.Equal(string.Empty, error.ToString());
}
/// <summary>Verifies that smoke command closes opened session when a command fails.</summary>
[Fact]
public async Task RunAsync_Smoke_WhenCommandFails_ClosesOpenedSession()
@@ -507,6 +588,41 @@ public sealed class MxGatewayClientCliTests
}
}
/// <summary>Queue of acknowledge-alarm replies to return.</summary>
public Queue<AcknowledgeAlarmReply> AcknowledgeAlarmReplies { get; } = new();
/// <summary>List of received acknowledge-alarm requests.</summary>
public List<AcknowledgeAlarmRequest> AcknowledgeAlarmRequests { get; } = [];
/// <summary>List of received stream-alarms requests.</summary>
public List<StreamAlarmsRequest> StreamAlarmsRequests { get; } = [];
/// <summary>List of alarm feed messages to yield when streaming alarms.</summary>
public List<AlarmFeedMessage> AlarmFeedMessages { get; } = [];
/// <inheritdoc />
public Task<AcknowledgeAlarmReply> AcknowledgeAlarmAsync(
AcknowledgeAlarmRequest request,
CancellationToken cancellationToken)
{
AcknowledgeAlarmRequests.Add(request);
return Task.FromResult(AcknowledgeAlarmReplies.Dequeue());
}
/// <inheritdoc />
public async IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
StreamAlarmsRequest request,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
StreamAlarmsRequests.Add(request);
foreach (AlarmFeedMessage feedMessage in AlarmFeedMessages)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
yield return feedMessage;
}
}
/// <summary>Galaxy test connection reply to return.</summary>
public TestConnectionReply GalaxyTestConnectionReply { get; set; } = new() { Ok = true };