Compare commits
5 Commits
f90bff01db
...
8a0c59d7e8
| Author | SHA1 | Date | |
|---|---|---|---|
| 8a0c59d7e8 | |||
| 828e3e6cf6 | |||
| 7de4efeb02 | |||
| 6f0d142639 | |||
| 11cc6715ed |
@@ -45,6 +45,27 @@ public interface IMxGatewayCliClient : IAsyncDisposable
|
|||||||
StreamEventsRequest request,
|
StreamEventsRequest request,
|
||||||
CancellationToken cancellationToken);
|
CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Acknowledges an active MXAccess alarm condition through the gateway.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="request">The acknowledge request — alarm reference, comment, operator user.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token for the operation.</param>
|
||||||
|
/// <returns>The acknowledge reply with protocol + native MxStatus.</returns>
|
||||||
|
Task<AcknowledgeAlarmReply> AcknowledgeAlarmAsync(
|
||||||
|
AcknowledgeAlarmRequest request,
|
||||||
|
CancellationToken cancellationToken);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Attaches to the gateway's central alarm feed — the current active-alarm
|
||||||
|
/// snapshot followed by live transitions.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="request">The stream request, optionally scoped by alarm-reference prefix.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token for the operation.</param>
|
||||||
|
/// <returns>An async enumerable of alarm feed messages.</returns>
|
||||||
|
IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
|
||||||
|
StreamAlarmsRequest request,
|
||||||
|
CancellationToken cancellationToken);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Tests connection to the Galaxy Repository.
|
/// Tests connection to the Galaxy Repository.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@@ -52,6 +52,22 @@ internal sealed class MxGatewayCliClientAdapter : IMxGatewayCliClient
|
|||||||
return _client.StreamEventsAsync(request, cancellationToken);
|
return _client.StreamEventsAsync(request, cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task<AcknowledgeAlarmReply> AcknowledgeAlarmAsync(
|
||||||
|
AcknowledgeAlarmRequest request,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return _client.AcknowledgeAlarmAsync(request, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
|
||||||
|
StreamAlarmsRequest request,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return _client.StreamAlarmsAsync(request, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public Task<TestConnectionReply> GalaxyTestConnectionAsync(
|
public Task<TestConnectionReply> GalaxyTestConnectionAsync(
|
||||||
TestConnectionRequest request,
|
TestConnectionRequest request,
|
||||||
|
|||||||
@@ -128,6 +128,10 @@ public static class MxGatewayClientCli
|
|||||||
.ConfigureAwait(false),
|
.ConfigureAwait(false),
|
||||||
"stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
|
"stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
|
||||||
.ConfigureAwait(false),
|
.ConfigureAwait(false),
|
||||||
|
"stream-alarms" => await StreamAlarmsAsync(arguments, client, standardOutput, cancellation.Token)
|
||||||
|
.ConfigureAwait(false),
|
||||||
|
"acknowledge-alarm" => await AcknowledgeAlarmAsync(arguments, client, standardOutput, cancellation.Token)
|
||||||
|
.ConfigureAwait(false),
|
||||||
"write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token)
|
"write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token)
|
||||||
.ConfigureAwait(false),
|
.ConfigureAwait(false),
|
||||||
"write2" => await Write2Async(arguments, client, standardOutput, cancellation.Token)
|
"write2" => await Write2Async(arguments, client, standardOutput, cancellation.Token)
|
||||||
@@ -212,9 +216,13 @@ public static class MxGatewayClientCli
|
|||||||
forceJsonErrors: true)
|
forceJsonErrors: true)
|
||||||
.ConfigureAwait(false);
|
.ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
catch (Exception exception) when (exception is not OperationCanceledException)
|
catch (Exception exception)
|
||||||
{
|
{
|
||||||
// Unexpected exception that escaped RunCoreAsync (shouldn't happen, but be safe).
|
// Unexpected exception that escaped RunCoreAsync (shouldn't happen, but be safe).
|
||||||
|
// OperationCanceledException from long-running streaming commands
|
||||||
|
// (e.g. galaxy-watch hit by --timeout) is caught here too — the
|
||||||
|
// batch process must continue with the next command rather than
|
||||||
|
// unwinding.
|
||||||
commandError.WriteLine(JsonSerializer.Serialize(
|
commandError.WriteLine(JsonSerializer.Serialize(
|
||||||
new { error = exception.Message, type = exception.GetType().Name },
|
new { error = exception.Message, type = exception.GetType().Name },
|
||||||
JsonOptions));
|
JsonOptions));
|
||||||
@@ -993,6 +1001,8 @@ public static class MxGatewayClientCli
|
|||||||
AfterWorkerSequence = arguments.GetUInt64("after-worker-sequence", 0),
|
AfterWorkerSequence = arguments.GetUInt64("after-worker-sequence", 0),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(request, cancellationToken)
|
await foreach (MxEvent gatewayEvent in client.StreamEventsAsync(request, cancellationToken)
|
||||||
.WithCancellation(cancellationToken)
|
.WithCancellation(cancellationToken)
|
||||||
.ConfigureAwait(false))
|
.ConfigureAwait(false))
|
||||||
@@ -1016,6 +1026,12 @@ public static class MxGatewayClientCli
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
// Client.Dotnet-017: graceful end-of-window completion mode for a
|
||||||
|
// finite-window event collector. Emit aggregate JSON below and exit 0.
|
||||||
|
}
|
||||||
|
|
||||||
if (json && !jsonLines)
|
if (json && !jsonLines)
|
||||||
{
|
{
|
||||||
@@ -1027,6 +1043,124 @@ public static class MxGatewayClientCli
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static async Task<int> StreamAlarmsAsync(
|
||||||
|
CliArguments arguments,
|
||||||
|
IMxGatewayCliClient client,
|
||||||
|
TextWriter output,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
uint maxEvents = arguments.GetUInt32("max-events", 0);
|
||||||
|
bool json = arguments.HasFlag("json");
|
||||||
|
bool jsonLines = arguments.HasFlag("jsonl");
|
||||||
|
if (json && !jsonLines && maxEvents is 0)
|
||||||
|
{
|
||||||
|
throw new ArgumentException("--json stream-alarms requires --max-events to bound aggregate output.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (maxEvents > MaxAggregateEvents)
|
||||||
|
{
|
||||||
|
throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
var messages = json && !jsonLines
|
||||||
|
? new List<AlarmFeedMessage>(checked((int)maxEvents))
|
||||||
|
: [];
|
||||||
|
uint messageCount = 0;
|
||||||
|
var request = new StreamAlarmsRequest
|
||||||
|
{
|
||||||
|
ClientCorrelationId = CreateCorrelationId(),
|
||||||
|
AlarmFilterPrefix = arguments.GetOptional("filter-prefix") ?? string.Empty,
|
||||||
|
};
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await foreach (AlarmFeedMessage feedMessage in client.StreamAlarmsAsync(request, cancellationToken)
|
||||||
|
.WithCancellation(cancellationToken)
|
||||||
|
.ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
if (jsonLines)
|
||||||
|
{
|
||||||
|
output.WriteLine(ProtobufJsonFormatter.Format(feedMessage));
|
||||||
|
}
|
||||||
|
else if (json)
|
||||||
|
{
|
||||||
|
messages.Add(feedMessage);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
output.WriteLine(FormatAlarmFeedMessage(feedMessage));
|
||||||
|
}
|
||||||
|
|
||||||
|
messageCount++;
|
||||||
|
if (maxEvents > 0 && messageCount >= maxEvents)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
// Mirrors stream-events (Client.Dotnet-017): the supplied token covers
|
||||||
|
// the user's --timeout wall-clock budget and external Ctrl+C / parent
|
||||||
|
// CTS cancellation. All are graceful completion modes for a
|
||||||
|
// finite-window alarm-feed collector: emit what arrived and exit 0.
|
||||||
|
}
|
||||||
|
|
||||||
|
if (json && !jsonLines)
|
||||||
|
{
|
||||||
|
output.WriteLine(JsonSerializer.Serialize(
|
||||||
|
new { alarms = messages.Select(AlarmFeedMessageToJsonElement).ToArray() },
|
||||||
|
JsonOptions));
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Task<int> AcknowledgeAlarmAsync(
|
||||||
|
CliArguments arguments,
|
||||||
|
IMxGatewayCliClient client,
|
||||||
|
TextWriter output,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var request = new AcknowledgeAlarmRequest
|
||||||
|
{
|
||||||
|
ClientCorrelationId = CreateCorrelationId(),
|
||||||
|
AlarmFullReference = arguments.GetRequired("reference"),
|
||||||
|
Comment = arguments.GetOptional("comment") ?? string.Empty,
|
||||||
|
OperatorUser = arguments.GetOptional("operator") ?? string.Empty,
|
||||||
|
};
|
||||||
|
|
||||||
|
return WriteReplyAsync(
|
||||||
|
client.AcknowledgeAlarmAsync(request, cancellationToken),
|
||||||
|
arguments,
|
||||||
|
output);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Renders one <see cref="AlarmFeedMessage"/> for the human-readable
|
||||||
|
/// (non-JSON) stream-alarms output, distinguishing the <c>payload</c> oneof
|
||||||
|
/// arms: a snapshot active alarm, the snapshot-complete sentinel, or a live
|
||||||
|
/// transition.
|
||||||
|
/// </summary>
|
||||||
|
private static string FormatAlarmFeedMessage(AlarmFeedMessage feedMessage)
|
||||||
|
{
|
||||||
|
return feedMessage.PayloadCase switch
|
||||||
|
{
|
||||||
|
AlarmFeedMessage.PayloadOneofCase.ActiveAlarm =>
|
||||||
|
$"active-alarm {ProtobufJsonFormatter.Format(feedMessage.ActiveAlarm)}",
|
||||||
|
AlarmFeedMessage.PayloadOneofCase.SnapshotComplete =>
|
||||||
|
$"snapshot-complete {feedMessage.SnapshotComplete}",
|
||||||
|
AlarmFeedMessage.PayloadOneofCase.Transition =>
|
||||||
|
$"transition {ProtobufJsonFormatter.Format(feedMessage.Transition)}",
|
||||||
|
_ => $"unknown-payload {feedMessage.PayloadCase}",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static JsonElement AlarmFeedMessageToJsonElement(AlarmFeedMessage feedMessage)
|
||||||
|
{
|
||||||
|
return JsonDocument.Parse(ProtobufJsonFormatter.Format(feedMessage)).RootElement.Clone();
|
||||||
|
}
|
||||||
|
|
||||||
private static async Task<int> SmokeAsync(
|
private static async Task<int> SmokeAsync(
|
||||||
CliArguments arguments,
|
CliArguments arguments,
|
||||||
IMxGatewayCliClient client,
|
IMxGatewayCliClient client,
|
||||||
@@ -1546,6 +1680,8 @@ public static class MxGatewayClientCli
|
|||||||
or "write-secured2-bulk"
|
or "write-secured2-bulk"
|
||||||
or "bench-read-bulk"
|
or "bench-read-bulk"
|
||||||
or "stream-events"
|
or "stream-events"
|
||||||
|
or "stream-alarms"
|
||||||
|
or "acknowledge-alarm"
|
||||||
or "write"
|
or "write"
|
||||||
or "write2"
|
or "write2"
|
||||||
or "smoke"
|
or "smoke"
|
||||||
@@ -1605,6 +1741,8 @@ public static class MxGatewayClientCli
|
|||||||
writer.WriteLine("mxgw-dotnet write-secured2-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> --current-user-id <n> [--verifier-user-id <n>] [--timestamp <iso>] [--json]");
|
writer.WriteLine("mxgw-dotnet write-secured2-bulk --session-id <id> --server-handle <n> --item-handles <n,n> --type <type> --values <v,v> --current-user-id <n> [--verifier-user-id <n>] [--timestamp <iso>] [--json]");
|
||||||
writer.WriteLine("mxgw-dotnet bench-read-bulk [--duration-seconds <n>] [--warmup-seconds <n>] [--bulk-size <n>] [--tag-start <n>] [--tag-prefix <s>] [--tag-attribute <s>] [--timeout-ms <n>] [--client-name <name>]");
|
writer.WriteLine("mxgw-dotnet bench-read-bulk [--duration-seconds <n>] [--warmup-seconds <n>] [--bulk-size <n>] [--tag-start <n>] [--tag-prefix <s>] [--tag-attribute <s>] [--timeout-ms <n>] [--client-name <name>]");
|
||||||
writer.WriteLine("mxgw-dotnet stream-events --session-id <id> [--max-events <n>] [--json]");
|
writer.WriteLine("mxgw-dotnet stream-events --session-id <id> [--max-events <n>] [--json]");
|
||||||
|
writer.WriteLine("mxgw-dotnet stream-alarms [--filter-prefix <ref>] [--max-events <n>] [--json] [--jsonl]");
|
||||||
|
writer.WriteLine("mxgw-dotnet acknowledge-alarm --reference <ref> [--comment <text>] [--operator <user>] [--json]");
|
||||||
writer.WriteLine("mxgw-dotnet write --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--json]");
|
writer.WriteLine("mxgw-dotnet write --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--json]");
|
||||||
writer.WriteLine("mxgw-dotnet write2 --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--timestamp <iso>] [--json]");
|
writer.WriteLine("mxgw-dotnet write2 --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--timestamp <iso>] [--json]");
|
||||||
writer.WriteLine("mxgw-dotnet smoke --item <ref> [--value <value> --type <type>] [--json]");
|
writer.WriteLine("mxgw-dotnet smoke --item <ref> [--value <value> --type <type>] [--json]");
|
||||||
|
|||||||
@@ -51,6 +51,11 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public List<(QueryActiveAlarmsRequest Request, CallOptions CallOptions)> QueryActiveAlarmsCalls { get; } = [];
|
public List<(QueryActiveAlarmsRequest Request, CallOptions CallOptions)> QueryActiveAlarmsCalls { get; } = [];
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets the list of captured StreamAlarmsAsync calls.
|
||||||
|
/// </summary>
|
||||||
|
public List<(StreamAlarmsRequest Request, CallOptions CallOptions)> StreamAlarmsCalls { get; } = [];
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Gets the queue of exceptions to throw from AcknowledgeAlarmAsync.
|
/// Gets the queue of exceptions to throw from AcknowledgeAlarmAsync.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
@@ -58,6 +63,7 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx
|
|||||||
|
|
||||||
private readonly Queue<AcknowledgeAlarmReply> _acknowledgeReplies = new();
|
private readonly Queue<AcknowledgeAlarmReply> _acknowledgeReplies = new();
|
||||||
private readonly List<ActiveAlarmSnapshot> _activeAlarmSnapshots = [];
|
private readonly List<ActiveAlarmSnapshot> _activeAlarmSnapshots = [];
|
||||||
|
private readonly List<AlarmFeedMessage> _alarmFeedMessages = [];
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Gets or sets the reply to return from OpenSessionAsync.
|
/// Gets or sets the reply to return from OpenSessionAsync.
|
||||||
@@ -238,4 +244,27 @@ internal sealed class FakeGatewayTransport(MxGatewayClientOptions options) : IMx
|
|||||||
{
|
{
|
||||||
_activeAlarmSnapshots.Add(snapshot);
|
_activeAlarmSnapshots.Add(snapshot);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Records the stream-alarms call and yields each enqueued feed message.
|
||||||
|
/// </summary>
|
||||||
|
public async IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
|
||||||
|
StreamAlarmsRequest request,
|
||||||
|
CallOptions callOptions)
|
||||||
|
{
|
||||||
|
StreamAlarmsCalls.Add((request, callOptions));
|
||||||
|
|
||||||
|
foreach (AlarmFeedMessage message in _alarmFeedMessages)
|
||||||
|
{
|
||||||
|
callOptions.CancellationToken.ThrowIfCancellationRequested();
|
||||||
|
await Task.Yield();
|
||||||
|
yield return message;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Enqueues an alarm feed message to be yielded from StreamAlarmsAsync.</summary>
|
||||||
|
public void AddAlarmFeedMessage(AlarmFeedMessage message)
|
||||||
|
{
|
||||||
|
_alarmFeedMessages.Add(message);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -148,6 +148,87 @@ public sealed class MxGatewayClientCliTests
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// <summary>Verifies that stream-alarms with --max-events stops output and distinguishes payload cases.</summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task RunAsync_StreamAlarms_WithMaxEventsStopsAndDistinguishesPayloadCases()
|
||||||
|
{
|
||||||
|
using var output = new StringWriter();
|
||||||
|
using var error = new StringWriter();
|
||||||
|
FakeCliClient fakeClient = new();
|
||||||
|
fakeClient.AlarmFeedMessages.Add(new AlarmFeedMessage
|
||||||
|
{
|
||||||
|
ActiveAlarm = new ActiveAlarmSnapshot { AlarmFullReference = "Tank01.Level.HiHi" },
|
||||||
|
});
|
||||||
|
fakeClient.AlarmFeedMessages.Add(new AlarmFeedMessage { SnapshotComplete = true });
|
||||||
|
|
||||||
|
int exitCode = await MxGatewayClientCli.RunAsync(
|
||||||
|
[
|
||||||
|
"stream-alarms",
|
||||||
|
"--endpoint",
|
||||||
|
"http://localhost:5000",
|
||||||
|
"--api-key",
|
||||||
|
"test-api-key",
|
||||||
|
"--filter-prefix",
|
||||||
|
"Tank01",
|
||||||
|
"--max-events",
|
||||||
|
"1",
|
||||||
|
],
|
||||||
|
output,
|
||||||
|
error,
|
||||||
|
_ => fakeClient);
|
||||||
|
|
||||||
|
Assert.Equal(0, exitCode);
|
||||||
|
StreamAlarmsRequest request = Assert.Single(fakeClient.StreamAlarmsRequests);
|
||||||
|
Assert.Equal("Tank01", request.AlarmFilterPrefix);
|
||||||
|
string text = output.ToString();
|
||||||
|
Assert.Contains("active-alarm", text);
|
||||||
|
Assert.Contains("Tank01.Level.HiHi", text);
|
||||||
|
Assert.DoesNotContain("snapshot-complete", text);
|
||||||
|
Assert.Equal(string.Empty, error.ToString());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Verifies that acknowledge-alarm builds a request and prints the JSON reply.</summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task RunAsync_AcknowledgeAlarm_BuildsRequestAndPrintsJsonReply()
|
||||||
|
{
|
||||||
|
using var output = new StringWriter();
|
||||||
|
using var error = new StringWriter();
|
||||||
|
FakeCliClient fakeClient = new();
|
||||||
|
fakeClient.AcknowledgeAlarmReplies.Enqueue(new AcknowledgeAlarmReply
|
||||||
|
{
|
||||||
|
CorrelationId = "ack-fixture",
|
||||||
|
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
|
||||||
|
Hresult = 0,
|
||||||
|
});
|
||||||
|
|
||||||
|
int exitCode = await MxGatewayClientCli.RunAsync(
|
||||||
|
[
|
||||||
|
"acknowledge-alarm",
|
||||||
|
"--endpoint",
|
||||||
|
"http://localhost:5000",
|
||||||
|
"--api-key",
|
||||||
|
"test-api-key",
|
||||||
|
"--reference",
|
||||||
|
"Tank01.Level.HiHi",
|
||||||
|
"--comment",
|
||||||
|
"ack from cli",
|
||||||
|
"--operator",
|
||||||
|
"operator1",
|
||||||
|
"--json",
|
||||||
|
],
|
||||||
|
output,
|
||||||
|
error,
|
||||||
|
_ => fakeClient);
|
||||||
|
|
||||||
|
Assert.Equal(0, exitCode);
|
||||||
|
AcknowledgeAlarmRequest request = Assert.Single(fakeClient.AcknowledgeAlarmRequests);
|
||||||
|
Assert.Equal("Tank01.Level.HiHi", request.AlarmFullReference);
|
||||||
|
Assert.Equal("ack from cli", request.Comment);
|
||||||
|
Assert.Equal("operator1", request.OperatorUser);
|
||||||
|
Assert.Contains("ack-fixture", output.ToString());
|
||||||
|
Assert.Equal(string.Empty, error.ToString());
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Verifies that smoke command closes opened session when a command fails.</summary>
|
/// <summary>Verifies that smoke command closes opened session when a command fails.</summary>
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task RunAsync_Smoke_WhenCommandFails_ClosesOpenedSession()
|
public async Task RunAsync_Smoke_WhenCommandFails_ClosesOpenedSession()
|
||||||
@@ -507,6 +588,41 @@ public sealed class MxGatewayClientCliTests
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>Queue of acknowledge-alarm replies to return.</summary>
|
||||||
|
public Queue<AcknowledgeAlarmReply> AcknowledgeAlarmReplies { get; } = new();
|
||||||
|
|
||||||
|
/// <summary>List of received acknowledge-alarm requests.</summary>
|
||||||
|
public List<AcknowledgeAlarmRequest> AcknowledgeAlarmRequests { get; } = [];
|
||||||
|
|
||||||
|
/// <summary>List of received stream-alarms requests.</summary>
|
||||||
|
public List<StreamAlarmsRequest> StreamAlarmsRequests { get; } = [];
|
||||||
|
|
||||||
|
/// <summary>List of alarm feed messages to yield when streaming alarms.</summary>
|
||||||
|
public List<AlarmFeedMessage> AlarmFeedMessages { get; } = [];
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task<AcknowledgeAlarmReply> AcknowledgeAlarmAsync(
|
||||||
|
AcknowledgeAlarmRequest request,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
AcknowledgeAlarmRequests.Add(request);
|
||||||
|
return Task.FromResult(AcknowledgeAlarmReplies.Dequeue());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
|
||||||
|
StreamAlarmsRequest request,
|
||||||
|
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
StreamAlarmsRequests.Add(request);
|
||||||
|
foreach (AlarmFeedMessage feedMessage in AlarmFeedMessages)
|
||||||
|
{
|
||||||
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
|
await Task.Yield();
|
||||||
|
yield return feedMessage;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Galaxy test connection reply to return.</summary>
|
/// <summary>Galaxy test connection reply to return.</summary>
|
||||||
public TestConnectionReply GalaxyTestConnectionReply { get; set; } = new() { Ok = true };
|
public TestConnectionReply GalaxyTestConnectionReply { get; set; } = new() { Ok = true };
|
||||||
|
|
||||||
|
|||||||
@@ -175,6 +175,48 @@ internal sealed class GrpcMxGatewayClientTransport(
|
|||||||
return QueryActiveAlarmsAsync(request, callOptions);
|
return QueryActiveAlarmsAsync(request, callOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
|
||||||
|
StreamAlarmsRequest request,
|
||||||
|
CallOptions callOptions,
|
||||||
|
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
CancellationToken effectiveCancellationToken = cancellationToken.CanBeCanceled
|
||||||
|
? cancellationToken
|
||||||
|
: callOptions.CancellationToken;
|
||||||
|
|
||||||
|
using AsyncServerStreamingCall<AlarmFeedMessage> call = RawClient.StreamAlarms(request, callOptions);
|
||||||
|
|
||||||
|
IAsyncStreamReader<AlarmFeedMessage> responseStream = call.ResponseStream;
|
||||||
|
while (true)
|
||||||
|
{
|
||||||
|
AlarmFeedMessage? message;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (!await responseStream.MoveNext(effectiveCancellationToken).ConfigureAwait(false))
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
message = responseStream.Current;
|
||||||
|
}
|
||||||
|
catch (RpcException exception)
|
||||||
|
{
|
||||||
|
throw MapRpcException(exception, effectiveCancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
yield return message;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
IAsyncEnumerable<AlarmFeedMessage> IMxGatewayClientTransport.StreamAlarmsAsync(
|
||||||
|
StreamAlarmsRequest request,
|
||||||
|
CallOptions callOptions)
|
||||||
|
{
|
||||||
|
return StreamAlarmsAsync(request, callOptions);
|
||||||
|
}
|
||||||
|
|
||||||
private static Exception MapRpcException(
|
private static Exception MapRpcException(
|
||||||
RpcException exception,
|
RpcException exception,
|
||||||
CancellationToken cancellationToken)
|
CancellationToken cancellationToken)
|
||||||
|
|||||||
@@ -75,4 +75,15 @@ internal interface IMxGatewayClientTransport
|
|||||||
IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync(
|
IAsyncEnumerable<ActiveAlarmSnapshot> QueryActiveAlarmsAsync(
|
||||||
QueryActiveAlarmsRequest request,
|
QueryActiveAlarmsRequest request,
|
||||||
CallOptions callOptions);
|
CallOptions callOptions);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Attaches to the gateway's central alarm feed — the current active-alarm
|
||||||
|
/// snapshot followed by live transitions.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="request">The stream request, optionally scoped by alarm-reference prefix.</param>
|
||||||
|
/// <param name="callOptions">gRPC call options.</param>
|
||||||
|
/// <returns>An async enumerable of alarm feed messages.</returns>
|
||||||
|
IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
|
||||||
|
StreamAlarmsRequest request,
|
||||||
|
CallOptions callOptions);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -224,6 +224,28 @@ public sealed class MxGatewayClient : IAsyncDisposable
|
|||||||
return _transport.QueryActiveAlarmsAsync(request, CreateStreamCallOptions(cancellationToken));
|
return _transport.QueryActiveAlarmsAsync(request, CreateStreamCallOptions(cancellationToken));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Attaches to the gateway's central alarm feed. The stream opens with one
|
||||||
|
/// <see cref="AlarmFeedMessage"/> per currently-active alarm (the
|
||||||
|
/// ConditionRefresh snapshot), then a single <c>snapshot_complete</c>, then a
|
||||||
|
/// <c>transition</c> for every subsequent raise / acknowledge / clear. Served
|
||||||
|
/// by the gateway's always-on alarm monitor — no worker session is opened, so
|
||||||
|
/// any number of clients may attach. Optionally scoped by alarm-reference
|
||||||
|
/// prefix (<see cref="StreamAlarmsRequest.AlarmFilterPrefix"/>).
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="request">The stream request, optionally scoped by alarm-reference prefix.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token for the stream.</param>
|
||||||
|
/// <returns>An async enumerable of alarm feed messages.</returns>
|
||||||
|
public IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
|
||||||
|
StreamAlarmsRequest request,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(request);
|
||||||
|
ThrowIfDisposed();
|
||||||
|
|
||||||
|
return _transport.StreamAlarmsAsync(request, CreateStreamCallOptions(cancellationToken));
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Disposes the client and releases all resources.
|
/// Disposes the client and releases all resources.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
@@ -107,6 +107,10 @@ func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) err
|
|||||||
return runWrite(ctx, args[1:], stdout, stderr)
|
return runWrite(ctx, args[1:], stdout, stderr)
|
||||||
case "stream-events":
|
case "stream-events":
|
||||||
return runStreamEvents(ctx, args[1:], stdout, stderr)
|
return runStreamEvents(ctx, args[1:], stdout, stderr)
|
||||||
|
case "stream-alarms":
|
||||||
|
return runStreamAlarms(ctx, args[1:], stdout, stderr)
|
||||||
|
case "acknowledge-alarm":
|
||||||
|
return runAcknowledgeAlarm(ctx, args[1:], stdout, stderr)
|
||||||
case "smoke":
|
case "smoke":
|
||||||
return runSmoke(ctx, args[1:], stdout, stderr)
|
return runSmoke(ctx, args[1:], stdout, stderr)
|
||||||
case "galaxy-test-connection":
|
case "galaxy-test-connection":
|
||||||
@@ -796,6 +800,119 @@ func runStreamEvents(ctx context.Context, args []string, stdout, stderr io.Write
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func runStreamAlarms(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
|
flags := flag.NewFlagSet("stream-alarms", flag.ContinueOnError)
|
||||||
|
flags.SetOutput(stderr)
|
||||||
|
common := bindCommonFlags(flags)
|
||||||
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
||||||
|
filterPrefix := flags.String("filter-prefix", "", "alarm-reference prefix scoping the feed; empty means unscoped")
|
||||||
|
limit := flags.Int("limit", 0, "maximum feed messages to read; 0 means unbounded")
|
||||||
|
|
||||||
|
if err := flags.Parse(args); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
client, _, err := dialForCommand(ctx, common)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Mirror runStreamEvents so Ctrl+C on a long-running stream-alarms command
|
||||||
|
// cancels the gRPC stream cleanly (the gateway sees codes.Canceled rather
|
||||||
|
// than a torn TCP connection) and the deferred client.Close() actually runs.
|
||||||
|
signalCtx, stopSignals := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
|
||||||
|
defer stopSignals()
|
||||||
|
|
||||||
|
streamCtx, cancelStream := context.WithCancel(signalCtx)
|
||||||
|
defer cancelStream()
|
||||||
|
stream, err := client.StreamAlarms(streamCtx, &mxgateway.StreamAlarmsRequest{AlarmFilterPrefix: *filterPrefix})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
count := 0
|
||||||
|
for {
|
||||||
|
message, err := stream.Recv()
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if *jsonOutput {
|
||||||
|
fmt.Fprintln(stdout, string(mustMarshalProto(message)))
|
||||||
|
} else {
|
||||||
|
fmt.Fprintln(stdout, formatAlarmFeedMessage(message))
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
if *limit > 0 && count >= *limit {
|
||||||
|
cancelStream()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// formatAlarmFeedMessage renders one AlarmFeedMessage in the CLI's plain-text
|
||||||
|
// output style, distinguishing the active-alarm snapshot, snapshot-complete
|
||||||
|
// sentinel, and transition cases of the message's payload oneof.
|
||||||
|
func formatAlarmFeedMessage(message *mxgateway.AlarmFeedMessage) string {
|
||||||
|
switch {
|
||||||
|
case message.GetActiveAlarm() != nil:
|
||||||
|
alarm := message.GetActiveAlarm()
|
||||||
|
return fmt.Sprintf("active-alarm %s state=%s severity=%d", alarm.GetAlarmFullReference(), alarm.GetCurrentState(), alarm.GetSeverity())
|
||||||
|
case message.GetSnapshotComplete():
|
||||||
|
return "snapshot-complete"
|
||||||
|
case message.GetTransition() != nil:
|
||||||
|
transition := message.GetTransition()
|
||||||
|
return fmt.Sprintf("transition %s kind=%s severity=%d", transition.GetAlarmFullReference(), transition.GetTransitionKind(), transition.GetSeverity())
|
||||||
|
default:
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func runAcknowledgeAlarm(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
|
flags := flag.NewFlagSet("acknowledge-alarm", flag.ContinueOnError)
|
||||||
|
flags.SetOutput(stderr)
|
||||||
|
common := bindCommonFlags(flags)
|
||||||
|
jsonOutput := flags.Bool("json", false, "write JSON output")
|
||||||
|
reference := flags.String("reference", "", "full alarm reference to acknowledge")
|
||||||
|
comment := flags.String("comment", "", "operator acknowledge comment")
|
||||||
|
operator := flags.String("operator", "", "operator user performing the acknowledge")
|
||||||
|
|
||||||
|
if err := flags.Parse(args); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if *reference == "" {
|
||||||
|
return errors.New("reference is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
client, options, err := dialForCommand(ctx, common)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
reply, err := client.AcknowledgeAlarm(ctx, &mxgateway.AcknowledgeAlarmRequest{
|
||||||
|
AlarmFullReference: *reference,
|
||||||
|
Comment: *comment,
|
||||||
|
OperatorUser: *operator,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if *jsonOutput {
|
||||||
|
return writeJSON(stdout, commandReplyOutput{
|
||||||
|
Command: "acknowledge-alarm",
|
||||||
|
Options: options,
|
||||||
|
Reply: mustMarshalProto(reply),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Fprintln(stdout, reply.GetHresult())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func runSmoke(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
func runSmoke(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||||
flags := flag.NewFlagSet("smoke", flag.ContinueOnError)
|
flags := flag.NewFlagSet("smoke", flag.ContinueOnError)
|
||||||
flags.SetOutput(stderr)
|
flags.SetOutput(stderr)
|
||||||
@@ -1064,7 +1181,7 @@ type protojsonMessage interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func writeUsage(writer io.Writer) {
|
func writeUsage(writer io.Writer) {
|
||||||
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|read-bulk|write-bulk|write2-bulk|write-secured-bulk|write-secured2-bulk|bench-read-bulk|write|stream-events|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
|
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|read-bulk|write-bulk|write2-bulk|write-secured-bulk|write-secured2-bulk|bench-read-bulk|write|stream-events|stream-alarms|acknowledge-alarm|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
|
||||||
}
|
}
|
||||||
|
|
||||||
// batchEOR is the end-of-result sentinel emitted to stdout after every command
|
// batchEOR is the end-of-result sentinel emitted to stdout after every command
|
||||||
|
|||||||
@@ -51,3 +51,26 @@ func (c *Client) QueryActiveAlarms(ctx context.Context, req *QueryActiveAlarmsRe
|
|||||||
|
|
||||||
return stream, nil
|
return stream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StreamAlarms attaches to the gateway's central alarm feed. The stream opens
|
||||||
|
// with one AlarmFeedMessage per currently-active alarm (the ConditionRefresh
|
||||||
|
// snapshot), then a single snapshot-complete sentinel, then a transition for
|
||||||
|
// every subsequent raise / acknowledge / clear. It is served by the gateway's
|
||||||
|
// always-on alarm monitor — no worker session is opened — so any number of
|
||||||
|
// clients may attach.
|
||||||
|
//
|
||||||
|
// The returned stream is owned by the caller; cancel ctx to release it.
|
||||||
|
// Optional alarm-reference prefix scoping (req.AlarmFilterPrefix) limits the
|
||||||
|
// stream to a sub-tree.
|
||||||
|
func (c *Client) StreamAlarms(ctx context.Context, req *StreamAlarmsRequest) (StreamAlarmsClient, error) {
|
||||||
|
if req == nil {
|
||||||
|
return nil, errors.New("mxgateway: stream alarms request is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
stream, err := c.raw.StreamAlarms(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, &GatewayError{Op: "stream alarms", Err: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
return stream, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -112,6 +112,11 @@ type (
|
|||||||
AcknowledgeAlarmReply = pb.AcknowledgeAlarmReply
|
AcknowledgeAlarmReply = pb.AcknowledgeAlarmReply
|
||||||
// QueryActiveAlarmsRequest is the gateway QueryActiveAlarms request message.
|
// QueryActiveAlarmsRequest is the gateway QueryActiveAlarms request message.
|
||||||
QueryActiveAlarmsRequest = pb.QueryActiveAlarmsRequest
|
QueryActiveAlarmsRequest = pb.QueryActiveAlarmsRequest
|
||||||
|
// StreamAlarmsRequest is the gateway StreamAlarms request message.
|
||||||
|
StreamAlarmsRequest = pb.StreamAlarmsRequest
|
||||||
|
// AlarmFeedMessage is one message on the StreamAlarms feed — an
|
||||||
|
// active-alarm snapshot row, a snapshot-complete sentinel, or a transition.
|
||||||
|
AlarmFeedMessage = pb.AlarmFeedMessage
|
||||||
// ActiveAlarmSnapshot is one row in a ConditionRefresh stream.
|
// ActiveAlarmSnapshot is one row in a ConditionRefresh stream.
|
||||||
ActiveAlarmSnapshot = pb.ActiveAlarmSnapshot
|
ActiveAlarmSnapshot = pb.ActiveAlarmSnapshot
|
||||||
// OnAlarmTransitionEvent is the body carried by alarm-transition MxEvents.
|
// OnAlarmTransitionEvent is the body carried by alarm-transition MxEvents.
|
||||||
@@ -130,6 +135,10 @@ type AlarmConditionState = pb.AlarmConditionState
|
|||||||
// QueryActiveAlarms RPC.
|
// QueryActiveAlarms RPC.
|
||||||
type QueryActiveAlarmsClient = pb.MxAccessGateway_QueryActiveAlarmsClient
|
type QueryActiveAlarmsClient = pb.MxAccessGateway_QueryActiveAlarmsClient
|
||||||
|
|
||||||
|
// StreamAlarmsClient is the generated server-streaming client for the
|
||||||
|
// StreamAlarms RPC.
|
||||||
|
type StreamAlarmsClient = pb.MxAccessGateway_StreamAlarmsClient
|
||||||
|
|
||||||
// Enumerations from the generated contract re-exported for client callers.
|
// Enumerations from the generated contract re-exported for client callers.
|
||||||
type (
|
type (
|
||||||
// MxCommandKind discriminates which MXAccess command an MxCommand carries.
|
// MxCommandKind discriminates which MXAccess command an MxCommand carries.
|
||||||
|
|||||||
+166
@@ -3,6 +3,7 @@ package com.zb.mom.ww.mxgateway.cli;
|
|||||||
import com.zb.mom.ww.mxgateway.client.DeployEventStream;
|
import com.zb.mom.ww.mxgateway.client.DeployEventStream;
|
||||||
import com.zb.mom.ww.mxgateway.client.GalaxyRepositoryClient;
|
import com.zb.mom.ww.mxgateway.client.GalaxyRepositoryClient;
|
||||||
import com.zb.mom.ww.mxgateway.client.MxEventStream;
|
import com.zb.mom.ww.mxgateway.client.MxEventStream;
|
||||||
|
import com.zb.mom.ww.mxgateway.client.MxGatewayAlarmFeedSubscription;
|
||||||
import com.zb.mom.ww.mxgateway.client.MxGatewayClient;
|
import com.zb.mom.ww.mxgateway.client.MxGatewayClient;
|
||||||
import com.zb.mom.ww.mxgateway.client.MxGatewayClientOptions;
|
import com.zb.mom.ww.mxgateway.client.MxGatewayClientOptions;
|
||||||
import com.zb.mom.ww.mxgateway.client.MxGatewayClientVersion;
|
import com.zb.mom.ww.mxgateway.client.MxGatewayClientVersion;
|
||||||
@@ -14,6 +15,7 @@ import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyAttribute;
|
|||||||
import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyObject;
|
import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyObject;
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
import com.google.protobuf.util.JsonFormat;
|
import com.google.protobuf.util.JsonFormat;
|
||||||
|
import io.grpc.stub.StreamObserver;
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
@@ -28,14 +30,22 @@ import java.util.LinkedHashMap;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
|
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
|
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
|
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
|
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
|
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
|
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
|
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry;
|
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry;
|
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry;
|
||||||
@@ -127,6 +137,8 @@ public final class MxGatewayCli implements Callable<Integer> {
|
|||||||
commandLine.addSubcommand("bench-read-bulk", new BenchReadBulkCommand(clientFactory));
|
commandLine.addSubcommand("bench-read-bulk", new BenchReadBulkCommand(clientFactory));
|
||||||
commandLine.addSubcommand("write", new WriteCommand(clientFactory));
|
commandLine.addSubcommand("write", new WriteCommand(clientFactory));
|
||||||
commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory));
|
commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory));
|
||||||
|
commandLine.addSubcommand("stream-alarms", new StreamAlarmsCommand(clientFactory));
|
||||||
|
commandLine.addSubcommand("acknowledge-alarm", new AcknowledgeAlarmCommand(clientFactory));
|
||||||
commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory));
|
commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory));
|
||||||
commandLine.addSubcommand("galaxy-test", new GalaxyTestConnectionCommand());
|
commandLine.addSubcommand("galaxy-test", new GalaxyTestConnectionCommand());
|
||||||
commandLine.addSubcommand("galaxy-deploy-time", new GalaxyDeployTimeCommand());
|
commandLine.addSubcommand("galaxy-deploy-time", new GalaxyDeployTimeCommand());
|
||||||
@@ -139,6 +151,9 @@ public final class MxGatewayCli implements Callable<Integer> {
|
|||||||
/** Sentinel written to stdout after every command result in batch mode. */
|
/** Sentinel written to stdout after every command result in batch mode. */
|
||||||
static final String BATCH_EOR = "__MXGW_BATCH_EOR__";
|
static final String BATCH_EOR = "__MXGW_BATCH_EOR__";
|
||||||
|
|
||||||
|
/** Sentinel queued by {@code stream-alarms} to mark a clean end of the alarm feed. */
|
||||||
|
private static final Object ALARM_FEED_END = new Object();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads one CLI invocation per stdin line, executes each via a fresh
|
* Reads one CLI invocation per stdin line, executes each via a fresh
|
||||||
* {@link CommandLine}, and writes {@value #BATCH_EOR} to stdout after
|
* {@link CommandLine}, and writes {@value #BATCH_EOR} to stdout after
|
||||||
@@ -1041,6 +1056,115 @@ public final class MxGatewayCli implements Callable<Integer> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Command(name = "stream-alarms", description = "Streams the gateway central alarm feed.")
|
||||||
|
static final class StreamAlarmsCommand extends GatewayCommand {
|
||||||
|
@Option(names = "--filter-prefix", description = "Alarm-reference prefix scoping the feed; empty means unscoped.")
|
||||||
|
String filterPrefix = "";
|
||||||
|
|
||||||
|
@Option(names = "--limit", defaultValue = "0", description = "Maximum feed messages to print.")
|
||||||
|
int limit;
|
||||||
|
|
||||||
|
StreamAlarmsCommand(MxGatewayCliClientFactory clientFactory) {
|
||||||
|
super(clientFactory);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer call() {
|
||||||
|
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
|
||||||
|
// The async alarm feed delivers on a background gRPC thread; buffer
|
||||||
|
// messages in a bounded queue and drain them on this thread so the
|
||||||
|
// --limit termination mirrors stream-events. 1024 absorbs the
|
||||||
|
// gateway's initial active-alarm snapshot burst.
|
||||||
|
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(1024);
|
||||||
|
StreamAlarmsRequest request = StreamAlarmsRequest.newBuilder()
|
||||||
|
.setAlarmFilterPrefix(filterPrefix)
|
||||||
|
.build();
|
||||||
|
MxGatewayAlarmFeedSubscription subscription =
|
||||||
|
client.streamAlarms(request, new StreamObserver<>() {
|
||||||
|
@Override
|
||||||
|
public void onNext(AlarmFeedMessage value) {
|
||||||
|
queue.offer(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable error) {
|
||||||
|
queue.offer(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCompleted() {
|
||||||
|
queue.offer(ALARM_FEED_END);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
int count = 0;
|
||||||
|
while (true) {
|
||||||
|
Object item = queue.take();
|
||||||
|
if (item == ALARM_FEED_END) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (item instanceof Throwable error) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"gateway stream alarms failed: " + error.getMessage(), error);
|
||||||
|
}
|
||||||
|
AlarmFeedMessage message = (AlarmFeedMessage) item;
|
||||||
|
if (json) {
|
||||||
|
client.out().println(protoJson(message));
|
||||||
|
} else {
|
||||||
|
client.out().println(formatAlarmFeedMessage(message));
|
||||||
|
}
|
||||||
|
client.out().flush();
|
||||||
|
count++;
|
||||||
|
if (limit > 0 && count >= limit) {
|
||||||
|
subscription.cancel();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException error) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
subscription.cancel();
|
||||||
|
} finally {
|
||||||
|
subscription.cancel();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Command(name = "acknowledge-alarm", description = "Acknowledges an active MXAccess alarm.")
|
||||||
|
static final class AcknowledgeAlarmCommand extends GatewayCommand {
|
||||||
|
@Option(names = "--reference", required = true, description = "Full alarm reference to acknowledge.")
|
||||||
|
String reference;
|
||||||
|
|
||||||
|
@Option(names = "--comment", description = "Operator acknowledge comment.")
|
||||||
|
String comment = "";
|
||||||
|
|
||||||
|
@Option(names = "--operator", description = "Operator user performing the acknowledge.")
|
||||||
|
String operator = "";
|
||||||
|
|
||||||
|
AcknowledgeAlarmCommand(MxGatewayCliClientFactory clientFactory) {
|
||||||
|
super(clientFactory);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer call() {
|
||||||
|
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
|
||||||
|
AcknowledgeAlarmReply reply = client.acknowledgeAlarm(AcknowledgeAlarmRequest.newBuilder()
|
||||||
|
.setAlarmFullReference(reference)
|
||||||
|
.setComment(comment)
|
||||||
|
.setOperatorUser(operator)
|
||||||
|
.build());
|
||||||
|
writeOutput(
|
||||||
|
"acknowledge-alarm",
|
||||||
|
common,
|
||||||
|
json,
|
||||||
|
reply,
|
||||||
|
() -> Integer.toString(reply.getHresult()));
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Command(name = "smoke", description = "Runs a bounded open/register/add/advise flow.")
|
@Command(name = "smoke", description = "Runs a bounded open/register/add/advise flow.")
|
||||||
static final class SmokeCommand extends GatewayCommand {
|
static final class SmokeCommand extends GatewayCommand {
|
||||||
@Option(names = "--client-name", defaultValue = "mxgw-java-smoke", description = "MXAccess client name.")
|
@Option(names = "--client-name", defaultValue = "mxgw-java-smoke", description = "MXAccess client name.")
|
||||||
@@ -1160,6 +1284,11 @@ public final class MxGatewayCli implements Callable<Integer> {
|
|||||||
|
|
||||||
MxGatewayCliSession session(String sessionId);
|
MxGatewayCliSession session(String sessionId);
|
||||||
|
|
||||||
|
AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request);
|
||||||
|
|
||||||
|
MxGatewayAlarmFeedSubscription streamAlarms(
|
||||||
|
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void close();
|
void close();
|
||||||
}
|
}
|
||||||
@@ -1232,6 +1361,17 @@ public final class MxGatewayCli implements Callable<Integer> {
|
|||||||
return new GrpcMxGatewayCliSession(MxGatewaySession.forSessionId(client, sessionId));
|
return new GrpcMxGatewayCliSession(MxGatewaySession.forSessionId(client, sessionId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) {
|
||||||
|
return client.acknowledgeAlarm(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MxGatewayAlarmFeedSubscription streamAlarms(
|
||||||
|
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer) {
|
||||||
|
return client.streamAlarms(request, observer);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
client.close();
|
client.close();
|
||||||
@@ -1407,6 +1547,32 @@ public final class MxGatewayCli implements Callable<Integer> {
|
|||||||
return values;
|
return values;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Renders one {@link AlarmFeedMessage} in the CLI's plain-text output
|
||||||
|
* style, distinguishing the active-alarm snapshot, snapshot-complete
|
||||||
|
* sentinel, and transition cases of the message's {@code payload} oneof.
|
||||||
|
*/
|
||||||
|
private static String formatAlarmFeedMessage(AlarmFeedMessage message) {
|
||||||
|
return switch (message.getPayloadCase()) {
|
||||||
|
case ACTIVE_ALARM -> {
|
||||||
|
ActiveAlarmSnapshot alarm = message.getActiveAlarm();
|
||||||
|
yield String.format(
|
||||||
|
"active-alarm %s state=%s severity=%d",
|
||||||
|
alarm.getAlarmFullReference(), alarm.getCurrentState().name(), alarm.getSeverity());
|
||||||
|
}
|
||||||
|
case SNAPSHOT_COMPLETE -> "snapshot-complete";
|
||||||
|
case TRANSITION -> {
|
||||||
|
OnAlarmTransitionEvent transition = message.getTransition();
|
||||||
|
yield String.format(
|
||||||
|
"transition %s kind=%s severity=%d",
|
||||||
|
transition.getAlarmFullReference(),
|
||||||
|
transition.getTransitionKind().name(),
|
||||||
|
transition.getSeverity());
|
||||||
|
}
|
||||||
|
case PAYLOAD_NOT_SET -> "unknown";
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
private static MxValue parseValue(String type, String text) {
|
private static MxValue parseValue(String type, String text) {
|
||||||
return switch (type) {
|
return switch (type) {
|
||||||
case "bool" -> MxValues.boolValue(Boolean.parseBoolean(text));
|
case "bool" -> MxValues.boolValue(Boolean.parseBoolean(text));
|
||||||
|
|||||||
+110
@@ -4,6 +4,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
|||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import com.zb.mom.ww.mxgateway.client.MxGatewayAlarmFeedSubscription;
|
||||||
|
import io.grpc.stub.StreamObserver;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
@@ -12,7 +14,13 @@ import java.nio.charset.StandardCharsets;
|
|||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
|
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.AlarmConditionState;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.AlarmTransitionKind;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
|
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
|
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
|
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
|
||||||
@@ -21,12 +29,14 @@ import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
|
|||||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
|
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
|
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
|
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
|
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
|
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
|
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.RegisterReply;
|
import mxaccess_gateway.v1.MxaccessGateway.RegisterReply;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.SessionState;
|
import mxaccess_gateway.v1.MxaccessGateway.SessionState;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
|
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry;
|
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry;
|
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry;
|
||||||
@@ -151,6 +161,70 @@ final class MxGatewayCliTests {
|
|||||||
assertTrue(run.output().contains("\"wasSuccessful\":true"));
|
assertTrue(run.output().contains("\"wasSuccessful\":true"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- stream-alarms / acknowledge-alarm subcommands ----
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void streamAlarmsCommandForwardsFilterPrefixAndPrintsFeedMessages() {
|
||||||
|
FakeClientFactory factory = new FakeClientFactory();
|
||||||
|
CliRun run = execute(factory, "stream-alarms", "--filter-prefix", "Tank01");
|
||||||
|
|
||||||
|
assertEquals(0, run.exitCode());
|
||||||
|
assertEquals("Tank01", factory.client.lastStreamAlarmsRequest.getAlarmFilterPrefix());
|
||||||
|
String out = run.output();
|
||||||
|
assertTrue(out.contains("active-alarm Tank01.Level.HiHi"), out);
|
||||||
|
assertTrue(out.contains("snapshot-complete"), out);
|
||||||
|
assertTrue(out.contains("transition Tank01.Level.HiHi"), out);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void streamAlarmsCommandHonoursLimit() {
|
||||||
|
FakeClientFactory factory = new FakeClientFactory();
|
||||||
|
CliRun run = execute(factory, "stream-alarms", "--limit", "1");
|
||||||
|
|
||||||
|
assertEquals(0, run.exitCode());
|
||||||
|
long lines = run.output().lines().filter(line -> !line.isBlank()).count();
|
||||||
|
assertEquals(1, lines, "expected exactly one feed message with --limit 1, got: " + run.output());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void streamAlarmsCommandPrintsJson() {
|
||||||
|
FakeClientFactory factory = new FakeClientFactory();
|
||||||
|
CliRun run = execute(factory, "stream-alarms", "--json");
|
||||||
|
|
||||||
|
assertEquals(0, run.exitCode());
|
||||||
|
assertTrue(run.output().contains("\"activeAlarm\""), run.output());
|
||||||
|
assertTrue(run.output().contains("\"snapshotComplete\""), run.output());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void acknowledgeAlarmCommandForwardsOptionsAndPrintsReply() {
|
||||||
|
FakeClientFactory factory = new FakeClientFactory();
|
||||||
|
CliRun run = execute(
|
||||||
|
factory,
|
||||||
|
"acknowledge-alarm",
|
||||||
|
"--reference",
|
||||||
|
"Tank01.Level.HiHi",
|
||||||
|
"--comment",
|
||||||
|
"checked",
|
||||||
|
"--operator",
|
||||||
|
"operator1",
|
||||||
|
"--json");
|
||||||
|
|
||||||
|
assertEquals(0, run.exitCode());
|
||||||
|
assertEquals("Tank01.Level.HiHi", factory.client.lastAcknowledgeAlarmRequest.getAlarmFullReference());
|
||||||
|
assertEquals("checked", factory.client.lastAcknowledgeAlarmRequest.getComment());
|
||||||
|
assertEquals("operator1", factory.client.lastAcknowledgeAlarmRequest.getOperatorUser());
|
||||||
|
assertTrue(run.output().contains("\"command\":\"acknowledge-alarm\""), run.output());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void acknowledgeAlarmCommandRequiresReference() {
|
||||||
|
CliRun run = execute(new FakeClientFactory(), "acknowledge-alarm", "--comment", "checked");
|
||||||
|
|
||||||
|
assertFalse(run.exitCode() == 0, "expected non-zero exit without --reference");
|
||||||
|
assertTrue(run.errors().contains("--reference"), run.errors());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void batchCommandExecutesVersionAndEmitsEorMarker() {
|
void batchCommandExecutesVersionAndEmitsEorMarker() {
|
||||||
CliRun run = executeBatch(new FakeClientFactory(), "version --json\n");
|
CliRun run = executeBatch(new FakeClientFactory(), "version --json\n");
|
||||||
@@ -220,6 +294,8 @@ final class MxGatewayCliTests {
|
|||||||
private final PrintWriter out;
|
private final PrintWriter out;
|
||||||
private final FakeSession session = new FakeSession();
|
private final FakeSession session = new FakeSession();
|
||||||
private boolean closeCalled;
|
private boolean closeCalled;
|
||||||
|
private AcknowledgeAlarmRequest lastAcknowledgeAlarmRequest;
|
||||||
|
private StreamAlarmsRequest lastStreamAlarmsRequest;
|
||||||
|
|
||||||
private FakeClient(PrintWriter out) {
|
private FakeClient(PrintWriter out) {
|
||||||
this.out = out;
|
this.out = out;
|
||||||
@@ -253,6 +329,40 @@ final class MxGatewayCliTests {
|
|||||||
return session;
|
return session;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) {
|
||||||
|
lastAcknowledgeAlarmRequest = request;
|
||||||
|
return AcknowledgeAlarmReply.newBuilder()
|
||||||
|
.setCorrelationId(request.getClientCorrelationId())
|
||||||
|
.setProtocolStatus(ok())
|
||||||
|
.setHresult(0)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MxGatewayAlarmFeedSubscription streamAlarms(
|
||||||
|
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer) {
|
||||||
|
lastStreamAlarmsRequest = request;
|
||||||
|
// Replay a deterministic active-alarm snapshot, snapshot-complete
|
||||||
|
// sentinel, transition, then complete the feed so the CLI command
|
||||||
|
// drains a bounded stream without contacting a live gateway.
|
||||||
|
observer.onNext(AlarmFeedMessage.newBuilder()
|
||||||
|
.setActiveAlarm(ActiveAlarmSnapshot.newBuilder()
|
||||||
|
.setAlarmFullReference("Tank01.Level.HiHi")
|
||||||
|
.setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE)
|
||||||
|
.setSeverity(700))
|
||||||
|
.build());
|
||||||
|
observer.onNext(AlarmFeedMessage.newBuilder().setSnapshotComplete(true).build());
|
||||||
|
observer.onNext(AlarmFeedMessage.newBuilder()
|
||||||
|
.setTransition(OnAlarmTransitionEvent.newBuilder()
|
||||||
|
.setAlarmFullReference("Tank01.Level.HiHi")
|
||||||
|
.setTransitionKind(AlarmTransitionKind.ALARM_TRANSITION_KIND_ACKNOWLEDGE)
|
||||||
|
.setSeverity(700))
|
||||||
|
.build());
|
||||||
|
observer.onCompleted();
|
||||||
|
return new MxGatewayAlarmFeedSubscription();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
}
|
}
|
||||||
|
|||||||
+67
@@ -0,0 +1,67 @@
|
|||||||
|
package com.zb.mom.ww.mxgateway.client;
|
||||||
|
|
||||||
|
import io.grpc.stub.ClientCallStreamObserver;
|
||||||
|
import io.grpc.stub.ClientResponseObserver;
|
||||||
|
import io.grpc.stub.StreamObserver;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancellable handle returned by {@code streamAlarms}.
|
||||||
|
*
|
||||||
|
* <p>Wraps a caller-supplied {@link StreamObserver} and exposes a
|
||||||
|
* {@link #cancel()} entry point that aborts the underlying gRPC call. The
|
||||||
|
* subscription also implements {@link AutoCloseable} so it can participate in
|
||||||
|
* try-with-resources blocks.
|
||||||
|
*/
|
||||||
|
public final class MxGatewayAlarmFeedSubscription implements AutoCloseable {
|
||||||
|
private final AtomicReference<ClientCallStreamObserver<StreamAlarmsRequest>> requestStream = new AtomicReference<>();
|
||||||
|
private final AtomicBoolean cancelled = new AtomicBoolean();
|
||||||
|
|
||||||
|
ClientResponseObserver<StreamAlarmsRequest, AlarmFeedMessage> wrap(StreamObserver<AlarmFeedMessage> observer) {
|
||||||
|
return new ClientResponseObserver<>() {
|
||||||
|
@Override
|
||||||
|
public void beforeStart(ClientCallStreamObserver<StreamAlarmsRequest> stream) {
|
||||||
|
requestStream.set(stream);
|
||||||
|
if (cancelled.get()) {
|
||||||
|
stream.cancel("client cancelled alarm feed", null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onNext(AlarmFeedMessage value) {
|
||||||
|
observer.onNext(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable error) {
|
||||||
|
observer.onError(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCompleted() {
|
||||||
|
observer.onCompleted();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancels the underlying gRPC call. Safe to invoke before the call has
|
||||||
|
* started; cancellation is recorded and applied as soon as the stream
|
||||||
|
* attaches.
|
||||||
|
*/
|
||||||
|
public void cancel() {
|
||||||
|
cancelled.set(true);
|
||||||
|
ClientCallStreamObserver<StreamAlarmsRequest> stream = requestStream.get();
|
||||||
|
if (stream != null) {
|
||||||
|
stream.cancel("client cancelled alarm feed", null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
cancel();
|
||||||
|
}
|
||||||
|
}
|
||||||
+23
@@ -18,6 +18,7 @@ import mxaccess_gateway.v1.MxAccessGatewayGrpc;
|
|||||||
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
|
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
|
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
|
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
|
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
|
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
|
||||||
@@ -27,6 +28,7 @@ import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
|
|||||||
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
|
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
|
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest;
|
import mxaccess_gateway.v1.MxaccessGateway.QueryActiveAlarmsRequest;
|
||||||
|
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
|
||||||
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
|
import mxaccess_gateway.v1.MxaccessGateway.StreamEventsRequest;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -320,6 +322,27 @@ public final class MxGatewayClient implements AutoCloseable {
|
|||||||
return subscription;
|
return subscription;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attaches to the gateway's central alarm feed. The stream opens with one
|
||||||
|
* {@code AlarmFeedMessage} per currently-active alarm (the ConditionRefresh
|
||||||
|
* snapshot), then a single {@code snapshot_complete}, then a
|
||||||
|
* {@code transition} for every subsequent raise / acknowledge / clear.
|
||||||
|
*
|
||||||
|
* <p>Served by the gateway's always-on alarm monitor — no worker session is
|
||||||
|
* opened — so any number of clients may attach.
|
||||||
|
*
|
||||||
|
* @param request the {@code StreamAlarmsRequest}, optionally scoped by
|
||||||
|
* alarm-reference prefix
|
||||||
|
* @param observer caller-supplied observer that receives feed messages and completion
|
||||||
|
* @return a cancellable subscription handle
|
||||||
|
*/
|
||||||
|
public MxGatewayAlarmFeedSubscription streamAlarms(
|
||||||
|
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer) {
|
||||||
|
MxGatewayAlarmFeedSubscription subscription = new MxGatewayAlarmFeedSubscription();
|
||||||
|
withStreamDeadline(rawAsyncStub()).streamAlarms(request, subscription.wrap(observer));
|
||||||
|
return subscription;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
if (ownedChannel != null) {
|
if (ownedChannel != null) {
|
||||||
|
|||||||
@@ -172,6 +172,28 @@ class GatewayClient:
|
|||||||
call = self.raw_stub.QueryActiveAlarms(request, **kwargs)
|
call = self.raw_stub.QueryActiveAlarms(request, **kwargs)
|
||||||
return _canceling_active_alarms_iterator(call)
|
return _canceling_active_alarms_iterator(call)
|
||||||
|
|
||||||
|
def stream_alarms(
|
||||||
|
self,
|
||||||
|
request: pb.StreamAlarmsRequest,
|
||||||
|
*,
|
||||||
|
metadata: Sequence[tuple[str, str]] | None = None,
|
||||||
|
) -> AsyncIterator[pb.AlarmFeedMessage]:
|
||||||
|
"""Attach to the gateway's central alarm feed.
|
||||||
|
|
||||||
|
The stream opens with one ``AlarmFeedMessage`` per currently-active
|
||||||
|
alarm (the ConditionRefresh snapshot), then a single
|
||||||
|
``snapshot_complete``, then a ``transition`` for every subsequent
|
||||||
|
raise / acknowledge / clear. Served by the gateway's always-on alarm
|
||||||
|
monitor — no worker session is opened — so any number of clients may
|
||||||
|
attach. Optionally scoped by alarm-reference prefix
|
||||||
|
(``request.alarm_filter_prefix``).
|
||||||
|
"""
|
||||||
|
kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)}
|
||||||
|
if self.options.stream_timeout is not None:
|
||||||
|
kwargs["timeout"] = self.options.stream_timeout
|
||||||
|
call = self.raw_stub.StreamAlarms(request, **kwargs)
|
||||||
|
return _canceling_alarm_feed_iterator(call)
|
||||||
|
|
||||||
async def _unary(
|
async def _unary(
|
||||||
self,
|
self,
|
||||||
operation: str,
|
operation: str,
|
||||||
@@ -223,3 +245,15 @@ async def _canceling_active_alarms_iterator(call: Any) -> AsyncIterator[pb.Activ
|
|||||||
cancel = getattr(call, "cancel", None)
|
cancel = getattr(call, "cancel", None)
|
||||||
if cancel is not None:
|
if cancel is not None:
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
|
|
||||||
|
async def _canceling_alarm_feed_iterator(call: Any) -> AsyncIterator[pb.AlarmFeedMessage]:
|
||||||
|
try:
|
||||||
|
async for message in call:
|
||||||
|
yield message
|
||||||
|
except grpc.RpcError as error:
|
||||||
|
raise map_rpc_error("stream alarms", error) from error
|
||||||
|
finally:
|
||||||
|
cancel = getattr(call, "cancel", None)
|
||||||
|
if cancel is not None:
|
||||||
|
cancel()
|
||||||
|
|||||||
@@ -386,6 +386,40 @@ def stream_events(**kwargs: Any) -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@main.command("stream-alarms")
|
||||||
|
@gateway_options
|
||||||
|
@click.option("--filter-prefix", default="", help="Alarm-reference prefix filter.")
|
||||||
|
@click.option("--max-messages", default=1, type=int, show_default=True)
|
||||||
|
@click.option("--timeout", default=5.0, type=float, show_default=True)
|
||||||
|
@click.option("--correlation-id", default="", help="Client correlation id.")
|
||||||
|
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||||
|
def stream_alarms(**kwargs: Any) -> None:
|
||||||
|
"""Stream a bounded number of messages from the gateway's central alarm feed."""
|
||||||
|
|
||||||
|
_run(
|
||||||
|
_stream_alarms(**kwargs),
|
||||||
|
output_json=kwargs["output_json"],
|
||||||
|
secrets=_secrets(kwargs),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@main.command("acknowledge-alarm")
|
||||||
|
@gateway_options
|
||||||
|
@click.option("--reference", required=True, help="Alarm full reference to acknowledge.")
|
||||||
|
@click.option("--comment", default="", help="Acknowledgement comment.")
|
||||||
|
@click.option("--operator", default="", help="Operator user name.")
|
||||||
|
@click.option("--correlation-id", default="", help="Client correlation id.")
|
||||||
|
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
|
||||||
|
def acknowledge_alarm(**kwargs: Any) -> None:
|
||||||
|
"""Acknowledge an active MXAccess alarm condition (session-less)."""
|
||||||
|
|
||||||
|
_run(
|
||||||
|
_acknowledge_alarm(**kwargs),
|
||||||
|
output_json=kwargs["output_json"],
|
||||||
|
secrets=_secrets(kwargs),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@main.command()
|
@main.command()
|
||||||
@gateway_options
|
@gateway_options
|
||||||
@click.option("--session-id", required=True, help="Gateway session id.")
|
@click.option("--session-id", required=True, help="Gateway session id.")
|
||||||
@@ -761,6 +795,34 @@ async def _stream_events(**kwargs: Any) -> dict[str, Any]:
|
|||||||
return {"events": [_message_dict(event) for event in events]}
|
return {"events": [_message_dict(event) for event in events]}
|
||||||
|
|
||||||
|
|
||||||
|
async def _stream_alarms(**kwargs: Any) -> dict[str, Any]:
|
||||||
|
async with await _connect(kwargs) as client:
|
||||||
|
messages = await _collect_alarm_messages(
|
||||||
|
client.stream_alarms(
|
||||||
|
pb.StreamAlarmsRequest(
|
||||||
|
client_correlation_id=kwargs["correlation_id"],
|
||||||
|
alarm_filter_prefix=kwargs["filter_prefix"],
|
||||||
|
),
|
||||||
|
),
|
||||||
|
max_messages=kwargs["max_messages"],
|
||||||
|
timeout=kwargs["timeout"],
|
||||||
|
)
|
||||||
|
return {"messages": [_message_dict(message) for message in messages]}
|
||||||
|
|
||||||
|
|
||||||
|
async def _acknowledge_alarm(**kwargs: Any) -> dict[str, Any]:
|
||||||
|
async with await _connect(kwargs) as client:
|
||||||
|
reply = await client.acknowledge_alarm(
|
||||||
|
pb.AcknowledgeAlarmRequest(
|
||||||
|
client_correlation_id=kwargs["correlation_id"],
|
||||||
|
alarm_full_reference=kwargs["reference"],
|
||||||
|
comment=kwargs["comment"],
|
||||||
|
operator_user=kwargs["operator"],
|
||||||
|
),
|
||||||
|
)
|
||||||
|
return _message_dict(reply)
|
||||||
|
|
||||||
|
|
||||||
async def _write(**kwargs: Any) -> dict[str, Any]:
|
async def _write(**kwargs: Any) -> dict[str, Any]:
|
||||||
value = _parse_value(kwargs["value"], kwargs["value_type"])
|
value = _parse_value(kwargs["value"], kwargs["value_type"])
|
||||||
async with await _connect(kwargs) as client:
|
async with await _connect(kwargs) as client:
|
||||||
@@ -912,6 +974,34 @@ async def _collect_events(
|
|||||||
return collected
|
return collected
|
||||||
|
|
||||||
|
|
||||||
|
async def _collect_alarm_messages(
|
||||||
|
messages: Any,
|
||||||
|
*,
|
||||||
|
max_messages: int,
|
||||||
|
timeout: float,
|
||||||
|
) -> list[pb.AlarmFeedMessage]:
|
||||||
|
if max_messages > MAX_AGGREGATE_EVENTS:
|
||||||
|
raise click.BadParameter(
|
||||||
|
f"must be less than or equal to {MAX_AGGREGATE_EVENTS}",
|
||||||
|
param_hint="--max-messages",
|
||||||
|
)
|
||||||
|
|
||||||
|
collected: list[pb.AlarmFeedMessage] = []
|
||||||
|
iterator = messages.__aiter__()
|
||||||
|
try:
|
||||||
|
while len(collected) < max_messages:
|
||||||
|
collected.append(await asyncio.wait_for(iterator.__anext__(), timeout=timeout))
|
||||||
|
except StopAsyncIteration:
|
||||||
|
pass
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
close = getattr(iterator, "aclose", None)
|
||||||
|
if close is not None:
|
||||||
|
await close()
|
||||||
|
return collected
|
||||||
|
|
||||||
|
|
||||||
def _parse_value(raw_value: str, value_type: str) -> MxValueInput:
|
def _parse_value(raw_value: str, value_type: str) -> MxValueInput:
|
||||||
normalized = value_type.lower()
|
normalized = value_type.lower()
|
||||||
if normalized == "bool":
|
if normalized == "bool":
|
||||||
|
|||||||
@@ -50,6 +50,28 @@ def test_write_parser_rejects_unknown_value_type() -> None:
|
|||||||
assert "unsupported value type" in result.output
|
assert "unsupported value type" in result.output
|
||||||
|
|
||||||
|
|
||||||
|
def test_stream_alarms_is_registered() -> None:
|
||||||
|
runner = CliRunner()
|
||||||
|
|
||||||
|
result = runner.invoke(main, ["stream-alarms", "--help"])
|
||||||
|
|
||||||
|
assert result.exit_code == 0
|
||||||
|
assert "--filter-prefix" in result.output
|
||||||
|
assert "--max-messages" in result.output
|
||||||
|
|
||||||
|
|
||||||
|
def test_acknowledge_alarm_requires_reference() -> None:
|
||||||
|
runner = CliRunner()
|
||||||
|
|
||||||
|
result = runner.invoke(
|
||||||
|
main,
|
||||||
|
["acknowledge-alarm", "--api-key", "mxgw_test_secret", "--json"],
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result.exit_code != 0
|
||||||
|
assert "--reference" in result.output
|
||||||
|
|
||||||
|
|
||||||
def test_cli_error_output_redacts_api_key() -> None:
|
def test_cli_error_output_redacts_api_key() -> None:
|
||||||
runner = CliRunner()
|
runner = CliRunner()
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,9 @@
|
|||||||
|
[target.'cfg(windows)']
|
||||||
|
# Bump the default 1 MB Windows stack to 8 MB. clap-derive builds a large
|
||||||
|
# Command enum in this CLI (one variant per subcommand, each carrying flag
|
||||||
|
# args); in debug builds the enum is materialized on the stack without
|
||||||
|
# optimization and overflows the default Windows main-thread stack before
|
||||||
|
# even reaching our code. Release builds are unaffected but the e2e matrix
|
||||||
|
# drives the CLI through `cargo run` (debug), so the link-arg ships with
|
||||||
|
# every dev-time invocation.
|
||||||
|
rustflags = ["-C", "link-arg=/STACK:8388608"]
|
||||||
@@ -16,18 +16,19 @@ use std::time::{Duration, Instant};
|
|||||||
|
|
||||||
use clap::{Args, Parser, Subcommand, ValueEnum};
|
use clap::{Args, Parser, Subcommand, ValueEnum};
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use zb_mom_ww_mxgateway_client::generated::galaxy_repository::v1::DeployEvent;
|
|
||||||
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::{
|
|
||||||
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxValue as ProtoMxValue,
|
|
||||||
OpenSessionRequest, PingCommand, StreamEventsRequest, Write2BulkEntry, WriteBulkEntry,
|
|
||||||
WriteSecured2BulkEntry, WriteSecuredBulkEntry,
|
|
||||||
};
|
|
||||||
use zb_mom_ww_mxgateway_client::{
|
|
||||||
ApiKey, ClientOptions, Error, GalaxyClient, GatewayClient, MxValue, CLIENT_VERSION,
|
|
||||||
GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION,
|
|
||||||
};
|
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
use zb_mom_ww_mxgateway_client::generated::galaxy_repository::v1::DeployEvent;
|
||||||
|
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::{
|
||||||
|
alarm_feed_message, AcknowledgeAlarmRequest, AlarmFeedMessage, CloseSessionRequest, MxCommand,
|
||||||
|
MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily, MxValue as ProtoMxValue,
|
||||||
|
OpenSessionRequest, PingCommand, StreamAlarmsRequest, StreamEventsRequest, Write2BulkEntry,
|
||||||
|
WriteBulkEntry, WriteSecured2BulkEntry, WriteSecuredBulkEntry,
|
||||||
|
};
|
||||||
|
use zb_mom_ww_mxgateway_client::{
|
||||||
|
ApiKey, ClientOptions, Error, GalaxyClient, GatewayClient, MxValue, MxValueProjection,
|
||||||
|
CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION,
|
||||||
|
};
|
||||||
|
|
||||||
const MAX_AGGREGATE_EVENTS: usize = 10_000;
|
const MAX_AGGREGATE_EVENTS: usize = 10_000;
|
||||||
|
|
||||||
@@ -274,6 +275,38 @@ enum Command {
|
|||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
jsonl: bool,
|
jsonl: bool,
|
||||||
},
|
},
|
||||||
|
/// Attach to the gateway's session-less central alarm feed. The stream
|
||||||
|
/// opens with one `active_alarm` per currently-active alarm, then a
|
||||||
|
/// single `snapshot_complete`, then a `transition` for every subsequent
|
||||||
|
/// raise / acknowledge / clear.
|
||||||
|
StreamAlarms {
|
||||||
|
#[command(flatten)]
|
||||||
|
connection: ConnectionArgs,
|
||||||
|
/// Optional alarm-reference prefix scoping the feed to an equipment
|
||||||
|
/// sub-tree. Omit to stream every active alarm.
|
||||||
|
#[arg(long)]
|
||||||
|
filter_prefix: Option<String>,
|
||||||
|
#[arg(long, default_value_t = 1)]
|
||||||
|
max_events: usize,
|
||||||
|
#[arg(long)]
|
||||||
|
json: bool,
|
||||||
|
#[arg(long)]
|
||||||
|
jsonl: bool,
|
||||||
|
},
|
||||||
|
/// Acknowledge an active MXAccess alarm condition through the gateway's
|
||||||
|
/// session-less AcknowledgeAlarm RPC.
|
||||||
|
AcknowledgeAlarm {
|
||||||
|
#[command(flatten)]
|
||||||
|
connection: ConnectionArgs,
|
||||||
|
#[arg(long)]
|
||||||
|
reference: String,
|
||||||
|
#[arg(long, default_value = "")]
|
||||||
|
comment: String,
|
||||||
|
#[arg(long, default_value = "")]
|
||||||
|
operator: String,
|
||||||
|
#[arg(long)]
|
||||||
|
json: bool,
|
||||||
|
},
|
||||||
Write {
|
Write {
|
||||||
#[command(flatten)]
|
#[command(flatten)]
|
||||||
connection: ConnectionArgs,
|
connection: ConnectionArgs,
|
||||||
@@ -760,7 +793,7 @@ async fn dispatch(command: Command) -> Result<(), Error> {
|
|||||||
after_worker_sequence,
|
after_worker_sequence,
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
let mut events = Vec::new();
|
let mut events: Vec<Value> = Vec::new();
|
||||||
let mut event_count = 0usize;
|
let mut event_count = 0usize;
|
||||||
while event_count < max_events {
|
while event_count < max_events {
|
||||||
let Some(event) = stream.next().await else {
|
let Some(event) = stream.next().await else {
|
||||||
@@ -769,23 +802,81 @@ async fn dispatch(command: Command) -> Result<(), Error> {
|
|||||||
let event = event?;
|
let event = event?;
|
||||||
event_count += 1;
|
event_count += 1;
|
||||||
if jsonl {
|
if jsonl {
|
||||||
println!(
|
println!("{}", event_to_json(&event));
|
||||||
"{}",
|
|
||||||
json!({
|
|
||||||
"workerSequence": event.worker_sequence,
|
|
||||||
"family": event.family,
|
|
||||||
})
|
|
||||||
);
|
|
||||||
} else if json {
|
} else if json {
|
||||||
events.push(event);
|
events.push(event_to_json(&event));
|
||||||
} else {
|
} else {
|
||||||
println!("{} {}", event.worker_sequence, event.family);
|
println!("{} {}", event.worker_sequence, event.family);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if json {
|
if json {
|
||||||
println!("{}", json!({ "eventCount": event_count }));
|
// `eventCount` is preserved for back-compat; `events` carries
|
||||||
|
// the per-event detail the cross-language e2e matrix compares.
|
||||||
|
println!("{}", json!({ "eventCount": event_count, "events": events }));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Command::StreamAlarms {
|
||||||
|
connection,
|
||||||
|
filter_prefix,
|
||||||
|
max_events,
|
||||||
|
json,
|
||||||
|
jsonl,
|
||||||
|
} => {
|
||||||
|
if max_events > MAX_AGGREGATE_EVENTS {
|
||||||
|
return Err(Error::InvalidArgument {
|
||||||
|
name: "max-events".to_owned(),
|
||||||
|
detail: format!("must be less than or equal to {MAX_AGGREGATE_EVENTS}"),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let client = connect(connection).await?;
|
||||||
|
let mut stream = client
|
||||||
|
.stream_alarms(StreamAlarmsRequest {
|
||||||
|
client_correlation_id: "rust-cli-stream-alarms".to_owned(),
|
||||||
|
alarm_filter_prefix: filter_prefix.unwrap_or_default(),
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
let mut messages: Vec<Value> = Vec::new();
|
||||||
|
let mut message_count = 0usize;
|
||||||
|
while message_count < max_events {
|
||||||
|
let Some(message) = stream.next().await else {
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
let message = message?;
|
||||||
|
message_count += 1;
|
||||||
|
if jsonl {
|
||||||
|
println!("{}", alarm_feed_message_to_json(&message));
|
||||||
|
} else if json {
|
||||||
|
messages.push(alarm_feed_message_to_json(&message));
|
||||||
|
} else {
|
||||||
|
println!("{}", alarm_feed_message_summary(&message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if json {
|
||||||
|
println!(
|
||||||
|
"{}",
|
||||||
|
json!({ "messageCount": message_count, "messages": messages })
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Command::AcknowledgeAlarm {
|
||||||
|
connection,
|
||||||
|
reference,
|
||||||
|
comment,
|
||||||
|
operator,
|
||||||
|
json,
|
||||||
|
} => {
|
||||||
|
let client = connect(connection).await?;
|
||||||
|
let reply = client
|
||||||
|
.acknowledge_alarm(AcknowledgeAlarmRequest {
|
||||||
|
client_correlation_id: "rust-cli-acknowledge-alarm".to_owned(),
|
||||||
|
alarm_full_reference: reference,
|
||||||
|
comment,
|
||||||
|
operator_user: operator,
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
print_acknowledge_alarm_reply(&reply, json);
|
||||||
|
}
|
||||||
Command::Write {
|
Command::Write {
|
||||||
connection,
|
connection,
|
||||||
session_id,
|
session_id,
|
||||||
@@ -1296,9 +1387,7 @@ async fn run_bench_read_bulk(
|
|||||||
// successfully-subscribed subset.
|
// successfully-subscribed subset.
|
||||||
let bench_outcome = async {
|
let bench_outcome = async {
|
||||||
let server_handle = session.register(&client_name).await?;
|
let server_handle = session.register(&client_name).await?;
|
||||||
let subscribe_results = session
|
let subscribe_results = session.subscribe_bulk(server_handle, tags.clone()).await?;
|
||||||
.subscribe_bulk(server_handle, tags.clone())
|
|
||||||
.await?;
|
|
||||||
let item_handles: Vec<i32> = subscribe_results
|
let item_handles: Vec<i32> = subscribe_results
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|r| r.was_successful)
|
.filter(|r| r.was_successful)
|
||||||
@@ -1351,9 +1440,7 @@ async fn run_bench_read_bulk(
|
|||||||
|
|
||||||
// Best-effort cleanup: unsubscribe so the worker can release cache slots.
|
// Best-effort cleanup: unsubscribe so the worker can release cache slots.
|
||||||
if !item_handles.is_empty() {
|
if !item_handles.is_empty() {
|
||||||
let _ = session
|
let _ = session.unsubscribe_bulk(server_handle, item_handles).await;
|
||||||
.unsubscribe_bulk(server_handle, item_handles)
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let total_calls = successful_calls + failed_calls;
|
let total_calls = successful_calls + failed_calls;
|
||||||
@@ -1577,6 +1664,158 @@ fn print_deploy_event(event: &DeployEvent, use_json: bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Render a streamed [`MxEvent`] as a JSON object. The scalar value is
|
||||||
|
/// projected into protojson-style `*Value` keys so the cross-language e2e
|
||||||
|
/// matrix can extract and compare event values uniformly across all five
|
||||||
|
/// client CLIs.
|
||||||
|
fn event_to_json(event: &MxEvent) -> Value {
|
||||||
|
// Match the other four CLIs by rendering the family as its protobuf enum
|
||||||
|
// name (e.g. MX_EVENT_FAMILY_ON_WRITE_COMPLETE). The e2e write round-trip
|
||||||
|
// looks up this name to confirm the OnWriteComplete echo arrived; emitting
|
||||||
|
// the raw i32 leaves it unable to recognise any event.
|
||||||
|
let family = MxEventFamily::try_from(event.family)
|
||||||
|
.map(|f| f.as_str_name())
|
||||||
|
.unwrap_or("MX_EVENT_FAMILY_UNSPECIFIED");
|
||||||
|
json!({
|
||||||
|
"family": family,
|
||||||
|
"sessionId": event.session_id,
|
||||||
|
"serverHandle": event.server_handle,
|
||||||
|
"itemHandle": event.item_handle,
|
||||||
|
"quality": event.quality,
|
||||||
|
"workerSequence": event.worker_sequence,
|
||||||
|
"value": event.value.as_ref().map(event_value_to_json),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Project an [`MxValue`] into a protojson-shaped JSON object whose single
|
||||||
|
/// key names the scalar kind (`int32Value`, `stringValue`, ...), matching
|
||||||
|
/// the protobuf-JSON the .NET/Go/Java CLIs emit.
|
||||||
|
fn event_value_to_json(value: &ProtoMxValue) -> Value {
|
||||||
|
match MxValue::from_proto(value.clone()).projection() {
|
||||||
|
MxValueProjection::Bool(inner) => json!({ "boolValue": inner }),
|
||||||
|
MxValueProjection::Int32(inner) => json!({ "int32Value": inner }),
|
||||||
|
// protojson renders 64-bit integers as strings; mirror that here.
|
||||||
|
MxValueProjection::Int64(inner) => json!({ "int64Value": inner.to_string() }),
|
||||||
|
MxValueProjection::Float(inner) => json!({ "floatValue": inner }),
|
||||||
|
MxValueProjection::Double(inner) => json!({ "doubleValue": inner }),
|
||||||
|
MxValueProjection::String(inner) => json!({ "stringValue": inner }),
|
||||||
|
MxValueProjection::Timestamp(ts) => {
|
||||||
|
json!({ "timestampValue": { "seconds": ts.seconds, "nanos": ts.nanos } })
|
||||||
|
}
|
||||||
|
MxValueProjection::Array(_) => json!({ "arrayValue": {} }),
|
||||||
|
MxValueProjection::Raw(bytes) => json!({ "rawValue": { "byteCount": bytes.len() } }),
|
||||||
|
MxValueProjection::Null => json!({ "isNull": true }),
|
||||||
|
MxValueProjection::Unset => Value::Null,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Render a streamed [`AlarmFeedMessage`] as a terse one-line summary that
|
||||||
|
/// distinguishes the three `payload` oneof cases.
|
||||||
|
fn alarm_feed_message_summary(message: &AlarmFeedMessage) -> String {
|
||||||
|
match &message.payload {
|
||||||
|
Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => {
|
||||||
|
format!(
|
||||||
|
"active-alarm {} state={}",
|
||||||
|
snapshot.alarm_full_reference,
|
||||||
|
AlarmEnumName::condition_state(snapshot.current_state)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => {
|
||||||
|
format!("snapshot-complete {complete}")
|
||||||
|
}
|
||||||
|
Some(alarm_feed_message::Payload::Transition(transition)) => {
|
||||||
|
format!(
|
||||||
|
"transition {} kind={}",
|
||||||
|
transition.alarm_full_reference,
|
||||||
|
AlarmEnumName::transition_kind(transition.transition_kind)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
None => "(empty)".to_owned(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Render a streamed [`AlarmFeedMessage`] as a JSON object whose single
|
||||||
|
/// top-level key names the active `payload` oneof case, mirroring the
|
||||||
|
/// protobuf-JSON the .NET/Go/Java/Python CLIs emit.
|
||||||
|
fn alarm_feed_message_to_json(message: &AlarmFeedMessage) -> Value {
|
||||||
|
match &message.payload {
|
||||||
|
Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => json!({
|
||||||
|
"activeAlarm": {
|
||||||
|
"alarmFullReference": snapshot.alarm_full_reference,
|
||||||
|
"sourceObjectReference": snapshot.source_object_reference,
|
||||||
|
"alarmTypeName": snapshot.alarm_type_name,
|
||||||
|
"severity": snapshot.severity,
|
||||||
|
"currentState": AlarmEnumName::condition_state(snapshot.current_state),
|
||||||
|
"category": snapshot.category,
|
||||||
|
"description": snapshot.description,
|
||||||
|
"operatorUser": snapshot.operator_user,
|
||||||
|
"operatorComment": snapshot.operator_comment,
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => json!({
|
||||||
|
"snapshotComplete": complete,
|
||||||
|
}),
|
||||||
|
Some(alarm_feed_message::Payload::Transition(transition)) => json!({
|
||||||
|
"transition": {
|
||||||
|
"alarmFullReference": transition.alarm_full_reference,
|
||||||
|
"sourceObjectReference": transition.source_object_reference,
|
||||||
|
"alarmTypeName": transition.alarm_type_name,
|
||||||
|
"transitionKind": AlarmEnumName::transition_kind(transition.transition_kind),
|
||||||
|
"severity": transition.severity,
|
||||||
|
"operatorUser": transition.operator_user,
|
||||||
|
"operatorComment": transition.operator_comment,
|
||||||
|
"category": transition.category,
|
||||||
|
"description": transition.description,
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
None => Value::Null,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tiny namespace for alarm-enum name lookups used by the alarm-feed
|
||||||
|
/// renderers; keeps the proto-enum imports off the `main.rs` top level.
|
||||||
|
struct AlarmEnumName;
|
||||||
|
|
||||||
|
impl AlarmEnumName {
|
||||||
|
fn condition_state(value: i32) -> String {
|
||||||
|
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::AlarmConditionState;
|
||||||
|
AlarmConditionState::try_from(value)
|
||||||
|
.map(|state| state.as_str_name().to_owned())
|
||||||
|
.unwrap_or_else(|_| value.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn transition_kind(value: i32) -> String {
|
||||||
|
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::AlarmTransitionKind;
|
||||||
|
AlarmTransitionKind::try_from(value)
|
||||||
|
.map(|kind| kind.as_str_name().to_owned())
|
||||||
|
.unwrap_or_else(|_| value.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Render an [`AcknowledgeAlarmReply`] as a terse line or a JSON document.
|
||||||
|
fn print_acknowledge_alarm_reply(
|
||||||
|
reply: &zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::AcknowledgeAlarmReply,
|
||||||
|
use_json: bool,
|
||||||
|
) {
|
||||||
|
if use_json {
|
||||||
|
println!(
|
||||||
|
"{}",
|
||||||
|
json!({
|
||||||
|
"operation": "acknowledge-alarm",
|
||||||
|
"correlationId": reply.correlation_id,
|
||||||
|
"protocolStatus": reply.protocol_status.as_ref().map(|status| json!({
|
||||||
|
"code": status.code,
|
||||||
|
"message": status.message,
|
||||||
|
})),
|
||||||
|
"hresult": reply.hresult,
|
||||||
|
"diagnosticMessage": reply.diagnostic_message,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
println!("acknowledge-alarm completed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Parse a small but practically-complete subset of RFC3339:
|
/// Parse a small but practically-complete subset of RFC3339:
|
||||||
/// `YYYY-MM-DDTHH:MM:SS[.fffffffff][Z|+HH:MM|-HH:MM]`. Returns the
|
/// `YYYY-MM-DDTHH:MM:SS[.fffffffff][Z|+HH:MM|-HH:MM]`. Returns the
|
||||||
/// corresponding `prost_types::Timestamp` (Unix seconds + nanoseconds).
|
/// corresponding `prost_types::Timestamp` (Unix seconds + nanoseconds).
|
||||||
@@ -1788,6 +2027,47 @@ mod tests {
|
|||||||
assert_eq!(value["workerProtocolVersion"], 1);
|
assert_eq!(value["workerProtocolVersion"], 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parses_stream_alarms_command() {
|
||||||
|
let parsed = Cli::try_parse_from([
|
||||||
|
"mxgw",
|
||||||
|
"stream-alarms",
|
||||||
|
"--filter-prefix",
|
||||||
|
"Tank01",
|
||||||
|
"--max-events",
|
||||||
|
"3",
|
||||||
|
"--json",
|
||||||
|
]);
|
||||||
|
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parses_stream_alarms_command_without_filter_prefix() {
|
||||||
|
let parsed = Cli::try_parse_from(["mxgw", "stream-alarms"]);
|
||||||
|
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parses_acknowledge_alarm_command() {
|
||||||
|
let parsed = Cli::try_parse_from([
|
||||||
|
"mxgw",
|
||||||
|
"acknowledge-alarm",
|
||||||
|
"--reference",
|
||||||
|
"Tank01.Level.HiHi",
|
||||||
|
"--comment",
|
||||||
|
"ack from cli",
|
||||||
|
"--operator",
|
||||||
|
"operator1",
|
||||||
|
]);
|
||||||
|
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn acknowledge_alarm_requires_reference() {
|
||||||
|
let parsed = Cli::try_parse_from(["mxgw", "acknowledge-alarm"]);
|
||||||
|
assert!(parsed.is_err());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn parses_galaxy_watch_command_with_last_seen_and_max_events() {
|
fn parses_galaxy_watch_command_with_last_seen_and_max_events() {
|
||||||
let parsed = Cli::try_parse_from([
|
let parsed = Cli::try_parse_from([
|
||||||
|
|||||||
@@ -16,9 +16,10 @@ use crate::auth::AuthInterceptor;
|
|||||||
use crate::error::{ensure_command_success, ensure_protocol_success, Error};
|
use crate::error::{ensure_command_success, ensure_protocol_success, Error};
|
||||||
use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient;
|
use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient;
|
||||||
use crate::generated::mxaccess_gateway::v1::{
|
use crate::generated::mxaccess_gateway::v1::{
|
||||||
AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, CloseSessionReply,
|
AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, AlarmFeedMessage,
|
||||||
CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent, OpenSessionReply,
|
CloseSessionReply, CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent,
|
||||||
OpenSessionRequest, QueryActiveAlarmsRequest, StreamEventsRequest,
|
OpenSessionReply, OpenSessionRequest, QueryActiveAlarmsRequest, StreamAlarmsRequest,
|
||||||
|
StreamEventsRequest,
|
||||||
};
|
};
|
||||||
use crate::options::ClientOptions;
|
use crate::options::ClientOptions;
|
||||||
use crate::session::Session;
|
use crate::session::Session;
|
||||||
@@ -40,6 +41,13 @@ pub type ActiveAlarmStream = std::pin::Pin<
|
|||||||
Box<dyn futures_core::Stream<Item = Result<ActiveAlarmSnapshot, Error>> + Send + 'static>,
|
Box<dyn futures_core::Stream<Item = Result<ActiveAlarmSnapshot, Error>> + Send + 'static>,
|
||||||
>;
|
>;
|
||||||
|
|
||||||
|
/// Pinned, boxed [`AlarmFeedMessage`] stream returned by
|
||||||
|
/// [`GatewayClient::stream_alarms`]. Errors are pre-mapped from
|
||||||
|
/// `tonic::Status` to [`Error`]; dropping the stream cancels the call.
|
||||||
|
pub type AlarmFeedStream = std::pin::Pin<
|
||||||
|
Box<dyn futures_core::Stream<Item = Result<AlarmFeedMessage, Error>> + Send + 'static>,
|
||||||
|
>;
|
||||||
|
|
||||||
/// Thin async wrapper around the generated gateway client.
|
/// Thin async wrapper around the generated gateway client.
|
||||||
///
|
///
|
||||||
/// The wrapper is `Clone`: every clone shares the underlying tonic channel
|
/// The wrapper is `Clone`: every clone shares the underlying tonic channel
|
||||||
@@ -219,7 +227,9 @@ impl GatewayClient {
|
|||||||
request: AcknowledgeAlarmRequest,
|
request: AcknowledgeAlarmRequest,
|
||||||
) -> Result<AcknowledgeAlarmReply, Error> {
|
) -> Result<AcknowledgeAlarmReply, Error> {
|
||||||
let mut client = self.inner.clone();
|
let mut client = self.inner.clone();
|
||||||
let response = client.acknowledge_alarm(self.unary_request(request)).await?;
|
let response = client
|
||||||
|
.acknowledge_alarm(self.unary_request(request))
|
||||||
|
.await?;
|
||||||
let reply = response.into_inner();
|
let reply = response.into_inner();
|
||||||
ensure_protocol_success("acknowledge alarm", reply.protocol_status.as_ref())?;
|
ensure_protocol_success("acknowledge alarm", reply.protocol_status.as_ref())?;
|
||||||
Ok(reply)
|
Ok(reply)
|
||||||
@@ -252,6 +262,34 @@ impl GatewayClient {
|
|||||||
Ok(Box::pin(stream))
|
Ok(Box::pin(stream))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attach to the gateway's central `StreamAlarms` feed.
|
||||||
|
///
|
||||||
|
/// The returned [`AlarmFeedStream`] opens with one [`AlarmFeedMessage`]
|
||||||
|
/// per currently-active alarm (the ConditionRefresh snapshot), then a
|
||||||
|
/// single `snapshot_complete`, then a `transition` for every subsequent
|
||||||
|
/// raise / acknowledge / clear. It is served by the gateway's always-on
|
||||||
|
/// alarm monitor — no worker session is opened — so any number of clients
|
||||||
|
/// may attach. Dropping the stream cancels the gRPC call cooperatively.
|
||||||
|
/// Optional alarm-reference prefix scoping (`request.alarm_filter_prefix`)
|
||||||
|
/// limits the stream to a sub-tree.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns the `tonic::Status` mapped through [`Error::from`] if the
|
||||||
|
/// server rejects the request.
|
||||||
|
pub async fn stream_alarms(
|
||||||
|
&self,
|
||||||
|
request: StreamAlarmsRequest,
|
||||||
|
) -> Result<AlarmFeedStream, Error> {
|
||||||
|
let mut client = self.inner.clone();
|
||||||
|
let response = client.stream_alarms(self.stream_request(request)).await?;
|
||||||
|
let stream = futures_util::StreamExt::map(response.into_inner(), |result| {
|
||||||
|
result.map_err(Error::from)
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(Box::pin(stream))
|
||||||
|
}
|
||||||
|
|
||||||
fn unary_request<T>(&self, message: T) -> Request<T> {
|
fn unary_request<T>(&self, message: T) -> Request<T> {
|
||||||
let mut request = Request::new(message);
|
let mut request = Request::new(message);
|
||||||
request.set_timeout(self.call_timeout);
|
request.set_timeout(self.call_timeout);
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ pub mod version;
|
|||||||
#[doc(inline)]
|
#[doc(inline)]
|
||||||
pub use auth::{ApiKey, AuthInterceptor};
|
pub use auth::{ApiKey, AuthInterceptor};
|
||||||
#[doc(inline)]
|
#[doc(inline)]
|
||||||
pub use client::{EventStream, GatewayClient};
|
pub use client::{AlarmFeedStream, EventStream, GatewayClient};
|
||||||
#[doc(inline)]
|
#[doc(inline)]
|
||||||
pub use error::{CommandError, Error};
|
pub use error::{CommandError, Error};
|
||||||
#[doc(inline)]
|
#[doc(inline)]
|
||||||
|
|||||||
@@ -8,6 +8,12 @@ use std::time::Duration;
|
|||||||
|
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
|
use serde_json::Value;
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
use tokio::sync::{mpsc, Mutex};
|
||||||
|
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
|
||||||
|
use tonic::transport::Server;
|
||||||
|
use tonic::{Request, Response, Status};
|
||||||
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gateway_server::{
|
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gateway_server::{
|
||||||
MxAccessGateway, MxAccessGatewayServer,
|
MxAccessGateway, MxAccessGatewayServer,
|
||||||
};
|
};
|
||||||
@@ -25,12 +31,6 @@ use zb_mom_ww_mxgateway_client::{
|
|||||||
ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue,
|
ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue,
|
||||||
MxValueProjection,
|
MxValueProjection,
|
||||||
};
|
};
|
||||||
use serde_json::Value;
|
|
||||||
use tokio::net::TcpListener;
|
|
||||||
use tokio::sync::{mpsc, Mutex};
|
|
||||||
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
|
|
||||||
use tonic::transport::Server;
|
|
||||||
use tonic::{Request, Response, Status};
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn fake_server_receives_bearer_metadata_and_raw_client_is_reachable() {
|
async fn fake_server_receives_bearer_metadata_and_raw_client_is_reachable() {
|
||||||
@@ -320,7 +320,9 @@ impl MxAccessGateway for FakeGateway {
|
|||||||
|
|
||||||
async fn invoke(
|
async fn invoke(
|
||||||
&self,
|
&self,
|
||||||
request: Request<zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::MxCommandRequest>,
|
request: Request<
|
||||||
|
zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::MxCommandRequest,
|
||||||
|
>,
|
||||||
) -> Result<Response<MxCommandReply>, Status> {
|
) -> Result<Response<MxCommandReply>, Status> {
|
||||||
let request = request.into_inner();
|
let request = request.into_inner();
|
||||||
let kind = request
|
let kind = request
|
||||||
|
|||||||
@@ -1,12 +1,12 @@
|
|||||||
use std::fs;
|
use std::fs;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
use serde_json::Value;
|
||||||
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::{
|
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::{
|
||||||
mx_command, mx_value, MxCommand, MxCommandKind, MxCommandRequest, MxDataType, MxEvent,
|
mx_command, mx_value, MxCommand, MxCommandKind, MxCommandRequest, MxDataType, MxEvent,
|
||||||
MxEventFamily, MxValue, OpenSessionReply, ProtocolStatusCode, RegisterCommand,
|
MxEventFamily, MxValue, OpenSessionReply, ProtocolStatusCode, RegisterCommand,
|
||||||
};
|
};
|
||||||
use zb_mom_ww_mxgateway_client::{GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION};
|
use zb_mom_ww_mxgateway_client::{GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION};
|
||||||
use serde_json::Value;
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn generated_golden_fixtures_are_available() {
|
fn generated_golden_fixtures_are_available() {
|
||||||
|
|||||||
Reference in New Issue
Block a user