Files
suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs
2026-03-17 11:04:19 -04:00

670 lines
25 KiB
C#

using System.Net.Sockets;
using SuiteLink.Client.Internal;
using SuiteLink.Client.Protocol;
using SuiteLink.Client.Transport;
namespace SuiteLink.Client.Tests;
public sealed class SuiteLinkClientReconnectTests
{
[Fact]
public async Task Reconnect_UsesConfiguredRetryPolicy()
{
var observedDelays = new List<TimeSpan>();
var capturedSchedule = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var syncRoot = new object();
Task CaptureDelayAsync(TimeSpan delay, CancellationToken _)
{
lock (syncRoot)
{
observedDelays.Add(delay);
if (observedDelays.Count >= 5)
{
capturedSchedule.TrySetResult(true);
}
}
return Task.CompletedTask;
}
var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ReturnEof)
.WithFrame(BuildHandshakeAckFrame())
.WithFrame(BuildAdviseAckFrame(1));
var client = new SuiteLinkClient(transport, ownsTransport: false, delayAsync: CaptureDelayAsync);
await client.ConnectAsync(CreateOptions(runtime: new SuiteLinkRuntimeOptions(
retryPolicy: new SuiteLinkRetryPolicy(
initialDelay: TimeSpan.FromSeconds(3),
multiplier: 3.0,
maxDelay: TimeSpan.FromSeconds(20),
useJitter: false),
catchUpPolicy: SuiteLinkCatchUpPolicy.None,
catchUpTimeout: TimeSpan.FromSeconds(2))));
_ = await client.SubscribeAsync("Pump001.Run", _ => { });
_ = await capturedSchedule.Task.WaitAsync(TimeSpan.FromSeconds(2));
TimeSpan[] firstFiveObserved;
lock (syncRoot)
{
firstFiveObserved =
[
observedDelays[0],
observedDelays[1],
observedDelays[2],
observedDelays[3],
observedDelays[4]
];
}
Assert.Equal(TimeSpan.Zero, firstFiveObserved[0]);
Assert.Equal(TimeSpan.FromSeconds(3), firstFiveObserved[1]);
Assert.Equal(TimeSpan.FromSeconds(9), firstFiveObserved[2]);
Assert.Equal(TimeSpan.FromSeconds(20), firstFiveObserved[3]);
Assert.Equal(TimeSpan.FromSeconds(20), firstFiveObserved[4]);
await client.DisposeAsync();
}
[Fact]
public async Task ReceiveLoop_Eof_TransitionsToReconnecting()
{
var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ReturnEof)
.WithFrame(BuildHandshakeAckFrame())
.WithFrame(BuildAdviseAckFrame(1));
var client = new SuiteLinkClient(transport);
await client.ConnectAsync(CreateOptions());
_ = await client.SubscribeAsync("Pump001.Run", _ => { });
await AssertStateEventuallyAsync(client, SuiteLinkSessionState.Reconnecting);
await client.DisposeAsync();
}
[Fact]
public async Task ReceiveLoop_ReceiveIOException_TransitionsToReconnecting()
{
var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ThrowIoException)
.WithFrame(BuildHandshakeAckFrame())
.WithFrame(BuildAdviseAckFrame(1));
var client = new SuiteLinkClient(transport);
await client.ConnectAsync(CreateOptions());
_ = await client.SubscribeAsync("Pump001.Run", _ => { });
await AssertStateEventuallyAsync(client, SuiteLinkSessionState.Reconnecting);
await client.DisposeAsync();
}
[Fact]
public async Task ReceiveLoop_ReceiveSocketException_TransitionsToReconnecting()
{
var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ThrowSocketException)
.WithFrame(BuildHandshakeAckFrame())
.WithFrame(BuildAdviseAckFrame(1));
var client = new SuiteLinkClient(transport);
await client.ConnectAsync(CreateOptions());
_ = await client.SubscribeAsync("Pump001.Run", _ => { });
await AssertStateEventuallyAsync(client, SuiteLinkSessionState.Reconnecting);
await client.DisposeAsync();
}
[Fact]
public async Task ReceiveLoop_PartialFrameThenEof_TransitionsToReconnecting()
{
var updateFrame = BuildBooleanUpdateFrame(1, true);
var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ReturnEof)
.WithFrame(BuildHandshakeAckFrame())
.WithFrame(BuildAdviseAckFrame(1))
.WithChunk(updateFrame.AsSpan(0, 5).ToArray());
var client = new SuiteLinkClient(transport);
await client.ConnectAsync(CreateOptions());
_ = await client.SubscribeAsync("Pump001.Run", _ => { });
await AssertStateEventuallyAsync(client, SuiteLinkSessionState.Reconnecting);
await client.DisposeAsync();
}
[Theory]
[InlineData(true, DisconnectBehavior.ReturnEof)]
[InlineData(true, DisconnectBehavior.ThrowIoException)]
[InlineData(false, DisconnectBehavior.ReturnEof)]
[InlineData(false, DisconnectBehavior.ThrowIoException)]
public async Task CloseOperations_RacingRuntimeDisconnect_EndInDisconnectedState(
bool useDisposeAsync,
DisconnectBehavior behavior)
{
var runtimeReceiveEntered = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var allowDisconnectSignal = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var transport = new RuntimeDisconnectFakeTransport(behavior)
.WithFrame(BuildHandshakeAckFrame())
.WithFrame(BuildAdviseAckFrame(1));
transport.RuntimeReceiveEntered = runtimeReceiveEntered;
transport.AllowDisconnectSignal = allowDisconnectSignal.Task;
var client = new SuiteLinkClient(transport);
await client.ConnectAsync(CreateOptions());
_ = await client.SubscribeAsync("Pump001.Run", _ => { });
_ = await runtimeReceiveEntered.Task.WaitAsync(TimeSpan.FromSeconds(2));
var closeTask = useDisposeAsync
? client.DisposeAsync().AsTask()
: client.DisconnectAsync();
allowDisconnectSignal.TrySetResult(true);
await closeTask.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(SuiteLinkSessionState.Disconnected, client.DebugState);
Assert.False(client.IsConnected);
}
[Fact]
public async Task ReadyWithNoSubscriptions_DoesNotProbeTransportLiveness_AndRemainsReady()
{
var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ReturnEof)
.WithFrame(BuildHandshakeAckFrame());
var client = new SuiteLinkClient(transport);
await client.ConnectAsync(CreateOptions());
await Task.Delay(250);
Assert.Equal(SuiteLinkSessionState.Ready, client.DebugState);
Assert.Equal(0, transport.RuntimeReceiveCallCount);
await client.DisposeAsync();
}
[Fact]
public async Task DisconnectAsync_CancelsPendingReconnectDelay_AndEndsDisconnected()
{
var reconnectDelayStarted = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var reconnectDelayCanceled = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Task DelayAsync(TimeSpan delay, CancellationToken cancellationToken)
{
if (delay == TimeSpan.Zero)
{
return Task.CompletedTask;
}
reconnectDelayStarted.TrySetResult(true);
cancellationToken.Register(() => reconnectDelayCanceled.TrySetResult(true));
return Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
}
var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ReturnEof)
.WithFrame(BuildHandshakeAckFrame())
.WithFrame(BuildAdviseAckFrame(1));
var client = new SuiteLinkClient(
transport,
ownsTransport: false,
delayAsync: DelayAsync,
reconnectAttemptAsync: static _ => ValueTask.FromResult(false));
await client.ConnectAsync(CreateOptions());
_ = await client.SubscribeAsync("Pump001.Run", _ => { });
_ = await reconnectDelayStarted.Task.WaitAsync(TimeSpan.FromSeconds(2));
await client.DisconnectAsync().WaitAsync(TimeSpan.FromSeconds(2));
_ = await reconnectDelayCanceled.Task.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(SuiteLinkSessionState.Disconnected, client.DebugState);
Assert.False(client.IsConnected);
}
[Fact]
public async Task Reconnect_ReplaysDurableSubscriptions_AndResumesUpdateDispatch()
{
var updateReceived = new TaskCompletionSource<SuiteLinkTagUpdate>(
TaskCreationOptions.RunContinuationsAsynchronously);
var transport = new ReplayableReconnectFakeTransport(
new ConnectionPlan(
EmptyReceiveBehavior.ReturnEof,
BuildHandshakeAckFrame(),
BuildAdviseAckFrame(1)),
new ConnectionPlan(
EmptyReceiveBehavior.Block,
BuildHandshakeAckFrame(),
BuildAdviseAckFrame(1),
BuildBooleanUpdateFrame(1, true)));
var client = new SuiteLinkClient(
transport,
ownsTransport: false,
delayAsync: static (_, _) => Task.CompletedTask);
await client.ConnectAsync(CreateOptions());
_ = await client.SubscribeAsync("Pump001.Run", update => updateReceived.TrySetResult(update));
var update = await updateReceived.Task.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(2, transport.ConnectCallCount);
Assert.Equal(2, CountSentMessageType(transport.SentBuffers, SuiteLinkSubscriptionCodec.AdviseMessageType));
Assert.Equal(SuiteLinkSessionState.Subscribed, client.DebugState);
Assert.True(update.Value.TryGetBoolean(out var value));
Assert.True(value);
await client.DisposeAsync();
}
[Fact]
public async Task Reconnect_RestoresLiveTagMappings_AndAllowsWriteAfterReplay()
{
var transport = new ReplayableReconnectFakeTransport(
new ConnectionPlan(
EmptyReceiveBehavior.ReturnEof,
BuildHandshakeAckFrame(),
BuildAdviseAckFrame(1)),
new ConnectionPlan(
EmptyReceiveBehavior.Block,
BuildHandshakeAckFrame(),
BuildAdviseAckFrame(1)));
var client = new SuiteLinkClient(
transport,
ownsTransport: false,
delayAsync: static (_, _) => Task.CompletedTask);
await client.ConnectAsync(CreateOptions());
_ = await client.SubscribeAsync("Pump001.Run", _ => { });
await AssertStateEventuallyAsync(client, SuiteLinkSessionState.Subscribed);
transport.ClearSentBuffers();
await client.WriteAsync("Pump001.Run", SuiteLinkValue.FromBoolean(false));
Assert.Contains(
transport.SentBuffers,
frameBytes => frameBytes.AsSpan().SequenceEqual(
SuiteLinkWriteCodec.Encode(1, SuiteLinkValue.FromBoolean(false))));
await client.DisposeAsync();
}
[Fact]
public async Task Reconnect_WithRefreshLatestValue_DispatchesCatchUpReplay()
{
var catchUpReceived = new TaskCompletionSource<SuiteLinkTagUpdate>(
TaskCreationOptions.RunContinuationsAsynchronously);
var transport = new ReplayableReconnectFakeTransport(
new ConnectionPlan(
EmptyReceiveBehavior.ReturnEof,
BuildHandshakeAckFrame(),
BuildAdviseAckFrame(1)),
new ConnectionPlan(
EmptyReceiveBehavior.Block,
BuildHandshakeAckFrame(),
BuildAdviseAckFrame(1),
BuildAdviseAckFrame(2),
BuildBooleanUpdateFrame(2, true)));
var client = new SuiteLinkClient(
transport,
ownsTransport: false,
delayAsync: static (_, _) => Task.CompletedTask);
await client.ConnectAsync(CreateOptions(runtime: new SuiteLinkRuntimeOptions(
retryPolicy: SuiteLinkRetryPolicy.Default,
catchUpPolicy: SuiteLinkCatchUpPolicy.RefreshLatestValue,
catchUpTimeout: TimeSpan.FromSeconds(2))));
_ = await client.SubscribeAsync("Pump001.Run", update =>
{
if (update.Source == SuiteLinkUpdateSource.CatchUpReplay)
{
catchUpReceived.TrySetResult(update);
}
});
var catchUp = await catchUpReceived.Task.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(SuiteLinkUpdateSource.CatchUpReplay, catchUp.Source);
Assert.Equal(1u, catchUp.TagId);
Assert.True(catchUp.Value.TryGetBoolean(out var value));
Assert.True(value);
await client.DisposeAsync();
}
[Fact]
public async Task Reconnect_CatchUpTimeout_DoesNotFailRecoveredSubscriptions()
{
var transport = new ReplayableReconnectFakeTransport(
new ConnectionPlan(
EmptyReceiveBehavior.ReturnEof,
BuildHandshakeAckFrame(),
BuildAdviseAckFrame(1)),
new ConnectionPlan(
EmptyReceiveBehavior.Block,
BuildHandshakeAckFrame(),
BuildAdviseAckFrame(1),
BuildAdviseAckFrame(2)));
var client = new SuiteLinkClient(
transport,
ownsTransport: false,
delayAsync: static (_, _) => Task.CompletedTask);
await client.ConnectAsync(CreateOptions(runtime: new SuiteLinkRuntimeOptions(
retryPolicy: SuiteLinkRetryPolicy.Default,
catchUpPolicy: SuiteLinkCatchUpPolicy.RefreshLatestValue,
catchUpTimeout: TimeSpan.FromMilliseconds(100))));
_ = await client.SubscribeAsync("Pump001.Run", _ => { });
await AssertStateEventuallyAsync(client, SuiteLinkSessionState.Subscribed, TimeSpan.FromSeconds(2));
Assert.True(client.IsConnected);
await client.DisposeAsync();
}
[Fact]
public async Task WriteAsync_DuringReconnect_ThrowsPredictableInvalidOperationException()
{
var reconnectAttemptStarted = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ReturnEof)
.WithFrame(BuildHandshakeAckFrame())
.WithFrame(BuildAdviseAckFrame(1));
var client = new SuiteLinkClient(
transport,
ownsTransport: false,
delayAsync: static (_, _) => Task.CompletedTask,
reconnectAttemptAsync: async cancellationToken =>
{
reconnectAttemptStarted.TrySetResult(true);
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false);
return false;
});
await client.ConnectAsync(CreateOptions());
_ = await client.SubscribeAsync("Pump001.Run", _ => { });
await AssertStateEventuallyAsync(client, SuiteLinkSessionState.Reconnecting);
_ = await reconnectAttemptStarted.Task.WaitAsync(TimeSpan.FromSeconds(2));
var ex = await Assert.ThrowsAsync<InvalidOperationException>(
() => client.WriteAsync("Pump001.Run", SuiteLinkValue.FromBoolean(false)));
Assert.Contains("reconnecting", ex.Message, StringComparison.OrdinalIgnoreCase);
await client.DisposeAsync();
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task CloseOperations_DuringReconnectAttempt_CancelRecoveryAndEndDisconnected(bool useDisposeAsync)
{
var reconnectAttemptStarted = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var reconnectAttemptCanceled = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ReturnEof)
.WithFrame(BuildHandshakeAckFrame())
.WithFrame(BuildAdviseAckFrame(1));
var client = new SuiteLinkClient(
transport,
ownsTransport: false,
delayAsync: static (_, _) => Task.CompletedTask,
reconnectAttemptAsync: async cancellationToken =>
{
reconnectAttemptStarted.TrySetResult(true);
try
{
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false);
return false;
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
reconnectAttemptCanceled.TrySetResult(true);
throw;
}
});
await client.ConnectAsync(CreateOptions());
_ = await client.SubscribeAsync("Pump001.Run", _ => { });
_ = await reconnectAttemptStarted.Task.WaitAsync(TimeSpan.FromSeconds(2));
if (useDisposeAsync)
{
await client.DisposeAsync();
}
else
{
await client.DisconnectAsync();
}
_ = await reconnectAttemptCanceled.Task.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(SuiteLinkSessionState.Disconnected, client.DebugState);
Assert.False(client.IsConnected);
}
private static async Task AssertStateEventuallyAsync(
SuiteLinkClient client,
SuiteLinkSessionState expectedState,
TimeSpan? timeout = null)
{
var deadline = DateTime.UtcNow + (timeout ?? TimeSpan.FromSeconds(2));
while (DateTime.UtcNow < deadline)
{
if (client.DebugState == expectedState)
{
return;
}
await Task.Delay(20);
}
Assert.Equal(expectedState, client.DebugState);
}
private static SuiteLinkConnectionOptions CreateOptions(SuiteLinkRuntimeOptions? runtime = null)
{
return new SuiteLinkConnectionOptions(
host: "127.0.0.1",
application: "App",
topic: "Topic",
clientName: "Client",
clientNode: "Node",
userName: "User",
serverNode: "Server",
timezone: "UTC",
port: 5413,
runtime: runtime);
}
private static byte[] BuildHandshakeAckFrame()
{
return [0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5];
}
private static byte[] BuildAdviseAckFrame(params uint[] tagIds)
{
var payload = new byte[Math.Max(1, tagIds.Length) * 5];
var ids = tagIds.Length == 0 ? [0u] : tagIds;
var offset = 0;
foreach (var tagId in ids)
{
SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(offset, 4), tagId);
payload[offset + 4] = 0x00;
offset += 5;
}
return SuiteLinkFrameWriter.WriteFrame(SuiteLinkSubscriptionCodec.AdviseAckMessageType, payload);
}
private static byte[] BuildBooleanUpdateFrame(uint tagId, bool value)
{
var payload = new byte[10];
SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(0, 4), tagId);
SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(4, 2), 1);
SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(6, 2), 0x00C0);
payload[8] = (byte)SuiteLinkWireValueType.Binary;
payload[9] = value ? (byte)1 : (byte)0;
return SuiteLinkFrameWriter.WriteFrame(SuiteLinkUpdateCodec.UpdateMessageType, payload);
}
private static int CountSentMessageType(IEnumerable<byte[]> sentBuffers, ushort messageType)
{
return sentBuffers.Count(
frameBytes =>
SuiteLinkFrameReader.TryParseFrame(frameBytes, out var frame, out _) &&
frame.MessageType == messageType);
}
public enum DisconnectBehavior
{
ReturnEof,
ThrowIoException,
ThrowSocketException
}
private enum EmptyReceiveBehavior
{
ReturnEof,
Block
}
private sealed record ConnectionPlan(
EmptyReceiveBehavior EmptyReceiveBehavior,
params byte[][] Frames);
private sealed class RuntimeDisconnectFakeTransport : ISuiteLinkTransport
{
private readonly Queue<byte[]> _receiveChunks = [];
private readonly DisconnectBehavior _disconnectBehavior;
public RuntimeDisconnectFakeTransport(DisconnectBehavior disconnectBehavior)
{
_disconnectBehavior = disconnectBehavior;
}
public bool IsConnected { get; private set; }
public int RuntimeReceiveCallCount { get; private set; }
public TaskCompletionSource<bool>? RuntimeReceiveEntered { get; set; }
public Task? AllowDisconnectSignal { get; set; }
public RuntimeDisconnectFakeTransport WithFrame(byte[] frameBytes)
{
_receiveChunks.Enqueue(frameBytes);
return this;
}
public RuntimeDisconnectFakeTransport WithChunk(byte[] bytes)
{
_receiveChunks.Enqueue(bytes);
return this;
}
public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default)
{
IsConnected = true;
return ValueTask.CompletedTask;
}
public ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
return ValueTask.CompletedTask;
}
public async ValueTask<int> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (_receiveChunks.Count > 0)
{
var next = _receiveChunks.Dequeue();
next.CopyTo(buffer);
return next.Length;
}
RuntimeReceiveCallCount++;
RuntimeReceiveEntered?.TrySetResult(true);
if (AllowDisconnectSignal is not null)
{
await AllowDisconnectSignal.ConfigureAwait(false);
}
return _disconnectBehavior switch
{
DisconnectBehavior.ReturnEof => 0,
DisconnectBehavior.ThrowIoException =>
throw new IOException("Synthetic runtime disconnect."),
DisconnectBehavior.ThrowSocketException =>
throw new SocketException((int)SocketError.ConnectionReset),
_ => 0
};
}
public ValueTask DisposeAsync()
{
IsConnected = false;
return ValueTask.CompletedTask;
}
}
private sealed class ReplayableReconnectFakeTransport : ISuiteLinkTransport
{
private readonly object _syncRoot = new();
private readonly List<ConnectionPlan> _connectionPlans;
private Queue<byte[]> _receiveChunks = [];
private EmptyReceiveBehavior _emptyReceiveBehavior;
private bool _disposed;
public ReplayableReconnectFakeTransport(params ConnectionPlan[] connectionPlans)
{
_connectionPlans = [.. connectionPlans];
}
public int ConnectCallCount { get; private set; }
public bool IsConnected => !_disposed;
public List<byte[]> SentBuffers { get; } = [];
public void ClearSentBuffers()
{
lock (_syncRoot)
{
SentBuffers.Clear();
}
}
public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default)
{
if (ConnectCallCount >= _connectionPlans.Count)
{
throw new InvalidOperationException("No reconnect plan is available for the requested attempt.");
}
var plan = _connectionPlans[ConnectCallCount];
ConnectCallCount++;
_receiveChunks = new Queue<byte[]>(plan.Frames);
_emptyReceiveBehavior = plan.EmptyReceiveBehavior;
return ValueTask.CompletedTask;
}
public ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
lock (_syncRoot)
{
SentBuffers.Add(buffer.ToArray());
}
return ValueTask.CompletedTask;
}
public async ValueTask<int> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
if (_receiveChunks.Count > 0)
{
var next = _receiveChunks.Dequeue();
next.CopyTo(buffer);
return next.Length;
}
if (_emptyReceiveBehavior == EmptyReceiveBehavior.ReturnEof)
{
return 0;
}
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false);
return 0;
}
public ValueTask DisposeAsync()
{
_disposed = true;
return ValueTask.CompletedTask;
}
}
}