feat: add resilient reconnect and catch-up replay

This commit is contained in:
Joseph Doherty
2026-03-17 11:04:19 -04:00
parent c278f98496
commit 2f04ec9d1d
29 changed files with 3746 additions and 95 deletions

View File

@@ -0,0 +1,272 @@
using System.Threading.Channels;
using SuiteLink.Client.Protocol;
using SuiteLink.Client.Transport;
namespace SuiteLink.Client.Tests;
public sealed class SuiteLinkClientRuntimeLoopTests
{
[Fact]
public async Task ConnectAsync_WithZeroSubscriptions_TransitionsToReadyOnceRuntimeLoopStarts()
{
var transport = new BlockingFakeTransport();
transport.EnqueueReceive(BuildHandshakeAckFrame());
var client = new SuiteLinkClient(transport);
await client.ConnectAsync(CreateOptions());
Assert.True(client.IsConnected);
await client.DisposeAsync();
}
[Fact]
public async Task ConnectAsync_StartsBackgroundLoop_AndDispatchesUpdateWithoutManualPolling()
{
var updateReceived = new TaskCompletionSource<SuiteLinkTagUpdate>(
TaskCreationOptions.RunContinuationsAsynchronously);
var transport = new BlockingFakeTransport();
transport.EnqueueReceive(BuildHandshakeAckFrame());
var client = new SuiteLinkClient(transport);
await client.ConnectAsync(CreateOptions());
transport.EnqueueReceive(BuildAdviseAckFrame(1));
transport.EnqueueReceive(BuildBooleanUpdateFrame(1, true));
_ = await client.SubscribeAsync(
"Pump001.Run",
update => updateReceived.TrySetResult(update));
var update = await updateReceived.Task.WaitAsync(TimeSpan.FromSeconds(2));
Assert.True(update.Value.TryGetBoolean(out var value) && value);
await client.DisposeAsync();
}
[Fact]
public async Task RuntimeLoop_CallbackCanReenterClientWriteWithoutDeadlock()
{
var callbackCompleted = new TaskCompletionSource<bool>(
TaskCreationOptions.RunContinuationsAsynchronously);
var transport = new BlockingFakeTransport();
transport.EnqueueReceive(BuildHandshakeAckFrame());
var client = new SuiteLinkClient(transport);
await client.ConnectAsync(CreateOptions());
transport.EnqueueReceive(BuildAdviseAckFrame(1));
_ = await client.SubscribeAsync(
"Pump001.Run",
_ =>
{
client.WriteAsync("Pump001.Run", SuiteLinkValue.FromBoolean(false))
.GetAwaiter()
.GetResult();
callbackCompleted.TrySetResult(true);
});
transport.EnqueueReceive(BuildBooleanUpdateFrame(1, true));
_ = await callbackCompleted.Task.WaitAsync(TimeSpan.FromSeconds(2));
var expectedPoke = SuiteLinkWriteCodec.Encode(1, SuiteLinkValue.FromBoolean(false));
Assert.Contains(
transport.SentBuffers,
frameBytes => frameBytes.AsSpan().SequenceEqual(expectedPoke));
await client.DisposeAsync();
}
[Fact]
public async Task DisposeAsync_AwaitsRuntimeLoopStop_BeforeDisposingOwnedTransport()
{
var transport = new OrderedShutdownFakeTransport();
transport.EnqueueReceive(BuildHandshakeAckFrame());
transport.EnqueueReceive(BuildAdviseAckFrame(1));
var client = new SuiteLinkClient(transport, ownsTransport: true);
await client.ConnectAsync(CreateOptions());
_ = await client.SubscribeAsync("Pump001.Run", _ => { });
_ = await transport.RuntimeReceiveEntered.Task.WaitAsync(TimeSpan.FromSeconds(2));
var disposeTask = client.DisposeAsync().AsTask();
// The runtime loop is still blocked in receive and has not been allowed to return.
await Task.Delay(100);
Assert.False(disposeTask.IsCompleted);
Assert.Equal(0, transport.DisposeCallCount);
transport.AllowRuntimeReceiveReturn.TrySetResult(true);
await disposeTask.WaitAsync(TimeSpan.FromSeconds(2));
Assert.Equal(1, transport.DisposeCallCount);
Assert.True(transport.DisposeObservedRuntimeReceiveReturned);
}
private static SuiteLinkConnectionOptions CreateOptions()
{
return new SuiteLinkConnectionOptions(
host: "127.0.0.1",
application: "App",
topic: "Topic",
clientName: "Client",
clientNode: "Node",
userName: "User",
serverNode: "Server",
timezone: "UTC",
port: 5413);
}
private static byte[] BuildHandshakeAckFrame()
{
return [0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5];
}
private static byte[] BuildAdviseAckFrame(params uint[] tagIds)
{
var payload = new byte[Math.Max(1, tagIds.Length) * 5];
var ids = tagIds.Length == 0 ? [0u] : tagIds;
var offset = 0;
foreach (var tagId in ids)
{
SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(offset, 4), tagId);
payload[offset + 4] = 0x00;
offset += 5;
}
return SuiteLinkFrameWriter.WriteFrame(SuiteLinkSubscriptionCodec.AdviseAckMessageType, payload);
}
private static byte[] BuildBooleanUpdateFrame(uint tagId, bool value)
{
var payload = new byte[10];
SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(0, 4), tagId);
SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(4, 2), 1);
SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(6, 2), 0x00C0);
payload[8] = (byte)SuiteLinkWireValueType.Binary;
payload[9] = value ? (byte)1 : (byte)0;
return SuiteLinkFrameWriter.WriteFrame(SuiteLinkUpdateCodec.UpdateMessageType, payload);
}
private sealed class BlockingFakeTransport : ISuiteLinkTransport
{
private readonly Channel<byte[]> _receiveChannel = Channel.CreateUnbounded<byte[]>();
private readonly object _syncRoot = new();
private bool _disposed;
public bool IsConnected => !_disposed;
public List<byte[]> SentBuffers { get; } = [];
public void EnqueueReceive(byte[] frameBytes)
{
if (!_receiveChannel.Writer.TryWrite(frameBytes))
{
throw new InvalidOperationException("Unable to enqueue receive frame.");
}
}
public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default)
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(BlockingFakeTransport));
}
return ValueTask.CompletedTask;
}
public ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
lock (_syncRoot)
{
SentBuffers.Add(buffer.ToArray());
}
return ValueTask.CompletedTask;
}
public async ValueTask<int> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
var next = await _receiveChannel.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
next.CopyTo(buffer);
return next.Length;
}
public ValueTask DisposeAsync()
{
_disposed = true;
_receiveChannel.Writer.TryComplete();
return ValueTask.CompletedTask;
}
}
private sealed class OrderedShutdownFakeTransport : ISuiteLinkTransport
{
private readonly object _syncRoot = new();
private readonly Queue<byte[]> _startupFrames = [];
private bool _disposed;
public bool IsConnected => !_disposed;
public int DisposeCallCount { get; private set; }
public bool DisposeObservedRuntimeReceiveReturned { get; private set; }
public TaskCompletionSource<bool> RuntimeReceiveEntered { get; } =
new(TaskCreationOptions.RunContinuationsAsynchronously);
public TaskCompletionSource<bool> AllowRuntimeReceiveReturn { get; } =
new(TaskCreationOptions.RunContinuationsAsynchronously);
public TaskCompletionSource<bool> RuntimeReceiveReturned { get; } =
new(TaskCreationOptions.RunContinuationsAsynchronously);
public void EnqueueReceive(byte[] frameBytes)
{
lock (_syncRoot)
{
_startupFrames.Enqueue(frameBytes);
}
}
public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default)
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(OrderedShutdownFakeTransport));
}
return ValueTask.CompletedTask;
}
public ValueTask SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
return ValueTask.CompletedTask;
}
public ValueTask<int> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
lock (_syncRoot)
{
if (_startupFrames.Count > 0)
{
var startupFrame = _startupFrames.Dequeue();
startupFrame.CopyTo(buffer);
return ValueTask.FromResult(startupFrame.Length);
}
}
RuntimeReceiveEntered.TrySetResult(true);
return ReceiveRuntimeLoopBlockAsync();
}
public ValueTask DisposeAsync()
{
_disposed = true;
DisposeCallCount++;
DisposeObservedRuntimeReceiveReturned = RuntimeReceiveReturned.Task.IsCompleted;
return ValueTask.CompletedTask;
}
private async ValueTask<int> ReceiveRuntimeLoopBlockAsync()
{
await AllowRuntimeReceiveReturn.Task.ConfigureAwait(false);
RuntimeReceiveReturned.TrySetResult(true);
return 0;
}
}
}