Compare commits

..

11 Commits

Author SHA1 Message Date
Joseph Doherty dd7ca1634e Mark code-review findings Server-033..037 resolved
Records the resolutions for the five Galaxy snapshot-persistence findings
fixed in bdccdbf, and regenerates the code-reviews index. Server open
findings drop from 7 to 2 (Server-031, Server-032 remain — unrelated
event-channel backpressure findings).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 03:35:56 -04:00
Joseph Doherty bdccdbf6dd Resolve code-review findings Server-033..037
Server-033 (Medium): TryRestoreFromDiskAsync now completes the _firstLoad
gate once the restored snapshot is published, so a browse call racing the
first refresh is served immediately instead of waiting out the 5s bootstrap
budget while an unreachable-database query runs.

Server-034 (Low): GalaxyHierarchySnapshotStore.TryLoadAsync catches
JsonException / IOException / UnauthorizedAccessException and returns null,
honoring the Try contract for a corrupt or unreadable snapshot file.

Server-035 (Low): SaveAsync bounds the write with a linked CancellationToken
(CommandTimeoutSeconds budget) so a stuck disk cannot pin the refresh loop.

Server-036 (Low): PersistSnapshotAsync no longer logs a save cancelled by
gateway shutdown as a persistence failure.

Server-037 (Low): added cache tests for the corrupt-snapshot restore path
and for PersistSnapshot=false, plus a store test for corrupt JSON.

All 100 Galaxy tests pass; gateway builds clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 03:34:35 -04:00
Joseph Doherty fa491c752b Persist the Galaxy browse dataset to disk for offline startup
The gateway can lose connectivity to the Galaxy database, and the
database is often unreachable exactly when the gateway restarts. The
hierarchy cache was purely in-memory, so a cold start with no database
left clients with an Unavailable browse surface until SQL came back.

Add a JSON snapshot store: each successful heavy refresh writes the raw
hierarchy and attribute rowsets to disk atomically (temp file + rename),
and the first refresh after startup restores that snapshot before any
SQL runs. Restored data is served as Stale until a live query confirms
it; a live query that observes the same time_of_last_deploy promotes it
to Healthy with no heavy re-query.

Persistence is on by default (MxGateway:Galaxy:PersistSnapshot) and
writes to C:\ProgramData\MxGateway\galaxy-snapshot.json.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 02:03:00 -04:00
Joseph Doherty aba228f443 Surface built-in primitive attributes in Galaxy browse
AttributesSql enumerated only the dynamic_attribute table (user-configured
attributes), so engine/platform objects came back with zero attributes and
extension sub-attributes (TestAlarm001.Acked, .AckMsg, ...) were missing.
DiscoverHierarchy diverged badly from what System Platform's Object Viewer
shows.

AttributesSql now UNIONs dynamic_attribute with the built-in attributes
every object inherits from its primitives (attribute_definition joined via
primitive_instance). Built-in rows carry no category filter (the
attribute_definition category numbering differs from dynamic_attribute's)
and are never flagged is_historized/is_alarm, since those flags identify a
configured attribute that anchors an extension, not the extension's leaves.
dynamic_attribute wins on a reference collision.

This raises the attribute surface ~7x (verified 2,026 -> 14,334 against the
ZB database). AttributesSql no longer matches the OtOpcUa original;
HierarchySql still does. Column shape, ordinals, proto, and generated code
are unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 01:42:18 -04:00
Joseph Doherty 5e493484f1 Run the Rust CLI on a large-stack worker thread
The clap derive-generated argument parser is deeply recursive; in debug
builds (no inlining) parsing the Command enum exhausted the default
8 MiB main-thread stack once the alarm subcommands grew it, crashing
mxgw.exe with STATUS_STACK_OVERFLOW at startup — which failed the Rust
leg of the client e2e matrix. Move parse + dispatch onto a dedicated
32 MiB worker thread so the CLI is robust regardless of build profile.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 20:12:09 -04:00
Joseph Doherty 3e22285f09 Exercise the alarm subcommands in the client e2e matrix
Add an opt-in alarm phase (-VerifyAlarms) to run-client-e2e-tests.ps1:
each of the five client CLIs runs stream-alarms (asserting at least one
AlarmFeedMessage) and acknowledge-alarm against the gateway's central
alarm monitor. Both RPCs are session-less. -AlarmReference and
-AlarmStreamMax tune the phase; GatewayTesting.md documents it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 19:47:20 -04:00
Joseph Doherty 120cd0b1b6 Add stream-alarms and acknowledge-alarm to the Python CLI
Brings the Python mxgateway_cli in line with the other four client
CLIs: stream-alarms reads a bounded slice of the gateway's central
alarm feed (--filter-prefix, --max-messages, --timeout);
acknowledge-alarm is a unary session-less ack (--reference required,
--comment, --operator).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 19:47:19 -04:00
Joseph Doherty 56949c967b Add stream-alarms and acknowledge-alarm to the .NET CLI
stream-alarms attaches to the gateway's central alarm feed (mirrors
stream-events: --max-events cap, --json/--jsonl, --filter-prefix);
acknowledge-alarm is a unary session-less ack (--reference required,
--comment, --operator). Both wired through IMxGatewayCliClient and the
adapter.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 19:01:58 -04:00
Joseph Doherty 7dec9b30f5 Add stream-alarms and acknowledge-alarm to the Java CLI
stream-alarms attaches to the gateway's central alarm feed (mirrors
stream-events: --limit cap, --json, --filter-prefix); acknowledge-alarm
is a unary session-less ack (--reference required, --comment,
--operator). Both route through new session-less methods on the CLI
client abstraction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 19:01:57 -04:00
Joseph Doherty 1d3c8edb44 Add stream-alarms and acknowledge-alarm to the Rust CLI
stream-alarms attaches to the gateway's central alarm feed (mirrors
stream-events: --max-events cap, --json/--jsonl, --filter-prefix);
acknowledge-alarm is a unary session-less ack (--reference required,
--comment, --operator).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 19:01:49 -04:00
Joseph Doherty 58259016b0 Add stream-alarms and acknowledge-alarm to the Go CLI
stream-alarms attaches to the gateway's central alarm feed (mirrors
stream-events: --limit cap, --json); acknowledge-alarm is a unary
session-less ack (--reference required, --comment, --operator).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 19:01:48 -04:00
26 changed files with 2399 additions and 104 deletions
@@ -51,6 +51,27 @@ public interface IMxGatewayCliClient : IAsyncDisposable
StreamEventsRequest request,
CancellationToken cancellationToken);
/// <summary>
/// Acknowledges an active MXAccess alarm condition through the gateway.
/// </summary>
/// <param name="request">The acknowledge request — alarm reference, comment, operator user.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
/// <returns>The acknowledge reply with protocol + native MxStatus.</returns>
Task<AcknowledgeAlarmReply> AcknowledgeAlarmAsync(
AcknowledgeAlarmRequest request,
CancellationToken cancellationToken);
/// <summary>
/// Attaches to the gateway's central alarm feed — the current active-alarm
/// snapshot followed by live transitions.
/// </summary>
/// <param name="request">The stream request, optionally scoped by alarm-reference prefix.</param>
/// <param name="cancellationToken">Cancellation token for the operation.</param>
/// <returns>An async enumerable of alarm feed messages.</returns>
IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
StreamAlarmsRequest request,
CancellationToken cancellationToken);
/// <summary>
/// Tests connection to the Galaxy Repository.
/// </summary>
@@ -52,6 +52,22 @@ internal sealed class MxGatewayCliClientAdapter : IMxGatewayCliClient
return _client.StreamEventsAsync(request, cancellationToken);
}
/// <inheritdoc />
public Task<AcknowledgeAlarmReply> AcknowledgeAlarmAsync(
AcknowledgeAlarmRequest request,
CancellationToken cancellationToken)
{
return _client.AcknowledgeAlarmAsync(request, cancellationToken);
}
/// <inheritdoc />
public IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
StreamAlarmsRequest request,
CancellationToken cancellationToken)
{
return _client.StreamAlarmsAsync(request, cancellationToken);
}
/// <inheritdoc />
public Task<TestConnectionReply> GalaxyTestConnectionAsync(
TestConnectionRequest request,
@@ -130,6 +130,10 @@ public static class MxGatewayClientCli
.ConfigureAwait(false),
"stream-events" => await StreamEventsAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"stream-alarms" => await StreamAlarmsAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"acknowledge-alarm" => await AcknowledgeAlarmAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write" => await WriteAsync(arguments, client, standardOutput, cancellation.Token)
.ConfigureAwait(false),
"write2" => await Write2Async(arguments, client, standardOutput, cancellation.Token)
@@ -1353,6 +1357,124 @@ public static class MxGatewayClientCli
return 0;
}
private static async Task<int> StreamAlarmsAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
uint maxEvents = arguments.GetUInt32("max-events", 0);
bool json = arguments.HasFlag("json");
bool jsonLines = arguments.HasFlag("jsonl");
if (json && !jsonLines && maxEvents is 0)
{
throw new ArgumentException("--json stream-alarms requires --max-events to bound aggregate output.");
}
if (maxEvents > MaxAggregateEvents)
{
throw new ArgumentException($"--max-events cannot exceed {MaxAggregateEvents}.");
}
var messages = json && !jsonLines
? new List<AlarmFeedMessage>(checked((int)maxEvents))
: [];
uint messageCount = 0;
var request = new StreamAlarmsRequest
{
ClientCorrelationId = CreateCorrelationId(),
AlarmFilterPrefix = arguments.GetOptional("filter-prefix") ?? string.Empty,
};
try
{
await foreach (AlarmFeedMessage feedMessage in client.StreamAlarmsAsync(request, cancellationToken)
.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
if (jsonLines)
{
output.WriteLine(ProtobufJsonFormatter.Format(feedMessage));
}
else if (json)
{
messages.Add(feedMessage);
}
else
{
output.WriteLine(FormatAlarmFeedMessage(feedMessage));
}
messageCount++;
if (maxEvents > 0 && messageCount >= maxEvents)
{
break;
}
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Mirrors stream-events (Client.Dotnet-017): the supplied token covers
// the user's --timeout wall-clock budget and external Ctrl+C / parent
// CTS cancellation. All are graceful completion modes for a
// finite-window alarm-feed collector: emit what arrived and exit 0.
}
if (json && !jsonLines)
{
output.WriteLine(JsonSerializer.Serialize(
new { alarms = messages.Select(AlarmFeedMessageToJsonElement).ToArray() },
JsonOptions));
}
return 0;
}
private static Task<int> AcknowledgeAlarmAsync(
CliArguments arguments,
IMxGatewayCliClient client,
TextWriter output,
CancellationToken cancellationToken)
{
var request = new AcknowledgeAlarmRequest
{
ClientCorrelationId = CreateCorrelationId(),
AlarmFullReference = arguments.GetRequired("reference"),
Comment = arguments.GetOptional("comment") ?? string.Empty,
OperatorUser = arguments.GetOptional("operator") ?? string.Empty,
};
return WriteReplyAsync(
client.AcknowledgeAlarmAsync(request, cancellationToken),
arguments,
output);
}
/// <summary>
/// Renders one <see cref="AlarmFeedMessage"/> for the human-readable
/// (non-JSON) stream-alarms output, distinguishing the <c>payload</c> oneof
/// arms: a snapshot active alarm, the snapshot-complete sentinel, or a live
/// transition.
/// </summary>
private static string FormatAlarmFeedMessage(AlarmFeedMessage feedMessage)
{
return feedMessage.PayloadCase switch
{
AlarmFeedMessage.PayloadOneofCase.ActiveAlarm =>
$"active-alarm {ProtobufJsonFormatter.Format(feedMessage.ActiveAlarm)}",
AlarmFeedMessage.PayloadOneofCase.SnapshotComplete =>
$"snapshot-complete {feedMessage.SnapshotComplete}",
AlarmFeedMessage.PayloadOneofCase.Transition =>
$"transition {ProtobufJsonFormatter.Format(feedMessage.Transition)}",
_ => $"unknown-payload {feedMessage.PayloadCase}",
};
}
private static JsonElement AlarmFeedMessageToJsonElement(AlarmFeedMessage feedMessage)
{
return JsonDocument.Parse(ProtobufJsonFormatter.Format(feedMessage)).RootElement.Clone();
}
private static async Task<int> SmokeAsync(
CliArguments arguments,
IMxGatewayCliClient client,
@@ -1908,6 +2030,8 @@ public static class MxGatewayClientCli
or "bench-read-bulk"
or "bench-stream-events"
or "stream-events"
or "stream-alarms"
or "acknowledge-alarm"
or "write"
or "write2"
or "smoke"
@@ -1966,6 +2090,8 @@ public static class MxGatewayClientCli
writer.WriteLine("mxgw-dotnet subscribe-bulk --session-id <id> --server-handle <n> --items <ref,ref> [--json]");
writer.WriteLine("mxgw-dotnet unsubscribe-bulk --session-id <id> --server-handle <n> --item-handles <n,n> [--json]");
writer.WriteLine("mxgw-dotnet stream-events --session-id <id> [--max-events <n>] [--json]");
writer.WriteLine("mxgw-dotnet stream-alarms [--filter-prefix <ref>] [--max-events <n>] [--json] [--jsonl]");
writer.WriteLine("mxgw-dotnet acknowledge-alarm --reference <ref> [--comment <text>] [--operator <user>] [--json]");
writer.WriteLine("mxgw-dotnet write --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--json]");
writer.WriteLine("mxgw-dotnet write2 --session-id <id> --server-handle <n> --item-handle <n> --type <type> --value <value> [--timestamp <iso>] [--json]");
writer.WriteLine("mxgw-dotnet smoke --item <ref> [--value <value> --type <type>] [--json]");
@@ -248,6 +248,87 @@ public sealed class MxGatewayClientCliTests
}
/// <summary>Verifies that stream-alarms with --max-events stops output and distinguishes payload cases.</summary>
[Fact]
public async Task RunAsync_StreamAlarms_WithMaxEventsStopsAndDistinguishesPayloadCases()
{
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new();
fakeClient.AlarmFeedMessages.Add(new AlarmFeedMessage
{
ActiveAlarm = new ActiveAlarmSnapshot { AlarmFullReference = "Tank01.Level.HiHi" },
});
fakeClient.AlarmFeedMessages.Add(new AlarmFeedMessage { SnapshotComplete = true });
int exitCode = await MxGatewayClientCli.RunAsync(
[
"stream-alarms",
"--endpoint",
"http://localhost:5000",
"--api-key",
"test-api-key",
"--filter-prefix",
"Tank01",
"--max-events",
"1",
],
output,
error,
_ => fakeClient);
Assert.Equal(0, exitCode);
StreamAlarmsRequest request = Assert.Single(fakeClient.StreamAlarmsRequests);
Assert.Equal("Tank01", request.AlarmFilterPrefix);
string text = output.ToString();
Assert.Contains("active-alarm", text);
Assert.Contains("Tank01.Level.HiHi", text);
Assert.DoesNotContain("snapshot-complete", text);
Assert.Equal(string.Empty, error.ToString());
}
/// <summary>Verifies that acknowledge-alarm builds a request and prints the JSON reply.</summary>
[Fact]
public async Task RunAsync_AcknowledgeAlarm_BuildsRequestAndPrintsJsonReply()
{
using var output = new StringWriter();
using var error = new StringWriter();
FakeCliClient fakeClient = new();
fakeClient.AcknowledgeAlarmReplies.Enqueue(new AcknowledgeAlarmReply
{
CorrelationId = "ack-fixture",
ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.Ok },
Hresult = 0,
});
int exitCode = await MxGatewayClientCli.RunAsync(
[
"acknowledge-alarm",
"--endpoint",
"http://localhost:5000",
"--api-key",
"test-api-key",
"--reference",
"Tank01.Level.HiHi",
"--comment",
"ack from cli",
"--operator",
"operator1",
"--json",
],
output,
error,
_ => fakeClient);
Assert.Equal(0, exitCode);
AcknowledgeAlarmRequest request = Assert.Single(fakeClient.AcknowledgeAlarmRequests);
Assert.Equal("Tank01.Level.HiHi", request.AlarmFullReference);
Assert.Equal("ack from cli", request.Comment);
Assert.Equal("operator1", request.OperatorUser);
Assert.Contains("ack-fixture", output.ToString());
Assert.Equal(string.Empty, error.ToString());
}
/// <summary>Verifies that smoke command closes opened session when a command fails.</summary>
[Fact]
public async Task RunAsync_Smoke_WhenCommandFails_ClosesOpenedSession()
@@ -695,6 +776,41 @@ public sealed class MxGatewayClientCliTests
}
}
/// <summary>Queue of acknowledge-alarm replies to return.</summary>
public Queue<AcknowledgeAlarmReply> AcknowledgeAlarmReplies { get; } = new();
/// <summary>List of received acknowledge-alarm requests.</summary>
public List<AcknowledgeAlarmRequest> AcknowledgeAlarmRequests { get; } = [];
/// <summary>List of received stream-alarms requests.</summary>
public List<StreamAlarmsRequest> StreamAlarmsRequests { get; } = [];
/// <summary>List of alarm feed messages to yield when streaming alarms.</summary>
public List<AlarmFeedMessage> AlarmFeedMessages { get; } = [];
/// <inheritdoc />
public Task<AcknowledgeAlarmReply> AcknowledgeAlarmAsync(
AcknowledgeAlarmRequest request,
CancellationToken cancellationToken)
{
AcknowledgeAlarmRequests.Add(request);
return Task.FromResult(AcknowledgeAlarmReplies.Dequeue());
}
/// <inheritdoc />
public async IAsyncEnumerable<AlarmFeedMessage> StreamAlarmsAsync(
StreamAlarmsRequest request,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
StreamAlarmsRequests.Add(request);
foreach (AlarmFeedMessage feedMessage in AlarmFeedMessages)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Yield();
yield return feedMessage;
}
}
/// <summary>Galaxy test connection reply to return.</summary>
public TestConnectionReply GalaxyTestConnectionReply { get; set; } = new() { Ok = true };
+118 -1
View File
@@ -107,6 +107,10 @@ func runWithIO(ctx context.Context, args []string, stdout, stderr io.Writer) err
return runWrite(ctx, args[1:], stdout, stderr)
case "stream-events":
return runStreamEvents(ctx, args[1:], stdout, stderr)
case "stream-alarms":
return runStreamAlarms(ctx, args[1:], stdout, stderr)
case "acknowledge-alarm":
return runAcknowledgeAlarm(ctx, args[1:], stdout, stderr)
case "smoke":
return runSmoke(ctx, args[1:], stdout, stderr)
case "galaxy-test-connection":
@@ -816,6 +820,119 @@ func runStreamEvents(ctx context.Context, args []string, stdout, stderr io.Write
return nil
}
func runStreamAlarms(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("stream-alarms", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
filterPrefix := flags.String("filter-prefix", "", "alarm-reference prefix scoping the feed; empty means unscoped")
limit := flags.Int("limit", 0, "maximum feed messages to read; 0 means unbounded")
if err := flags.Parse(args); err != nil {
return err
}
client, _, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
// Mirror runStreamEvents so Ctrl+C on a long-running stream-alarms command
// cancels the gRPC stream cleanly (the gateway sees codes.Canceled rather
// than a torn TCP connection) and the deferred client.Close() actually runs.
signalCtx, stopSignals := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer stopSignals()
streamCtx, cancelStream := context.WithCancel(signalCtx)
defer cancelStream()
stream, err := client.StreamAlarms(streamCtx, &mxgateway.StreamAlarmsRequest{AlarmFilterPrefix: *filterPrefix})
if err != nil {
return err
}
count := 0
for {
message, err := stream.Recv()
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
return err
}
if *jsonOutput {
fmt.Fprintln(stdout, string(mustMarshalProto(message)))
} else {
fmt.Fprintln(stdout, formatAlarmFeedMessage(message))
}
count++
if *limit > 0 && count >= *limit {
cancelStream()
return nil
}
}
}
// formatAlarmFeedMessage renders one AlarmFeedMessage in the CLI's plain-text
// output style, distinguishing the active-alarm snapshot, snapshot-complete
// sentinel, and transition cases of the message's payload oneof.
func formatAlarmFeedMessage(message *mxgateway.AlarmFeedMessage) string {
switch {
case message.GetActiveAlarm() != nil:
alarm := message.GetActiveAlarm()
return fmt.Sprintf("active-alarm %s state=%s severity=%d", alarm.GetAlarmFullReference(), alarm.GetCurrentState(), alarm.GetSeverity())
case message.GetSnapshotComplete():
return "snapshot-complete"
case message.GetTransition() != nil:
transition := message.GetTransition()
return fmt.Sprintf("transition %s kind=%s severity=%d", transition.GetAlarmFullReference(), transition.GetTransitionKind(), transition.GetSeverity())
default:
return "unknown"
}
}
func runAcknowledgeAlarm(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("acknowledge-alarm", flag.ContinueOnError)
flags.SetOutput(stderr)
common := bindCommonFlags(flags)
jsonOutput := flags.Bool("json", false, "write JSON output")
reference := flags.String("reference", "", "full alarm reference to acknowledge")
comment := flags.String("comment", "", "operator acknowledge comment")
operator := flags.String("operator", "", "operator user performing the acknowledge")
if err := flags.Parse(args); err != nil {
return err
}
if *reference == "" {
return errors.New("reference is required")
}
client, options, err := dialForCommand(ctx, common)
if err != nil {
return err
}
defer client.Close()
reply, err := client.AcknowledgeAlarm(ctx, &mxgateway.AcknowledgeAlarmRequest{
AlarmFullReference: *reference,
Comment: *comment,
OperatorUser: *operator,
})
if err != nil {
return err
}
if *jsonOutput {
return writeJSON(stdout, commandReplyOutput{
Command: "acknowledge-alarm",
Options: options,
Reply: mustMarshalProto(reply),
})
}
fmt.Fprintln(stdout, reply.GetHresult())
return nil
}
func runSmoke(ctx context.Context, args []string, stdout, stderr io.Writer) error {
flags := flag.NewFlagSet("smoke", flag.ContinueOnError)
flags.SetOutput(stderr)
@@ -1120,7 +1237,7 @@ func runBatch(ctx context.Context, in io.Reader, stdout, stderr io.Writer) error
}
func writeUsage(writer io.Writer) {
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|read-bulk|write-bulk|write2-bulk|write-secured-bulk|write-secured2-bulk|bench-read-bulk|write|stream-events|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
fmt.Fprintln(writer, "usage: mxgw-go <version|open-session|close-session|register|add-item|advise|subscribe-bulk|unsubscribe-bulk|read-bulk|write-bulk|write2-bulk|write-secured-bulk|write-secured2-bulk|bench-read-bulk|write|stream-events|stream-alarms|acknowledge-alarm|smoke|galaxy-test-connection|galaxy-last-deploy|galaxy-discover|galaxy-watch|batch>")
}
func dialGalaxyForCommand(ctx context.Context, common *commonOptions) (*mxgateway.GalaxyClient, commonOptions, error) {
@@ -3,6 +3,7 @@ package com.dohertylan.mxgateway.cli;
import com.dohertylan.mxgateway.client.DeployEventStream;
import com.dohertylan.mxgateway.client.GalaxyRepositoryClient;
import com.dohertylan.mxgateway.client.MxEventStream;
import com.dohertylan.mxgateway.client.MxGatewayAlarmFeedSubscription;
import com.dohertylan.mxgateway.client.MxGatewayClient;
import com.dohertylan.mxgateway.client.MxGatewayClientOptions;
import com.dohertylan.mxgateway.client.MxGatewayClientVersion;
@@ -28,14 +29,23 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import io.grpc.stub.StreamObserver;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
import mxaccess_gateway.v1.MxaccessGateway.SubscribeResult;
import mxaccess_gateway.v1.MxaccessGateway.Write2BulkEntry;
import mxaccess_gateway.v1.MxaccessGateway.WriteBulkEntry;
@@ -127,6 +137,8 @@ public final class MxGatewayCli implements Callable<Integer> {
commandLine.addSubcommand("bench-read-bulk", new BenchReadBulkCommand(clientFactory));
commandLine.addSubcommand("write", new WriteCommand(clientFactory));
commandLine.addSubcommand("stream-events", new StreamEventsCommand(clientFactory));
commandLine.addSubcommand("stream-alarms", new StreamAlarmsCommand(clientFactory));
commandLine.addSubcommand("acknowledge-alarm", new AcknowledgeAlarmCommand(clientFactory));
commandLine.addSubcommand("smoke", new SmokeCommand(clientFactory));
commandLine.addSubcommand("galaxy-test", new GalaxyTestConnectionCommand());
commandLine.addSubcommand("galaxy-deploy-time", new GalaxyDeployTimeCommand());
@@ -139,6 +151,9 @@ public final class MxGatewayCli implements Callable<Integer> {
/** Sentinel written to stdout after every command result in batch mode. */
static final String BATCH_EOR = "__MXGW_BATCH_EOR__";
/** Sentinel queued by {@code stream-alarms} to mark a clean end of the alarm feed. */
private static final Object ALARM_FEED_END = new Object();
/**
* Reads one CLI invocation per stdin line, executes each via a fresh
* {@link CommandLine}, and writes {@value #BATCH_EOR} to stdout after
@@ -1155,6 +1170,115 @@ public final class MxGatewayCli implements Callable<Integer> {
}
}
@Command(name = "stream-alarms", description = "Streams the gateway central alarm feed.")
static final class StreamAlarmsCommand extends GatewayCommand {
@Option(names = "--filter-prefix", description = "Alarm-reference prefix scoping the feed; empty means unscoped.")
String filterPrefix = "";
@Option(names = "--limit", defaultValue = "0", description = "Maximum feed messages to print.")
int limit;
StreamAlarmsCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
// The async alarm feed delivers on a background gRPC thread; buffer
// messages in a bounded queue and drain them on this thread so the
// --limit termination mirrors stream-events. 1024 absorbs the
// gateway's initial active-alarm snapshot burst.
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(1024);
StreamAlarmsRequest request = StreamAlarmsRequest.newBuilder()
.setAlarmFilterPrefix(filterPrefix)
.build();
MxGatewayAlarmFeedSubscription subscription =
client.streamAlarms(request, new StreamObserver<>() {
@Override
public void onNext(AlarmFeedMessage value) {
queue.offer(value);
}
@Override
public void onError(Throwable error) {
queue.offer(error);
}
@Override
public void onCompleted() {
queue.offer(ALARM_FEED_END);
}
});
try {
int count = 0;
while (true) {
Object item = queue.take();
if (item == ALARM_FEED_END) {
break;
}
if (item instanceof Throwable error) {
throw new IllegalStateException(
"gateway stream alarms failed: " + error.getMessage(), error);
}
AlarmFeedMessage message = (AlarmFeedMessage) item;
if (json) {
client.out().println(protoJson(message));
} else {
client.out().println(formatAlarmFeedMessage(message));
}
client.out().flush();
count++;
if (limit > 0 && count >= limit) {
subscription.cancel();
break;
}
}
} catch (InterruptedException error) {
Thread.currentThread().interrupt();
subscription.cancel();
} finally {
subscription.cancel();
}
}
return 0;
}
}
@Command(name = "acknowledge-alarm", description = "Acknowledges an active MXAccess alarm.")
static final class AcknowledgeAlarmCommand extends GatewayCommand {
@Option(names = "--reference", required = true, description = "Full alarm reference to acknowledge.")
String reference;
@Option(names = "--comment", description = "Operator acknowledge comment.")
String comment = "";
@Option(names = "--operator", description = "Operator user performing the acknowledge.")
String operator = "";
AcknowledgeAlarmCommand(MxGatewayCliClientFactory clientFactory) {
super(clientFactory);
}
@Override
public Integer call() {
try (MxGatewayCliClient client = clientFactory.connect(common.resolved())) {
AcknowledgeAlarmReply reply = client.acknowledgeAlarm(AcknowledgeAlarmRequest.newBuilder()
.setAlarmFullReference(reference)
.setComment(comment)
.setOperatorUser(operator)
.build());
writeOutput(
"acknowledge-alarm",
common,
json,
reply,
() -> Integer.toString(reply.getHresult()));
}
return 0;
}
}
@Command(name = "smoke", description = "Runs a bounded open/register/add/advise flow.")
static final class SmokeCommand extends GatewayCommand {
@Option(names = "--client-name", defaultValue = "mxgw-java-smoke", description = "MXAccess client name.")
@@ -1329,6 +1453,11 @@ public final class MxGatewayCli implements Callable<Integer> {
MxGatewayCliSession session(String sessionId);
AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request);
MxGatewayAlarmFeedSubscription streamAlarms(
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer);
@Override
void close();
}
@@ -1401,6 +1530,17 @@ public final class MxGatewayCli implements Callable<Integer> {
return new GrpcMxGatewayCliSession(MxGatewaySession.forSessionId(client, sessionId));
}
@Override
public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) {
return client.acknowledgeAlarm(request);
}
@Override
public MxGatewayAlarmFeedSubscription streamAlarms(
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer) {
return client.streamAlarms(request, observer);
}
@Override
public void close() {
client.close();
@@ -1576,6 +1716,32 @@ public final class MxGatewayCli implements Callable<Integer> {
return values;
}
/**
* Renders one {@link AlarmFeedMessage} in the CLI's plain-text output
* style, distinguishing the active-alarm snapshot, snapshot-complete
* sentinel, and transition cases of the message's {@code payload} oneof.
*/
private static String formatAlarmFeedMessage(AlarmFeedMessage message) {
return switch (message.getPayloadCase()) {
case ACTIVE_ALARM -> {
ActiveAlarmSnapshot alarm = message.getActiveAlarm();
yield String.format(
"active-alarm %s state=%s severity=%d",
alarm.getAlarmFullReference(), alarm.getCurrentState().name(), alarm.getSeverity());
}
case SNAPSHOT_COMPLETE -> "snapshot-complete";
case TRANSITION -> {
OnAlarmTransitionEvent transition = message.getTransition();
yield String.format(
"transition %s kind=%s severity=%d",
transition.getAlarmFullReference(),
transition.getTransitionKind().name(),
transition.getSeverity());
}
case PAYLOAD_NOT_SET -> "unknown";
};
}
private static MxValue parseValue(String type, String text) {
return switch (type) {
case "bool" -> MxValues.boolValue(Boolean.parseBoolean(text));
@@ -8,10 +8,18 @@ import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import com.dohertylan.mxgateway.client.MxGatewayAlarmFeedSubscription;
import io.grpc.stub.StreamObserver;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmReply;
import mxaccess_gateway.v1.MxaccessGateway.AcknowledgeAlarmRequest;
import mxaccess_gateway.v1.MxaccessGateway.ActiveAlarmSnapshot;
import mxaccess_gateway.v1.MxaccessGateway.AddItemReply;
import mxaccess_gateway.v1.MxaccessGateway.AlarmConditionState;
import mxaccess_gateway.v1.MxaccessGateway.AlarmFeedMessage;
import mxaccess_gateway.v1.MxaccessGateway.AlarmTransitionKind;
import mxaccess_gateway.v1.MxaccessGateway.BulkReadResult;
import mxaccess_gateway.v1.MxaccessGateway.BulkWriteResult;
import mxaccess_gateway.v1.MxaccessGateway.CloseSessionReply;
@@ -20,9 +28,11 @@ import mxaccess_gateway.v1.MxaccessGateway.MxCommandKind;
import mxaccess_gateway.v1.MxaccessGateway.MxCommandReply;
import mxaccess_gateway.v1.MxaccessGateway.MxEvent;
import mxaccess_gateway.v1.MxaccessGateway.MxValue;
import mxaccess_gateway.v1.MxaccessGateway.OnAlarmTransitionEvent;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionReply;
import mxaccess_gateway.v1.MxaccessGateway.OpenSessionRequest;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatus;
import mxaccess_gateway.v1.MxaccessGateway.StreamAlarmsRequest;
import mxaccess_gateway.v1.MxaccessGateway.ProtocolStatusCode;
import mxaccess_gateway.v1.MxaccessGateway.RegisterReply;
import mxaccess_gateway.v1.MxaccessGateway.SessionState;
@@ -389,6 +399,70 @@ final class MxGatewayCliTests {
assertTrue(output.contains("TestMachine_002.TestChangingInt"), output);
}
// ---- stream-alarms / acknowledge-alarm subcommands ----
@Test
void streamAlarmsCommandForwardsFilterPrefixAndPrintsFeedMessages() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(factory, "stream-alarms", "--filter-prefix", "Tank01");
assertEquals(0, run.exitCode());
assertEquals("Tank01", factory.client.lastStreamAlarmsRequest.getAlarmFilterPrefix());
String out = run.output();
assertTrue(out.contains("active-alarm Tank01.Level.HiHi"), out);
assertTrue(out.contains("snapshot-complete"), out);
assertTrue(out.contains("transition Tank01.Level.HiHi"), out);
}
@Test
void streamAlarmsCommandHonoursLimit() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(factory, "stream-alarms", "--limit", "1");
assertEquals(0, run.exitCode());
long lines = run.output().lines().filter(line -> !line.isBlank()).count();
assertEquals(1, lines, "expected exactly one feed message with --limit 1, got: " + run.output());
}
@Test
void streamAlarmsCommandPrintsJson() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(factory, "stream-alarms", "--json");
assertEquals(0, run.exitCode());
assertTrue(run.output().contains("\"activeAlarm\""), run.output());
assertTrue(run.output().contains("\"snapshotComplete\""), run.output());
}
@Test
void acknowledgeAlarmCommandForwardsOptionsAndPrintsReply() {
FakeClientFactory factory = new FakeClientFactory();
CliRun run = execute(
factory,
"acknowledge-alarm",
"--reference",
"Tank01.Level.HiHi",
"--comment",
"checked",
"--operator",
"operator1",
"--json");
assertEquals(0, run.exitCode());
assertEquals("Tank01.Level.HiHi", factory.client.lastAcknowledgeAlarmRequest.getAlarmFullReference());
assertEquals("checked", factory.client.lastAcknowledgeAlarmRequest.getComment());
assertEquals("operator1", factory.client.lastAcknowledgeAlarmRequest.getOperatorUser());
assertTrue(run.output().contains("\"command\":\"acknowledge-alarm\""), run.output());
}
@Test
void acknowledgeAlarmCommandRequiresReference() {
CliRun run = execute(new FakeClientFactory(), "acknowledge-alarm", "--comment", "checked");
assertFalse(run.exitCode() == 0, "expected non-zero exit without --reference");
assertTrue(run.errors().contains("--reference"), run.errors());
}
// ---- Client.Java-027: batch subcommand ----
@Test
@@ -501,6 +575,8 @@ final class MxGatewayCliTests {
private final PrintWriter out;
private final FakeSession session = new FakeSession();
private boolean closeCalled;
private AcknowledgeAlarmRequest lastAcknowledgeAlarmRequest;
private StreamAlarmsRequest lastStreamAlarmsRequest;
private FakeClient(PrintWriter out) {
this.out = out;
@@ -534,6 +610,40 @@ final class MxGatewayCliTests {
return session;
}
@Override
public AcknowledgeAlarmReply acknowledgeAlarm(AcknowledgeAlarmRequest request) {
lastAcknowledgeAlarmRequest = request;
return AcknowledgeAlarmReply.newBuilder()
.setCorrelationId(request.getClientCorrelationId())
.setProtocolStatus(ok())
.setHresult(0)
.build();
}
@Override
public MxGatewayAlarmFeedSubscription streamAlarms(
StreamAlarmsRequest request, StreamObserver<AlarmFeedMessage> observer) {
lastStreamAlarmsRequest = request;
// Replay a deterministic active-alarm snapshot, snapshot-complete
// sentinel, transition, then complete the feed so the CLI command
// drains a bounded stream without contacting a live gateway.
observer.onNext(AlarmFeedMessage.newBuilder()
.setActiveAlarm(ActiveAlarmSnapshot.newBuilder()
.setAlarmFullReference("Tank01.Level.HiHi")
.setCurrentState(AlarmConditionState.ALARM_CONDITION_STATE_ACTIVE)
.setSeverity(700))
.build());
observer.onNext(AlarmFeedMessage.newBuilder().setSnapshotComplete(true).build());
observer.onNext(AlarmFeedMessage.newBuilder()
.setTransition(OnAlarmTransitionEvent.newBuilder()
.setAlarmFullReference("Tank01.Level.HiHi")
.setTransitionKind(AlarmTransitionKind.ALARM_TRANSITION_KIND_ACKNOWLEDGE)
.setSeverity(700))
.build());
observer.onCompleted();
return new MxGatewayAlarmFeedSubscription();
}
@Override
public void close() {
}
@@ -404,6 +404,40 @@ def stream_events(**kwargs: Any) -> None:
)
@main.command("stream-alarms")
@gateway_options
@click.option("--filter-prefix", default="", help="Alarm-reference prefix filter.")
@click.option("--max-messages", default=1, type=int, show_default=True)
@click.option("--timeout", default=5.0, type=float, show_default=True)
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def stream_alarms(**kwargs: Any) -> None:
"""Stream a bounded number of messages from the gateway's central alarm feed."""
_run(
_stream_alarms(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command("acknowledge-alarm")
@gateway_options
@click.option("--reference", required=True, help="Alarm full reference to acknowledge.")
@click.option("--comment", default="", help="Acknowledgement comment.")
@click.option("--operator", default="", help="Operator user name.")
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def acknowledge_alarm(**kwargs: Any) -> None:
"""Acknowledge an active MXAccess alarm condition (session-less)."""
_run(
_acknowledge_alarm(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command()
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@@ -779,6 +813,34 @@ async def _stream_events(**kwargs: Any) -> dict[str, Any]:
return {"events": [_message_dict(event) for event in events]}
async def _stream_alarms(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
messages = await _collect_alarm_messages(
client.stream_alarms(
pb.StreamAlarmsRequest(
client_correlation_id=kwargs["correlation_id"],
alarm_filter_prefix=kwargs["filter_prefix"],
),
),
max_messages=kwargs["max_messages"],
timeout=kwargs["timeout"],
)
return {"messages": [_message_dict(message) for message in messages]}
async def _acknowledge_alarm(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
reply = await client.acknowledge_alarm(
pb.AcknowledgeAlarmRequest(
client_correlation_id=kwargs["correlation_id"],
alarm_full_reference=kwargs["reference"],
comment=kwargs["comment"],
operator_user=kwargs["operator"],
),
)
return {"rawReply": _message_dict(reply)}
async def _write(**kwargs: Any) -> dict[str, Any]:
value = _parse_value(kwargs["value"], kwargs["value_type"])
async with await _connect(kwargs) as client:
@@ -936,6 +998,34 @@ async def _collect_events(
return collected
async def _collect_alarm_messages(
messages: Any,
*,
max_messages: int,
timeout: float,
) -> list[pb.AlarmFeedMessage]:
if max_messages > MAX_AGGREGATE_EVENTS:
raise click.BadParameter(
f"must be less than or equal to {MAX_AGGREGATE_EVENTS}",
param_hint="--max-messages",
)
collected: list[pb.AlarmFeedMessage] = []
iterator = messages.__aiter__()
try:
while len(collected) < max_messages:
collected.append(await asyncio.wait_for(iterator.__anext__(), timeout=timeout))
except StopAsyncIteration:
pass
except asyncio.TimeoutError:
pass
finally:
close = getattr(iterator, "aclose", None)
if close is not None:
await close()
return collected
def _parse_value(raw_value: str, value_type: str) -> MxValueInput:
normalized = value_type.lower()
if normalized == "bool":
+22
View File
@@ -52,6 +52,28 @@ def test_write_parser_rejects_unknown_value_type() -> None:
assert "unsupported value type" in result.output
def test_stream_alarms_is_registered() -> None:
runner = CliRunner()
result = runner.invoke(main, ["stream-alarms", "--help"])
assert result.exit_code == 0
assert "--filter-prefix" in result.output
assert "--max-messages" in result.output
def test_acknowledge_alarm_requires_reference() -> None:
runner = CliRunner()
result = runner.invoke(
main,
["acknowledge-alarm", "--api-key", "mxgw_test_secret", "--json"],
)
assert result.exit_code != 0
assert "--reference" in result.output
def test_cli_error_output_redacts_api_key() -> None:
runner = CliRunner()
+274 -8
View File
@@ -18,8 +18,9 @@ use clap::{Args, Parser, Subcommand, ValueEnum};
use futures_util::StreamExt;
use mxgateway_client::generated::galaxy_repository::v1::DeployEvent;
use mxgateway_client::generated::mxaccess_gateway::v1::{
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily,
MxValue as ProtoMxValue, OpenSessionRequest, PingCommand, StreamEventsRequest, Write2BulkEntry,
alarm_feed_message, AcknowledgeAlarmRequest, AlarmFeedMessage, CloseSessionRequest, MxCommand,
MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily, MxValue as ProtoMxValue,
OpenSessionRequest, PingCommand, StreamAlarmsRequest, StreamEventsRequest, Write2BulkEntry,
WriteBulkEntry, WriteSecured2BulkEntry, WriteSecuredBulkEntry,
};
use mxgateway_client::{
@@ -272,6 +273,24 @@ enum Command {
#[arg(long)]
jsonl: bool,
},
/// Attach to the gateway's session-less central alarm feed. The stream
/// opens with one `active_alarm` per currently-active alarm, then a
/// single `snapshot_complete`, then a `transition` for every subsequent
/// raise / acknowledge / clear.
StreamAlarms {
#[command(flatten)]
connection: ConnectionArgs,
/// Optional alarm-reference prefix scoping the feed to an equipment
/// sub-tree. Omit to stream every active alarm.
#[arg(long)]
filter_prefix: Option<String>,
#[arg(long, default_value_t = 1)]
max_events: usize,
#[arg(long)]
json: bool,
#[arg(long)]
jsonl: bool,
},
Write {
#[command(flatten)]
connection: ConnectionArgs,
@@ -310,6 +329,20 @@ enum Command {
#[arg(long)]
json: bool,
},
/// Acknowledge an active MXAccess alarm condition through the gateway's
/// session-less AcknowledgeAlarm RPC.
AcknowledgeAlarm {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
reference: String,
#[arg(long, default_value = "")]
comment: String,
#[arg(long, default_value = "")]
operator: String,
#[arg(long)]
json: bool,
},
Smoke {
#[command(flatten)]
connection: ConnectionArgs,
@@ -432,13 +465,32 @@ enum CliValueType {
String,
}
#[tokio::main]
async fn main() -> ExitCode {
/// Entry point. The real work runs on a dedicated thread with a large stack:
/// clap's derive-generated argument parser is deeply recursive, and in debug
/// builds (no inlining) parsing the `Command` enum can exhaust the default
/// 8 MiB main-thread stack as the enum grows. A 32 MiB worker stack keeps the
/// CLI robust regardless of build profile or future subcommand growth.
fn main() -> ExitCode {
let worker = std::thread::Builder::new()
.name("mxgw-cli".to_owned())
.stack_size(32 * 1024 * 1024)
.spawn(run)
.expect("failed to spawn the CLI worker thread");
worker.join().expect("the CLI worker thread panicked")
}
fn run() -> ExitCode {
let cli = Cli::parse();
let result = match cli.command {
Command::Batch => run_batch().await,
command => dispatch(command).await,
};
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build the Tokio runtime");
let result = runtime.block_on(async {
match cli.command {
Command::Batch => run_batch().await,
command => dispatch(command).await,
}
});
match result {
Ok(()) => ExitCode::SUCCESS,
Err(error) => {
@@ -788,6 +840,52 @@ async fn dispatch(command: Command) -> Result<(), Error> {
println!("{}", json!({ "eventCount": event_count, "events": events }));
}
}
Command::StreamAlarms {
connection,
filter_prefix,
max_events,
json,
jsonl,
} => {
if max_events > MAX_AGGREGATE_EVENTS {
return Err(Error::InvalidArgument {
name: "max-events".to_owned(),
detail: format!("must be less than or equal to {MAX_AGGREGATE_EVENTS}"),
});
}
let client = connect(connection).await?;
let mut stream = client
.stream_alarms(StreamAlarmsRequest {
client_correlation_id: mxgateway_client::next_correlation_id(
"cli-stream-alarms",
),
alarm_filter_prefix: filter_prefix.unwrap_or_default(),
})
.await?;
let mut messages: Vec<Value> = Vec::new();
let mut message_count = 0usize;
while message_count < max_events {
let Some(message) = stream.next().await else {
break;
};
let message = message?;
message_count += 1;
if jsonl {
println!("{}", alarm_feed_message_to_json(&message));
} else if json {
messages.push(alarm_feed_message_to_json(&message));
} else {
println!("{}", alarm_feed_message_summary(&message));
}
}
if json {
println!(
"{}",
json!({ "messageCount": message_count, "messages": messages })
);
}
}
Command::Write {
connection,
session_id,
@@ -832,6 +930,26 @@ async fn dispatch(command: Command) -> Result<(), Error> {
.await?;
print_ok("write2", json);
}
Command::AcknowledgeAlarm {
connection,
reference,
comment,
operator,
json,
} => {
let client = connect(connection).await?;
let reply = client
.acknowledge_alarm(AcknowledgeAlarmRequest {
client_correlation_id: mxgateway_client::next_correlation_id(
"cli-acknowledge-alarm",
),
alarm_full_reference: reference,
comment,
operator_user: operator,
})
.await?;
print_acknowledge_alarm_reply(&reply, json);
}
Command::Galaxy(galaxy_command) => run_galaxy(galaxy_command).await?,
Command::Smoke {
connection,
@@ -1533,6 +1651,113 @@ fn print_deploy_event(event: &DeployEvent, use_json: bool) {
}
}
/// Render a streamed [`AlarmFeedMessage`] as a terse one-line summary that
/// distinguishes the three `payload` oneof cases.
fn alarm_feed_message_summary(message: &AlarmFeedMessage) -> String {
match &message.payload {
Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => {
format!(
"active-alarm {} state={}",
snapshot.alarm_full_reference,
AlarmEnumName::condition_state(snapshot.current_state)
)
}
Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => {
format!("snapshot-complete {complete}")
}
Some(alarm_feed_message::Payload::Transition(transition)) => {
format!(
"transition {} kind={}",
transition.alarm_full_reference,
AlarmEnumName::transition_kind(transition.transition_kind)
)
}
None => "(empty)".to_owned(),
}
}
/// Render a streamed [`AlarmFeedMessage`] as a JSON object whose single
/// top-level key names the active `payload` oneof case, mirroring the
/// protobuf-JSON the .NET/Go/Java/Python CLIs emit.
fn alarm_feed_message_to_json(message: &AlarmFeedMessage) -> Value {
match &message.payload {
Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => json!({
"activeAlarm": {
"alarmFullReference": snapshot.alarm_full_reference,
"sourceObjectReference": snapshot.source_object_reference,
"alarmTypeName": snapshot.alarm_type_name,
"severity": snapshot.severity,
"currentState": AlarmEnumName::condition_state(snapshot.current_state),
"category": snapshot.category,
"description": snapshot.description,
"operatorUser": snapshot.operator_user,
"operatorComment": snapshot.operator_comment,
}
}),
Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => json!({
"snapshotComplete": complete,
}),
Some(alarm_feed_message::Payload::Transition(transition)) => json!({
"transition": {
"alarmFullReference": transition.alarm_full_reference,
"sourceObjectReference": transition.source_object_reference,
"alarmTypeName": transition.alarm_type_name,
"transitionKind": AlarmEnumName::transition_kind(transition.transition_kind),
"severity": transition.severity,
"operatorUser": transition.operator_user,
"operatorComment": transition.operator_comment,
"category": transition.category,
"description": transition.description,
}
}),
None => Value::Null,
}
}
/// Tiny namespace for alarm-enum name lookups used by the alarm-feed
/// renderers; keeps the proto-enum imports off the `main.rs` top level.
struct AlarmEnumName;
impl AlarmEnumName {
fn condition_state(value: i32) -> String {
use mxgateway_client::generated::mxaccess_gateway::v1::AlarmConditionState;
AlarmConditionState::try_from(value)
.map(|state| state.as_str_name().to_owned())
.unwrap_or_else(|_| value.to_string())
}
fn transition_kind(value: i32) -> String {
use mxgateway_client::generated::mxaccess_gateway::v1::AlarmTransitionKind;
AlarmTransitionKind::try_from(value)
.map(|kind| kind.as_str_name().to_owned())
.unwrap_or_else(|_| value.to_string())
}
}
/// Render an [`AcknowledgeAlarmReply`] as a terse line or a JSON document.
fn print_acknowledge_alarm_reply(
reply: &mxgateway_client::generated::mxaccess_gateway::v1::AcknowledgeAlarmReply,
use_json: bool,
) {
if use_json {
println!(
"{}",
json!({
"operation": "acknowledge-alarm",
"correlationId": reply.correlation_id,
"protocolStatus": reply.protocol_status.as_ref().map(|status| json!({
"code": status.code,
"message": status.message,
})),
"hresult": reply.hresult,
"diagnosticMessage": reply.diagnostic_message,
})
);
} else {
println!("acknowledge-alarm completed");
}
}
/// Render a streamed [`MxEvent`] as a JSON object. The scalar value is
/// projected into protojson-style `*Value` keys so the cross-language e2e
/// matrix can extract and compare event values uniformly across all five
@@ -1793,6 +2018,47 @@ mod tests {
);
}
#[test]
fn parses_stream_alarms_command() {
let parsed = Cli::try_parse_from([
"mxgw",
"stream-alarms",
"--filter-prefix",
"Tank01",
"--max-events",
"3",
"--json",
]);
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
}
#[test]
fn parses_stream_alarms_command_without_filter_prefix() {
let parsed = Cli::try_parse_from(["mxgw", "stream-alarms"]);
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
}
#[test]
fn parses_acknowledge_alarm_command() {
let parsed = Cli::try_parse_from([
"mxgw",
"acknowledge-alarm",
"--reference",
"Tank01.Level.HiHi",
"--comment",
"ack from cli",
"--operator",
"operator1",
]);
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
}
#[test]
fn acknowledge_alarm_requires_reference() {
let parsed = Cli::try_parse_from(["mxgw", "acknowledge-alarm"]);
assert!(parsed.is_err());
}
#[test]
fn parses_galaxy_watch_command_with_last_seen_and_max_events() {
let parsed = Cli::try_parse_from([
+10 -2
View File
@@ -17,7 +17,7 @@ Each module's `findings.md` is the source of truth; this file is generated from
| [Client.Rust](Client.Rust/findings.md) | Claude Code | 2026-05-20 | `a020350` | Reviewed | 0 | 20 |
| [Contracts](Contracts/findings.md) | Claude Code | 2026-05-20 | `a020350` | Reviewed | 0 | 15 |
| [IntegrationTests](IntegrationTests/findings.md) | Claude Code | 2026-05-20 | `a020350` | Reviewed | 0 | 21 |
| [Server](Server/findings.md) | Claude Code | 2026-05-20 | `a020350` | Reviewed | 0 | 30 |
| [Server](Server/findings.md) | Claude Code | 2026-05-22 | `fa491c7` | Reviewed | 2 | 37 |
| [Tests](Tests/findings.md) | Claude Code | 2026-05-20 | `a020350` | Reviewed | 0 | 24 |
| [Worker](Worker/findings.md) | Claude Code | 2026-05-20 | `a020350` | Reviewed | 0 | 25 |
| [Worker.Tests](Worker.Tests/findings.md) | Claude Code | 2026-05-20 | `a020350` | Reviewed | 0 | 30 |
@@ -26,7 +26,10 @@ Each module's `findings.md` is the source of truth; this file is generated from
Findings with status `Open` or `In Progress`, ordered by severity.
_No pending findings._
| ID | Severity | Category | Location | Description |
|---|---|---|---|---|
| Server-031 | Medium | Concurrency & thread safety | `src/MxGateway.Server/Workers/WorkerClient.cs:392-422` (gateway-side heartbeat watchdog); `src/MxGateway.Worker/Ipc/WorkerPipeSession.cs:588-617` (worker-side heartbeat loop); `src/MxGateway.Worker/Ipc/WorkerFrameWriter.cs:14,67-76` (shared `_writeLock`) | Surfaced during the 2026-05-20 cross-language e2e re-run against gateway `b794c46`. The .NET phase succeeded through `open-session`/`register`/`bulk-subscribe`/`bulk-read`/`bulk-unsubscribe`/`stream-events`/`write` but then failed on its t… |
| Server-032 | Medium | Error handling & resilience | `src/MxGateway.Server/Workers/WorkerClient.cs:70-77,463-484` (gateway-side `_events` channel); `src/MxGateway.Server/Configuration/EventOptions.cs:8` (default capacity 10,000); `src/MxGateway.Server/Grpc/EventStreamService.cs` (consumer) | Surfaced during the 2026-05-20 cross-language e2e re-run against gateway `b794c46`. The Java phase advised ~55 items (`item-handle 63`) before failing on the next `advise` call with the Server-030 diagnostic `Session ... is not ready. Sess… |
## Closed findings
@@ -94,6 +97,7 @@ Findings with status `Resolved`, `Won't Fix`, or `Deferred`.
| Server-016 | Medium | Resolved | Error handling & resilience | `src/MxGateway.Server/Sessions/GatewaySession.cs:790-797`, `src/MxGateway.Server/Sessions/SessionManager.cs:237-258` |
| Server-021 | Medium | Resolved | Testing coverage | `src/MxGateway.Server/Grpc/MxAccessGatewayService.cs:266-664`, `src/MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceTests.cs` |
| Server-030 | Medium | Resolved | Error handling & resilience | `src/MxGateway.Server/Sessions/GatewaySession.cs:952-980` |
| Server-033 | Medium | Resolved | Error handling & resilience | `src/MxGateway.Server/Galaxy/GalaxyHierarchyCache.cs:265-323` (`TryRestoreFromDiskAsync`), `:84-99` (`_firstLoad` / `WaitForFirstLoadAsync`); `src/MxGateway.Server/Grpc/GalaxyRepositoryGrpcService.cs:141-163` (`WaitForCacheBootstrap`) |
| Tests-003 | Medium | Resolved | Performance & resource management | `src/MxGateway.Tests/Security/Authentication/SqliteAuthStoreTests.cs:170-176`, `src/MxGateway.Tests/Security/Authentication/ApiKeyAdminCliRunnerTests.cs:252-258` |
| Tests-004 | Medium | Resolved | Testing coverage | `src/MxGateway.Tests/Security/Authorization/GatewayGrpcAuthorizationInterceptorTests.cs` |
| Tests-005 | Medium | Resolved | Testing coverage | `src/MxGateway.Tests/Gateway/Grpc/EventStreamServiceTests.cs:239-261`, `src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs` |
@@ -235,6 +239,10 @@ Findings with status `Resolved`, `Won't Fix`, or `Deferred`.
| Server-027 | Low | Resolved | Design-document adherence | `docs/Authorization.md:120-141,176-181` |
| Server-028 | Low | Resolved | Testing coverage | `src/MxGateway.Tests/Security/Authorization/GatewayGrpcScopeResolverTests.cs:13-20`, `src/MxGateway.Tests/Gateway/Sessions/GatewaySessionTests.cs` |
| Server-029 | Low | Resolved | Documentation & comments | `src/MxGateway.Server/Grpc/MxAccessGatewayService.cs:52-58` |
| Server-034 | Low | Resolved | Error handling & resilience | `src/MxGateway.Server/Galaxy/GalaxyHierarchySnapshotStore.cs:87-115` (`TryLoadAsync`) |
| Server-035 | Low | Resolved | Performance & resource management | `src/MxGateway.Server/Galaxy/GalaxyHierarchyCache.cs:176` (call site), `:327-352` (`PersistSnapshotAsync`) |
| Server-036 | Low | Resolved | Error handling & resilience | `src/MxGateway.Server/Galaxy/GalaxyHierarchyCache.cs:345-348` (`PersistSnapshotAsync` catch) |
| Server-037 | Low | Resolved | Testing coverage | `src/MxGateway.Tests/Galaxy/GalaxyHierarchySnapshotStoreTests.cs`, `src/MxGateway.Tests/Galaxy/GalaxyHierarchyCacheTests.cs` |
| Tests-007 | Low | Resolved | Code organization & conventions | `src/MxGateway.Tests/Gateway/Grpc/MxAccessGatewayServiceTests.cs:682`, `src/MxGateway.Tests/Gateway/Grpc/GalaxyRepositoryGrpcServiceTests.cs:324`, `src/MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs:460`, `src/MxGateway.Tests/Security/Authorization/GatewayGrpcAuthorizationInterceptorTests.cs:233` |
| Tests-008 | Low | Resolved | mxaccessgw conventions | `src/MxGateway.Tests/Gateway/Sessions/WorkerAlarmRpcDispatcherTests.cs:1-9`, `src/MxGateway.Tests/Gateway/Sessions/NotWiredAlarmRpcDispatcherTests.cs:1-3`, `src/MxGateway.Tests/Gateway/Sessions/SessionManagerAlarmAutoSubscribeTests.cs:1` |
| Tests-009 | Low | Resolved | Documentation & comments | `src/MxGateway.Tests/Gateway/Sessions/SessionManagerTests.cs:36-37,99,365` |
+100 -3
View File
@@ -4,10 +4,10 @@
|---|---|
| Module | `src/MxGateway.Server` |
| Reviewer | Claude Code |
| Review date | 2026-05-20 |
| Commit reviewed | `a020350` |
| Review date | 2026-05-22 |
| Commit reviewed | `fa491c7` |
| Status | Reviewed |
| Open findings | 0 |
| Open findings | 2 |
## Checklist coverage
@@ -47,6 +47,28 @@ Re-review pass at `a020350` — the cross-module sweep that resolved Server-015
| 9 | Testing coverage | Issues found: Server-028 (`GatewayGrpcScopeResolverTests` does not exercise `WatchDeployEventsRequest` or `MxCommandKind.ReadBulk`; no `GatewaySessionTests` case asserts a `MarkFaulted` during in-flight Close). |
| 10 | Documentation & comments | Issues found: Server-023 (`NotWiredAlarmRpcDispatcher` class XML doc still says "PR A.6/A.7 — default … shipped while the worker-side AlarmClient event subscription is gated on dev-rig validation"; contradicts the cleanup that Server-014/Server-022 applied to the interface, gateway service, and `WorkerAlarmRpcDispatcher`). Issues found: Server-029 (`OpenSession` capability list advertises `bulk-subscribe-commands` but not the now-shipping bulk-read or bulk-write families — clients that gate on capability strings have no signal that those families exist). |
### 2026-05-22 review (commit fa491c7)
Re-review pass at `fa491c7`, scoped to the Galaxy hierarchy snapshot-persistence
change: the new `GalaxyHierarchySnapshot`, `IGalaxyHierarchySnapshotStore` /
`GalaxyHierarchySnapshotStore`, the restore / persist paths added to
`GalaxyHierarchyCache`, the two new `GalaxyRepositoryOptions`, and the
`docs/GalaxyRepository.md` / `docs/GatewayConfiguration.md` updates. Prior
findings (Server-001 through Server-032) are unchanged by this pass.
| # | Category | Result |
|---|---|---|
| 1 | Correctness & logic bugs | No issues found — restore/save sequencing and the shared `BuildEntry` materialization are sound. |
| 2 | mxaccessgw conventions | No issues found — file-scoped namespaces, `sealed`, `Async` suffixes, Options pattern, and XML docs all conform; the snapshot persists Galaxy metadata (names/types), not tag values or secrets. |
| 3 | Concurrency & thread safety | No issues found — `_restoreAttempted` and `_current` are touched only under `_refreshGate`; `_current` is published via `Volatile.Write`; the store serializes its file I/O on a private `SemaphoreSlim`. |
| 4 | Error handling & resilience | Issues found: Server-033 (restore never completes `_firstLoad`, so a cold-start browse waits the full 5s bootstrap budget), Server-034 (`TryLoadAsync` throws on a corrupt file despite the `Try` prefix), Server-036 (a save cancelled at shutdown logs a misleading warning). |
| 5 | Security | No issues found — the snapshot holds non-secret Galaxy metadata, is written under `C:\ProgramData\MxGateway` alongside the auth DB, and restored rows flow the same materialization path as live SQL with no injection surface. |
| 6 | Performance & resource management | Issues found: Server-035 (the snapshot write is awaited on the refresh critical path under `_refreshGate` with no timeout). |
| 7 | Design-document adherence | No issues found — `docs/GalaxyRepository.md` and `docs/GatewayConfiguration.md` were updated in the same commit; `docs/DesignDecisions.md` already defers to `GalaxyRepository.md` as the Galaxy authority. |
| 8 | Code organization & conventions | No issues found — the new options live on `GalaxyRepositoryOptions`, the store is a registered singleton, and the on-disk envelope (`PersistedFile`) is a private nested record. |
| 9 | Testing coverage | Issues found: Server-037 (no test for the corrupt-snapshot restore path or for `PersistSnapshot = false` at the cache level). |
| 10 | Documentation & comments | No issues found — XML docs match behavior; the `GalaxyRepository.md` "On-disk snapshot" section documents the Stale-on-restore lifecycle. |
## Findings
### Server-001
@@ -568,3 +590,78 @@ The diagnostic `"Worker event channel rejected an event."` also does not name th
Add a regression test that advises N items without an active `StreamEvents` consumer, lets the channel fill, and asserts the produced fault message contains the channel-depth diagnostic (#2) — gated so that #3 is not required.
**Resolution:** _(empty until closed; on close, record the fixing commit SHA, the date, and a one-line description of the fix)_
### Server-033
| Field | Value |
|---|---|
| Severity | Medium |
| Category | Error handling & resilience |
| Location | `src/MxGateway.Server/Galaxy/GalaxyHierarchyCache.cs:265-323` (`TryRestoreFromDiskAsync`), `:84-99` (`_firstLoad` / `WaitForFirstLoadAsync`); `src/MxGateway.Server/Grpc/GalaxyRepositoryGrpcService.cs:141-163` (`WaitForCacheBootstrap`) |
| Status | Resolved |
**Description:** `TryRestoreFromDiskAsync` populates `_current` with the on-disk snapshot (status `Stale`, `HasData == true`) but never completes the `_firstLoad` `TaskCompletionSource` — only the live-query paths (cheap / heavy / catch) in `RefreshCoreAsync` do. A `DiscoverHierarchy` or `GetLastDeployTime` call that arrives after gateway start but before the first refresh tick finishes sees `cache.Current` as `Empty` (status `Unknown`) when `WaitForCacheBootstrap` runs its initial check, so it falls through to `await WaitForFirstLoadAsync` with a 5-second budget. Restore then completes within milliseconds and makes the data available, but `_firstLoad` stays pending until the live query returns or fails. When the Galaxy database is unreachable — the exact scenario the snapshot feature exists for — the SQL connect attempt outlasts the 5s budget, so the caller waits the full 5 seconds before the budget elapses and the handler falls through to read the (already-restored) data. The result is correct, but the first browse calls after a cold offline start incur a needless ~5s latency, undercutting the feature's purpose.
**Recommendation:** Call `_firstLoad.TrySetResult()` at the end of `TryRestoreFromDiskAsync` once the restored entry is published — restored data is a valid completed first load. Add a regression test: a cache with a throwing repository plus a populated snapshot store should have `WaitForFirstLoadAsync` complete promptly after `RefreshAsync`, not block on the live query.
**Resolution:** Resolved in `bdccdbf` (2026-05-22): `TryRestoreFromDiskAsync` calls `_firstLoad.TrySetResult()` immediately after publishing the restored entry, so a restored snapshot satisfies the bootstrap gate without waiting on the live query. New test `GalaxyHierarchyCacheTests.RefreshAsync_RestoredSnapshotCompletesFirstLoadBeforeLiveQueryReturns` blocks the repository's deploy-time query and asserts `WaitForFirstLoadAsync` still completes from the snapshot.
### Server-034
| Field | Value |
|---|---|
| Severity | Low |
| Category | Error handling & resilience |
| Location | `src/MxGateway.Server/Galaxy/GalaxyHierarchySnapshotStore.cs:87-115` (`TryLoadAsync`) |
| Status | Resolved |
**Description:** `TryLoadAsync` carries the `Try` prefix and its XML doc says it returns `null` "when none exists, persistence is disabled, or the on-disk file uses an unrecognized schema version." But a corrupt or partially written JSON file makes `JsonSerializer.DeserializeAsync` throw `JsonException`, and an unreadable file (locked, denied ACL) throws `IOException` / `UnauthorizedAccessException` — none of which the method catches. End-to-end behavior is still safe because the sole caller, `GalaxyHierarchyCache.TryRestoreFromDiskAsync`, wraps the call in a `catch (Exception)`; but the store's own `Try`-prefixed contract is violated, and any future caller would be surprised by the throw.
**Recommendation:** Catch `JsonException` and `IOException` (the latter covers the `UnauthorizedAccessException` family) inside `TryLoadAsync`, log a warning, and return `null` — consistent with the unrecognized-schema-version branch already present and with the `Try` naming. A corrupt cache file is an expected failure mode for a disk cache.
**Resolution:** Resolved in `bdccdbf` (2026-05-22): `TryLoadAsync` now has a `catch (Exception) when (exception is JsonException or IOException or UnauthorizedAccessException)` that logs a warning and returns `null`. New test `GalaxyHierarchySnapshotStoreTests.TryLoadAsync_WhenFileIsCorruptJson_ReturnsNull`.
### Server-035
| Field | Value |
|---|---|
| Severity | Low |
| Category | Performance & resource management |
| Location | `src/MxGateway.Server/Galaxy/GalaxyHierarchyCache.cs:176` (call site), `:327-352` (`PersistSnapshotAsync`) |
| Status | Resolved |
**Description:** After a heavy refresh, `RefreshCoreAsync` `await`s `PersistSnapshotAsync` while still holding `_refreshGate`, and the `SaveAsync` write has no timeout. The only caller of `RefreshAsync` is the sequential `GalaxyHierarchyRefreshService` loop, so a write that hangs — e.g. a `SnapshotCachePath` pointed at an unresponsive network share — blocks the gate and stalls all subsequent cache refreshes until gateway shutdown. Impact is bounded: clients keep being served the last entry (which flips to `Stale` after the 5-minute threshold), so this is a degradation rather than an outage, and the default `C:\ProgramData` path is local disk where a hang is unlikely.
**Recommendation:** Bound the snapshot write with a timeout — a linked `CancellationTokenSource` cancelling after, say, the SQL `CommandTimeoutSeconds` budget — so a stuck write fails fast and logs rather than pinning the refresh loop. Moving the write off the gate is an alternative but would need its own write-serialization.
**Resolution:** Resolved in `bdccdbf` (2026-05-22): `SaveAsync` wraps the write in a `CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)` cancelled after `Math.Max(1, CommandTimeoutSeconds)` seconds, so a stuck write fails fast instead of pinning the refresh loop. The timeout-expiry path itself is not unit-tested — exercising it would require a genuinely hanging filesystem.
### Server-036
| Field | Value |
|---|---|
| Severity | Low |
| Category | Error handling & resilience |
| Location | `src/MxGateway.Server/Galaxy/GalaxyHierarchyCache.cs:345-348` (`PersistSnapshotAsync` catch) |
| Status | Resolved |
**Description:** `PersistSnapshotAsync` passes the refresh `CancellationToken` to `SaveAsync` and catches every exception — including the `OperationCanceledException` thrown when that token is cancelled at gateway shutdown — in its general `catch (Exception)`, logging it as `Warning: "Failed to persist the Galaxy hierarchy snapshot to disk."`. A snapshot write interrupted by a normal shutdown is not a failure, but it surfaces as a misleading warning every time the gateway stops mid-write.
**Recommendation:** Let a cancellation-driven `OperationCanceledException` pass without the warning — e.g. add `catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { }` before the general catch — matching the cancellation handling already used in `RefreshCoreAsync` and `TryRestoreFromDiskAsync`.
**Resolution:** Resolved in `bdccdbf` (2026-05-22): `PersistSnapshotAsync` has a `catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)` ahead of the general catch, so a save aborted by gateway shutdown is silent while a genuine failure (including a write timeout) still logs. New test `GalaxyHierarchyCacheTests.RefreshAsync_WhenSnapshotSaveCancelledAtShutdown_DoesNotLogPersistFailure`.
### Server-037
| Field | Value |
|---|---|
| Severity | Low |
| Category | Testing coverage |
| Location | `src/MxGateway.Tests/Galaxy/GalaxyHierarchySnapshotStoreTests.cs`, `src/MxGateway.Tests/Galaxy/GalaxyHierarchyCacheTests.cs` |
| Status | Resolved |
**Description:** The new snapshot tests cover the round-trip, missing-file, persistence-disabled, unrecognized-schema, and overwrite cases for the store, and the persist / restore-when-unreachable / promote-on-matching-deploy cases for the cache. Two resilience paths are untested: (1) `GalaxyHierarchyCache.TryRestoreFromDiskAsync`'s `catch` path when the snapshot file is corrupt — the cache must come up `Unavailable` rather than throwing; (2) the cache restore path when `PersistSnapshot = false` (the store yields `null` and the cache stays `Unavailable`). Both are the failure modes most likely to matter operationally.
**Recommendation:** Add a cache test that writes a corrupt snapshot file and asserts `RefreshAsync` with an unreachable repository leaves the cache `Unavailable` without throwing, and a test that confirms a `PersistSnapshot = false` store neither restores nor persists. If Server-034 is fixed, the corrupt-file test also pins the store's null-return.
**Resolution:** Resolved in `bdccdbf` (2026-05-22): added `GalaxyHierarchyCacheTests.RefreshAsync_WhenSnapshotFileCorrupt_ComesUpUnavailableWithoutThrowing` and `RefreshAsync_WhenPersistDisabled_DoesNotRestoreFromDisk`, plus the `TryLoadAsync_WhenFileIsCorruptJson_ReturnsNull` store test added for Server-034.
+80 -8
View File
@@ -2,7 +2,7 @@
The gateway exposes a read-only browse surface over the AVEVA System Platform
Galaxy Repository (the SQL Server database named `ZB`). Clients use it to
enumerate the deployed object hierarchy and each object's dynamic attributes
enumerate the deployed object hierarchy and each object's attributes
before subscribing to runtime values via the existing `MxAccessGateway` RPCs.
This is a metadata layer: it never reads or writes runtime tag values, never
@@ -19,8 +19,10 @@ ArchestrA IDE renders the deployment tree. Surfacing that data over gRPC lets
remote clients build a navigable address space without any coupling to the
COM layer or the host platform.
The query bodies are kept byte-for-byte identical to the equivalent OPC UA
server in the OtOpcUa project so the two consumers see the same row sets.
`HierarchySql` is the object-hierarchy query originally ported from the
equivalent OPC UA server in the OtOpcUa project. `AttributesSql` has since
diverged from OtOpcUa — see [Built-in vs configured attributes](#built-in-vs-configured-attributes)
— and is no longer kept in sync with it.
## RPC Surface
@@ -32,7 +34,7 @@ The service is defined in
|-----|---------|
| `TestConnection` | Connectivity probe. Returns `{ ok: bool }` after a `SELECT 1`. Does not throw on SQL failure — returns `ok = false`. Always hits SQL directly so it remains a true health check. |
| `GetLastDeployTime` | Returns the cached `galaxy.time_of_last_deploy`. Served from the shared hierarchy cache; refreshed in the background. |
| `DiscoverHierarchy` | Returns one page of the deployed hierarchy plus each returned object's dynamic attributes. **Served from cache** — see [Hierarchy Cache](#hierarchy-cache). |
| `DiscoverHierarchy` | Returns one page of the deployed hierarchy plus each returned object's attributes (configured and built-in — see [Built-in vs configured attributes](#built-in-vs-configured-attributes)). **Served from cache** — see [Hierarchy Cache](#hierarchy-cache). |
| `WatchDeployEvents` | **Server-streaming.** The server emits the current state immediately on subscribe (so clients can bootstrap without waiting), then emits one event per detected deploy change. See [Deploy Notifications](#deploy-notifications). |
`DiscoverHierarchy` is a paged unary RPC. The raw request accepts `page_size`
@@ -87,6 +89,36 @@ load to complete before returning. If the first load fails or times out,
the client gets `Unavailable` with a short reason. Once any load completes
(success or failure), this wait is skipped on subsequent calls.
### On-disk snapshot
The gateway may lose connectivity to the Galaxy database — and the database is
often unreachable right when the gateway itself restarts. To keep browse
working across that gap, the cache persists its dataset to disk:
- After every successful **heavy** refresh (a deploy change), the raw
hierarchy and attribute rowsets are written to
`MxGateway:Galaxy:SnapshotCachePath`
(default `C:\ProgramData\MxGateway\galaxy-snapshot.json`). The write is
atomic — a temp file plus rename — so a crash mid-write cannot corrupt the
snapshot. Cheap no-change ticks write nothing; the file is already current.
- On the **first** refresh after startup, before any SQL runs, the cache
reloads that file. The restored data is served with `Stale` status —
it is last-known data, not live — so clients can browse immediately even
when the Galaxy database is unreachable.
- The first live query then reconciles: if it observes the **same**
`time_of_last_deploy` the snapshot was saved at, the entry is promoted to
`Healthy` with no heavy re-query (the snapshot is provably current); if it
observes a newer deploy, the heavy queries run and replace the snapshot; if
the database is still unreachable, the entry stays `Stale`.
`is_alarm` / `is_historized` filters, paging, and the dashboard summary all
work against a restored snapshot exactly as against a live pull — the restore
path runs the same materialization. Persistence is disabled by setting
`MxGateway:Galaxy:PersistSnapshot` to `false`; the snapshot file is then
neither written nor read, and a cold start with an unreachable database comes
up `Unavailable` as before. The on-disk file is a cache, not a system of
record: deleting it only forces the next cold start to wait for live SQL.
## Deploy Notifications
`WatchDeployEvents` is a server-streaming RPC backed by
@@ -176,6 +208,43 @@ message DiscoverHierarchyReply {
}
```
### Built-in vs configured attributes
Each `GalaxyObject` carries two kinds of attribute, both surfaced the same way
in the `attributes` list:
- **Configured (dynamic) attributes** — attributes added in the ArchestrA IDE
attribute editor. Stored in the Galaxy `dynamic_attribute` table.
- **Built-in attributes** — attributes every object inherits from its
primitives: the object framework, the engine/platform primitives, and the
per-attribute extensions (Alarm, History, Boolean, …). Stored in
`attribute_definition` and reached through `primitive_instance`.
Built-in attributes are why an `AppEngine` or `WinPlatform` object reports its
`Engine.*` and `Alarm*` attributes, and why an alarmed attribute such as
`TestAlarm001` reports its extension leaves `TestAlarm001.Acked`,
`TestAlarm001.AckMsg`, `TestAlarm001.ActiveAlarmState`, and so on. An earlier
version of the browse query returned only configured attributes, so those
objects came back empty or partial; including built-ins makes the browse
surface match what System Platform's own Object Viewer shows. Expect roughly
seven times as many attributes as configured-only — the dashboard attribute
count reflects this.
Two rules govern the built-in rows:
- **No category filter.** `attribute_definition` uses a different
`mx_attribute_category` numbering than `dynamic_attribute`, so only the
`_`-prefixed-name and `.Description` exclusions apply to built-ins. (The
configured-attribute category allow-list is unchanged.)
- **`is_historized` / `is_alarm` are always `false` for built-in rows.** Those
flags identify a configured attribute that *anchors* a history or alarm
extension (e.g. `TestAlarm001`), not the extension's machinery leaves
(`TestAlarm001.Acked`). `alarm_bearing_only` and `historized_only` therefore
still select the anchor attributes, not their built-in children.
When a configured attribute and a built-in attribute resolve to the same
reference, the configured attribute wins.
### Contained name vs tag name
Galaxy objects carry two names. `tag_name` is globally unique and is what
@@ -219,10 +288,11 @@ GalaxyHierarchyRefreshService (BackgroundService)
Component breakdown:
- `GalaxyRepository` (`src/MxGateway.Server/Galaxy/GalaxyRepository.cs`) holds
the SQL. Its constants `HierarchySql` and `AttributesSql` are copied verbatim
from the OtOpcUa project; do not edit them in isolation here. The two
queries walk template-derivation and package-derivation chains via
recursive CTEs and pick the most-derived attribute override per object.
the SQL. Both `HierarchySql` and `AttributesSql` walk template-derivation and
package-derivation chains via recursive CTEs and pick the most-derived
override per object. `HierarchySql` still matches the OtOpcUa original;
`AttributesSql` does not — it additionally enumerates built-in primitive
attributes (see [Built-in vs configured attributes](#built-in-vs-configured-attributes)).
- `GalaxyHierarchyCache`
(`src/MxGateway.Server/Galaxy/GalaxyHierarchyCache.cs`) holds the most
recent immutable `GalaxyHierarchyCacheEntry` (materialized objects +
@@ -251,6 +321,8 @@ Bound to `MxGateway:Galaxy` via `GalaxyRepositoryOptions`.
|--------|---------|-------------|
| `MxGateway:Galaxy:ConnectionString` | `Server=localhost;Database=ZB;Integrated Security=True;TrustServerCertificate=True;Encrypt=False;` | SQL Server connection string for the Galaxy Repository. Integrated Security against `localhost` is the dev default; production deployments should override this through the standard double-underscore environment variable form, e.g. `MxGateway__Galaxy__ConnectionString`. |
| `MxGateway:Galaxy:CommandTimeoutSeconds` | `60` | Per-command SQL timeout. Applies to all three RPCs. |
| `MxGateway:Galaxy:PersistSnapshot` | `true` | Persists each successful browse dataset to disk and reloads it at startup. See [On-disk snapshot](#on-disk-snapshot). |
| `MxGateway:Galaxy:SnapshotCachePath` | `C:\ProgramData\MxGateway\galaxy-snapshot.json` | File path for the persisted browse snapshot. Ignored when `PersistSnapshot` is `false`. |
The connection string is not treated as a secret in dev (`Integrated
Security`), but production deployments that use SQL authentication should set
+5 -1
View File
@@ -60,7 +60,9 @@ paths, timeouts, queue sizes, enum values, or protocol values are invalid.
"Galaxy": {
"ConnectionString": "Server=localhost;Database=ZB;Integrated Security=True;TrustServerCertificate=True;Encrypt=False;",
"CommandTimeoutSeconds": 60,
"DashboardRefreshIntervalSeconds": 30
"DashboardRefreshIntervalSeconds": 30,
"PersistSnapshot": true,
"SnapshotCachePath": "C:\\ProgramData\\MxGateway\\galaxy-snapshot.json"
},
"Alarms": {
"Enabled": false,
@@ -170,6 +172,8 @@ at startup.
| `MxGateway:Galaxy:ConnectionString` | `Server=localhost;Database=ZB;Integrated Security=True;TrustServerCertificate=True;Encrypt=False;` | SQL Server connection string for the Galaxy Repository (`ZB`) used by the `GalaxyRepository` browse RPCs. Override in production via `MxGateway__Galaxy__ConnectionString`. |
| `MxGateway:Galaxy:CommandTimeoutSeconds` | `60` | Per-command SQL timeout for all Galaxy browse RPCs. |
| `MxGateway:Galaxy:DashboardRefreshIntervalSeconds` | `30` | Interval between background refreshes of the dashboard Galaxy summary cache. SQL is hit at most once per interval regardless of dashboard render rate. |
| `MxGateway:Galaxy:PersistSnapshot` | `true` | Persists the latest successful Galaxy browse dataset to disk. When `true`, the cache reloads that snapshot at startup so clients can still browse last-known data while the Galaxy database is unreachable. The restored data is served with `Stale` status until a live query confirms it. |
| `MxGateway:Galaxy:SnapshotCachePath` | `C:\ProgramData\MxGateway\galaxy-snapshot.json` | File path for the persisted Galaxy browse snapshot. Ignored when `PersistSnapshot` is `false`. The snapshot is written atomically (temp file plus rename). |
See [Galaxy Repository Browse](./GalaxyRepository.md) for the RPC surface and
behavior.
+14
View File
@@ -293,6 +293,18 @@ path and writes a JSON report under `artifacts/e2e/`:
write command is rejected — e.g. against a gateway whose worker predates
write support (`MxAccessCommandExecutor` returning `InvalidRequest` for
`Write`/`Write2`/`WriteSecured`/`WriteSecured2`).
8. **Alarm feed + acknowledge***opt-in (`-VerifyAlarms`).* Runs after the
stream phase. Exercises the two session-less alarm subcommands against the
gateway's central alarm monitor: `stream-alarms` reads a bounded slice of
the feed (`-AlarmStreamMax`, default 1 — the feed's first message always
arrives immediately, whereas later ones depend on live transitions) and
asserts at least one `AlarmFeedMessage`; `acknowledge-alarm` acknowledges
`-AlarmReference` (default `Galaxy!TestArea.TestMachine_001.TestAlarm001`)
and asserts the RPC round-trips. The native ack outcome is not asserted —
it depends on whether that alarm is currently active.
It is opt-in because it depends on the gateway's central alarm monitor
being enabled (`MxGateway:Alarms:Enabled`) and a live alarm provider.
Each client CLI is driven through one long-lived `batch` process. Every CLI
exposes a `batch` subcommand: a process that reads one command line from stdin,
@@ -329,6 +341,8 @@ powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -SkipB
# Write round-trip (opt-in): point at a writable scalar attribute and its
# value type.
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -VerifyWrite -WriteAttribute TestChangingInt -WriteType int32
# Alarm feed + acknowledge (opt-in): needs MxGateway:Alarms:Enabled on the gateway.
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -VerifyAlarms -AlarmReference "Galaxy!TestArea.TestMachine_001.TestAlarm001"
# Auth rejection: also assert an insufficient-scope key is denied.
powershell -ExecutionPolicy Bypass -File scripts/run-client-e2e-tests.ps1 -RejectScopeApiKeyEnv MXGATEWAY_READONLY_API_KEY
# Run all five clients concurrently as isolated child processes.
+138 -1
View File
@@ -7,7 +7,9 @@ Drives the .NET, Go, Rust, Python, and Java client CLIs against a running
gateway + worker. For each language the script exercises session open/close,
register, bulk subscribe/unsubscribe, per-tag add-item/advise, event
streaming, a write round-trip with value assertion, error-path (parity)
checks, and API-key auth rejection.
checks, and API-key auth rejection. With -VerifyAlarms it also exercises the
session-less stream-alarms and acknowledge-alarm subcommands against the
gateway's central alarm monitor.
Each client CLI is driven through one long-lived `batch` process: the harness
writes one command line to its stdin and reads the JSON result back, so the
@@ -60,6 +62,18 @@ param(
[string]$WriteType = "int32",
[int]$WriteValueBase = 424200,
[int]$WriteEchoMaxEvents = 200,
# Alarm feed + acknowledge coverage. Opt-in because it depends on the
# gateway's central alarm monitor being enabled (MxGateway:Alarms:Enabled)
# and a live alarm provider: stream-alarms reads the monitor's snapshot and
# acknowledge-alarm acknowledges -AlarmReference. Both RPCs are session-less
# — they exercise the gateway's always-on monitor, not a client session.
[switch]$VerifyAlarms,
[string]$AlarmReference = "Galaxy!TestArea.TestMachine_001.TestAlarm001",
# Messages to read from the central alarm feed. 1 is enough to confirm the
# subcommand round-trips: the feed's first message (an active-alarm
# snapshot, or snapshot-complete when no alarms are active) always arrives
# immediately, whereas later messages depend on live alarm transitions.
[int]$AlarmStreamMax = 1,
# Error-path (parity) checks.
[switch]$SkipParity,
# API-key auth rejection checks.
@@ -118,6 +132,10 @@ if ($WriteEchoMaxEvents -lt 1) {
throw "WriteEchoMaxEvents must be greater than zero."
}
if ($AlarmStreamMax -lt 1) {
throw "AlarmStreamMax must be greater than zero."
}
foreach ($client in $Clients) {
if ($validClients -notcontains $client) {
throw "Unsupported client '$client'. Supported clients: $($validClients -join ', ')."
@@ -327,6 +345,25 @@ function Get-StreamEvents {
}
}
# Counts the messages in a stream-alarms reply. The CLIs shape the aggregate
# JSON differently: .NET nests them under `alarms`, Rust under `messages` with
# a `messageCount`, Python under `messages`; Go and Java emit one AlarmFeedMessage
# object per line (Read-JsonObject collapses NDJSON into a bare array).
function Get-AlarmMessageCount {
param(
[string]$Client,
[object]$Json
)
switch ($Client) {
"dotnet" { return @($Json.alarms).Count }
"go" { return @($Json).Count }
"rust" { return [int]$Json.messageCount }
"python" { return @($Json.messages).Count }
"java" { return @($Json).Count }
}
}
function Get-PropertyValue {
param(
[object]$Object,
@@ -564,6 +601,13 @@ function Get-ClientCommand {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
} elseif ($Operation -eq "stream-events") {
$arguments += @("--session-id", $Values.sessionId, "--max-events", "$streamMaxEvents")
} elseif ($Operation -eq "stream-alarms") {
$arguments += @("--max-events", "$streamMaxEvents")
if ($Values.ContainsKey("filterPrefix")) { $arguments += @("--filter-prefix", $Values.filterPrefix) }
} elseif ($Operation -eq "acknowledge-alarm") {
$arguments += @("--reference", $Values.alarmReference)
if ($Values.ContainsKey("comment")) { $arguments += @("--comment", $Values.comment) }
if ($Values.ContainsKey("operator")) { $arguments += @("--operator", $Values.operator) }
} elseif ($Operation -eq "close-session") {
$arguments += @("--session-id", $Values.sessionId)
}
@@ -600,6 +644,13 @@ function Get-ClientCommand {
$arguments += @("-session-id", $Values.sessionId, "-server-handle", "$($Values.serverHandle)", "-item-handle", "$($Values.itemHandle)", "-type", $Values.valueType, "-value", $Values.value)
} elseif ($Operation -eq "stream-events") {
$arguments += @("-session-id", $Values.sessionId, "-limit", "$streamMaxEvents")
} elseif ($Operation -eq "stream-alarms") {
$arguments += @("-limit", "$streamMaxEvents")
if ($Values.ContainsKey("filterPrefix")) { $arguments += @("-filter-prefix", $Values.filterPrefix) }
} elseif ($Operation -eq "acknowledge-alarm") {
$arguments += @("-reference", $Values.alarmReference)
if ($Values.ContainsKey("comment")) { $arguments += @("-comment", $Values.comment) }
if ($Values.ContainsKey("operator")) { $arguments += @("-operator", $Values.operator) }
} elseif ($Operation -eq "close-session") {
$arguments += @("-session-id", $Values.sessionId)
}
@@ -637,6 +688,13 @@ function Get-ClientCommand {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--value-type", $Values.valueType, "--value", $Values.value)
} elseif ($Operation -eq "stream-events") {
$arguments += @("--session-id", $Values.sessionId, "--max-events", "$streamMaxEvents")
} elseif ($Operation -eq "stream-alarms") {
$arguments += @("--max-events", "$streamMaxEvents")
if ($Values.ContainsKey("filterPrefix")) { $arguments += @("--filter-prefix", $Values.filterPrefix) }
} elseif ($Operation -eq "acknowledge-alarm") {
$arguments += @("--reference", $Values.alarmReference)
if ($Values.ContainsKey("comment")) { $arguments += @("--comment", $Values.comment) }
if ($Values.ContainsKey("operator")) { $arguments += @("--operator", $Values.operator) }
} elseif ($Operation -eq "close-session") {
$arguments += @("--session-id", $Values.sessionId)
}
@@ -673,6 +731,13 @@ function Get-ClientCommand {
$arguments += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
} elseif ($Operation -eq "stream-events") {
$arguments += @("--session-id", $Values.sessionId, "--max-events", "$streamMaxEvents", "--timeout", "$pythonStreamTimeout")
} elseif ($Operation -eq "stream-alarms") {
$arguments += @("--max-messages", "$streamMaxEvents", "--timeout", "$pythonStreamTimeout")
if ($Values.ContainsKey("filterPrefix")) { $arguments += @("--filter-prefix", $Values.filterPrefix) }
} elseif ($Operation -eq "acknowledge-alarm") {
$arguments += @("--reference", $Values.alarmReference)
if ($Values.ContainsKey("comment")) { $arguments += @("--comment", $Values.comment) }
if ($Values.ContainsKey("operator")) { $arguments += @("--operator", $Values.operator) }
} elseif ($Operation -eq "close-session") {
$arguments += @("--session-id", $Values.sessionId)
}
@@ -712,6 +777,13 @@ function Get-ClientCommand {
$cliArgs += @("--session-id", $Values.sessionId, "--server-handle", "$($Values.serverHandle)", "--item-handle", "$($Values.itemHandle)", "--type", $Values.valueType, "--value", $Values.value)
} elseif ($Operation -eq "stream-events") {
$cliArgs += @("--session-id", $Values.sessionId, "--limit", "$streamMaxEvents")
} elseif ($Operation -eq "stream-alarms") {
$cliArgs += @("--limit", "$streamMaxEvents")
if ($Values.ContainsKey("filterPrefix")) { $cliArgs += @("--filter-prefix", $Values.filterPrefix) }
} elseif ($Operation -eq "acknowledge-alarm") {
$cliArgs += @("--reference", $Values.alarmReference)
if ($Values.ContainsKey("comment")) { $cliArgs += @("--comment", $Values.comment) }
if ($Values.ContainsKey("operator")) { $cliArgs += @("--operator", $Values.operator) }
} elseif ($Operation -eq "close-session") {
$cliArgs += @("--session-id", $Values.sessionId)
}
@@ -801,6 +873,36 @@ function Get-DryRunReply {
default { return [pscustomobject]@{ events = $events } }
}
}
"stream-alarms" {
# Synthesize an active-alarm snapshot followed by the
# snapshot-complete sentinel. The reply is shaped per client:
# Go and Java emit one message object per line (Read-JsonObject
# collapses NDJSON to a bare array), Rust aggregates under
# `messages` with a `messageCount`, Python under `messages`, and
# .NET under `alarms`.
$activeAlarm = [pscustomobject]@{
activeAlarm = [pscustomobject]@{
alarmFullReference = "Galaxy!TestArea.TestMachine_001.TestAlarm001"
currentState = "ALARM_CONDITION_STATE_ACTIVE"
severity = 500
}
}
$snapshotComplete = [pscustomobject]@{ snapshotComplete = $true }
$messages = @($activeAlarm, $snapshotComplete)
switch ($Client) {
"go" { return ,$messages }
"java" { return ,$messages }
"rust" { return [pscustomobject]@{ messageCount = $messages.Count; messages = $messages } }
"dotnet" { return [pscustomobject]@{ alarms = $messages } }
default { return [pscustomobject]@{ messages = $messages } }
}
}
"acknowledge-alarm" {
return [pscustomobject]@{
rawReply = [pscustomobject]@{ hresult = 0; diagnosticMessage = "dry-run ack" }
reply = [pscustomobject]@{ hresult = 0 }
}
}
default { return [pscustomobject]@{ ok = $true; reply = [pscustomobject]@{} } }
}
}
@@ -1053,6 +1155,7 @@ function Invoke-ClientFlow {
addedItems = @()
eventCount = 0
write = $null
alarms = $null
parity = @()
auth = @()
closed = $false
@@ -1285,6 +1388,35 @@ function Invoke-ClientFlow {
}
}
# --- Alarm feed + acknowledge -------------------------------------
# Session-less RPCs against the gateway's always-on central alarm
# monitor. Opt-in (-VerifyAlarms) because it needs the monitor enabled
# (MxGateway:Alarms:Enabled) and a live alarm provider.
if ($VerifyAlarms) {
$alarmStreamJson = Invoke-ClientOperation -Client $Client -Operation "stream-alarms" -Values @{
maxEvents = $AlarmStreamMax
}
$alarmMessageCount = Get-AlarmMessageCount -Client $Client -Json $alarmStreamJson
if ($alarmMessageCount -lt 1) {
throw "The $Client stream-alarms command returned no alarm-feed messages."
}
# The acknowledge round-trips against the central monitor; the
# native ack outcome depends on whether the referenced alarm is
# currently active, so only the RPC's success is asserted here.
Invoke-ClientOperation -Client $Client -Operation "acknowledge-alarm" -Values @{
alarmReference = $AlarmReference
comment = "e2e-matrix"
operator = "mxgw-e2e"
} | Out-Null
$clientResult.alarms = [ordered]@{
streamMessageCount = $alarmMessageCount
acknowledgeReference = $AlarmReference
acknowledged = $true
}
}
# --- Error-path (parity) checks -----------------------------------
# MXAccess parity: an invalid item handle and an unknown session must
# both be rejected rather than silently succeeding.
@@ -1391,6 +1523,8 @@ function Get-ChildArgumentList {
"-WriteType", $WriteType,
"-WriteValueBase", "$WriteValueBase",
"-WriteEchoMaxEvents", "$WriteEchoMaxEvents",
"-AlarmReference", $AlarmReference,
"-AlarmStreamMax", "$AlarmStreamMax",
"-ReportPath", $ChildReportPath,
"-EmitReport"
)
@@ -1400,6 +1534,7 @@ function Get-ChildArgumentList {
if ($SkipStream) { $childArgs += "-SkipStream" }
if ($SkipBulk) { $childArgs += "-SkipBulk" }
if ($VerifyWrite) { $childArgs += "-VerifyWrite" }
if ($VerifyAlarms) { $childArgs += "-VerifyAlarms" }
if ($SkipParity) { $childArgs += "-SkipParity" }
if ($SkipAuth) { $childArgs += "-SkipAuth" }
if ($DryRun) { $childArgs += "-DryRun" }
@@ -1479,6 +1614,7 @@ if ($Parallel -and $Clients.Count -gt 1) {
skipStream = [bool]$SkipStream
skipBulk = [bool]$SkipBulk
verifyWrite = [bool]$VerifyWrite
verifyAlarms = [bool]$VerifyAlarms
skipParity = [bool]$SkipParity
skipAuth = [bool]$SkipAuth
writeAttribute = $WriteAttribute
@@ -1540,6 +1676,7 @@ $run = [ordered]@{
skipStream = [bool]$SkipStream
skipBulk = [bool]$SkipBulk
verifyWrite = [bool]$VerifyWrite
verifyAlarms = [bool]$VerifyAlarms
skipParity = [bool]$SkipParity
skipAuth = [bool]$SkipAuth
writeAttribute = $WriteAttribute
@@ -12,6 +12,10 @@ namespace MxGateway.Server.Galaxy;
/// refresh and reused across requests. Refreshes are deploy-time gated: every tick
/// queries <c>galaxy.time_of_last_deploy</c> (cheap), and the heavy hierarchy +
/// attributes rowsets are pulled only when that timestamp has advanced.
/// Each successful heavy refresh is persisted to disk through
/// <see cref="IGalaxyHierarchySnapshotStore"/>; the first refresh restores that
/// snapshot (as <see cref="GalaxyCacheStatus.Stale"/>) so clients can browse
/// last-known data when the Galaxy database is unreachable on a cold start.
/// </summary>
public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
{
@@ -19,27 +23,35 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
private readonly IGalaxyRepository _repository;
private readonly IGalaxyDeployNotifier _notifier;
private readonly IGalaxyHierarchySnapshotStore? _snapshotStore;
private readonly TimeProvider _timeProvider;
private readonly ILogger<GalaxyHierarchyCache>? _logger;
private readonly TaskCompletionSource _firstLoad = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly SemaphoreSlim _refreshGate = new(1, 1);
private GalaxyHierarchyCacheEntry _current = GalaxyHierarchyCacheEntry.Empty;
private bool _restoreAttempted;
/// <summary>Initializes a new instance of the <see cref="GalaxyHierarchyCache"/> class.</summary>
/// <param name="repository">Galaxy Repository client for SQL queries.</param>
/// <param name="notifier">Galaxy deploy event notifier.</param>
/// <param name="timeProvider">Provider for current time; defaults to system time.</param>
/// <param name="logger">Optional logger for diagnostic output.</param>
/// <param name="snapshotStore">
/// Optional on-disk snapshot store. When supplied, the cache persists each
/// successful refresh and restores the last snapshot on first load.
/// </param>
public GalaxyHierarchyCache(
IGalaxyRepository repository,
IGalaxyDeployNotifier notifier,
TimeProvider? timeProvider = null,
ILogger<GalaxyHierarchyCache>? logger = null)
ILogger<GalaxyHierarchyCache>? logger = null,
IGalaxyHierarchySnapshotStore? snapshotStore = null)
{
_repository = repository;
_notifier = notifier;
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger;
_snapshotStore = snapshotStore;
}
/// <summary>Gets the current Galaxy hierarchy cache entry with projected status.</summary>
@@ -88,6 +100,15 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
private async Task RefreshCoreAsync(CancellationToken cancellationToken)
{
// First refresh only: seed the cache from the on-disk snapshot before
// querying SQL, so a cold start with an unreachable Galaxy database can
// still serve last-known browse data. Runs under the refresh gate.
if (!_restoreAttempted)
{
_restoreAttempted = true;
await TryRestoreFromDiskAsync(cancellationToken).ConfigureAwait(false);
}
GalaxyHierarchyCacheEntry previous = Volatile.Read(ref _current);
DateTimeOffset queriedAt = _timeProvider.GetUtcNow();
@@ -130,41 +151,17 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
List<GalaxyHierarchyRow> hierarchy = hierarchyTask.Result;
List<GalaxyAttributeRow> attributes = attributesTask.Result;
IReadOnlyList<GalaxyObject> objects = BuildObjects(hierarchy, attributes);
GalaxyHierarchyIndex index = GalaxyHierarchyIndex.Build(objects);
int areaCount = hierarchy.Count(row => row.IsArea);
int historized = attributes.Count(row => row.IsHistorized);
int alarms = attributes.Count(row => row.IsAlarm);
DashboardGalaxySummary dashboardSummary = BuildDashboardSummary(
long nextSequence = previous.Sequence + 1;
GalaxyHierarchyCacheEntry next = BuildEntry(
status: GalaxyCacheStatus.Healthy,
sequence: nextSequence,
lastQueriedAt: queriedAt,
lastSuccessAt: queriedAt,
lastDeployTime: deployTime,
lastError: null,
hierarchy: hierarchy,
objectCount: hierarchy.Count,
areaCount: areaCount,
attributeCount: attributes.Count,
historizedAttributeCount: historized,
alarmAttributeCount: alarms);
long nextSequence = previous.Sequence + 1;
GalaxyHierarchyCacheEntry next = new(
Status: GalaxyCacheStatus.Healthy,
Sequence: nextSequence,
LastQueriedAt: queriedAt,
LastSuccessAt: queriedAt,
LastDeployTime: deployTime,
LastError: null,
Objects: objects,
Index: index,
DashboardSummary: dashboardSummary,
ObjectCount: hierarchy.Count,
AreaCount: areaCount,
AttributeCount: attributes.Count,
HistorizedAttributeCount: historized,
AlarmAttributeCount: alarms);
attributes: attributes);
Volatile.Write(ref _current, next);
_firstLoad.TrySetResult();
@@ -175,6 +172,8 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
TimeOfLastDeploy: deployTime,
ObjectCount: hierarchy.Count,
AttributeCount: attributes.Count));
await PersistSnapshotAsync(deployTime, queriedAt, hierarchy, attributes, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
@@ -205,6 +204,161 @@ public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
}
}
/// <summary>
/// Materializes a complete <see cref="GalaxyHierarchyCacheEntry"/> from raw
/// hierarchy and attribute rowsets. Shared by the live refresh path and the
/// on-disk restore path so both produce an identical object list, index, and
/// dashboard summary.
/// </summary>
private static GalaxyHierarchyCacheEntry BuildEntry(
GalaxyCacheStatus status,
long sequence,
DateTimeOffset? lastQueriedAt,
DateTimeOffset? lastSuccessAt,
DateTimeOffset? lastDeployTime,
string? lastError,
IReadOnlyList<GalaxyHierarchyRow> hierarchy,
IReadOnlyList<GalaxyAttributeRow> attributes)
{
IReadOnlyList<GalaxyObject> objects = BuildObjects(hierarchy, attributes);
GalaxyHierarchyIndex index = GalaxyHierarchyIndex.Build(objects);
int areaCount = hierarchy.Count(row => row.IsArea);
int historized = attributes.Count(row => row.IsHistorized);
int alarms = attributes.Count(row => row.IsAlarm);
DashboardGalaxySummary dashboardSummary = BuildDashboardSummary(
status: status,
lastQueriedAt: lastQueriedAt,
lastSuccessAt: lastSuccessAt,
lastDeployTime: lastDeployTime,
lastError: lastError,
hierarchy: hierarchy,
objectCount: hierarchy.Count,
areaCount: areaCount,
attributeCount: attributes.Count,
historizedAttributeCount: historized,
alarmAttributeCount: alarms);
return new GalaxyHierarchyCacheEntry(
Status: status,
Sequence: sequence,
LastQueriedAt: lastQueriedAt,
LastSuccessAt: lastSuccessAt,
LastDeployTime: lastDeployTime,
LastError: lastError,
Objects: objects,
Index: index,
DashboardSummary: dashboardSummary,
ObjectCount: hierarchy.Count,
AreaCount: areaCount,
AttributeCount: attributes.Count,
HistorizedAttributeCount: historized,
AlarmAttributeCount: alarms);
}
/// <summary>
/// Seeds the cache from the on-disk snapshot when no live data has loaded yet.
/// The restored entry is marked <see cref="GalaxyCacheStatus.Stale"/> — it is
/// last-known data, not live. A later refresh that observes the same deploy
/// time promotes it to healthy; one that observes a newer deploy replaces it.
/// </summary>
private async Task TryRestoreFromDiskAsync(CancellationToken cancellationToken)
{
if (_snapshotStore is null)
{
return;
}
if (Volatile.Read(ref _current).HasData)
{
return;
}
GalaxyHierarchySnapshot? snapshot;
try
{
snapshot = await _snapshotStore.TryLoadAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception exception)
{
_logger?.LogWarning(exception, "Failed to restore the Galaxy hierarchy from the on-disk snapshot.");
return;
}
if (snapshot is null)
{
return;
}
long sequence = Volatile.Read(ref _current).Sequence + 1;
GalaxyHierarchyCacheEntry restored = BuildEntry(
status: GalaxyCacheStatus.Stale,
sequence: sequence,
lastQueriedAt: snapshot.SavedAt,
lastSuccessAt: snapshot.SavedAt,
lastDeployTime: snapshot.LastDeployTime,
lastError: null,
hierarchy: snapshot.Hierarchy,
attributes: snapshot.Attributes);
Volatile.Write(ref _current, restored);
// Restored data is a valid completed first load: unblock callers waiting on
// the bootstrap gate immediately, rather than making them wait out the full
// wait budget for a live query that — when the database is unreachable, the
// scenario this restore exists for — may not return for seconds.
_firstLoad.TrySetResult();
_notifier.Publish(new GalaxyDeployEventInfo(
Sequence: sequence,
ObservedAt: _timeProvider.GetUtcNow(),
TimeOfLastDeploy: snapshot.LastDeployTime,
ObjectCount: snapshot.Hierarchy.Count,
AttributeCount: snapshot.Attributes.Count));
_logger?.LogInformation(
"Restored Galaxy hierarchy from on-disk snapshot saved {SavedAt:o}: {ObjectCount} objects, {AttributeCount} attributes (status Stale until the Galaxy database confirms).",
snapshot.SavedAt,
snapshot.Hierarchy.Count,
snapshot.Attributes.Count);
}
/// <summary>
/// Persists a successful refresh to disk. Persistence failures are logged and
/// swallowed — a cache that cannot write its backup is still fully usable.
/// </summary>
private async Task PersistSnapshotAsync(
DateTimeOffset? deployTime,
DateTimeOffset savedAt,
IReadOnlyList<GalaxyHierarchyRow> hierarchy,
IReadOnlyList<GalaxyAttributeRow> attributes,
CancellationToken cancellationToken)
{
if (_snapshotStore is null)
{
return;
}
try
{
await _snapshotStore.SaveAsync(
new GalaxyHierarchySnapshot(deployTime, savedAt, hierarchy, attributes),
cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// The refresh was cancelled (gateway shutdown) before the write finished.
// That is not a persistence failure — do not log it as a warning.
}
catch (Exception exception)
{
_logger?.LogWarning(exception, "Failed to persist the Galaxy hierarchy snapshot to disk.");
}
}
private static IReadOnlyList<GalaxyObject> BuildObjects(
IReadOnlyList<GalaxyHierarchyRow> hierarchy,
IReadOnlyList<GalaxyAttributeRow> attributes)
@@ -0,0 +1,24 @@
namespace MxGateway.Server.Galaxy;
/// <summary>
/// A serializable point-in-time copy of the Galaxy Repository browse data.
/// Holds the raw hierarchy and attribute rowsets — not the materialized
/// protobuf objects — so the restore path runs the exact same
/// materialization as a live refresh. Persisted by
/// <see cref="IGalaxyHierarchySnapshotStore"/> after a successful refresh
/// and reloaded at startup when the Galaxy database is unreachable.
/// </summary>
/// <param name="LastDeployTime">
/// The <c>galaxy.time_of_last_deploy</c> the rowsets were pulled at, or
/// <see langword="null"/> when the Galaxy table reported no deploy. A later
/// live refresh that observes this same timestamp can promote the restored
/// entry to healthy without re-running the heavy queries.
/// </param>
/// <param name="SavedAt">UTC wall-clock when the snapshot was written to disk.</param>
/// <param name="Hierarchy">The persisted object-hierarchy rowset.</param>
/// <param name="Attributes">The persisted attribute rowset.</param>
public sealed record GalaxyHierarchySnapshot(
DateTimeOffset? LastDeployTime,
DateTimeOffset SavedAt,
IReadOnlyList<GalaxyHierarchyRow> Hierarchy,
IReadOnlyList<GalaxyAttributeRow> Attributes);
@@ -0,0 +1,141 @@
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace MxGateway.Server.Galaxy;
/// <summary>
/// JSON-file implementation of <see cref="IGalaxyHierarchySnapshotStore"/>.
/// Writes the on-disk snapshot atomically (temp file + rename) so a crash
/// mid-write can never leave a torn file, and ignores files whose schema
/// version it does not recognize. When
/// <see cref="GalaxyRepositoryOptions.PersistSnapshot"/> is <see langword="false"/>
/// both operations are no-ops.
/// </summary>
public sealed class GalaxyHierarchySnapshotStore : IGalaxyHierarchySnapshotStore
{
/// <summary>
/// On-disk format version. Bump this whenever the persisted shape changes
/// in a way an older or newer gateway cannot read; a mismatched file is
/// ignored rather than misparsed.
/// </summary>
private const int CurrentSchemaVersion = 1;
private static readonly JsonSerializerOptions SerializerOptions = new()
{
WriteIndented = false,
};
private readonly string? _path;
private readonly TimeSpan _writeTimeout;
private readonly ILogger<GalaxyHierarchySnapshotStore>? _logger;
private readonly SemaphoreSlim _ioGate = new(1, 1);
/// <summary>Initializes a new instance of the <see cref="GalaxyHierarchySnapshotStore"/> class.</summary>
/// <param name="options">Galaxy repository options carrying the snapshot path and enable flag.</param>
/// <param name="logger">Optional logger for diagnostic output.</param>
public GalaxyHierarchySnapshotStore(
IOptions<GalaxyRepositoryOptions> options,
ILogger<GalaxyHierarchySnapshotStore>? logger = null)
{
GalaxyRepositoryOptions value = options.Value;
_path = value.PersistSnapshot && !string.IsNullOrWhiteSpace(value.SnapshotCachePath)
? value.SnapshotCachePath
: null;
_writeTimeout = TimeSpan.FromSeconds(Math.Max(1, value.CommandTimeoutSeconds));
_logger = logger;
}
/// <inheritdoc />
public async Task SaveAsync(GalaxyHierarchySnapshot snapshot, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(snapshot);
if (_path is null)
{
return;
}
PersistedFile file = new(CurrentSchemaVersion, snapshot);
await _ioGate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
// Bound the write so a stuck disk — e.g. a SnapshotCachePath on an
// unresponsive network share — cannot stall the caller. On the cache
// refresh path that would otherwise pin the whole refresh loop.
using CancellationTokenSource writeCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
writeCts.CancelAfter(_writeTimeout);
string? directory = Path.GetDirectoryName(_path);
if (!string.IsNullOrEmpty(directory))
{
Directory.CreateDirectory(directory);
}
string tempPath = _path + ".tmp";
await using (FileStream stream = new(tempPath, FileMode.Create, FileAccess.Write, FileShare.None))
{
await JsonSerializer.SerializeAsync(stream, file, SerializerOptions, writeCts.Token).ConfigureAwait(false);
}
File.Move(tempPath, _path, overwrite: true);
_logger?.LogDebug(
"Persisted Galaxy hierarchy snapshot to {Path} ({ObjectCount} objects, {AttributeCount} attributes).",
_path,
snapshot.Hierarchy.Count,
snapshot.Attributes.Count);
}
finally
{
_ioGate.Release();
}
}
/// <inheritdoc />
public async Task<GalaxyHierarchySnapshot?> TryLoadAsync(CancellationToken cancellationToken)
{
if (_path is null || !File.Exists(_path))
{
return null;
}
await _ioGate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
PersistedFile? file;
await using (FileStream stream = new(_path, FileMode.Open, FileAccess.Read, FileShare.Read))
{
file = await JsonSerializer.DeserializeAsync<PersistedFile>(
stream, SerializerOptions, cancellationToken).ConfigureAwait(false);
}
if (file is null || file.SchemaVersion != CurrentSchemaVersion || file.Snapshot is null)
{
_logger?.LogWarning(
"Ignoring Galaxy hierarchy snapshot at {Path}: unrecognized or empty schema version.",
_path);
return null;
}
return file.Snapshot;
}
catch (Exception exception) when (exception is JsonException or IOException or UnauthorizedAccessException)
{
// A corrupt, truncated, locked, or access-denied snapshot file is an
// expected failure mode for a disk cache — honor the Try contract and
// return null rather than throwing.
_logger?.LogWarning(
exception,
"Ignoring Galaxy hierarchy snapshot at {Path}: the file is unreadable or not valid JSON.",
_path);
return null;
}
finally
{
_ioGate.Release();
}
}
/// <summary>On-disk envelope: a schema version plus the snapshot payload.</summary>
private sealed record PersistedFile(int SchemaVersion, GalaxyHierarchySnapshot? Snapshot);
}
+75 -49
View File
@@ -3,10 +3,15 @@ using Microsoft.Data.SqlClient;
namespace MxGateway.Server.Galaxy;
/// <summary>
/// SQL access to the AVEVA System Platform Galaxy Repository (ZB) database. Ported from
/// the OtOpcUa project so the row sets stay byte-for-byte identical between the two
/// consumers — the same SQL drives the OPC UA server's address space and this gateway's
/// gRPC browse surface.
/// SQL access to the AVEVA System Platform Galaxy Repository (ZB) database.
/// <para>
/// <see cref="HierarchySql" /> is still the query originally ported from the OtOpcUa
/// project. <see cref="AttributesSql" /> has diverged: it additionally enumerates the
/// built-in attributes contributed by each object's primitives (from
/// <c>attribute_definition</c> via <c>primitive_instance</c>), so engine/platform objects
/// and extension sub-attributes (e.g. <c>TestAlarm001.Acked</c>) are surfaced. The
/// OtOpcUa query is not kept in sync — see docs/GalaxyRepository.md.
/// </para>
/// </summary>
public sealed class GalaxyRepository(GalaxyRepositoryOptions options) : IGalaxyRepository
{
@@ -158,6 +163,16 @@ WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26)
AND g.deployed_package_id <> 0
ORDER BY parent_gobject_id, g.tag_name";
// Unlike HierarchySql, this query has diverged from the OtOpcUa original. It returns two
// kinds of attribute: user-configured dynamic attributes (the original `dynamic_attribute`
// body, src_pri 0) and the built-in attributes every object inherits from its primitives
// (`attribute_definition` joined through `primitive_instance`, src_pri 1). Built-in
// attributes are why engine/platform objects and extension sub-attributes such as
// `TestAlarm001.Acked` show up at all. Built-in rows carry no category filter (the
// `attribute_definition` category numbering differs from `dynamic_attribute`'s — only the
// `_`-prefix and `.Description` name exclusions apply) and are never flagged
// `is_historized`/`is_alarm`: those flags describe a user attribute that anchors an
// extension, not the extension's machinery leaves. See docs/GalaxyRepository.md.
private const string AttributesSql = @"
;WITH deployed_package_chain AS (
SELECT g.gobject_id, p.package_id, p.derived_from_package_id, 0 AS depth
@@ -169,58 +184,69 @@ ORDER BY parent_gobject_id, g.tag_name";
FROM deployed_package_chain dpc
INNER JOIN package p ON p.package_id = dpc.derived_from_package_id
WHERE dpc.derived_from_package_id <> 0 AND dpc.depth < 10
)
SELECT gobject_id, tag_name, attribute_name, full_tag_reference,
mx_data_type, data_type_name, is_array, array_dimension,
mx_attribute_category, security_classification, is_historized, is_alarm
FROM (
),
candidate AS (
SELECT
dpc.gobject_id,
g.tag_name,
da.attribute_name,
g.tag_name + '.' + da.attribute_name
+ CASE WHEN da.is_array = 1 THEN '[]' ELSE '' END
AS full_tag_reference,
da.mx_data_type,
dt.description AS data_type_name,
da.is_array,
dpc.gobject_id, g.tag_name, da.attribute_name, da.mx_data_type, da.is_array,
CASE WHEN da.is_array = 1
THEN CONVERT(int, CONVERT(varbinary(2),
SUBSTRING(da.mx_value, 15, 2) + SUBSTRING(da.mx_value, 13, 2), 2))
ELSE NULL
END AS array_dimension,
da.mx_attribute_category,
da.security_classification,
CASE WHEN EXISTS (
SELECT 1 FROM deployed_package_chain dpc2
INNER JOIN primitive_instance pi ON pi.package_id = dpc2.package_id AND pi.primitive_name = da.attribute_name
INNER JOIN primitive_definition pd ON pd.primitive_definition_id = pi.primitive_definition_id AND pd.primitive_name = 'HistoryExtension'
WHERE dpc2.gobject_id = dpc.gobject_id
) THEN 1 ELSE 0 END AS is_historized,
CASE WHEN EXISTS (
SELECT 1 FROM deployed_package_chain dpc2
INNER JOIN primitive_instance pi ON pi.package_id = dpc2.package_id AND pi.primitive_name = da.attribute_name
INNER JOIN primitive_definition pd ON pd.primitive_definition_id = pi.primitive_definition_id AND pd.primitive_name = 'AlarmExtension'
WHERE dpc2.gobject_id = dpc.gobject_id
) THEN 1 ELSE 0 END AS is_alarm,
ROW_NUMBER() OVER (
PARTITION BY dpc.gobject_id, da.attribute_name
ORDER BY dpc.depth
) AS rn
ELSE NULL END AS array_dimension,
da.mx_attribute_category, da.security_classification, dpc.depth, 0 AS src_pri
FROM deployed_package_chain dpc
INNER JOIN dynamic_attribute da
ON da.package_id = dpc.package_id
INNER JOIN gobject g
ON g.gobject_id = dpc.gobject_id
INNER JOIN template_definition td
ON td.template_definition_id = g.template_definition_id
LEFT JOIN data_type dt
ON dt.mx_data_type = da.mx_data_type
INNER JOIN dynamic_attribute da ON da.package_id = dpc.package_id
INNER JOIN gobject g ON g.gobject_id = dpc.gobject_id
INNER JOIN template_definition td ON td.template_definition_id = g.template_definition_id
WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26)
AND da.attribute_name NOT LIKE '[_]%'
AND da.attribute_name NOT LIKE '%.Description'
AND da.mx_attribute_category IN (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 24)
) ranked
WHERE rn = 1
ORDER BY tag_name, attribute_name";
UNION ALL
SELECT
dpc.gobject_id, g.tag_name,
CASE WHEN pi.primitive_name IS NULL OR pi.primitive_name = ''
THEN ad.attribute_name
ELSE pi.primitive_name + '.' + ad.attribute_name END AS attribute_name,
ad.mx_data_type, ad.is_array,
CASE WHEN ad.is_array = 1
THEN CONVERT(int, CONVERT(varbinary(2),
SUBSTRING(ad.mx_value, 15, 2) + SUBSTRING(ad.mx_value, 13, 2), 2))
ELSE NULL END AS array_dimension,
ad.mx_attribute_category, ad.security_classification, dpc.depth, 1 AS src_pri
FROM deployed_package_chain dpc
INNER JOIN primitive_instance pi ON pi.package_id = dpc.package_id
INNER JOIN attribute_definition ad ON ad.primitive_definition_id = pi.primitive_definition_id
INNER JOIN gobject g ON g.gobject_id = dpc.gobject_id
INNER JOIN template_definition td ON td.template_definition_id = g.template_definition_id
WHERE td.category_id IN (1, 3, 4, 10, 11, 13, 17, 24, 26)
AND ad.attribute_name NOT LIKE '[_]%'
AND ad.attribute_name NOT LIKE '%.Description'
),
ranked AS (
SELECT c.*, ROW_NUMBER() OVER (
PARTITION BY c.gobject_id, c.attribute_name ORDER BY c.src_pri, c.depth) AS rn
FROM candidate c
)
SELECT
r.gobject_id, r.tag_name, r.attribute_name,
r.tag_name + '.' + r.attribute_name
+ CASE WHEN r.is_array = 1 THEN '[]' ELSE '' END AS full_tag_reference,
r.mx_data_type, dt.description AS data_type_name, r.is_array, r.array_dimension,
r.mx_attribute_category, r.security_classification,
CASE WHEN r.src_pri = 0 AND EXISTS (
SELECT 1 FROM deployed_package_chain dpc2
INNER JOIN primitive_instance pi ON pi.package_id = dpc2.package_id AND pi.primitive_name = r.attribute_name
INNER JOIN primitive_definition pd ON pd.primitive_definition_id = pi.primitive_definition_id AND pd.primitive_name = 'HistoryExtension'
WHERE dpc2.gobject_id = r.gobject_id
) THEN 1 ELSE 0 END AS is_historized,
CASE WHEN r.src_pri = 0 AND EXISTS (
SELECT 1 FROM deployed_package_chain dpc2
INNER JOIN primitive_instance pi ON pi.package_id = dpc2.package_id AND pi.primitive_name = r.attribute_name
INNER JOIN primitive_definition pd ON pd.primitive_definition_id = pi.primitive_definition_id AND pd.primitive_name = 'AlarmExtension'
WHERE dpc2.gobject_id = r.gobject_id
) THEN 1 ELSE 0 END AS is_alarm
FROM ranked r
LEFT JOIN data_type dt ON dt.mx_data_type = r.mx_data_type
WHERE r.rn = 1
ORDER BY r.tag_name, r.attribute_name";
}
@@ -27,4 +27,21 @@ public sealed class GalaxyRepositoryOptions
/// cache. SQL is hit at most once per interval regardless of dashboard render rate.
/// </summary>
public int DashboardRefreshIntervalSeconds { get; init; } = 30;
/// <summary>Default on-disk path for the persisted Galaxy browse snapshot.</summary>
public const string DefaultSnapshotCachePath =
@"C:\ProgramData\MxGateway\galaxy-snapshot.json";
/// <summary>
/// Whether the gateway persists the latest successful Galaxy browse dataset to
/// disk. When enabled, the cache reloads that snapshot at startup so clients can
/// still browse last-known data while the Galaxy database is unreachable.
/// </summary>
public bool PersistSnapshot { get; init; } = true;
/// <summary>
/// File path for the persisted Galaxy browse snapshot. Ignored when
/// <see cref="PersistSnapshot"/> is <see langword="false"/>.
/// </summary>
public string SnapshotCachePath { get; init; } = DefaultSnapshotCachePath;
}
@@ -19,6 +19,7 @@ public static class GalaxyRepositoryServiceCollectionExtensions
services.AddSingleton<IGalaxyRepository>(sp => sp.GetRequiredService<GalaxyRepository>());
services.AddSingleton<IGalaxyDeployNotifier, GalaxyDeployNotifier>();
services.AddSingleton<IGalaxyHierarchySnapshotStore, GalaxyHierarchySnapshotStore>();
services.AddSingleton<IGalaxyHierarchyCache, GalaxyHierarchyCache>();
services.AddHostedService<GalaxyHierarchyRefreshService>();
@@ -0,0 +1,28 @@
namespace MxGateway.Server.Galaxy;
/// <summary>
/// Persists the latest Galaxy Repository browse dataset to disk and reloads
/// it at startup. Lets <see cref="GalaxyHierarchyCache"/> serve last-known
/// browse data when the Galaxy database is unreachable on a cold start.
/// </summary>
public interface IGalaxyHierarchySnapshotStore
{
/// <summary>
/// Writes <paramref name="snapshot"/> to disk, replacing any previous
/// snapshot atomically. A no-op when snapshot persistence is disabled.
/// </summary>
/// <param name="snapshot">The browse dataset to persist.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
Task SaveAsync(GalaxyHierarchySnapshot snapshot, CancellationToken cancellationToken);
/// <summary>
/// Reads the persisted Galaxy browse dataset.
/// </summary>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
/// <returns>
/// The persisted snapshot, or <see langword="null"/> when none exists,
/// persistence is disabled, or the on-disk file uses an unrecognized
/// schema version.
/// </returns>
Task<GalaxyHierarchySnapshot?> TryLoadAsync(CancellationToken cancellationToken);
}
+3 -1
View File
@@ -65,7 +65,9 @@
"Galaxy": {
"ConnectionString": "Server=localhost;Database=ZB;Integrated Security=True;TrustServerCertificate=True;Encrypt=False;",
"CommandTimeoutSeconds": 60,
"DashboardRefreshIntervalSeconds": 30
"DashboardRefreshIntervalSeconds": 30,
"PersistSnapshot": true,
"SnapshotCachePath": "C:\\ProgramData\\MxGateway\\galaxy-snapshot.json"
},
"Alarms": {
"Enabled": true,
@@ -1,11 +1,15 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MxGateway.Server.Galaxy;
using MxGateway.Contracts.Proto.Galaxy;
using MxGateway.Tests.TestSupport;
namespace MxGateway.Tests.Galaxy;
public sealed class GalaxyHierarchyCacheTests
public sealed class GalaxyHierarchyCacheTests : IDisposable
{
private readonly List<string> _tempPaths = [];
/// <summary>
/// Verifies cache returns empty entry before any refresh occurs.
/// </summary>
@@ -121,6 +125,345 @@ public sealed class GalaxyHierarchyCacheTests
Assert.Same(root, index.ObjectViewsById[1].Object);
}
/// <summary>
/// Verifies a successful refresh writes the browse dataset to the on-disk
/// snapshot store so a later cold start can restore it.
/// </summary>
[Fact]
public async Task RefreshAsync_WhenSuccessful_PersistsSnapshotToDisk()
{
GalaxyDeployNotifier notifier = new();
StubGalaxyRepository repository = new(
deployTime: new DateTime(2026, 5, 20, 9, 0, 0, DateTimeKind.Utc),
hierarchy: [SampleHierarchyRow()],
attributes: [SampleAttributeRow()]);
GalaxyHierarchySnapshotStore store = CreateStore();
GalaxyHierarchyCache cache = new(repository, notifier, new ManualTimeProvider(), snapshotStore: store);
await cache.RefreshAsync(CancellationToken.None);
Assert.Equal(GalaxyCacheStatus.Healthy, cache.Current.Status);
GalaxyHierarchySnapshot? persisted = await store.TryLoadAsync(CancellationToken.None);
Assert.NotNull(persisted);
Assert.Equal(99, Assert.Single(persisted.Hierarchy).GobjectId);
Assert.Equal("PV", Assert.Single(persisted.Attributes).AttributeName);
}
/// <summary>
/// Verifies that when the Galaxy database is unreachable on first refresh but a
/// snapshot exists on disk, the cache serves that data with <c>Stale</c> status
/// rather than coming up empty.
/// </summary>
[Fact]
public async Task RefreshAsync_WhenDatabaseUnreachableButSnapshotOnDisk_RestoresStaleData()
{
GalaxyHierarchySnapshotStore store = CreateStore();
await store.SaveAsync(
new GalaxyHierarchySnapshot(
LastDeployTime: new DateTimeOffset(2026, 5, 20, 9, 0, 0, TimeSpan.Zero),
SavedAt: new DateTimeOffset(2026, 5, 20, 9, 1, 0, TimeSpan.Zero),
Hierarchy: [SampleHierarchyRow()],
Attributes: [SampleAttributeRow()]),
CancellationToken.None);
GalaxyDeployNotifier notifier = new();
ThrowingGalaxyRepository repository = new(new InvalidOperationException("Galaxy repository unreachable"));
GalaxyHierarchyCache cache = new(repository, notifier, new ManualTimeProvider(), snapshotStore: store);
await cache.RefreshAsync(CancellationToken.None);
Assert.Equal(GalaxyCacheStatus.Stale, cache.Current.Status);
Assert.True(cache.Current.HasData);
Assert.Equal(1, cache.Current.ObjectCount);
Assert.Equal(1, cache.Current.AttributeCount);
Assert.NotNull(notifier.Latest);
}
/// <summary>
/// Verifies that when the disk snapshot's deploy time still matches the live
/// Galaxy database, the cache promotes the restored data to <c>Healthy</c>
/// without re-running the heavy hierarchy and attribute queries.
/// </summary>
[Fact]
public async Task RefreshAsync_WhenSnapshotDeployMatchesLive_PromotesToHealthyWithoutHeavyQuery()
{
DateTime deployTime = new(2026, 5, 20, 9, 0, 0, DateTimeKind.Utc);
GalaxyHierarchySnapshotStore store = CreateStore();
await store.SaveAsync(
new GalaxyHierarchySnapshot(
LastDeployTime: new DateTimeOffset(deployTime, TimeSpan.Zero),
SavedAt: new DateTimeOffset(2026, 5, 20, 9, 1, 0, TimeSpan.Zero),
Hierarchy: [SampleHierarchyRow()],
Attributes: [SampleAttributeRow()]),
CancellationToken.None);
GalaxyDeployNotifier notifier = new();
StubGalaxyRepository repository = new(deployTime);
GalaxyHierarchyCache cache = new(repository, notifier, new ManualTimeProvider(), snapshotStore: store);
await cache.RefreshAsync(CancellationToken.None);
Assert.Equal(GalaxyCacheStatus.Healthy, cache.Current.Status);
Assert.Equal(1, cache.Current.ObjectCount);
Assert.Equal(0, repository.GetHierarchyCount);
Assert.Equal(0, repository.GetAttributesCount);
}
/// <summary>
/// Verifies that a restored on-disk snapshot completes the first-load gate
/// immediately, so a browse call racing the first refresh is not blocked for
/// the full bootstrap budget while the live Galaxy query is still running.
/// Regression test for Server-033.
/// </summary>
[Fact]
public async Task RefreshAsync_RestoredSnapshotCompletesFirstLoadBeforeLiveQueryReturns()
{
GalaxyHierarchySnapshotStore store = CreateStore();
await store.SaveAsync(
new GalaxyHierarchySnapshot(
LastDeployTime: new DateTimeOffset(2026, 5, 20, 9, 0, 0, TimeSpan.Zero),
SavedAt: new DateTimeOffset(2026, 5, 20, 9, 1, 0, TimeSpan.Zero),
Hierarchy: [SampleHierarchyRow()],
Attributes: [SampleAttributeRow()]),
CancellationToken.None);
GalaxyDeployNotifier notifier = new();
BlockingGalaxyRepository repository = new();
GalaxyHierarchyCache cache = new(repository, notifier, new ManualTimeProvider(), snapshotStore: store);
Task refresh = cache.RefreshAsync(CancellationToken.None);
// The live query is blocked inside the repository; first-load must still
// complete — from the restored snapshot — well within the wait budget.
await cache.WaitForFirstLoadAsync(CancellationToken.None).WaitAsync(TimeSpan.FromSeconds(5));
Assert.True(cache.Current.HasData);
Assert.Equal(GalaxyCacheStatus.Stale, cache.Current.Status);
repository.Release();
await refresh.WaitAsync(TimeSpan.FromSeconds(5));
}
/// <summary>
/// Verifies a corrupt on-disk snapshot does not crash startup: the cache
/// ignores the unreadable file and comes up Unavailable when the database is
/// also unreachable. Regression test for Server-037.
/// </summary>
[Fact]
public async Task RefreshAsync_WhenSnapshotFileCorrupt_ComesUpUnavailableWithoutThrowing()
{
string path = CreateTempPath();
await File.WriteAllTextAsync(path, "{ this is not valid json");
GalaxyHierarchySnapshotStore store = CreateStore(path);
GalaxyDeployNotifier notifier = new();
ThrowingGalaxyRepository repository = new(new InvalidOperationException("Galaxy repository unreachable"));
GalaxyHierarchyCache cache = new(repository, notifier, new ManualTimeProvider(), snapshotStore: store);
await cache.RefreshAsync(CancellationToken.None);
Assert.Equal(GalaxyCacheStatus.Unavailable, cache.Current.Status);
Assert.False(cache.Current.HasData);
}
/// <summary>
/// Verifies that with snapshot persistence disabled the cache does not
/// restore from disk — an unreachable database leaves it Unavailable.
/// Regression test for Server-037.
/// </summary>
[Fact]
public async Task RefreshAsync_WhenPersistDisabled_DoesNotRestoreFromDisk()
{
GalaxyHierarchySnapshotStore store = CreateStore(CreateTempPath(), persist: false);
GalaxyDeployNotifier notifier = new();
ThrowingGalaxyRepository repository = new(new InvalidOperationException("Galaxy repository unreachable"));
GalaxyHierarchyCache cache = new(repository, notifier, new ManualTimeProvider(), snapshotStore: store);
await cache.RefreshAsync(CancellationToken.None);
Assert.Equal(GalaxyCacheStatus.Unavailable, cache.Current.Status);
Assert.False(cache.Current.HasData);
}
/// <summary>
/// Verifies that a snapshot save aborted because the gateway is shutting down
/// (the refresh token is cancelled) is not logged as a persistence failure.
/// Regression test for Server-036.
/// </summary>
[Fact]
public async Task RefreshAsync_WhenSnapshotSaveCancelledAtShutdown_DoesNotLogPersistFailure()
{
using CancellationTokenSource cts = new();
GalaxyDeployNotifier notifier = new();
StubGalaxyRepository repository = new(
deployTime: new DateTime(2026, 5, 20, 9, 0, 0, DateTimeKind.Utc),
hierarchy: [SampleHierarchyRow()],
attributes: [SampleAttributeRow()]);
CancellingSaveStore store = new(cts);
RecordingLogger<GalaxyHierarchyCache> logger = new();
GalaxyHierarchyCache cache = new(repository, notifier, new ManualTimeProvider(), logger, store);
await cache.RefreshAsync(cts.Token);
Assert.DoesNotContain(
logger.Entries,
entry => entry.Level == LogLevel.Warning
&& entry.Message.Contains("persist", StringComparison.OrdinalIgnoreCase));
}
private static GalaxyHierarchyRow SampleHierarchyRow() => new()
{
GobjectId = 99,
TagName = "Pump_001",
ContainedName = "Pump",
BrowseName = "Pump",
CategoryId = 10,
TemplateChain = ["AppPump"],
};
private static GalaxyAttributeRow SampleAttributeRow() => new()
{
GobjectId = 99,
TagName = "Pump_001",
AttributeName = "PV",
FullTagReference = "Pump_001.PV",
MxDataType = 5,
DataTypeName = "Float",
};
private string CreateTempPath()
{
string path = Path.Combine(
Path.GetTempPath(),
$"mxgw-galaxy-cache-test-{Guid.NewGuid():N}.json");
_tempPaths.Add(path);
return path;
}
private GalaxyHierarchySnapshotStore CreateStore() => CreateStore(CreateTempPath());
private static GalaxyHierarchySnapshotStore CreateStore(string path, bool persist = true)
{
GalaxyRepositoryOptions options = new()
{
PersistSnapshot = persist,
SnapshotCachePath = path,
};
return new GalaxyHierarchySnapshotStore(Options.Create(options));
}
/// <summary><see cref="IGalaxyRepository"/> whose deploy-time query blocks until released.</summary>
private sealed class BlockingGalaxyRepository : IGalaxyRepository
{
private readonly TaskCompletionSource _release = new(TaskCreationOptions.RunContinuationsAsynchronously);
public void Release() => _release.TrySetResult();
public Task<bool> TestConnectionAsync(CancellationToken ct = default) => Task.FromResult(false);
public async Task<DateTime?> GetLastDeployTimeAsync(CancellationToken ct = default)
{
await _release.Task.WaitAsync(ct).ConfigureAwait(false);
throw new InvalidOperationException("Galaxy repository unreachable");
}
public Task<List<GalaxyHierarchyRow>> GetHierarchyAsync(CancellationToken ct = default)
=> throw new InvalidOperationException("GetHierarchyAsync should not be reached");
public Task<List<GalaxyAttributeRow>> GetAttributesAsync(CancellationToken ct = default)
=> throw new InvalidOperationException("GetAttributesAsync should not be reached");
}
/// <summary>Snapshot store whose <see cref="SaveAsync"/> cancels the token mid-save.</summary>
private sealed class CancellingSaveStore(CancellationTokenSource cts) : IGalaxyHierarchySnapshotStore
{
public Task<GalaxyHierarchySnapshot?> TryLoadAsync(CancellationToken cancellationToken)
=> Task.FromResult<GalaxyHierarchySnapshot?>(null);
public Task SaveAsync(GalaxyHierarchySnapshot snapshot, CancellationToken cancellationToken)
{
cts.Cancel();
cancellationToken.ThrowIfCancellationRequested();
return Task.CompletedTask;
}
}
/// <summary>Minimal <see cref="ILogger{T}"/> that records every emitted log entry.</summary>
private sealed class RecordingLogger<T> : ILogger<T>
{
public List<(LogLevel Level, string Message)> Entries { get; } = [];
public IDisposable BeginScope<TState>(TState state)
where TState : notnull => NullScope.Instance;
public bool IsEnabled(LogLevel logLevel) => true;
public void Log<TState>(
LogLevel logLevel,
EventId eventId,
TState state,
Exception? exception,
Func<TState, Exception?, string> formatter)
{
Entries.Add((logLevel, formatter(state, exception)));
}
private sealed class NullScope : IDisposable
{
public static readonly NullScope Instance = new();
public void Dispose()
{
}
}
}
/// <summary>In-memory <see cref="IGalaxyRepository"/> that returns fixed rowsets.</summary>
private sealed class StubGalaxyRepository(
DateTime? deployTime,
List<GalaxyHierarchyRow>? hierarchy = null,
List<GalaxyAttributeRow>? attributes = null) : IGalaxyRepository
{
private readonly List<GalaxyHierarchyRow> _hierarchy = hierarchy ?? [];
private readonly List<GalaxyAttributeRow> _attributes = attributes ?? [];
public int GetHierarchyCount { get; private set; }
public int GetAttributesCount { get; private set; }
public Task<bool> TestConnectionAsync(CancellationToken ct = default) => Task.FromResult(true);
public Task<DateTime?> GetLastDeployTimeAsync(CancellationToken ct = default) => Task.FromResult(deployTime);
public Task<List<GalaxyHierarchyRow>> GetHierarchyAsync(CancellationToken ct = default)
{
GetHierarchyCount++;
return Task.FromResult(_hierarchy);
}
public Task<List<GalaxyAttributeRow>> GetAttributesAsync(CancellationToken ct = default)
{
GetAttributesCount++;
return Task.FromResult(_attributes);
}
}
public void Dispose()
{
foreach (string path in _tempPaths)
{
try
{
File.Delete(path);
File.Delete(path + ".tmp");
}
catch (IOException)
{
// Best-effort cleanup of test scratch files.
}
}
}
private sealed class ThrowingGalaxyRepository(Exception toThrow) : IGalaxyRepository
{
/// <summary>Gets the number of times <see cref="GetLastDeployTimeAsync"/> was called.</summary>
@@ -0,0 +1,177 @@
using Microsoft.Extensions.Options;
using MxGateway.Server.Galaxy;
namespace MxGateway.Tests.Galaxy;
/// <summary>
/// Covers <see cref="GalaxyHierarchySnapshotStore"/>: the on-disk persistence
/// that lets the Galaxy browse cache survive a cold start while the Galaxy
/// database is unreachable.
/// </summary>
public sealed class GalaxyHierarchySnapshotStoreTests : IDisposable
{
private readonly List<string> _tempPaths = [];
[Fact]
public async Task SaveAsync_ThenTryLoadAsync_RoundTripsRows()
{
string path = CreateTempPath();
GalaxyHierarchySnapshotStore store = CreateStore(path);
GalaxyHierarchySnapshot snapshot = SampleSnapshot();
await store.SaveAsync(snapshot, CancellationToken.None);
GalaxyHierarchySnapshot? loaded = await store.TryLoadAsync(CancellationToken.None);
Assert.NotNull(loaded);
Assert.Equal(snapshot.LastDeployTime, loaded.LastDeployTime);
Assert.Equal(snapshot.SavedAt, loaded.SavedAt);
GalaxyHierarchyRow row = Assert.Single(loaded.Hierarchy);
Assert.Equal(7, row.GobjectId);
Assert.Equal("Pump_001", row.TagName);
Assert.Equal(["AppPump", "Pump"], row.TemplateChain);
Assert.Equal(2, loaded.Attributes.Count);
GalaxyAttributeRow withDimension = loaded.Attributes[0];
Assert.Equal("PV", withDimension.AttributeName);
Assert.Equal(8, withDimension.ArrayDimension);
Assert.True(withDimension.IsAlarm);
Assert.Null(loaded.Attributes[1].ArrayDimension);
}
[Fact]
public async Task TryLoadAsync_WhenNoFileExists_ReturnsNull()
{
GalaxyHierarchySnapshotStore store = CreateStore(CreateTempPath());
Assert.Null(await store.TryLoadAsync(CancellationToken.None));
}
[Fact]
public async Task SaveAsync_WhenPersistenceDisabled_WritesNothing()
{
string path = CreateTempPath();
GalaxyHierarchySnapshotStore store = CreateStore(path, persist: false);
await store.SaveAsync(SampleSnapshot(), CancellationToken.None);
Assert.False(File.Exists(path));
Assert.Null(await store.TryLoadAsync(CancellationToken.None));
}
[Fact]
public async Task TryLoadAsync_WhenFileIsCorruptJson_ReturnsNull()
{
string path = CreateTempPath();
await File.WriteAllTextAsync(path, "{ this is not valid json");
GalaxyHierarchySnapshotStore store = CreateStore(path);
Assert.Null(await store.TryLoadAsync(CancellationToken.None));
}
[Fact]
public async Task TryLoadAsync_WhenSchemaVersionUnrecognized_ReturnsNull()
{
string path = CreateTempPath();
await File.WriteAllTextAsync(path, """{"SchemaVersion":999,"Snapshot":null}""");
GalaxyHierarchySnapshotStore store = CreateStore(path);
Assert.Null(await store.TryLoadAsync(CancellationToken.None));
}
[Fact]
public async Task SaveAsync_OverwritesAnEarlierSnapshot()
{
string path = CreateTempPath();
GalaxyHierarchySnapshotStore store = CreateStore(path);
await store.SaveAsync(SampleSnapshot(), CancellationToken.None);
GalaxyHierarchySnapshot second = SampleSnapshot() with
{
Hierarchy = [],
Attributes = [],
};
await store.SaveAsync(second, CancellationToken.None);
GalaxyHierarchySnapshot? loaded = await store.TryLoadAsync(CancellationToken.None);
Assert.NotNull(loaded);
Assert.Empty(loaded.Hierarchy);
Assert.Empty(loaded.Attributes);
}
private static GalaxyHierarchySnapshot SampleSnapshot() => new(
LastDeployTime: new DateTimeOffset(2026, 5, 20, 9, 30, 0, TimeSpan.Zero),
SavedAt: new DateTimeOffset(2026, 5, 20, 9, 31, 0, TimeSpan.Zero),
Hierarchy:
[
new GalaxyHierarchyRow
{
GobjectId = 7,
TagName = "Pump_001",
ContainedName = "Pump",
BrowseName = "Pump",
CategoryId = 10,
TemplateChain = ["AppPump", "Pump"],
},
],
Attributes:
[
new GalaxyAttributeRow
{
GobjectId = 7,
TagName = "Pump_001",
AttributeName = "PV",
FullTagReference = "Pump_001.PV[]",
MxDataType = 5,
DataTypeName = "Float",
IsArray = true,
ArrayDimension = 8,
IsAlarm = true,
},
new GalaxyAttributeRow
{
GobjectId = 7,
TagName = "Pump_001",
AttributeName = "Mode",
FullTagReference = "Pump_001.Mode",
MxDataType = 3,
DataTypeName = "Integer",
ArrayDimension = null,
},
]);
private static GalaxyHierarchySnapshotStore CreateStore(string path, bool persist = true)
{
GalaxyRepositoryOptions options = new()
{
PersistSnapshot = persist,
SnapshotCachePath = path,
};
return new GalaxyHierarchySnapshotStore(Options.Create(options));
}
private string CreateTempPath()
{
string path = Path.Combine(
Path.GetTempPath(),
$"mxgw-galaxy-snapshot-{Guid.NewGuid():N}.json");
_tempPaths.Add(path);
return path;
}
public void Dispose()
{
foreach (string path in _tempPaths)
{
try
{
File.Delete(path);
File.Delete(path + ".tmp");
}
catch (IOException)
{
// Best-effort cleanup of test scratch files.
}
}
}
}