fix(galaxy): reconnect recreates a faulted session instead of no-op'ing
This commit is contained in:
@@ -282,15 +282,17 @@ public sealed class GalaxyDriver
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Reopen callback for <see cref="ReconnectSupervisor"/>: re-Register the gw session.
|
/// Reopen callback for <see cref="ReconnectSupervisor"/>: recreate the gw session. After a
|
||||||
/// If the session never connected, this is a fresh ConnectAsync; otherwise it's a
|
/// gateway restart the existing session handle is faulted / stale but non-null, so a plain
|
||||||
/// reconnect against the existing client.
|
/// <c>ConnectAsync</c> would no-op and the supervisor would loop forever replaying against a
|
||||||
|
/// dead session. <see cref="GalaxyMxSession.RecreateAsync"/> disposes the stale session +
|
||||||
|
/// owned client and rebuilds, so a never-connected session still opens fresh.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private async Task ReopenAsync(CancellationToken cancellationToken)
|
private async Task ReopenAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
if (_ownedMxSession is null) return;
|
if (_ownedMxSession is null) return;
|
||||||
var clientOptions = BuildClientOptions(_options.Gateway);
|
var clientOptions = BuildClientOptions(_options.Gateway);
|
||||||
await _ownedMxSession.ConnectAsync(clientOptions, cancellationToken).ConfigureAwait(false);
|
await _ownedMxSession.RecreateAsync(clientOptions, cancellationToken).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ public sealed class GalaxyMxSession : IAsyncDisposable
|
|||||||
private MxGatewayClient? _ownedClient;
|
private MxGatewayClient? _ownedClient;
|
||||||
private MxGatewaySession? _session;
|
private MxGatewaySession? _session;
|
||||||
private int _serverHandle;
|
private int _serverHandle;
|
||||||
|
private bool _connected;
|
||||||
private bool _disposed;
|
private bool _disposed;
|
||||||
|
|
||||||
/// <summary>Initializes a new instance of the GalaxyMxSession class.</summary>
|
/// <summary>Initializes a new instance of the GalaxyMxSession class.</summary>
|
||||||
@@ -48,24 +49,65 @@ public sealed class GalaxyMxSession : IAsyncDisposable
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public int ServerHandle => _serverHandle;
|
public int ServerHandle => _serverHandle;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Test seam — when set, replaces the real "open session + register" work so unit tests can
|
||||||
|
/// exercise the connect / recreate orchestration without a live gateway (the SDK session/client
|
||||||
|
/// types are sealed + internal-ctor and cannot be faked). Null in production = real gateway path.
|
||||||
|
/// </summary>
|
||||||
|
internal Func<CancellationToken, Task>? OpenAndRegisterOverrideForTests { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Connect the underlying gateway client + open an MXAccess session + register the
|
/// Connect the underlying gateway client + open an MXAccess session + register the
|
||||||
/// configured client name. Idempotent — second calls are no-ops while
|
/// configured client name. Idempotent — second calls are no-ops while a connect has
|
||||||
/// <see cref="IsConnected"/> is true.
|
/// already succeeded. Use <see cref="RecreateAsync"/> to force a rebuild after the
|
||||||
|
/// gateway restarts and leaves a faulted / stale session handle.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="clientOptions">The MX gateway client options.</param>
|
/// <param name="clientOptions">The MX gateway client options.</param>
|
||||||
/// <param name="cancellationToken">The cancellation token.</param>
|
/// <param name="cancellationToken">The cancellation token.</param>
|
||||||
public async Task ConnectAsync(MxGatewayClientOptions clientOptions, CancellationToken cancellationToken)
|
public async Task ConnectAsync(MxGatewayClientOptions clientOptions, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||||
if (_session is not null) return;
|
if (_connected) return;
|
||||||
|
|
||||||
_ownedClient = MxGatewayClient.Create(clientOptions);
|
try
|
||||||
_session = await _ownedClient.OpenSessionAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
|
{
|
||||||
_serverHandle = await _session.RegisterAsync(_options.ClientName, cancellationToken).ConfigureAwait(false);
|
if (OpenAndRegisterOverrideForTests is not null)
|
||||||
_logger.LogInformation(
|
{
|
||||||
"GalaxyMxSession connected — clientName={ClientName} serverHandle={Handle}",
|
await OpenAndRegisterOverrideForTests(cancellationToken).ConfigureAwait(false);
|
||||||
_options.ClientName, _serverHandle);
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_ownedClient = MxGatewayClient.Create(clientOptions);
|
||||||
|
_session = await _ownedClient.OpenSessionAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
|
||||||
|
_serverHandle = await _session.RegisterAsync(_options.ClientName, cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
_connected = true;
|
||||||
|
_logger.LogInformation(
|
||||||
|
"GalaxyMxSession connected — clientName={ClientName} serverHandle={Handle}",
|
||||||
|
_options.ClientName, _serverHandle);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
// A partial open (e.g. OpenSession succeeded but Register threw) must not leave a half-open
|
||||||
|
// handle that blocks the next reconnect. Tear the partial state down so a retry rebuilds cleanly.
|
||||||
|
await TeardownSessionAsync().ConfigureAwait(false);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Disposes the current (faulted / stale) session + owned client and rebuilds. Use this on
|
||||||
|
/// reconnect: <see cref="ConnectAsync"/> alone no-ops while a (possibly dead) session handle is
|
||||||
|
/// still present, so the supervisor's reopen must route through here to actually re-establish.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="clientOptions">The MX gateway client options.</param>
|
||||||
|
/// <param name="cancellationToken">The cancellation token.</param>
|
||||||
|
public async Task RecreateAsync(MxGatewayClientOptions clientOptions, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||||
|
await TeardownSessionAsync().ConfigureAwait(false);
|
||||||
|
await ConnectAsync(clientOptions, cancellationToken).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -93,19 +135,31 @@ public sealed class GalaxyMxSession : IAsyncDisposable
|
|||||||
{
|
{
|
||||||
if (_disposed) return;
|
if (_disposed) return;
|
||||||
_disposed = true;
|
_disposed = true;
|
||||||
|
await TeardownSessionAsync().ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Resets the connect flag and best-effort disposes + nulls the owned session and client.
|
||||||
|
/// Shared by <see cref="DisposeAsync"/> (final teardown) and <see cref="RecreateAsync"/> /
|
||||||
|
/// the partial-open recovery path (rebuild teardown) — it does NOT set <c>_disposed</c>, so
|
||||||
|
/// a torn-down session can be re-opened.
|
||||||
|
/// </summary>
|
||||||
|
private async Task TeardownSessionAsync()
|
||||||
|
{
|
||||||
|
_connected = false;
|
||||||
|
|
||||||
if (_session is not null)
|
if (_session is not null)
|
||||||
{
|
{
|
||||||
try { await _session.DisposeAsync().ConfigureAwait(false); }
|
try { await _session.DisposeAsync().ConfigureAwait(false); }
|
||||||
catch (Exception ex) { _logger.LogWarning(ex, "GalaxyMxSession session dispose failed (best-effort)"); }
|
catch (Exception ex) { _logger.LogWarning(ex, "GalaxyMxSession session dispose failed (best-effort)"); }
|
||||||
|
_session = null;
|
||||||
}
|
}
|
||||||
_session = null;
|
|
||||||
|
|
||||||
if (_ownedClient is not null)
|
if (_ownedClient is not null)
|
||||||
{
|
{
|
||||||
try { await _ownedClient.DisposeAsync().ConfigureAwait(false); }
|
try { await _ownedClient.DisposeAsync().ConfigureAwait(false); }
|
||||||
catch (Exception ex) { _logger.LogWarning(ex, "GalaxyMxSession client dispose failed (best-effort)"); }
|
catch (Exception ex) { _logger.LogWarning(ex, "GalaxyMxSession client dispose failed (best-effort)"); }
|
||||||
|
_ownedClient = null;
|
||||||
}
|
}
|
||||||
_ownedClient = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+106
@@ -0,0 +1,106 @@
|
|||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Connect / recreate orchestration tests for <see cref="GalaxyMxSession"/>. The SDK
|
||||||
|
/// session/client types are sealed with internal ctors and cannot be faked, so these
|
||||||
|
/// drive the open body through the <c>OpenAndRegisterOverrideForTests</c> seam. The
|
||||||
|
/// core regression guard is <see cref="Recreate_after_connect_opens_a_fresh_session"/>:
|
||||||
|
/// a stale-session reconnect must rebuild rather than no-op (the gateway-restart bug).
|
||||||
|
/// </summary>
|
||||||
|
public sealed class GalaxyMxSessionReconnectTests
|
||||||
|
{
|
||||||
|
private static GalaxyMxAccessOptions MinimalOptions() => new(ClientName: "OtOpcUaTest");
|
||||||
|
|
||||||
|
private static GalaxyMxSession NewSession() => new(MinimalOptions());
|
||||||
|
|
||||||
|
/// <summary>A second <c>ConnectAsync</c> while connected must be a no-op (idempotent guard).</summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task Connect_then_connect_again_is_a_noop()
|
||||||
|
{
|
||||||
|
var session = NewSession();
|
||||||
|
var openCount = 0;
|
||||||
|
session.OpenAndRegisterOverrideForTests = _ =>
|
||||||
|
{
|
||||||
|
openCount++;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
};
|
||||||
|
|
||||||
|
await session.ConnectAsync(null!, CancellationToken.None);
|
||||||
|
openCount.ShouldBe(1);
|
||||||
|
|
||||||
|
await session.ConnectAsync(null!, CancellationToken.None);
|
||||||
|
openCount.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The regression guard for the gateway-restart bug: <c>RecreateAsync</c> must bypass
|
||||||
|
/// the no-op guard and re-run the open body even though a (stale) session is present.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task Recreate_after_connect_opens_a_fresh_session()
|
||||||
|
{
|
||||||
|
var session = NewSession();
|
||||||
|
var openCount = 0;
|
||||||
|
session.OpenAndRegisterOverrideForTests = _ =>
|
||||||
|
{
|
||||||
|
openCount++;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
};
|
||||||
|
|
||||||
|
await session.ConnectAsync(null!, CancellationToken.None);
|
||||||
|
openCount.ShouldBe(1);
|
||||||
|
|
||||||
|
await session.RecreateAsync(null!, CancellationToken.None);
|
||||||
|
openCount.ShouldBe(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary><c>RecreateAsync</c> on a never-connected session still opens (teardown is a no-op first).</summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task Recreate_when_never_connected_still_opens()
|
||||||
|
{
|
||||||
|
var session = NewSession();
|
||||||
|
var openCount = 0;
|
||||||
|
session.OpenAndRegisterOverrideForTests = _ =>
|
||||||
|
{
|
||||||
|
openCount++;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
};
|
||||||
|
|
||||||
|
await session.RecreateAsync(null!, CancellationToken.None);
|
||||||
|
openCount.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// A failed first connect must not leave <c>_connected</c> set — the next attempt has
|
||||||
|
/// to reach the open body again (partial-open teardown).
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task Connect_failure_is_not_left_half_open()
|
||||||
|
{
|
||||||
|
var session = NewSession();
|
||||||
|
var openCount = 0;
|
||||||
|
session.OpenAndRegisterOverrideForTests = _ =>
|
||||||
|
{
|
||||||
|
// Throw on the first attempt, succeed on the second.
|
||||||
|
if (openCount++ == 0)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException("simulated open failure");
|
||||||
|
}
|
||||||
|
|
||||||
|
return Task.CompletedTask;
|
||||||
|
};
|
||||||
|
|
||||||
|
await Should.ThrowAsync<InvalidOperationException>(
|
||||||
|
async () => await session.ConnectAsync(null!, CancellationToken.None));
|
||||||
|
openCount.ShouldBe(1);
|
||||||
|
|
||||||
|
// The failed first attempt must not have latched _connected — the retry reaches the body.
|
||||||
|
await session.ConnectAsync(null!, CancellationToken.None);
|
||||||
|
openCount.ShouldBe(2);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user