test+docs(galaxy): verify alarm acknowledger recovers after transient fault; document alarm-client reconnect
This commit is contained in:
@@ -92,6 +92,8 @@ Full per-field descriptions live in `Config/GalaxyDriverOptions.cs`. The full JS
|
||||
|
||||
`ReconnectSupervisor` owns an exponential-backoff loop bounded by `Reconnect.InitialBackoffMs` / `MaxBackoffMs`. On session loss it calls `GalaxyDriver.ReopenAsync`, which invokes `GalaxyMxSession.RecreateAsync` to dispose the stale/faulted session and client before rebuilding (`OpenSessionAsync` + `RegisterAsync`). Previously `ConnectAsync` was a no-op when a stale session handle was still present, so the reopen supervisor looped forever without recovering. After a successful reopen — when `ReplayOnSessionLost = true` — the supervisor calls the gateway's `ReplaySubscriptions` RPC with the cached subscription set from `SubscriptionRegistry` instead of re-subscribing tag-by-tag. The gateway's worker then re-issues `AdviseSupervisory` server-side under the apartment lock.
|
||||
|
||||
The session-less alarm feed (`GatewayGalaxyAlarmFeed`) and alarm acknowledger (`GatewayGalaxyAlarmAcknowledger`) run on a separate always-on `_ownedMxClient` that is intentionally **not** recreated when `ReconnectSupervisor` rebuilds the worker session. The feed has its own re-invoke loop (~5 s backoff) that reopens `StreamAlarms` after any stream fault, and gRPC.NET's channel auto-reconnect recovers the underlying HTTP/2 connection after a gateway restart — so both the alarm stream and individual ack calls recover without client replacement. The acknowledger is completely stateless between calls: each `AcknowledgeAsync` issues a single unary RPC and returns; no dead-client state is latched on failure. Channel-level keepalive hardening (TCP keep-alive intervals, gRPC ping frames) would require exposing additional knobs on `MxGatewayClientOptions` in the sibling `mxaccessgw` repo — a future option if flaky long-lived connections are observed in production.
|
||||
|
||||
## Testing
|
||||
|
||||
- **Unit tests**: `tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/` — fakes the gateway gRPC surface; covers Browse, Runtime, Health, and Config in isolation.
|
||||
|
||||
+202
@@ -0,0 +1,202 @@
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.MxGateway.Client;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime;
|
||||
|
||||
/// <summary>
|
||||
/// Pins <see cref="GatewayGalaxyAlarmAcknowledger"/> — the session-less unary caller
|
||||
/// that routes operator acknowledgements through
|
||||
/// <c>MxGatewayClient.AcknowledgeAlarmAsync</c>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The acknowledger is completely stateless w.r.t. transport faults: it holds an
|
||||
/// <see cref="MxGatewayClient"/> but has no internal dead-client latch. Each
|
||||
/// <c>AcknowledgeAsync</c> call issues exactly one unary RPC; if that RPC throws the
|
||||
/// call site (the scripted-alarm engine) is responsible for retry. gRPC.NET channel
|
||||
/// auto-reconnect ensures the underlying HTTP/2 connection recovers between calls
|
||||
/// without the client being replaced — these tests verify that behaviour.
|
||||
/// </remarks>
|
||||
public sealed class GatewayGalaxyAlarmAcknowledgerTests
|
||||
{
|
||||
// ------------------------------------------------------------------ helpers
|
||||
|
||||
/// <summary>
|
||||
/// Fake <see cref="IGalaxyAlarmAcknowledger"/> that records every call and lets
|
||||
/// tests control per-call outcomes. <see cref="IGalaxyAlarmAcknowledger"/> is
|
||||
/// the internal test seam — production wires
|
||||
/// <see cref="GatewayGalaxyAlarmAcknowledger"/> behind it via the
|
||||
/// <see cref="GalaxyDriver"/> internal constructor.
|
||||
/// </summary>
|
||||
private sealed class FakeAcknowledger : IGalaxyAlarmAcknowledger
|
||||
{
|
||||
private readonly Queue<Func<Task>> _outcomes = new();
|
||||
|
||||
/// <summary>Calls recorded in the order received.</summary>
|
||||
public List<(string AlarmRef, string Comment, string Operator)> Calls { get; } = [];
|
||||
|
||||
/// <summary>Enqueues an outcome for the next call (completes normally or throws).</summary>
|
||||
public void Enqueue(Func<Task> outcome) => _outcomes.Enqueue(outcome);
|
||||
|
||||
public Task AcknowledgeAsync(
|
||||
string alarmFullReference,
|
||||
string comment,
|
||||
string operatorUser,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
Calls.Add((alarmFullReference, comment, operatorUser));
|
||||
return _outcomes.Count > 0 ? _outcomes.Dequeue()() : Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
private static GalaxyDriverOptions Opts() => new(
|
||||
new GalaxyGatewayOptions("https://mxgw.test:5001", "key"),
|
||||
new GalaxyMxAccessOptions("OtOpcUa-A"),
|
||||
new GalaxyRepositoryOptions(),
|
||||
new GalaxyReconnectOptions());
|
||||
|
||||
private static AlarmAcknowledgeRequest Ack(string conditionId, string comment = "c", string source = "")
|
||||
=> new(SourceNodeId: source, ConditionId: conditionId, Comment: comment);
|
||||
|
||||
// ------------------------------------------------------------------ tests
|
||||
|
||||
/// <summary>
|
||||
/// Verifies the acknowledger is stateless: a transient fault on the first call
|
||||
/// does NOT latch it into a dead state — a second call after the fault still
|
||||
/// reaches the acknowledger and succeeds.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Acknowledge_after_transient_fault_succeeds_on_retry()
|
||||
{
|
||||
var fake = new FakeAcknowledger();
|
||||
// First call: simulates a transient transport blip (e.g. RpcException from gRPC).
|
||||
fake.Enqueue(() => Task.FromException(new IOException("synthetic transport fault")));
|
||||
// Second call: succeeds — proves no dead-state latch.
|
||||
fake.Enqueue(() => Task.CompletedTask);
|
||||
|
||||
var driver = new GalaxyDriver("g", Opts(),
|
||||
hierarchySource: null, alarmAcknowledger: fake);
|
||||
|
||||
await Should.ThrowAsync<IOException>(
|
||||
() => driver.AcknowledgeAsync(
|
||||
[Ack("Tank01.Level.HiHi", "investigating")], CancellationToken.None));
|
||||
|
||||
// After the first failure the fake (and by contract the production
|
||||
// GatewayGalaxyAlarmAcknowledger) must still be reachable — it holds no
|
||||
// dead-client latch.
|
||||
await driver.AcknowledgeAsync(
|
||||
[Ack("Tank01.Level.HiHi", "resolved")], CancellationToken.None);
|
||||
|
||||
fake.Calls.Count.ShouldBe(2);
|
||||
fake.Calls[0].AlarmRef.ShouldBe("Tank01.Level.HiHi");
|
||||
fake.Calls[0].Comment.ShouldBe("investigating");
|
||||
fake.Calls[1].Comment.ShouldBe("resolved");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a second acknowledge call after a protocol-level failure
|
||||
/// still reaches the transport — confirming no state leaks across calls.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Acknowledge_after_protocol_failure_still_issues_next_call()
|
||||
{
|
||||
var fake = new FakeAcknowledger();
|
||||
fake.Enqueue(() => Task.FromException(new InvalidOperationException("gateway overload")));
|
||||
fake.Enqueue(() => Task.CompletedTask);
|
||||
|
||||
var driver = new GalaxyDriver("g", Opts(),
|
||||
hierarchySource: null, alarmAcknowledger: fake);
|
||||
|
||||
await Should.ThrowAsync<InvalidOperationException>(
|
||||
() => driver.AcknowledgeAsync(
|
||||
[Ack("Valve01.PV.Hi", "c1")], CancellationToken.None));
|
||||
|
||||
await driver.AcknowledgeAsync(
|
||||
[Ack("Valve01.PV.Hi", "c2")], CancellationToken.None);
|
||||
|
||||
fake.Calls.Count.ShouldBe(2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that the driver's <c>AcknowledgeAsync</c> forwards the condition ID
|
||||
/// (alarm full reference) and comment to the acknowledger without modification.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Acknowledge_forwards_all_fields_to_acknowledger()
|
||||
{
|
||||
var fake = new FakeAcknowledger();
|
||||
|
||||
var driver = new GalaxyDriver("g", Opts(),
|
||||
hierarchySource: null, alarmAcknowledger: fake);
|
||||
|
||||
await driver.AcknowledgeAsync(
|
||||
[Ack("Pump01.Flow.Low", "operator comment")], CancellationToken.None);
|
||||
|
||||
fake.Calls.ShouldHaveSingleItem();
|
||||
fake.Calls[0].AlarmRef.ShouldBe("Pump01.Flow.Low");
|
||||
fake.Calls[0].Comment.ShouldBe("operator comment");
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Production class smoke — exercises GatewayGalaxyAlarmAcknowledger directly.
|
||||
//
|
||||
// IMxGatewayClientTransport (the transport seam on MxGatewayClient) is internal
|
||||
// to the NuGet package and cannot be implemented from outside the package.
|
||||
// The lightest available smoke is: construct GatewayGalaxyAlarmAcknowledger with
|
||||
// a real MxGatewayClient pointing at an unreachable endpoint, call it twice, and
|
||||
// verify both calls fault with the same transport-level exception type — NOT an
|
||||
// ObjectDisposedException or any "client is in a terminal dead state" variant.
|
||||
// That proves the concrete class latches no dead-client state between calls.
|
||||
// ------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that <see cref="GatewayGalaxyAlarmAcknowledger"/> is stateless:
|
||||
/// two consecutive calls both throw the same transport fault rather than a
|
||||
/// degraded "object-already-failed" state exception on the second call.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task GatewayAcknowledger_consecutive_faults_both_reach_grpc_layer()
|
||||
{
|
||||
// Point at a guaranteed-unreachable endpoint so every call fails at the
|
||||
// gRPC transport layer. Both calls must throw the same *kind* of exception
|
||||
// (transport failure), not a "client object is dead" exception on the second.
|
||||
var client = MxGatewayClient.Create(new MxGatewayClientOptions
|
||||
{
|
||||
Endpoint = new Uri("http://127.0.0.1:1", UriKind.Absolute), // port 1 = unreachable
|
||||
ApiKey = "test-key",
|
||||
UseTls = false,
|
||||
ConnectTimeout = TimeSpan.FromMilliseconds(200),
|
||||
DefaultCallTimeout = TimeSpan.FromMilliseconds(200),
|
||||
});
|
||||
|
||||
await using (client)
|
||||
{
|
||||
var acknowledger = new GatewayGalaxyAlarmAcknowledger(client, NullLogger.Instance);
|
||||
|
||||
var ex1 = await Record.ExceptionAsync(
|
||||
() => acknowledger.AcknowledgeAsync(
|
||||
"Tank01.Level.HiHi", "c1", "alice", CancellationToken.None));
|
||||
var ex2 = await Record.ExceptionAsync(
|
||||
() => acknowledger.AcknowledgeAsync(
|
||||
"Tank01.Level.HiHi", "c2", "alice", CancellationToken.None));
|
||||
|
||||
// Both calls must fault — not a success.
|
||||
ex1.ShouldNotBeNull("first call to unreachable endpoint should throw");
|
||||
ex2.ShouldNotBeNull("second call to unreachable endpoint should throw");
|
||||
|
||||
// The key invariant: neither call must throw ObjectDisposedException.
|
||||
// If the acknowledger latched a dead-client state after the first failure,
|
||||
// the second call would surface an "object is disposed/dead" error instead
|
||||
// of another transport fault. Both must be transport-level failures.
|
||||
ex1.ShouldNotBeOfType<ObjectDisposedException>(
|
||||
"first call must not get an 'object is disposed' error");
|
||||
ex2.ShouldNotBeOfType<ObjectDisposedException>(
|
||||
"second call must not get a 'client is dead/disposed' error — " +
|
||||
"the acknowledger must not latch any dead state between calls");
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user