using SuiteLink.Client.Protocol; using SuiteLink.Client.Transport; namespace SuiteLink.Client.Tests; public sealed class SuiteLinkClientSubscriptionTests { [Fact] public async Task SubscribeAsync_SendsAdvise_AndReturnsSubscriptionHandle() { var transport = new FakeTransport(); transport.EnqueueReceive(BuildHandshakeAckFrame()); transport.EnqueueReceive(BuildAdviseAckFrame(1)); var client = new SuiteLinkClient(transport); await client.ConnectAsync(CreateOptions()); var handle = await client.SubscribeAsync("Pump001.Run", _ => { }); Assert.True(client.IsConnected); Assert.Equal("Pump001.Run", handle.ItemName); Assert.Equal(1u, handle.TagId); Assert.Equal(3, transport.SentBuffers.Count); var adviseFrame = SuiteLinkFrameReader.ParseFrame(transport.SentBuffers[2]); Assert.Equal(SuiteLinkSubscriptionCodec.AdviseMessageType, adviseFrame.MessageType); Assert.Equal(1u, SuiteLinkEncoding.ReadUInt32LittleEndian(adviseFrame.Payload.Span[..4])); } [Fact] public async Task ProcessIncomingAsync_UpdateFrame_DispatchesToSubscriptionCallback() { var transport = new FakeTransport(); transport.EnqueueReceive(BuildHandshakeAckFrame()); transport.EnqueueReceive(BuildAdviseAckFrame(1)); transport.EnqueueReceive(BuildBooleanUpdateFrame(1, true)); var client = new SuiteLinkClient(transport); await client.ConnectAsync(CreateOptions()); SuiteLinkTagUpdate? callbackUpdate = null; _ = await client.SubscribeAsync("Pump001.Run", update => callbackUpdate = update); await client.ProcessIncomingAsync(); Assert.NotNull(callbackUpdate); Assert.Equal("Pump001.Run", callbackUpdate.ItemName); Assert.True(callbackUpdate.Value.TryGetBoolean(out var value)); Assert.True(value); } [Fact] public async Task ReadAsync_ReturnsFirstMatchingUpdate_UsingTemporarySubscription() { var transport = new FakeTransport(); transport.EnqueueReceive(BuildHandshakeAckFrame()); transport.EnqueueReceive(BuildAdviseAckFrame(1)); transport.EnqueueReceive(BuildIntegerUpdateFrame(1, 42)); var client = new SuiteLinkClient(transport); await client.ConnectAsync(CreateOptions()); var update = await client.ReadAsync("Pump001.Speed", TimeSpan.FromSeconds(2)); Assert.Equal("Pump001.Speed", update.ItemName); Assert.True(update.Value.TryGetInt32(out var value)); Assert.Equal(42, value); Assert.Equal(4, transport.SentBuffers.Count); var unadviseFrame = SuiteLinkFrameReader.ParseFrame(transport.SentBuffers[3]); Assert.Equal(SuiteLinkSubscriptionCodec.UnadviseMessageType, unadviseFrame.MessageType); Assert.Equal(1u, SuiteLinkEncoding.ReadUInt32LittleEndian(unadviseFrame.Payload.Span)); } [Fact] public async Task SubscriptionHandleDisposeAsync_SendsUnadvise_AndStopsFurtherDispatch() { var transport = new FakeTransport(); transport.EnqueueReceive(BuildHandshakeAckFrame()); transport.EnqueueReceive(BuildAdviseAckFrame(1)); transport.EnqueueReceive(BuildBooleanUpdateFrame(1, true)); var client = new SuiteLinkClient(transport); await client.ConnectAsync(CreateOptions()); var callbackCount = 0; var handle = await client.SubscribeAsync("Pump001.Run", _ => callbackCount++); await handle.DisposeAsync(); await client.ProcessIncomingAsync(); Assert.Equal(4, transport.SentBuffers.Count); var unadviseFrame = SuiteLinkFrameReader.ParseFrame(transport.SentBuffers[3]); Assert.Equal(SuiteLinkSubscriptionCodec.UnadviseMessageType, unadviseFrame.MessageType); Assert.Equal(1u, SuiteLinkEncoding.ReadUInt32LittleEndian(unadviseFrame.Payload.Span)); Assert.Equal(0, callbackCount); } [Fact] public async Task ProcessIncomingAsync_CallbackDisposesSubscription_DoesNotDeadlock() { var transport = new FakeTransport(); transport.EnqueueReceive(BuildHandshakeAckFrame()); transport.EnqueueReceive(BuildAdviseAckFrame(1)); transport.EnqueueReceive(BuildBooleanUpdateFrame(1, true)); var client = new SuiteLinkClient(transport); await client.ConnectAsync(CreateOptions()); SubscriptionHandle? handle = null; var callbackCompleted = false; handle = await client.SubscribeAsync( "Pump001.Run", _ => { handle!.DisposeAsync().AsTask().GetAwaiter().GetResult(); callbackCompleted = true; }); await client.ProcessIncomingAsync().WaitAsync(TimeSpan.FromSeconds(1)); Assert.True(callbackCompleted); Assert.Equal(4, transport.SentBuffers.Count); var unadviseFrame = SuiteLinkFrameReader.ParseFrame(transport.SentBuffers[3]); Assert.Equal(SuiteLinkSubscriptionCodec.UnadviseMessageType, unadviseFrame.MessageType); } [Fact] public async Task ProcessIncomingAsync_CallbackCanInvokeNestedProcessing_WithoutDeadlock() { var transport = new FakeTransport(); transport.EnqueueReceive(BuildHandshakeAckFrame()); transport.EnqueueReceive(BuildAdviseAckFrame(1)); transport.EnqueueReceive(BuildBooleanUpdateFrame(1, true)); transport.EnqueueReceive(BuildBooleanUpdateFrame(1, false)); var client = new SuiteLinkClient(transport); await client.ConnectAsync(CreateOptions()); var values = new List(); var nestedCalled = false; _ = await client.SubscribeAsync( "Pump001.Run", update => { if (update.Value.TryGetBoolean(out var value)) { values.Add(value); } if (!nestedCalled) { nestedCalled = true; client.ProcessIncomingAsync().GetAwaiter().GetResult(); } }); await client.ProcessIncomingAsync().WaitAsync(TimeSpan.FromSeconds(1)); Assert.Equal(2, values.Count); Assert.True(values[0]); Assert.False(values[1]); } [Fact] public async Task ReadAsync_PreservesPrimaryReceiveFailure_WhenCleanupUnadviseFails() { var transport = new FakeTransport(); transport.EnqueueReceive(BuildHandshakeAckFrame()); transport.EnqueueReceive(BuildAdviseAckFrame(1)); transport.SendFailureFactory = frameBytes => { var span = frameBytes.Span; var isUnadviseFrame = span.Length >= 4 && span[2] == 0x04 && span[3] == 0x80; return isUnadviseFrame ? new IOException("Synthetic unadvise send failure.") : null; }; var client = new SuiteLinkClient(transport); await client.ConnectAsync(CreateOptions()); var ex = await Assert.ThrowsAsync( () => client.ReadAsync("Pump001.Speed", TimeSpan.FromSeconds(2))); Assert.Contains("Remote endpoint closed", ex.Message, StringComparison.OrdinalIgnoreCase); } [Fact] public async Task SubscribeAsync_RejectsMultiItemAdviseAck_AsUnsupportedCorrelationPolicy() { var transport = new FakeTransport(); transport.EnqueueReceive(BuildHandshakeAckFrame()); transport.EnqueueReceive(BuildAdviseAckFrame(1, 2)); var client = new SuiteLinkClient(transport); await client.ConnectAsync(CreateOptions()); await Assert.ThrowsAsync( () => client.SubscribeAsync("Pump001.Run", _ => { })); } [Fact] public async Task SubscribeAsync_RejectsMismatchedAdviseAckTagId() { var transport = new FakeTransport(); transport.EnqueueReceive(BuildHandshakeAckFrame()); transport.EnqueueReceive(BuildAdviseAckFrame(0x1234)); var client = new SuiteLinkClient(transport); await client.ConnectAsync(CreateOptions()); await Assert.ThrowsAsync( () => client.SubscribeAsync("Pump001.Run", _ => { })); } private static SuiteLinkConnectionOptions CreateOptions() { return new SuiteLinkConnectionOptions( host: "127.0.0.1", application: "App", topic: "Topic", clientName: "Client", clientNode: "Node", userName: "User", serverNode: "Server", timezone: "UTC", port: 5413); } private static byte[] BuildHandshakeAckFrame() { return [0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5]; } private static byte[] BuildAdviseAckFrame(params uint[] tagIds) { var payload = new byte[Math.Max(1, tagIds.Length) * 5]; var ids = tagIds.Length == 0 ? [0u] : tagIds; var offset = 0; foreach (var tagId in ids) { SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(offset, 4), tagId); payload[offset + 4] = 0x00; offset += 5; } return SuiteLinkFrameWriter.WriteFrame(SuiteLinkSubscriptionCodec.AdviseAckMessageType, payload); } private static byte[] BuildBooleanUpdateFrame(uint tagId, bool value) { var payload = new byte[10]; SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(0, 4), tagId); SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(4, 2), 1); SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(6, 2), 0x00C0); payload[8] = (byte)SuiteLinkWireValueType.Binary; payload[9] = value ? (byte)1 : (byte)0; return SuiteLinkFrameWriter.WriteFrame(SuiteLinkUpdateCodec.UpdateMessageType, payload); } private static byte[] BuildIntegerUpdateFrame(uint tagId, int value) { var payload = new byte[13]; SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(0, 4), tagId); SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(4, 2), 1); SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(6, 2), 0x00C0); payload[8] = (byte)SuiteLinkWireValueType.Integer; SuiteLinkEncoding.WriteInt32LittleEndian(payload.AsSpan(9, 4), value); return SuiteLinkFrameWriter.WriteFrame(SuiteLinkUpdateCodec.UpdateMessageType, payload); } private sealed class FakeTransport : ISuiteLinkTransport { private readonly Queue _receiveChunks = []; private readonly object _syncRoot = new(); public bool IsConnected { get; private set; } public List SentBuffers { get; } = []; public Func, Exception?>? SendFailureFactory { get; set; } public void EnqueueReceive(byte[] bytes) { lock (_syncRoot) { _receiveChunks.Enqueue(bytes); } } public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default) { IsConnected = true; return ValueTask.CompletedTask; } public ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { lock (_syncRoot) { SentBuffers.Add(buffer.ToArray()); } var sendFailure = SendFailureFactory?.Invoke(buffer); if (sendFailure is not null) { throw sendFailure; } return ValueTask.CompletedTask; } public ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default) { lock (_syncRoot) { if (_receiveChunks.Count == 0) { return ValueTask.FromResult(0); } var next = _receiveChunks.Dequeue(); next.CopyTo(buffer); return ValueTask.FromResult(next.Length); } } public ValueTask DisposeAsync() { IsConnected = false; return ValueTask.CompletedTask; } } }