using System.Threading.Channels; using SuiteLink.Client.Protocol; using SuiteLink.Client.Transport; namespace SuiteLink.Client.Tests; public sealed class SuiteLinkClientRuntimeLoopTests { [Fact] public async Task ConnectAsync_WithZeroSubscriptions_TransitionsToReadyOnceRuntimeLoopStarts() { var transport = new BlockingFakeTransport(); transport.EnqueueReceive(BuildHandshakeAckFrame()); var client = new SuiteLinkClient(transport); await client.ConnectAsync(CreateOptions()); Assert.True(client.IsConnected); await client.DisposeAsync(); } [Fact] public async Task ConnectAsync_StartsBackgroundLoop_AndDispatchesUpdateWithoutManualPolling() { var updateReceived = new TaskCompletionSource( TaskCreationOptions.RunContinuationsAsynchronously); var transport = new BlockingFakeTransport(); transport.EnqueueReceive(BuildHandshakeAckFrame()); var client = new SuiteLinkClient(transport); await client.ConnectAsync(CreateOptions()); transport.EnqueueReceive(BuildAdviseAckFrame(1)); transport.EnqueueReceive(BuildBooleanUpdateFrame(1, true)); _ = await client.SubscribeAsync( "Pump001.Run", update => updateReceived.TrySetResult(update)); var update = await updateReceived.Task.WaitAsync(TimeSpan.FromSeconds(2)); Assert.True(update.Value.TryGetBoolean(out var value) && value); await client.DisposeAsync(); } [Fact] public async Task RuntimeLoop_CallbackCanReenterClientWriteWithoutDeadlock() { var callbackCompleted = new TaskCompletionSource( TaskCreationOptions.RunContinuationsAsynchronously); var transport = new BlockingFakeTransport(); transport.EnqueueReceive(BuildHandshakeAckFrame()); var client = new SuiteLinkClient(transport); await client.ConnectAsync(CreateOptions()); transport.EnqueueReceive(BuildAdviseAckFrame(1)); _ = await client.SubscribeAsync( "Pump001.Run", _ => { client.WriteAsync("Pump001.Run", SuiteLinkValue.FromBoolean(false)) .GetAwaiter() .GetResult(); callbackCompleted.TrySetResult(true); }); transport.EnqueueReceive(BuildBooleanUpdateFrame(1, true)); _ = await callbackCompleted.Task.WaitAsync(TimeSpan.FromSeconds(2)); var expectedPoke = SuiteLinkWriteCodec.Encode(1, SuiteLinkValue.FromBoolean(false)); Assert.Contains( transport.SentBuffers, frameBytes => frameBytes.AsSpan().SequenceEqual(expectedPoke)); await client.DisposeAsync(); } [Fact] public async Task DisposeAsync_AwaitsRuntimeLoopStop_BeforeDisposingOwnedTransport() { var transport = new OrderedShutdownFakeTransport(); transport.EnqueueReceive(BuildHandshakeAckFrame()); transport.EnqueueReceive(BuildAdviseAckFrame(1)); var client = new SuiteLinkClient(transport, ownsTransport: true); await client.ConnectAsync(CreateOptions()); _ = await client.SubscribeAsync("Pump001.Run", _ => { }); _ = await transport.RuntimeReceiveEntered.Task.WaitAsync(TimeSpan.FromSeconds(2)); var disposeTask = client.DisposeAsync().AsTask(); // The runtime loop is still blocked in receive and has not been allowed to return. await Task.Delay(100); Assert.False(disposeTask.IsCompleted); Assert.Equal(0, transport.DisposeCallCount); transport.AllowRuntimeReceiveReturn.TrySetResult(true); await disposeTask.WaitAsync(TimeSpan.FromSeconds(2)); Assert.Equal(1, transport.DisposeCallCount); Assert.True(transport.DisposeObservedRuntimeReceiveReturned); } private static SuiteLinkConnectionOptions CreateOptions() { return new SuiteLinkConnectionOptions( host: "127.0.0.1", application: "App", topic: "Topic", clientName: "Client", clientNode: "Node", userName: "User", serverNode: "Server", timezone: "UTC", port: 5413); } private static byte[] BuildHandshakeAckFrame() { return [0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5]; } private static byte[] BuildAdviseAckFrame(params uint[] tagIds) { var payload = new byte[Math.Max(1, tagIds.Length) * 5]; var ids = tagIds.Length == 0 ? [0u] : tagIds; var offset = 0; foreach (var tagId in ids) { SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(offset, 4), tagId); payload[offset + 4] = 0x00; offset += 5; } return SuiteLinkFrameWriter.WriteFrame(SuiteLinkSubscriptionCodec.AdviseAckMessageType, payload); } private static byte[] BuildBooleanUpdateFrame(uint tagId, bool value) { var payload = new byte[10]; SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(0, 4), tagId); SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(4, 2), 1); SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(6, 2), 0x00C0); payload[8] = (byte)SuiteLinkWireValueType.Binary; payload[9] = value ? (byte)1 : (byte)0; return SuiteLinkFrameWriter.WriteFrame(SuiteLinkUpdateCodec.UpdateMessageType, payload); } private sealed class BlockingFakeTransport : ISuiteLinkTransport { private readonly Channel _receiveChannel = Channel.CreateUnbounded(); private readonly object _syncRoot = new(); private bool _disposed; public bool IsConnected => !_disposed; public List SentBuffers { get; } = []; public void EnqueueReceive(byte[] frameBytes) { if (!_receiveChannel.Writer.TryWrite(frameBytes)) { throw new InvalidOperationException("Unable to enqueue receive frame."); } } public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default) { if (_disposed) { throw new ObjectDisposedException(nameof(BlockingFakeTransport)); } return ValueTask.CompletedTask; } public ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { lock (_syncRoot) { SentBuffers.Add(buffer.ToArray()); } return ValueTask.CompletedTask; } public async ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default) { var next = await _receiveChannel.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); next.CopyTo(buffer); return next.Length; } public ValueTask DisposeAsync() { _disposed = true; _receiveChannel.Writer.TryComplete(); return ValueTask.CompletedTask; } } private sealed class OrderedShutdownFakeTransport : ISuiteLinkTransport { private readonly object _syncRoot = new(); private readonly Queue _startupFrames = []; private bool _disposed; public bool IsConnected => !_disposed; public int DisposeCallCount { get; private set; } public bool DisposeObservedRuntimeReceiveReturned { get; private set; } public TaskCompletionSource RuntimeReceiveEntered { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); public TaskCompletionSource AllowRuntimeReceiveReturn { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); public TaskCompletionSource RuntimeReceiveReturned { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); public void EnqueueReceive(byte[] frameBytes) { lock (_syncRoot) { _startupFrames.Enqueue(frameBytes); } } public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default) { if (_disposed) { throw new ObjectDisposedException(nameof(OrderedShutdownFakeTransport)); } return ValueTask.CompletedTask; } public ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { return ValueTask.CompletedTask; } public ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default) { lock (_syncRoot) { if (_startupFrames.Count > 0) { var startupFrame = _startupFrames.Dequeue(); startupFrame.CopyTo(buffer); return ValueTask.FromResult(startupFrame.Length); } } RuntimeReceiveEntered.TrySetResult(true); return ReceiveRuntimeLoopBlockAsync(); } public ValueTask DisposeAsync() { _disposed = true; DisposeCallCount++; DisposeObservedRuntimeReceiveReturned = RuntimeReceiveReturned.Task.IsCompleted; return ValueTask.CompletedTask; } private async ValueTask ReceiveRuntimeLoopBlockAsync() { await AllowRuntimeReceiveReturn.Task.ConfigureAwait(false); RuntimeReceiveReturned.TrySetResult(true); return 0; } } }