From c278f98496b197ce6a05914044b15f498573f242 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 16 Mar 2026 16:46:32 -0400 Subject: [PATCH] feat: add suitelink client runtime and test harness --- README.md | 58 ++ SuiteLink.Client.slnx | 1 + .../Internal/SuiteLinkSession.cs | 233 ++++++++ .../Internal/SuiteLinkSessionState.cs | 12 + .../Protocol/SuiteLinkWriteCodec.cs | 92 ++++ src/SuiteLink.Client/SuiteLinkClient.cs | 511 ++++++++++++++++++ .../Transport/ISuiteLinkTransport.cs | 16 + .../Transport/SuiteLinkTcpTransport.cs | 186 +++++++ .../IntegrationSettings.cs | 81 +++ .../README.md | 36 ++ .../SuiteLink.Client.IntegrationTests.csproj | 24 + .../TagRoundTripTests.cs | 109 ++++ .../Fixtures/FixtureBytes.cs | 10 + .../SuiteLink.Client.Tests/Fixtures/README.md | 25 + .../Fixtures/advise-tagid-11223344-item-A.bin | Bin 0 -> 12 bytes .../Fixtures/handshake-ack-normal.bin | Bin 0 -> 8 bytes .../Fixtures/update-binary-tag-1234-true.bin | Bin 0 -> 15 bytes .../Internal/SuiteLinkSessionTests.cs | 194 +++++++ .../Protocol/SuiteLinkHandshakeCodecTests.cs | 5 +- .../SuiteLinkSubscriptionCodecTests.cs | 3 +- .../Protocol/SuiteLinkUpdateCodecTests.cs | 12 +- .../Protocol/SuiteLinkWriteCodecTests.cs | 66 +++ .../SuiteLink.Client.Tests.csproj | 6 +- .../SuiteLinkClientConnectionTests.cs | 209 +++++++ .../SuiteLinkClientSubscriptionTests.cs | 336 ++++++++++++ .../SuiteLinkClientWriteTests.cs | 126 +++++ .../Transport/SuiteLinkTcpTransportTests.cs | 179 ++++++ 27 files changed, 2515 insertions(+), 15 deletions(-) create mode 100644 README.md create mode 100644 src/SuiteLink.Client/Internal/SuiteLinkSession.cs create mode 100644 src/SuiteLink.Client/Internal/SuiteLinkSessionState.cs create mode 100644 src/SuiteLink.Client/Protocol/SuiteLinkWriteCodec.cs create mode 100644 src/SuiteLink.Client/SuiteLinkClient.cs create mode 100644 src/SuiteLink.Client/Transport/ISuiteLinkTransport.cs create mode 100644 src/SuiteLink.Client/Transport/SuiteLinkTcpTransport.cs create mode 100644 tests/SuiteLink.Client.IntegrationTests/IntegrationSettings.cs create mode 100644 tests/SuiteLink.Client.IntegrationTests/README.md create mode 100644 tests/SuiteLink.Client.IntegrationTests/SuiteLink.Client.IntegrationTests.csproj create mode 100644 tests/SuiteLink.Client.IntegrationTests/TagRoundTripTests.cs create mode 100644 tests/SuiteLink.Client.Tests/Fixtures/FixtureBytes.cs create mode 100644 tests/SuiteLink.Client.Tests/Fixtures/README.md create mode 100644 tests/SuiteLink.Client.Tests/Fixtures/advise-tagid-11223344-item-A.bin create mode 100644 tests/SuiteLink.Client.Tests/Fixtures/handshake-ack-normal.bin create mode 100644 tests/SuiteLink.Client.Tests/Fixtures/update-binary-tag-1234-true.bin create mode 100644 tests/SuiteLink.Client.Tests/Internal/SuiteLinkSessionTests.cs create mode 100644 tests/SuiteLink.Client.Tests/Protocol/SuiteLinkWriteCodecTests.cs create mode 100644 tests/SuiteLink.Client.Tests/SuiteLinkClientConnectionTests.cs create mode 100644 tests/SuiteLink.Client.Tests/SuiteLinkClientSubscriptionTests.cs create mode 100644 tests/SuiteLink.Client.Tests/SuiteLinkClientWriteTests.cs create mode 100644 tests/SuiteLink.Client.Tests/Transport/SuiteLinkTcpTransportTests.cs diff --git a/README.md b/README.md new file mode 100644 index 0000000..4d8296f --- /dev/null +++ b/README.md @@ -0,0 +1,58 @@ +# SuiteLink Client + +Cross-platform `.NET 10` C# client for AVEVA SuiteLink tag operations. + +## Scope + +Current implementation targets the normal SuiteLink tag protocol and supports: + +- startup handshake and connect message encoding +- subscribe and unadvise flows +- update decoding for `bool`, `int32`, `float32`, and `string` +- write (`POKE`) encoding for `bool`, `int32`, `float32`, and `string` +- client/session/transport layers suitable for macOS, Linux, and Windows + +## Unsupported + +This repository does not currently support: + +- AlarmMgr / alarms and events +- secure SuiteLink V3 / TLS transport +- automatic reconnect +- background receive loop / production retry behavior +- validated support for richer System Platform data types such as `double`, `int64`, or `DateTime` + +## Build + +```bash +dotnet build /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx -c Release +``` + +## Test + +Unit tests: + +```bash +dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx -v minimal +``` + +Focused protocol tests: + +```bash +dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter Protocol -v minimal +``` + +## Integration + +Live integration tests are safe by default and only run when explicitly enabled with environment variables. + +See [README.md](/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.IntegrationTests/README.md) for: + +- required environment variables +- tag configuration +- enabling live round-trip tests for `bool`, `int`, `float`, and `string` + +## Notes + +- The repository includes fixture-backed protocol tests under [Fixtures](/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/Fixtures). +- Protocol assumptions derived from reverse engineering are intentionally isolated in codec classes and tests so they can be refined against live captures later. diff --git a/SuiteLink.Client.slnx b/SuiteLink.Client.slnx index 898a283..e0a5531 100644 --- a/SuiteLink.Client.slnx +++ b/SuiteLink.Client.slnx @@ -4,5 +4,6 @@ + diff --git a/src/SuiteLink.Client/Internal/SuiteLinkSession.cs b/src/SuiteLink.Client/Internal/SuiteLinkSession.cs new file mode 100644 index 0000000..53c1b63 --- /dev/null +++ b/src/SuiteLink.Client/Internal/SuiteLinkSession.cs @@ -0,0 +1,233 @@ +using SuiteLink.Client.Protocol; + +namespace SuiteLink.Client.Internal; + +public sealed class SuiteLinkSession +{ + private readonly object _syncRoot = new(); + private readonly Dictionary _subscriptionsByItemName = new(StringComparer.Ordinal); + private readonly Dictionary _subscriptionsByTagId = []; + private SuiteLinkSessionState _state = SuiteLinkSessionState.Disconnected; + + public SuiteLinkSessionState State + { + get + { + lock (_syncRoot) + { + return _state; + } + } + } + + public int SubscriptionCount + { + get + { + lock (_syncRoot) + { + return _subscriptionsByTagId.Count; + } + } + } + + public void SetState(SuiteLinkSessionState state) + { + lock (_syncRoot) + { + EnsureValidTransition(_state, state); + _state = state; + } + } + + public bool TryTransitionState(SuiteLinkSessionState expectedCurrentState, SuiteLinkSessionState nextState) + { + lock (_syncRoot) + { + if (_state != expectedCurrentState) + { + return false; + } + + EnsureValidTransition(_state, nextState); + _state = nextState; + return true; + } + } + + public void RegisterSubscription(string itemName, uint tagId, Action onUpdate) + { + ArgumentException.ThrowIfNullOrWhiteSpace(itemName); + ArgumentNullException.ThrowIfNull(onUpdate); + + lock (_syncRoot) + { + if (_subscriptionsByItemName.TryGetValue(itemName, out var existingByItem)) + { + _subscriptionsByTagId.Remove(existingByItem.TagId); + } + + if (_subscriptionsByTagId.TryGetValue(tagId, out var existingByTag)) + { + _subscriptionsByItemName.Remove(existingByTag.ItemName); + } + + var entry = new SubscriptionEntry(itemName, tagId, onUpdate); + _subscriptionsByItemName[itemName] = entry; + _subscriptionsByTagId[tagId] = entry; + } + } + + public bool TryGetTagId(string itemName, out uint tagId) + { + ArgumentException.ThrowIfNullOrWhiteSpace(itemName); + + lock (_syncRoot) + { + if (_subscriptionsByItemName.TryGetValue(itemName, out var entry)) + { + tagId = entry.TagId; + return true; + } + } + + tagId = default; + return false; + } + + public bool TryGetItemName(uint tagId, out string? itemName) + { + lock (_syncRoot) + { + if (_subscriptionsByTagId.TryGetValue(tagId, out var entry)) + { + itemName = entry.ItemName; + return true; + } + } + + itemName = default; + return false; + } + + public bool TryUnregisterByItemName(string itemName, out uint removedTagId) + { + ArgumentException.ThrowIfNullOrWhiteSpace(itemName); + + lock (_syncRoot) + { + if (!_subscriptionsByItemName.TryGetValue(itemName, out var entry)) + { + removedTagId = default; + return false; + } + + _subscriptionsByItemName.Remove(itemName); + _subscriptionsByTagId.Remove(entry.TagId); + + removedTagId = entry.TagId; + return true; + } + } + + public bool TryUnregisterByTagId(uint tagId, out string? removedItemName) + { + lock (_syncRoot) + { + if (!_subscriptionsByTagId.TryGetValue(tagId, out var entry)) + { + removedItemName = default; + return false; + } + + _subscriptionsByTagId.Remove(tagId); + _subscriptionsByItemName.Remove(entry.ItemName); + + removedItemName = entry.ItemName; + return true; + } + } + + public bool TryDispatchUpdate(DecodedUpdate decodedUpdate, DateTimeOffset receivedAtUtc, out SuiteLinkTagUpdate? dispatchedUpdate) + { + return TryDispatchUpdate(decodedUpdate, receivedAtUtc, out dispatchedUpdate, out _); + } + + public bool TryDispatchUpdate( + DecodedUpdate decodedUpdate, + DateTimeOffset receivedAtUtc, + out SuiteLinkTagUpdate? dispatchedUpdate, + out Exception? callbackException) + { + Action? callback; + string itemName; + + lock (_syncRoot) + { + if (!_subscriptionsByTagId.TryGetValue(decodedUpdate.TagId, out var entry)) + { + dispatchedUpdate = default; + callbackException = default; + return false; + } + + itemName = entry.ItemName; + callback = entry.OnUpdate; + } + + dispatchedUpdate = new SuiteLinkTagUpdate( + itemName, + decodedUpdate.TagId, + decodedUpdate.Value, + decodedUpdate.Quality, + decodedUpdate.ElapsedMilliseconds, + receivedAtUtc); + + try + { + callback(dispatchedUpdate); + callbackException = default; + return true; + } + catch (Exception ex) + { + // User callback failures should not tear down the receive loop. + callbackException = ex; + return false; + } + } + + private static void EnsureValidTransition(SuiteLinkSessionState currentState, SuiteLinkSessionState nextState) + { + if (currentState == nextState) + { + return; + } + + if (!IsValidTransition(currentState, nextState)) + { + throw new InvalidOperationException($"Invalid state transition from {currentState} to {nextState}."); + } + } + + private static bool IsValidTransition(SuiteLinkSessionState currentState, SuiteLinkSessionState nextState) + { + return (currentState, nextState) switch + { + (SuiteLinkSessionState.Disconnected, SuiteLinkSessionState.TcpConnected) => true, + (SuiteLinkSessionState.TcpConnected, SuiteLinkSessionState.HandshakeComplete) => true, + (SuiteLinkSessionState.HandshakeComplete, SuiteLinkSessionState.ConnectSent) => true, + (SuiteLinkSessionState.ConnectSent, SuiteLinkSessionState.SessionConnected) => true, + (SuiteLinkSessionState.SessionConnected, SuiteLinkSessionState.Subscribed) => true, + (SuiteLinkSessionState.Subscribed, SuiteLinkSessionState.SessionConnected) => true, + (_, SuiteLinkSessionState.Disconnected) => true, + (_, SuiteLinkSessionState.Faulted) => true, + _ => false + }; + } + + private readonly record struct SubscriptionEntry( + string ItemName, + uint TagId, + Action OnUpdate); +} diff --git a/src/SuiteLink.Client/Internal/SuiteLinkSessionState.cs b/src/SuiteLink.Client/Internal/SuiteLinkSessionState.cs new file mode 100644 index 0000000..736d318 --- /dev/null +++ b/src/SuiteLink.Client/Internal/SuiteLinkSessionState.cs @@ -0,0 +1,12 @@ +namespace SuiteLink.Client.Internal; + +public enum SuiteLinkSessionState +{ + Disconnected = 0, + TcpConnected = 1, + HandshakeComplete = 2, + ConnectSent = 3, + SessionConnected = 4, + Subscribed = 5, + Faulted = 6 +} diff --git a/src/SuiteLink.Client/Protocol/SuiteLinkWriteCodec.cs b/src/SuiteLink.Client/Protocol/SuiteLinkWriteCodec.cs new file mode 100644 index 0000000..6b8bc01 --- /dev/null +++ b/src/SuiteLink.Client/Protocol/SuiteLinkWriteCodec.cs @@ -0,0 +1,92 @@ +using System.Text; + +namespace SuiteLink.Client.Protocol; + +public static class SuiteLinkWriteCodec +{ + public const ushort PokeMessageType = 0x080B; + + public static byte[] Encode(uint tagId, SuiteLinkValue value, Encoding? messageEncoding = null) + { + messageEncoding ??= Encoding.Latin1; + + return value.Kind switch + { + SuiteLinkValueKind.Boolean => EncodeBoolean(tagId, value), + SuiteLinkValueKind.Int32 => EncodeInt32(tagId, value), + SuiteLinkValueKind.Float32 => EncodeFloat32(tagId, value), + SuiteLinkValueKind.String => EncodeString(tagId, value, messageEncoding), + _ => throw new NotSupportedException( + $"Cannot encode unsupported write value kind '{value.Kind}'.") + }; + } + + private static byte[] EncodeBoolean(uint tagId, SuiteLinkValue value) + { + if (!value.TryGetBoolean(out var boolValue)) + { + throw new NotSupportedException("Cannot encode write value: boolean payload is missing."); + } + + Span payload = stackalloc byte[6]; + SuiteLinkEncoding.WriteUInt32LittleEndian(payload[..4], tagId); + payload[4] = (byte)SuiteLinkWireValueType.Binary; + payload[5] = boolValue ? (byte)1 : (byte)0; + + return SuiteLinkFrameWriter.WriteFrame(PokeMessageType, payload); + } + + private static byte[] EncodeInt32(uint tagId, SuiteLinkValue value) + { + if (!value.TryGetInt32(out var intValue)) + { + throw new NotSupportedException("Cannot encode write value: int32 payload is missing."); + } + + Span payload = stackalloc byte[9]; + SuiteLinkEncoding.WriteUInt32LittleEndian(payload[..4], tagId); + payload[4] = (byte)SuiteLinkWireValueType.Integer; + SuiteLinkEncoding.WriteInt32LittleEndian(payload[5..], intValue); + + return SuiteLinkFrameWriter.WriteFrame(PokeMessageType, payload); + } + + private static byte[] EncodeFloat32(uint tagId, SuiteLinkValue value) + { + if (!value.TryGetFloat32(out var floatValue)) + { + throw new NotSupportedException("Cannot encode write value: float32 payload is missing."); + } + + Span payload = stackalloc byte[9]; + SuiteLinkEncoding.WriteUInt32LittleEndian(payload[..4], tagId); + payload[4] = (byte)SuiteLinkWireValueType.Real; + SuiteLinkEncoding.WriteSingleLittleEndian(payload[5..], floatValue); + + return SuiteLinkFrameWriter.WriteFrame(PokeMessageType, payload); + } + + private static byte[] EncodeString(uint tagId, SuiteLinkValue value, Encoding messageEncoding) + { + if (!value.TryGetString(out var stringValue)) + { + throw new NotSupportedException("Cannot encode write value: string payload is missing."); + } + + var messageBytes = messageEncoding.GetBytes(stringValue ?? string.Empty); + if (messageBytes.Length > ushort.MaxValue) + { + throw new ArgumentOutOfRangeException( + nameof(value), + $"String write payload exceeds maximum message length of {ushort.MaxValue} bytes."); + } + + var payload = new byte[7 + messageBytes.Length]; + SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(0, 4), tagId); + payload[4] = (byte)SuiteLinkWireValueType.Message; + SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(5, 2), (ushort)messageBytes.Length); + messageBytes.CopyTo(payload.AsSpan(7)); + + return SuiteLinkFrameWriter.WriteFrame(PokeMessageType, payload); + } +} diff --git a/src/SuiteLink.Client/SuiteLinkClient.cs b/src/SuiteLink.Client/SuiteLinkClient.cs new file mode 100644 index 0000000..31eb416 --- /dev/null +++ b/src/SuiteLink.Client/SuiteLinkClient.cs @@ -0,0 +1,511 @@ +using SuiteLink.Client.Internal; +using SuiteLink.Client.Protocol; +using SuiteLink.Client.Transport; + +namespace SuiteLink.Client; + +public sealed class SuiteLinkClient : IAsyncDisposable +{ + private readonly ISuiteLinkTransport _transport; + private readonly bool _ownsTransport; + private readonly SemaphoreSlim _connectGate = new(1, 1); + private readonly SemaphoreSlim _operationGate = new(1, 1); + private readonly SuiteLinkSession _session = new(); + private byte[] _receiveBuffer = new byte[1024]; + private int _receiveCount; + private int _nextSubscriptionTagId; + private bool _disposed; + + public SuiteLinkClient() + : this(new SuiteLinkTcpTransport(), ownsTransport: true) + { + } + + public SuiteLinkClient(ISuiteLinkTransport transport, bool ownsTransport = false) + { + _transport = transport ?? throw new ArgumentNullException(nameof(transport)); + _ownsTransport = ownsTransport; + } + + public bool IsConnected => + !_disposed && + _session.State is SuiteLinkSessionState.SessionConnected or SuiteLinkSessionState.Subscribed; + + public async Task ConnectAsync(SuiteLinkConnectionOptions options, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(options); + ThrowIfDisposed(); + + await _connectGate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + ThrowIfDisposed(); + + if (IsConnected || _session.State == SuiteLinkSessionState.ConnectSent) + { + return; + } + + if (_session.State == SuiteLinkSessionState.Faulted) + { + throw new InvalidOperationException("Client is faulted and cannot be reused."); + } + + await _transport.ConnectAsync(options.Host, options.Port, cancellationToken).ConfigureAwait(false); + _session.SetState(SuiteLinkSessionState.TcpConnected); + + var handshakeBytes = SuiteLinkHandshakeCodec.EncodeNormalQueryHandshake( + options.Application, + options.ClientNode, + options.UserName); + await _transport.SendAsync(handshakeBytes, cancellationToken).ConfigureAwait(false); + + var handshakeAckBytes = await ReceiveSingleFrameAsync(cancellationToken).ConfigureAwait(false); + _ = SuiteLinkHandshakeCodec.ParseNormalHandshakeAck(handshakeAckBytes); + _session.SetState(SuiteLinkSessionState.HandshakeComplete); + + var connectBytes = SuiteLinkConnectCodec.Encode(options); + await _transport.SendAsync(connectBytes, cancellationToken).ConfigureAwait(false); + // At this stage we've only submitted CONNECT. Do not report ready yet. + _session.SetState(SuiteLinkSessionState.ConnectSent); + } + catch + { + try + { + _session.SetState(SuiteLinkSessionState.Faulted); + } + catch + { + // Preserve original exception. + } + + throw; + } + finally + { + _connectGate.Release(); + } + } + + public async Task DisconnectAsync(CancellationToken cancellationToken = default) + { + await DisposeCoreAsync(cancellationToken).ConfigureAwait(false); + } + + public async Task SubscribeAsync( + string itemName, + Action onUpdate, + CancellationToken cancellationToken = default) + { + ThrowIfDisposed(); + + SubscriptionRegistration registration; + await _operationGate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + registration = await SubscribeCoreAsync(itemName, onUpdate, cancellationToken).ConfigureAwait(false); + } + finally + { + _operationGate.Release(); + } + + DispatchDecodedUpdates(registration.DeferredUpdates); + return registration.Handle; + } + + public async Task ReadAsync( + string itemName, + TimeSpan timeout, + CancellationToken cancellationToken = default) + { + if (timeout <= TimeSpan.Zero && timeout != Timeout.InfiniteTimeSpan) + { + throw new ArgumentOutOfRangeException(nameof(timeout), timeout, "Timeout must be positive or infinite."); + } + + ThrowIfDisposed(); + + using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + if (timeout != Timeout.InfiniteTimeSpan) + { + timeoutCts.CancelAfter(timeout); + } + + var updateCompletion = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + SubscriptionHandle? temporaryHandle = null; + Exception? primaryFailure = null; + + await _operationGate.WaitAsync(timeoutCts.Token).ConfigureAwait(false); + try + { + var registration = await SubscribeCoreAsync( + itemName, + update => updateCompletion.TrySetResult(update), + timeoutCts.Token).ConfigureAwait(false); + temporaryHandle = registration.Handle; + DispatchDecodedUpdates(registration.DeferredUpdates); + } + finally + { + _operationGate.Release(); + } + + try + { + while (!updateCompletion.Task.IsCompleted) + { + IReadOnlyList decodedUpdates; + await _operationGate.WaitAsync(timeoutCts.Token).ConfigureAwait(false); + try + { + decodedUpdates = await ProcessSingleIncomingFrameAsync(timeoutCts.Token).ConfigureAwait(false); + } + finally + { + _operationGate.Release(); + } + + DispatchDecodedUpdates(decodedUpdates); + } + + return await updateCompletion.Task.ConfigureAwait(false); + } + catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) + { + primaryFailure = new TimeoutException($"No update for '{itemName}' was received within {timeout}."); + throw primaryFailure; + } + catch (Exception ex) + { + primaryFailure = ex; + throw; + } + finally + { + if (temporaryHandle is not null) + { + try + { + await _operationGate.WaitAsync(CancellationToken.None).ConfigureAwait(false); + try + { + await UnsubscribeCoreAsync(temporaryHandle.TagId, CancellationToken.None).ConfigureAwait(false); + } + finally + { + _operationGate.Release(); + } + } + catch when (primaryFailure is not null) + { + // Preserve the original read failure. + } + } + } + } + + public async Task ProcessIncomingAsync(CancellationToken cancellationToken = default) + { + ThrowIfDisposed(); + EnsureTagOperationsAllowed(); + + IReadOnlyList decodedUpdates; + await _operationGate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + decodedUpdates = await ProcessSingleIncomingFrameAsync(cancellationToken).ConfigureAwait(false); + } + finally + { + _operationGate.Release(); + } + + DispatchDecodedUpdates(decodedUpdates); + } + + public async Task WriteAsync( + string itemName, + SuiteLinkValue value, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(itemName); + ThrowIfDisposed(); + + await _operationGate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + ThrowIfDisposed(); + EnsureTagOperationsAllowed(); + + if (!_session.TryGetTagId(itemName, out var tagId)) + { + throw new InvalidOperationException( + $"Tag '{itemName}' is not subscribed. Subscribe before writing."); + } + + var pokeBytes = SuiteLinkWriteCodec.Encode(tagId, value); + await _transport.SendAsync(pokeBytes, cancellationToken).ConfigureAwait(false); + } + finally + { + _operationGate.Release(); + } + } + + public async ValueTask DisposeAsync() + { + await DisposeCoreAsync(CancellationToken.None).ConfigureAwait(false); + } + + private async ValueTask ReceiveSingleFrameAsync(CancellationToken cancellationToken) + { + while (true) + { + if (SuiteLinkFrameReader.TryParseFrame( + _receiveBuffer.AsSpan(0, _receiveCount), + out _, + out var consumed)) + { + var frameBytes = _receiveBuffer.AsSpan(0, consumed).ToArray(); + var remaining = _receiveCount - consumed; + if (remaining > 0) + { + _receiveBuffer.AsSpan(consumed, remaining).CopyTo(_receiveBuffer); + } + + _receiveCount = remaining; + return frameBytes; + } + + EnsureReceiveCapacity(); + var bytesRead = await _transport.ReceiveAsync( + _receiveBuffer.AsMemory(_receiveCount), + cancellationToken).ConfigureAwait(false); + + if (bytesRead == 0) + { + throw new IOException("Remote endpoint closed while waiting for a full frame."); + } + + _receiveCount += bytesRead; + } + } + + private void EnsureReceiveCapacity() + { + if (_receiveCount < _receiveBuffer.Length) + { + return; + } + + if (_receiveBuffer.Length >= 1024 * 1024) + { + throw new FormatException("Incoming frame exceeds maximum supported size."); + } + + Array.Resize(ref _receiveBuffer, _receiveBuffer.Length * 2); + } + + private void ThrowIfDisposed() + { + ObjectDisposedException.ThrowIf(_disposed, this); + } + + private async ValueTask DisposeCoreAsync(CancellationToken cancellationToken) + { + var connectGateHeld = false; + var operationGateHeld = false; + + try + { + await _connectGate.WaitAsync(cancellationToken).ConfigureAwait(false); + connectGateHeld = true; + + await _operationGate.WaitAsync(cancellationToken).ConfigureAwait(false); + operationGateHeld = true; + + if (_disposed) + { + return; + } + + _disposed = true; + _session.SetState(SuiteLinkSessionState.Disconnected); + _receiveCount = 0; + _receiveBuffer = new byte[1024]; + + if (_ownsTransport) + { + await _transport.DisposeAsync().ConfigureAwait(false); + } + } + finally + { + if (operationGateHeld) + { + _operationGate.Release(); + } + + if (connectGateHeld) + { + _connectGate.Release(); + } + } + } + + private async Task SubscribeCoreAsync( + string itemName, + Action onUpdate, + CancellationToken cancellationToken) + { + ArgumentException.ThrowIfNullOrWhiteSpace(itemName); + ArgumentNullException.ThrowIfNull(onUpdate); + + ThrowIfDisposed(); + EnsureTagOperationsAllowed(); + + var requestedTagId = unchecked((uint)Interlocked.Increment(ref _nextSubscriptionTagId)); + var adviseBytes = SuiteLinkSubscriptionCodec.EncodeAdvise(requestedTagId, itemName); + await _transport.SendAsync(adviseBytes, cancellationToken).ConfigureAwait(false); + + var adviseAckResult = await ReceiveAndCollectUpdatesUntilAsync( + messageType => messageType == SuiteLinkSubscriptionCodec.AdviseAckMessageType, + cancellationToken).ConfigureAwait(false); + var adviseAckBytes = adviseAckResult.FrameBytes; + + var ackItems = SuiteLinkSubscriptionCodec.DecodeAdviseAckMany(adviseAckBytes); + if (ackItems.Count != 1) + { + throw new FormatException( + $"Expected exactly one advise ACK item for a single subscribe request, but decoded {ackItems.Count}."); + } + + var acknowledgedTagId = ackItems[0].TagId; + if (acknowledgedTagId != requestedTagId) + { + throw new FormatException( + $"Advise ACK tag id 0x{acknowledgedTagId:x8} did not match requested tag id 0x{requestedTagId:x8}."); + } + + _session.RegisterSubscription(itemName, acknowledgedTagId, onUpdate); + if (_session.State == SuiteLinkSessionState.ConnectSent) + { + _session.SetState(SuiteLinkSessionState.SessionConnected); + } + + if (_session.State == SuiteLinkSessionState.SessionConnected) + { + _session.SetState(SuiteLinkSessionState.Subscribed); + } + + var handle = new SubscriptionHandle( + itemName, + acknowledgedTagId, + () => UnsubscribeAsync(acknowledgedTagId, CancellationToken.None)); + return new SubscriptionRegistration(handle, adviseAckResult.DeferredUpdates); + } + + private async ValueTask UnsubscribeAsync(uint tagId, CancellationToken cancellationToken) + { + if (_disposed) + { + return; + } + + await _operationGate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + await UnsubscribeCoreAsync(tagId, cancellationToken).ConfigureAwait(false); + } + finally + { + _operationGate.Release(); + } + } + + private async ValueTask UnsubscribeCoreAsync(uint tagId, CancellationToken cancellationToken) + { + if (!_session.TryUnregisterByTagId(tagId, out _)) + { + return; + } + + var unadviseBytes = SuiteLinkSubscriptionCodec.EncodeUnadvise(tagId); + await _transport.SendAsync(unadviseBytes, cancellationToken).ConfigureAwait(false); + + if (_session.State == SuiteLinkSessionState.Subscribed && _session.SubscriptionCount == 0) + { + _session.SetState(SuiteLinkSessionState.SessionConnected); + } + } + + private async Task ReceiveAndCollectUpdatesUntilAsync( + Func messageTypePredicate, + CancellationToken cancellationToken) + { + var deferredUpdates = new List(); + while (true) + { + var frameBytes = await ReceiveSingleFrameAsync(cancellationToken).ConfigureAwait(false); + var frame = SuiteLinkFrameReader.ParseFrame(frameBytes); + + if (frame.MessageType == SuiteLinkUpdateCodec.UpdateMessageType) + { + deferredUpdates.AddRange(SuiteLinkUpdateCodec.DecodeMany(frameBytes)); + } + + if (messageTypePredicate(frame.MessageType)) + { + return new FrameReadResult(frameBytes, deferredUpdates); + } + } + } + + private async Task> ProcessSingleIncomingFrameAsync(CancellationToken cancellationToken) + { + var frameBytes = await ReceiveSingleFrameAsync(cancellationToken).ConfigureAwait(false); + var frame = SuiteLinkFrameReader.ParseFrame(frameBytes); + if (frame.MessageType == SuiteLinkUpdateCodec.UpdateMessageType) + { + return SuiteLinkUpdateCodec.DecodeMany(frameBytes); + } + + return []; + } + + private void DispatchDecodedUpdates(IReadOnlyList decodedUpdates) + { + if (decodedUpdates.Count == 0) + { + return; + } + + var receivedAtUtc = DateTimeOffset.UtcNow; + foreach (var decodedUpdate in decodedUpdates) + { + _ = _session.TryDispatchUpdate(decodedUpdate, receivedAtUtc, out _, out _); + } + } + + private void EnsureTagOperationsAllowed() + { + if (_session.State is + not SuiteLinkSessionState.ConnectSent and + not SuiteLinkSessionState.SessionConnected and + not SuiteLinkSessionState.Subscribed) + { + throw new InvalidOperationException("Client is not ready for tag operations."); + } + } + + private readonly record struct SubscriptionRegistration( + SubscriptionHandle Handle, + IReadOnlyList DeferredUpdates); + + private readonly record struct FrameReadResult( + byte[] FrameBytes, + IReadOnlyList DeferredUpdates); +} diff --git a/src/SuiteLink.Client/Transport/ISuiteLinkTransport.cs b/src/SuiteLink.Client/Transport/ISuiteLinkTransport.cs new file mode 100644 index 0000000..3f062e0 --- /dev/null +++ b/src/SuiteLink.Client/Transport/ISuiteLinkTransport.cs @@ -0,0 +1,16 @@ +namespace SuiteLink.Client.Transport; + +public interface ISuiteLinkTransport : IAsyncDisposable +{ + /// + /// Indicates local transport readiness for send/receive operations. + /// This does not guarantee that the remote socket is alive. + /// + bool IsConnected { get; } + + ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default); + + ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default); + + ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default); +} diff --git a/src/SuiteLink.Client/Transport/SuiteLinkTcpTransport.cs b/src/SuiteLink.Client/Transport/SuiteLinkTcpTransport.cs new file mode 100644 index 0000000..0b18245 --- /dev/null +++ b/src/SuiteLink.Client/Transport/SuiteLinkTcpTransport.cs @@ -0,0 +1,186 @@ +using System.Net.Sockets; + +namespace SuiteLink.Client.Transport; + +public sealed class SuiteLinkTcpTransport : ISuiteLinkTransport +{ + private readonly bool _leaveOpen; + private readonly object _syncRoot = new(); + private readonly SemaphoreSlim _connectGate = new(1, 1); + private TcpClient? _tcpClient; + private Stream? _stream; + private bool _disposed; + + public SuiteLinkTcpTransport() + { + } + + public SuiteLinkTcpTransport(Stream stream, bool leaveOpen = false) + { + ArgumentNullException.ThrowIfNull(stream); + + _stream = stream; + _leaveOpen = leaveOpen; + } + + public SuiteLinkTcpTransport(TcpClient tcpClient, bool leaveOpen = false) + { + ArgumentNullException.ThrowIfNull(tcpClient); + + _tcpClient = tcpClient; + _leaveOpen = leaveOpen; + + if (tcpClient.Connected) + { + _stream = tcpClient.GetStream(); + } + } + + /// + /// Indicates local transport readiness for send/receive operations. + /// This does not guarantee that the remote socket is alive. + /// + public bool IsConnected + { + get + { + lock (_syncRoot) + { + return !_disposed && _stream is not null; + } + } + } + + public async ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(host); + ArgumentOutOfRangeException.ThrowIfLessThan(port, 1); + ArgumentOutOfRangeException.ThrowIfGreaterThan(port, 65535); + + await _connectGate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + ObjectDisposedException.ThrowIf(_disposed, this); + + lock (_syncRoot) + { + if (_stream is not null) + { + return; + } + } + + var tcpClient = _tcpClient; + var ownsClient = false; + if (tcpClient is null) + { + tcpClient = new TcpClient(); + ownsClient = true; + } + + try + { + if (!tcpClient.Connected) + { + await tcpClient.ConnectAsync(host, port, cancellationToken).ConfigureAwait(false); + } + + var connectedStream = tcpClient.GetStream(); + lock (_syncRoot) + { + ObjectDisposedException.ThrowIf(_disposed, this); + + if (_stream is not null) + { + if (ownsClient) + { + tcpClient.Dispose(); + } + return; + } + + _tcpClient = tcpClient; + _stream = connectedStream; + } + } + catch + { + if (ownsClient) + { + tcpClient.Dispose(); + } + throw; + } + } + finally + { + _connectGate.Release(); + } + } + + public async ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + var stream = GetConnectedStream(); + await stream.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); + } + + public async ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default) + { + var stream = GetConnectedStream(); + return await stream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false); + } + + public async ValueTask DisposeAsync() + { + Stream? streamToDispose = null; + TcpClient? tcpClientToDispose = null; + + lock (_syncRoot) + { + if (_disposed) + { + return; + } + + _disposed = true; + + if (!_leaveOpen) + { + streamToDispose = _stream; + tcpClientToDispose = _tcpClient; + } + + _stream = null; + _tcpClient = null; + } + + if (tcpClientToDispose is not null) + { + tcpClientToDispose.Dispose(); + return; + } + + if (streamToDispose is IAsyncDisposable asyncDisposable) + { + await asyncDisposable.DisposeAsync().ConfigureAwait(false); + return; + } + + streamToDispose?.Dispose(); + } + + private Stream GetConnectedStream() + { + lock (_syncRoot) + { + ObjectDisposedException.ThrowIf(_disposed, this); + + if (_stream is null) + { + throw new InvalidOperationException("Transport is not connected."); + } + + return _stream; + } + } +} diff --git a/tests/SuiteLink.Client.IntegrationTests/IntegrationSettings.cs b/tests/SuiteLink.Client.IntegrationTests/IntegrationSettings.cs new file mode 100644 index 0000000..acef5e6 --- /dev/null +++ b/tests/SuiteLink.Client.IntegrationTests/IntegrationSettings.cs @@ -0,0 +1,81 @@ +namespace SuiteLink.Client.IntegrationTests; + +internal sealed record class IntegrationSettings( + SuiteLinkConnectionOptions Connection, + string? BooleanTag, + string? IntegerTag, + string? FloatTag, + string? StringTag) +{ + public static bool TryLoad(out IntegrationSettings settings, out string reason) + { + settings = null!; + + var enabled = Environment.GetEnvironmentVariable("SUITELINK_IT_ENABLED"); + if (!string.Equals(enabled, "true", StringComparison.OrdinalIgnoreCase)) + { + reason = "Set SUITELINK_IT_ENABLED=true to run live integration tests."; + return false; + } + + if (!TryGetRequired("SUITELINK_IT_HOST", out var host, out reason) || + !TryGetRequired("SUITELINK_IT_APPLICATION", out var application, out reason) || + !TryGetRequired("SUITELINK_IT_TOPIC", out var topic, out reason) || + !TryGetRequired("SUITELINK_IT_CLIENT_NAME", out var clientName, out reason) || + !TryGetRequired("SUITELINK_IT_CLIENT_NODE", out var clientNode, out reason) || + !TryGetRequired("SUITELINK_IT_USER_NAME", out var userName, out reason) || + !TryGetRequired("SUITELINK_IT_SERVER_NODE", out var serverNode, out reason)) + { + return false; + } + + var timezone = Environment.GetEnvironmentVariable("SUITELINK_IT_TIMEZONE"); + + var port = 5413; + var portRaw = Environment.GetEnvironmentVariable("SUITELINK_IT_PORT"); + if (!string.IsNullOrWhiteSpace(portRaw) && !int.TryParse(portRaw, out port)) + { + reason = "SUITELINK_IT_PORT must be a valid integer."; + return false; + } + + var connection = new SuiteLinkConnectionOptions( + host: host, + application: application, + topic: topic, + clientName: clientName, + clientNode: clientNode, + userName: userName, + serverNode: serverNode, + timezone: timezone, + port: port); + + settings = new IntegrationSettings( + Connection: connection, + BooleanTag: Normalize(Environment.GetEnvironmentVariable("SUITELINK_IT_BOOL_TAG")), + IntegerTag: Normalize(Environment.GetEnvironmentVariable("SUITELINK_IT_INT_TAG")), + FloatTag: Normalize(Environment.GetEnvironmentVariable("SUITELINK_IT_FLOAT_TAG")), + StringTag: Normalize(Environment.GetEnvironmentVariable("SUITELINK_IT_STRING_TAG"))); + + reason = string.Empty; + return true; + } + + private static string? Normalize(string? value) + { + return string.IsNullOrWhiteSpace(value) ? null : value; + } + + private static bool TryGetRequired(string name, out string value, out string reason) + { + value = Environment.GetEnvironmentVariable(name) ?? string.Empty; + if (!string.IsNullOrWhiteSpace(value)) + { + reason = string.Empty; + return true; + } + + reason = $"Missing required environment variable: {name}."; + return false; + } +} diff --git a/tests/SuiteLink.Client.IntegrationTests/README.md b/tests/SuiteLink.Client.IntegrationTests/README.md new file mode 100644 index 0000000..e622789 --- /dev/null +++ b/tests/SuiteLink.Client.IntegrationTests/README.md @@ -0,0 +1,36 @@ +# SuiteLink Integration Tests + +These tests are intentionally safe by default and run only when explicitly enabled. + +## Enable + +Set: + +- `SUITELINK_IT_ENABLED=true` + +Required connection variables: + +- `SUITELINK_IT_HOST` +- `SUITELINK_IT_APPLICATION` +- `SUITELINK_IT_TOPIC` +- `SUITELINK_IT_CLIENT_NAME` +- `SUITELINK_IT_CLIENT_NODE` +- `SUITELINK_IT_USER_NAME` +- `SUITELINK_IT_SERVER_NODE` + +Optional connection variables: + +- `SUITELINK_IT_PORT` (default `5413`) +- `SUITELINK_IT_TIMEZONE` (defaults to `UTC` via `SuiteLinkConnectionOptions`) + +Optional tag variables (tests run only for the tags provided): + +- `SUITELINK_IT_BOOL_TAG` +- `SUITELINK_IT_INT_TAG` +- `SUITELINK_IT_FLOAT_TAG` +- `SUITELINK_IT_STRING_TAG` + +## Notes + +- If integration settings are missing, tests return immediately and do not perform network calls. +- These tests are intended as a live harness, not deterministic CI tests. diff --git a/tests/SuiteLink.Client.IntegrationTests/SuiteLink.Client.IntegrationTests.csproj b/tests/SuiteLink.Client.IntegrationTests/SuiteLink.Client.IntegrationTests.csproj new file mode 100644 index 0000000..47f15b8 --- /dev/null +++ b/tests/SuiteLink.Client.IntegrationTests/SuiteLink.Client.IntegrationTests.csproj @@ -0,0 +1,24 @@ + + + + net10.0 + enable + enable + false + + + + + + + + + + + + + + + + + diff --git a/tests/SuiteLink.Client.IntegrationTests/TagRoundTripTests.cs b/tests/SuiteLink.Client.IntegrationTests/TagRoundTripTests.cs new file mode 100644 index 0000000..833c6db --- /dev/null +++ b/tests/SuiteLink.Client.IntegrationTests/TagRoundTripTests.cs @@ -0,0 +1,109 @@ +namespace SuiteLink.Client.IntegrationTests; + +public sealed class TagRoundTripTests +{ + [Fact] + public async Task BooleanTag_RoundTrip_WhenConfigured() + { + if (!TryGetTagSettings(out var settings, out var tagName, "bool")) + { + return; + } + + await RunRoundTripAsync( + settings, + tagName, + SuiteLinkValue.FromBoolean(true), + value => value.TryGetBoolean(out _)); + } + + [Fact] + public async Task IntegerTag_RoundTrip_WhenConfigured() + { + if (!TryGetTagSettings(out var settings, out var tagName, "int")) + { + return; + } + + await RunRoundTripAsync( + settings, + tagName, + SuiteLinkValue.FromInt32(42), + value => value.TryGetInt32(out _)); + } + + [Fact] + public async Task FloatTag_RoundTrip_WhenConfigured() + { + if (!TryGetTagSettings(out var settings, out var tagName, "float")) + { + return; + } + + await RunRoundTripAsync( + settings, + tagName, + SuiteLinkValue.FromFloat32(12.25f), + value => value.TryGetFloat32(out _)); + } + + [Fact] + public async Task StringTag_RoundTrip_WhenConfigured() + { + if (!TryGetTagSettings(out var settings, out var tagName, "string")) + { + return; + } + + await RunRoundTripAsync( + settings, + tagName, + SuiteLinkValue.FromString("integration-test"), + value => value.TryGetString(out _)); + } + + private static bool TryGetTagSettings( + out IntegrationSettings settings, + out string tagName, + string type) + { + settings = null!; + tagName = string.Empty; + + if (!IntegrationSettings.TryLoad(out settings, out _)) + { + return false; + } + + tagName = type switch + { + "bool" => settings.BooleanTag ?? string.Empty, + "int" => settings.IntegerTag ?? string.Empty, + "float" => settings.FloatTag ?? string.Empty, + "string" => settings.StringTag ?? string.Empty, + _ => string.Empty + }; + + return !string.IsNullOrWhiteSpace(tagName); + } + + private static async Task RunRoundTripAsync( + IntegrationSettings settings, + string tagName, + SuiteLinkValue writeValue, + Func typeCheck) + { + await using var client = new SuiteLinkClient(); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)); + await client.ConnectAsync(settings.Connection, cts.Token); + + var readBefore = await client.ReadAsync(tagName, TimeSpan.FromSeconds(10), cts.Token); + Assert.True(typeCheck(readBefore.Value)); + + await client.WriteAsync(tagName, writeValue, cts.Token); + + var readAfter = await client.ReadAsync(tagName, TimeSpan.FromSeconds(10), cts.Token); + Assert.True(typeCheck(readAfter.Value)); + } +} diff --git a/tests/SuiteLink.Client.Tests/Fixtures/FixtureBytes.cs b/tests/SuiteLink.Client.Tests/Fixtures/FixtureBytes.cs new file mode 100644 index 0000000..59aea61 --- /dev/null +++ b/tests/SuiteLink.Client.Tests/Fixtures/FixtureBytes.cs @@ -0,0 +1,10 @@ +namespace SuiteLink.Client.Tests.Fixtures; + +internal static class FixtureBytes +{ + public static byte[] Read(string fileName) + { + var path = Path.Combine(AppContext.BaseDirectory, "Fixtures", fileName); + return File.ReadAllBytes(path); + } +} diff --git a/tests/SuiteLink.Client.Tests/Fixtures/README.md b/tests/SuiteLink.Client.Tests/Fixtures/README.md new file mode 100644 index 0000000..cd86958 --- /dev/null +++ b/tests/SuiteLink.Client.Tests/Fixtures/README.md @@ -0,0 +1,25 @@ +# SuiteLink Test Fixtures + +This folder stores small binary packet fixtures used by protocol codec tests. + +## Source and intent + +- These fixtures are based on fixed vectors already used in tests and aligned with the reverse-engineered SuiteLink dissector behavior used in this repo's design docs. +- They are intentionally minimal and validate specific assumptions, not full protocol coverage. + +## Fixtures + +- `handshake-ack-normal.bin` + - Bytes: `06 00 01 00 A1 B2 C3 A5` + - Assumption validated: normal handshake ACK frame type is `0x0001` with payload bytes preserved. +- `advise-tagid-11223344-item-A.bin` + - Bytes: `0A 00 10 80 44 33 22 11 01 41 00 A5` + - Assumption validated: ADVISE message type bytes are `10 80` (little-endian `0x8010`) and item encoding is UTF-16LE length-prefixed. +- `update-binary-tag-1234-true.bin` + - Bytes: `0D 00 09 00 34 12 00 00 0A 00 C0 00 01 01 A5` + - Assumption validated: UPDATE type bytes are `09 00` and binary value decode path maps to `true`. + +## Notes + +- Keep fixture count modest and focused on wire-level assumptions that are easy to regress. +- If a fixture changes, update corresponding tests and document why the protocol assumption changed. diff --git a/tests/SuiteLink.Client.Tests/Fixtures/advise-tagid-11223344-item-A.bin b/tests/SuiteLink.Client.Tests/Fixtures/advise-tagid-11223344-item-A.bin new file mode 100644 index 0000000000000000000000000000000000000000..f91c389aaad0b8d3f22a7f3eb7a65e98750da4ee GIT binary patch literal 12 Tcmd;L5NL2QRuW`%WLOFS3$_9* literal 0 HcmV?d00001 diff --git a/tests/SuiteLink.Client.Tests/Fixtures/handshake-ack-normal.bin b/tests/SuiteLink.Client.Tests/Fixtures/handshake-ack-normal.bin new file mode 100644 index 0000000000000000000000000000000000000000..d3c43aa0b3487b3f0f90b4351d43baeaf6a504c1 GIT binary patch literal 8 PcmZQ$U}RXh>F`nj2Ll4b literal 0 HcmV?d00001 diff --git a/tests/SuiteLink.Client.Tests/Fixtures/update-binary-tag-1234-true.bin b/tests/SuiteLink.Client.Tests/Fixtures/update-binary-tag-1234-true.bin new file mode 100644 index 0000000000000000000000000000000000000000..9ec281f16b74035ac5f7a734af21d6cabca92c8b GIT binary patch literal 15 Wcmd;O;AAinVqoB6IKaTjxD)^g=K; { }); + + Assert.True(session.TryGetTagId("Pump001.Run", out var tagId)); + Assert.Equal(0x1234u, tagId); + Assert.True(session.TryGetItemName(0x1234, out var itemName)); + Assert.Equal("Pump001.Run", itemName); + } + + [Fact] + public void TryDispatchUpdate_KnownTag_InvokesRegisteredCallback() + { + var session = new SuiteLinkSession(); + SuiteLinkTagUpdate? callbackUpdate = null; + + session.RegisterSubscription("Pump001.Run", 0x1234, update => callbackUpdate = update); + + var decoded = new DecodedUpdate( + TagId: 0x1234, + Quality: 0x00C0, + ElapsedMilliseconds: 10, + Value: SuiteLinkValue.FromBoolean(true)); + + var receivedAtUtc = new DateTimeOffset(2026, 03, 16, 18, 00, 00, TimeSpan.Zero); + var dispatched = session.TryDispatchUpdate(decoded, receivedAtUtc, out var dispatchedUpdate); + + Assert.True(dispatched); + Assert.NotNull(dispatchedUpdate); + Assert.Equal("Pump001.Run", dispatchedUpdate.ItemName); + Assert.Equal(0x1234u, dispatchedUpdate.TagId); + Assert.Equal(0x00C0, dispatchedUpdate.Quality); + Assert.Equal(10, dispatchedUpdate.ElapsedMilliseconds); + Assert.Equal(receivedAtUtc, dispatchedUpdate.ReceivedAtUtc); + Assert.Equal(dispatchedUpdate, callbackUpdate); + } + + [Fact] + public void TryDispatchUpdate_UnknownTag_ReturnsFalseAndDoesNotInvokeCallback() + { + var session = new SuiteLinkSession(); + var callbackCount = 0; + + session.RegisterSubscription("Pump001.Run", 0x1234, _ => callbackCount++); + + var decoded = new DecodedUpdate( + TagId: 0x9999, + Quality: 0x00C0, + ElapsedMilliseconds: 5, + Value: SuiteLinkValue.FromInt32(42)); + + var dispatched = session.TryDispatchUpdate(decoded, DateTimeOffset.UtcNow, out var dispatchedUpdate); + + Assert.False(dispatched); + Assert.Null(dispatchedUpdate); + Assert.Equal(0, callbackCount); + } + + [Fact] + public void UnregisterByItemName_RemovesMappingsAndCallback() + { + var session = new SuiteLinkSession(); + var callbackCount = 0; + + session.RegisterSubscription("Pump001.Run", 0x1234, _ => callbackCount++); + + Assert.True(session.TryUnregisterByItemName("Pump001.Run", out var removedTagId)); + Assert.Equal(0x1234u, removedTagId); + Assert.False(session.TryGetTagId("Pump001.Run", out _)); + Assert.False(session.TryGetItemName(0x1234, out _)); + + var decoded = new DecodedUpdate( + TagId: 0x1234, + Quality: 0x00C0, + ElapsedMilliseconds: 1, + Value: SuiteLinkValue.FromBoolean(true)); + + var dispatched = session.TryDispatchUpdate(decoded, DateTimeOffset.UtcNow, out _); + + Assert.False(dispatched); + Assert.Equal(0, callbackCount); + } + + [Fact] + public void RegisterSubscription_SameItemName_ReplacesOldTagAndCallback() + { + var session = new SuiteLinkSession(); + var oldCount = 0; + var newCount = 0; + + session.RegisterSubscription("Pump001.Run", 0x1000, _ => oldCount++); + session.RegisterSubscription("Pump001.Run", 0x2000, _ => newCount++); + + Assert.False(session.TryGetItemName(0x1000, out _)); + Assert.True(session.TryGetTagId("Pump001.Run", out var currentTagId)); + Assert.Equal(0x2000u, currentTagId); + + var oldDecoded = new DecodedUpdate(0x1000, 0x00C0, 1, SuiteLinkValue.FromBoolean(true)); + var newDecoded = new DecodedUpdate(0x2000, 0x00C0, 1, SuiteLinkValue.FromBoolean(true)); + + Assert.False(session.TryDispatchUpdate(oldDecoded, DateTimeOffset.UtcNow, out _)); + Assert.True(session.TryDispatchUpdate(newDecoded, DateTimeOffset.UtcNow, out _)); + Assert.Equal(0, oldCount); + Assert.Equal(1, newCount); + } + + [Fact] + public void RegisterSubscription_SameTagId_ReplacesOldItemAndCallback() + { + var session = new SuiteLinkSession(); + var oldCount = 0; + var newCount = 0; + + session.RegisterSubscription("Pump001.Run", 0x1234, _ => oldCount++); + session.RegisterSubscription("Pump002.Run", 0x1234, _ => newCount++); + + Assert.False(session.TryGetTagId("Pump001.Run", out _)); + Assert.True(session.TryGetTagId("Pump002.Run", out var replacementTagId)); + Assert.Equal(0x1234u, replacementTagId); + + var decoded = new DecodedUpdate(0x1234, 0x00C0, 1, SuiteLinkValue.FromBoolean(true)); + Assert.True(session.TryDispatchUpdate(decoded, DateTimeOffset.UtcNow, out var dispatchedUpdate)); + Assert.NotNull(dispatchedUpdate); + Assert.Equal("Pump002.Run", dispatchedUpdate.ItemName); + Assert.Equal(0, oldCount); + Assert.Equal(1, newCount); + } + + [Fact] + public void TryDispatchUpdate_CallbackThrows_IsCaughtAndReported() + { + var session = new SuiteLinkSession(); + + session.RegisterSubscription("Pump001.Run", 0x1234, _ => throw new InvalidOperationException("callback failure")); + + var decoded = new DecodedUpdate( + TagId: 0x1234, + Quality: 0x00C0, + ElapsedMilliseconds: 5, + Value: SuiteLinkValue.FromInt32(42)); + + var dispatched = session.TryDispatchUpdate( + decoded, + DateTimeOffset.UtcNow, + out var dispatchedUpdate, + out var callbackException); + + Assert.False(dispatched); + Assert.NotNull(dispatchedUpdate); + Assert.NotNull(callbackException); + Assert.Equal("callback failure", callbackException.Message); + } + + [Fact] + public void SetState_InvalidTransition_ThrowsInvalidOperationException() + { + var session = new SuiteLinkSession(); + + var ex = Assert.Throws(() => session.SetState(SuiteLinkSessionState.SessionConnected)); + + Assert.Contains("Invalid state transition", ex.Message); + Assert.Equal(SuiteLinkSessionState.Disconnected, session.State); + } + + [Fact] + public void TryTransitionState_EnforcesExpectedCurrentStateAtomically() + { + var session = new SuiteLinkSession(); + + Assert.True(session.TryTransitionState(SuiteLinkSessionState.Disconnected, SuiteLinkSessionState.TcpConnected)); + Assert.Equal(SuiteLinkSessionState.TcpConnected, session.State); + + Assert.False(session.TryTransitionState(SuiteLinkSessionState.Disconnected, SuiteLinkSessionState.HandshakeComplete)); + Assert.Equal(SuiteLinkSessionState.TcpConnected, session.State); + } +} diff --git a/tests/SuiteLink.Client.Tests/Protocol/SuiteLinkHandshakeCodecTests.cs b/tests/SuiteLink.Client.Tests/Protocol/SuiteLinkHandshakeCodecTests.cs index d72b1d5..2ff334b 100644 --- a/tests/SuiteLink.Client.Tests/Protocol/SuiteLinkHandshakeCodecTests.cs +++ b/tests/SuiteLink.Client.Tests/Protocol/SuiteLinkHandshakeCodecTests.cs @@ -1,5 +1,6 @@ using System.Buffers.Binary; using SuiteLink.Client.Protocol; +using SuiteLink.Client.Tests.Fixtures; namespace SuiteLink.Client.Tests.Protocol; @@ -33,9 +34,7 @@ public sealed class SuiteLinkHandshakeCodecTests [Fact] public void ParseNormalHandshakeAck_WithNormalAckFrame_ReturnsAckData() { - // Fixed vector for normal ACK assumption: - // remaining=0x0006, type=0x0001, payload=0xA1B2C3, marker=0xA5. - byte[] frame = [0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5]; + var frame = FixtureBytes.Read("handshake-ack-normal.bin"); var ack = SuiteLinkHandshakeCodec.ParseNormalHandshakeAck(frame); diff --git a/tests/SuiteLink.Client.Tests/Protocol/SuiteLinkSubscriptionCodecTests.cs b/tests/SuiteLink.Client.Tests/Protocol/SuiteLinkSubscriptionCodecTests.cs index e7f588e..0540162 100644 --- a/tests/SuiteLink.Client.Tests/Protocol/SuiteLinkSubscriptionCodecTests.cs +++ b/tests/SuiteLink.Client.Tests/Protocol/SuiteLinkSubscriptionCodecTests.cs @@ -1,4 +1,5 @@ using SuiteLink.Client.Protocol; +using SuiteLink.Client.Tests.Fixtures; namespace SuiteLink.Client.Tests.Protocol; @@ -9,7 +10,7 @@ public sealed class SuiteLinkSubscriptionCodecTests { var bytes = SuiteLinkSubscriptionCodec.EncodeAdvise(0x11223344, "A"); var frame = SuiteLinkFrameReader.ParseFrame(bytes); - byte[] expected = [0x0A, 0x00, 0x10, 0x80, 0x44, 0x33, 0x22, 0x11, 0x01, 0x41, 0x00, 0xA5]; + var expected = FixtureBytes.Read("advise-tagid-11223344-item-A.bin"); Assert.Equal(expected, bytes); Assert.Equal(0x10, bytes[2]); diff --git a/tests/SuiteLink.Client.Tests/Protocol/SuiteLinkUpdateCodecTests.cs b/tests/SuiteLink.Client.Tests/Protocol/SuiteLinkUpdateCodecTests.cs index ee8d6fd..9ebb737 100644 --- a/tests/SuiteLink.Client.Tests/Protocol/SuiteLinkUpdateCodecTests.cs +++ b/tests/SuiteLink.Client.Tests/Protocol/SuiteLinkUpdateCodecTests.cs @@ -1,5 +1,6 @@ using System.Text; using SuiteLink.Client.Protocol; +using SuiteLink.Client.Tests.Fixtures; namespace SuiteLink.Client.Tests.Protocol; @@ -8,16 +9,7 @@ public sealed class SuiteLinkUpdateCodecTests [Fact] public void DecodeUpdate_DecodesBinaryValue() { - byte[] frame = - [ - 0x0D, 0x00, 0x09, 0x00, - 0x34, 0x12, 0x00, 0x00, - 0x0A, 0x00, - 0xC0, 0x00, - 0x01, - 0x01, - 0xA5 - ]; + var frame = FixtureBytes.Read("update-binary-tag-1234-true.bin"); Assert.Equal(0x09, frame[2]); Assert.Equal(0x00, frame[3]); diff --git a/tests/SuiteLink.Client.Tests/Protocol/SuiteLinkWriteCodecTests.cs b/tests/SuiteLink.Client.Tests/Protocol/SuiteLinkWriteCodecTests.cs new file mode 100644 index 0000000..e70bb83 --- /dev/null +++ b/tests/SuiteLink.Client.Tests/Protocol/SuiteLinkWriteCodecTests.cs @@ -0,0 +1,66 @@ +using SuiteLink.Client.Protocol; + +namespace SuiteLink.Client.Tests.Protocol; + +public sealed class SuiteLinkWriteCodecTests +{ + [Fact] + public void EncodeWrite_BooleanValue_WritesExpectedGoldenVector() + { + var bytes = SuiteLinkWriteCodec.Encode(0x12345678, SuiteLinkValue.FromBoolean(true)); + byte[] expected = [0x09, 0x00, 0x0B, 0x08, 0x78, 0x56, 0x34, 0x12, 0x01, 0x01, 0xA5]; + + Assert.Equal(expected, bytes); + Assert.Equal(0x0B, bytes[2]); + Assert.Equal(0x08, bytes[3]); + } + + [Fact] + public void EncodeWrite_Int32Value_WritesExpectedPayload() + { + var bytes = SuiteLinkWriteCodec.Encode(0x89ABCDEF, SuiteLinkValue.FromInt32(42)); + var frame = SuiteLinkFrameReader.ParseFrame(bytes); + + Assert.Equal(0x0B, bytes[2]); + Assert.Equal(0x08, bytes[3]); + Assert.Equal(SuiteLinkWriteCodec.PokeMessageType, frame.MessageType); + Assert.Equal(0x89ABCDEFu, SuiteLinkEncoding.ReadUInt32LittleEndian(frame.Payload.Span)); + Assert.Equal((byte)SuiteLinkWireValueType.Integer, frame.Payload.Span[4]); + Assert.Equal(42, SuiteLinkEncoding.ReadInt32LittleEndian(frame.Payload.Span[5..])); + } + + [Fact] + public void EncodeWrite_Float32Value_WritesExpectedPayload() + { + var bytes = SuiteLinkWriteCodec.Encode(0x00000007, SuiteLinkValue.FromFloat32(12.5f)); + var frame = SuiteLinkFrameReader.ParseFrame(bytes); + + Assert.Equal(0x0B, bytes[2]); + Assert.Equal(0x08, bytes[3]); + Assert.Equal((byte)SuiteLinkWireValueType.Real, frame.Payload.Span[4]); + Assert.Equal(12.5f, SuiteLinkEncoding.ReadSingleLittleEndian(frame.Payload.Span[5..])); + } + + [Fact] + public void EncodeWrite_StringValue_WritesExpectedPayload() + { + var bytes = SuiteLinkWriteCodec.Encode(0x00000008, SuiteLinkValue.FromString("OK")); + var frame = SuiteLinkFrameReader.ParseFrame(bytes); + + Assert.Equal(0x0B, bytes[2]); + Assert.Equal(0x08, bytes[3]); + Assert.Equal((byte)SuiteLinkWireValueType.Message, frame.Payload.Span[4]); + Assert.Equal((ushort)2, SuiteLinkEncoding.ReadUInt16LittleEndian(frame.Payload.Span[5..])); + Assert.Equal((byte)'O', frame.Payload.Span[7]); + Assert.Equal((byte)'K', frame.Payload.Span[8]); + } + + [Fact] + public void EncodeWrite_NoneValue_ThrowsNotSupportedException() + { + var ex = Assert.Throws( + () => SuiteLinkWriteCodec.Encode(0x00000001, default)); + + Assert.Contains("unsupported", ex.Message, StringComparison.OrdinalIgnoreCase); + } +} diff --git a/tests/SuiteLink.Client.Tests/SuiteLink.Client.Tests.csproj b/tests/SuiteLink.Client.Tests/SuiteLink.Client.Tests.csproj index fd96ef7..90b53fa 100644 --- a/tests/SuiteLink.Client.Tests/SuiteLink.Client.Tests.csproj +++ b/tests/SuiteLink.Client.Tests/SuiteLink.Client.Tests.csproj @@ -22,4 +22,8 @@ - \ No newline at end of file + + + + + diff --git a/tests/SuiteLink.Client.Tests/SuiteLinkClientConnectionTests.cs b/tests/SuiteLink.Client.Tests/SuiteLinkClientConnectionTests.cs new file mode 100644 index 0000000..cc18da5 --- /dev/null +++ b/tests/SuiteLink.Client.Tests/SuiteLinkClientConnectionTests.cs @@ -0,0 +1,209 @@ +using SuiteLink.Client.Protocol; +using SuiteLink.Client.Transport; + +namespace SuiteLink.Client.Tests; + +public sealed class SuiteLinkClientConnectionTests +{ + [Fact] + public async Task ConnectAsync_SendsHandshakeThenConnect_ButDoesNotReportReadyYet() + { + var handshakeAckFrame = new byte[] { 0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5 }; + var transport = new FakeTransport([handshakeAckFrame[..4], handshakeAckFrame[4..]]); + var client = new SuiteLinkClient(transport); + var options = CreateOptions(); + + await client.ConnectAsync(options); + + Assert.False(client.IsConnected); + Assert.Equal(2, transport.SentBuffers.Count); + Assert.Equal( + SuiteLinkHandshakeCodec.EncodeNormalQueryHandshake( + options.Application, + options.ClientNode, + options.UserName), + transport.SentBuffers[0]); + Assert.Equal(SuiteLinkConnectCodec.Encode(options), transport.SentBuffers[1]); + Assert.Equal(1, transport.ConnectCallCount); + Assert.Equal(options.Host, transport.ConnectedHost); + Assert.Equal(options.Port, transport.ConnectedPort); + } + + [Fact] + public async Task DisconnectAsync_UsesSingleUseClientSemantics_AndDoesNotDisposeExternalTransport() + { + var handshakeAckFrame = new byte[] { 0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5 }; + var transport = new FakeTransport([handshakeAckFrame]); + var client = new SuiteLinkClient(transport); + + await client.ConnectAsync(CreateOptions()); + await client.DisconnectAsync(); + + Assert.False(client.IsConnected); + Assert.Equal(0, transport.DisposeCallCount); + await Assert.ThrowsAsync(() => client.ConnectAsync(CreateOptions())); + } + + [Fact] + public async Task DisposeAsync_WithOwnedTransport_DisposesUnderlyingTransport() + { + var handshakeAckFrame = new byte[] { 0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5 }; + var transport = new FakeTransport([handshakeAckFrame]); + var client = new SuiteLinkClient(transport, ownsTransport: true); + + await client.ConnectAsync(CreateOptions()); + await client.DisposeAsync(); + + Assert.False(client.IsConnected); + Assert.Equal(1, transport.DisposeCallCount); + } + + [Fact] + public async Task ConnectAsync_MalformedHandshakeAck_ThrowsAndFaultsClient() + { + var malformedAck = new byte[] { 0x03, 0x00, 0x02, 0x00, 0xA5 }; + var transport = new FakeTransport([malformedAck]); + var client = new SuiteLinkClient(transport); + + await Assert.ThrowsAsync(() => client.ConnectAsync(CreateOptions())); + + Assert.False(client.IsConnected); + Assert.Equal(1, transport.ConnectCallCount); + Assert.Single(transport.SentBuffers); // handshake only + } + + [Fact] + public async Task ConnectAsync_RemoteEofDuringHandshake_ThrowsAndFaultsClient() + { + var transport = new FakeTransport(receiveChunks: []); + var client = new SuiteLinkClient(transport); + + await Assert.ThrowsAsync(() => client.ConnectAsync(CreateOptions())); + + Assert.False(client.IsConnected); + Assert.Equal(1, transport.ConnectCallCount); + Assert.Single(transport.SentBuffers); // handshake only + } + + [Fact] + public async Task ConnectAsync_RepeatedWhilePending_DoesNotSendDuplicateStartupFrames() + { + var handshakeAckFrame = new byte[] { 0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5 }; + var transport = new FakeTransport([handshakeAckFrame]); + var client = new SuiteLinkClient(transport); + var options = CreateOptions(); + + await client.ConnectAsync(options); + await client.ConnectAsync(options); + + Assert.Equal(1, transport.ConnectCallCount); + Assert.Equal(2, transport.SentBuffers.Count); + } + + [Fact] + public async Task ConnectAsync_ConcurrentCalls_AreSerializedAndDoNotDuplicateStartupFrames() + { + var handshakeAckFrame = new byte[] { 0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5 }; + var receiveGate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var transport = new FakeTransport([handshakeAckFrame]) + { + ReceiveGate = receiveGate.Task + }; + var client = new SuiteLinkClient(transport); + var options = CreateOptions(); + + var connectTask1 = client.ConnectAsync(options); + while (transport.SentBuffers.Count == 0) + { + await Task.Delay(10); + } + + var connectTask2 = client.ConnectAsync(options); + receiveGate.SetResult(); + + await Task.WhenAll(connectTask1, connectTask2); + + Assert.Equal(1, transport.ConnectCallCount); + Assert.Equal(2, transport.SentBuffers.Count); + } + + private static SuiteLinkConnectionOptions CreateOptions() + { + return new SuiteLinkConnectionOptions( + host: "127.0.0.1", + application: "App", + topic: "Topic", + clientName: "Client", + clientNode: "Node", + userName: "User", + serverNode: "Server", + timezone: "UTC", + port: 5413); + } + + private sealed class FakeTransport : ISuiteLinkTransport + { + private readonly object _syncRoot = new(); + private readonly Queue _receiveChunks; + private bool _disposed; + + public FakeTransport(IEnumerable receiveChunks) + { + _receiveChunks = new Queue(receiveChunks); + } + + public Task? ReceiveGate { get; init; } + public string ConnectedHost { get; private set; } = string.Empty; + public int ConnectedPort { get; private set; } + public int ConnectCallCount { get; private set; } + public int DisposeCallCount { get; private set; } + public bool IsConnected => ConnectCallCount > 0 && !_disposed; + public List SentBuffers { get; } = []; + + public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default) + { + ConnectCallCount++; + ConnectedHost = host; + ConnectedPort = port; + return ValueTask.CompletedTask; + } + + public ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + lock (_syncRoot) + { + SentBuffers.Add(buffer.ToArray()); + } + return ValueTask.CompletedTask; + } + + public async ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default) + { + if (ReceiveGate is not null) + { + await ReceiveGate.WaitAsync(cancellationToken).ConfigureAwait(false); + } + + byte[]? next; + lock (_syncRoot) + { + if (_receiveChunks.Count == 0) + { + return 0; + } + + next = _receiveChunks.Dequeue(); + } + + next.CopyTo(buffer); + return next.Length; + } + + public ValueTask DisposeAsync() + { + _disposed = true; + DisposeCallCount++; + return ValueTask.CompletedTask; + } + } +} diff --git a/tests/SuiteLink.Client.Tests/SuiteLinkClientSubscriptionTests.cs b/tests/SuiteLink.Client.Tests/SuiteLinkClientSubscriptionTests.cs new file mode 100644 index 0000000..e2c2cf6 --- /dev/null +++ b/tests/SuiteLink.Client.Tests/SuiteLinkClientSubscriptionTests.cs @@ -0,0 +1,336 @@ +using SuiteLink.Client.Protocol; +using SuiteLink.Client.Transport; + +namespace SuiteLink.Client.Tests; + +public sealed class SuiteLinkClientSubscriptionTests +{ + [Fact] + public async Task SubscribeAsync_SendsAdvise_AndReturnsSubscriptionHandle() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + var handle = await client.SubscribeAsync("Pump001.Run", _ => { }); + + Assert.True(client.IsConnected); + Assert.Equal("Pump001.Run", handle.ItemName); + Assert.Equal(1u, handle.TagId); + Assert.Equal(3, transport.SentBuffers.Count); + + var adviseFrame = SuiteLinkFrameReader.ParseFrame(transport.SentBuffers[2]); + Assert.Equal(SuiteLinkSubscriptionCodec.AdviseMessageType, adviseFrame.MessageType); + Assert.Equal(1u, SuiteLinkEncoding.ReadUInt32LittleEndian(adviseFrame.Payload.Span[..4])); + } + + [Fact] + public async Task ProcessIncomingAsync_UpdateFrame_DispatchesToSubscriptionCallback() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + transport.EnqueueReceive(BuildBooleanUpdateFrame(1, true)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + SuiteLinkTagUpdate? callbackUpdate = null; + _ = await client.SubscribeAsync("Pump001.Run", update => callbackUpdate = update); + + await client.ProcessIncomingAsync(); + + Assert.NotNull(callbackUpdate); + Assert.Equal("Pump001.Run", callbackUpdate.ItemName); + Assert.True(callbackUpdate.Value.TryGetBoolean(out var value)); + Assert.True(value); + } + + [Fact] + public async Task ReadAsync_ReturnsFirstMatchingUpdate_UsingTemporarySubscription() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + transport.EnqueueReceive(BuildIntegerUpdateFrame(1, 42)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + var update = await client.ReadAsync("Pump001.Speed", TimeSpan.FromSeconds(2)); + + Assert.Equal("Pump001.Speed", update.ItemName); + Assert.True(update.Value.TryGetInt32(out var value)); + Assert.Equal(42, value); + + Assert.Equal(4, transport.SentBuffers.Count); + var unadviseFrame = SuiteLinkFrameReader.ParseFrame(transport.SentBuffers[3]); + Assert.Equal(SuiteLinkSubscriptionCodec.UnadviseMessageType, unadviseFrame.MessageType); + Assert.Equal(1u, SuiteLinkEncoding.ReadUInt32LittleEndian(unadviseFrame.Payload.Span)); + } + + [Fact] + public async Task SubscriptionHandleDisposeAsync_SendsUnadvise_AndStopsFurtherDispatch() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + transport.EnqueueReceive(BuildBooleanUpdateFrame(1, true)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + var callbackCount = 0; + var handle = await client.SubscribeAsync("Pump001.Run", _ => callbackCount++); + + await handle.DisposeAsync(); + await client.ProcessIncomingAsync(); + + Assert.Equal(4, transport.SentBuffers.Count); + var unadviseFrame = SuiteLinkFrameReader.ParseFrame(transport.SentBuffers[3]); + Assert.Equal(SuiteLinkSubscriptionCodec.UnadviseMessageType, unadviseFrame.MessageType); + Assert.Equal(1u, SuiteLinkEncoding.ReadUInt32LittleEndian(unadviseFrame.Payload.Span)); + Assert.Equal(0, callbackCount); + } + + [Fact] + public async Task ProcessIncomingAsync_CallbackDisposesSubscription_DoesNotDeadlock() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + transport.EnqueueReceive(BuildBooleanUpdateFrame(1, true)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + SubscriptionHandle? handle = null; + var callbackCompleted = false; + handle = await client.SubscribeAsync( + "Pump001.Run", + _ => + { + handle!.DisposeAsync().AsTask().GetAwaiter().GetResult(); + callbackCompleted = true; + }); + + await client.ProcessIncomingAsync().WaitAsync(TimeSpan.FromSeconds(1)); + + Assert.True(callbackCompleted); + Assert.Equal(4, transport.SentBuffers.Count); + var unadviseFrame = SuiteLinkFrameReader.ParseFrame(transport.SentBuffers[3]); + Assert.Equal(SuiteLinkSubscriptionCodec.UnadviseMessageType, unadviseFrame.MessageType); + } + + [Fact] + public async Task ProcessIncomingAsync_CallbackCanInvokeNestedProcessing_WithoutDeadlock() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + transport.EnqueueReceive(BuildBooleanUpdateFrame(1, true)); + transport.EnqueueReceive(BuildBooleanUpdateFrame(1, false)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + var values = new List(); + var nestedCalled = false; + _ = await client.SubscribeAsync( + "Pump001.Run", + update => + { + if (update.Value.TryGetBoolean(out var value)) + { + values.Add(value); + } + + if (!nestedCalled) + { + nestedCalled = true; + client.ProcessIncomingAsync().GetAwaiter().GetResult(); + } + }); + + await client.ProcessIncomingAsync().WaitAsync(TimeSpan.FromSeconds(1)); + + Assert.Equal(2, values.Count); + Assert.True(values[0]); + Assert.False(values[1]); + } + + [Fact] + public async Task ReadAsync_PreservesPrimaryReceiveFailure_WhenCleanupUnadviseFails() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + transport.SendFailureFactory = frameBytes => + { + var span = frameBytes.Span; + var isUnadviseFrame = span.Length >= 4 && + span[2] == 0x04 && + span[3] == 0x80; + return isUnadviseFrame + ? new IOException("Synthetic unadvise send failure.") + : null; + }; + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + var ex = await Assert.ThrowsAsync( + () => client.ReadAsync("Pump001.Speed", TimeSpan.FromSeconds(2))); + + Assert.Contains("Remote endpoint closed", ex.Message, StringComparison.OrdinalIgnoreCase); + } + + [Fact] + public async Task SubscribeAsync_RejectsMultiItemAdviseAck_AsUnsupportedCorrelationPolicy() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(1, 2)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + await Assert.ThrowsAsync( + () => client.SubscribeAsync("Pump001.Run", _ => { })); + } + + [Fact] + public async Task SubscribeAsync_RejectsMismatchedAdviseAckTagId() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(0x1234)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + await Assert.ThrowsAsync( + () => client.SubscribeAsync("Pump001.Run", _ => { })); + } + + private static SuiteLinkConnectionOptions CreateOptions() + { + return new SuiteLinkConnectionOptions( + host: "127.0.0.1", + application: "App", + topic: "Topic", + clientName: "Client", + clientNode: "Node", + userName: "User", + serverNode: "Server", + timezone: "UTC", + port: 5413); + } + + private static byte[] BuildHandshakeAckFrame() + { + return [0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5]; + } + + private static byte[] BuildAdviseAckFrame(params uint[] tagIds) + { + var payload = new byte[Math.Max(1, tagIds.Length) * 5]; + var ids = tagIds.Length == 0 ? [0u] : tagIds; + var offset = 0; + foreach (var tagId in ids) + { + SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(offset, 4), tagId); + payload[offset + 4] = 0x00; + offset += 5; + } + + return SuiteLinkFrameWriter.WriteFrame(SuiteLinkSubscriptionCodec.AdviseAckMessageType, payload); + } + + private static byte[] BuildBooleanUpdateFrame(uint tagId, bool value) + { + var payload = new byte[10]; + SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(0, 4), tagId); + SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(4, 2), 1); + SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(6, 2), 0x00C0); + payload[8] = (byte)SuiteLinkWireValueType.Binary; + payload[9] = value ? (byte)1 : (byte)0; + return SuiteLinkFrameWriter.WriteFrame(SuiteLinkUpdateCodec.UpdateMessageType, payload); + } + + private static byte[] BuildIntegerUpdateFrame(uint tagId, int value) + { + var payload = new byte[13]; + SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(0, 4), tagId); + SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(4, 2), 1); + SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(6, 2), 0x00C0); + payload[8] = (byte)SuiteLinkWireValueType.Integer; + SuiteLinkEncoding.WriteInt32LittleEndian(payload.AsSpan(9, 4), value); + return SuiteLinkFrameWriter.WriteFrame(SuiteLinkUpdateCodec.UpdateMessageType, payload); + } + + private sealed class FakeTransport : ISuiteLinkTransport + { + private readonly Queue _receiveChunks = []; + private readonly object _syncRoot = new(); + + public bool IsConnected { get; private set; } + + public List SentBuffers { get; } = []; + public Func, Exception?>? SendFailureFactory { get; set; } + + public void EnqueueReceive(byte[] bytes) + { + lock (_syncRoot) + { + _receiveChunks.Enqueue(bytes); + } + } + + public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default) + { + IsConnected = true; + return ValueTask.CompletedTask; + } + + public ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + lock (_syncRoot) + { + SentBuffers.Add(buffer.ToArray()); + } + + var sendFailure = SendFailureFactory?.Invoke(buffer); + if (sendFailure is not null) + { + throw sendFailure; + } + + return ValueTask.CompletedTask; + } + + public ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default) + { + lock (_syncRoot) + { + if (_receiveChunks.Count == 0) + { + return ValueTask.FromResult(0); + } + + var next = _receiveChunks.Dequeue(); + next.CopyTo(buffer); + return ValueTask.FromResult(next.Length); + } + } + + public ValueTask DisposeAsync() + { + IsConnected = false; + return ValueTask.CompletedTask; + } + } +} diff --git a/tests/SuiteLink.Client.Tests/SuiteLinkClientWriteTests.cs b/tests/SuiteLink.Client.Tests/SuiteLinkClientWriteTests.cs new file mode 100644 index 0000000..4c70f75 --- /dev/null +++ b/tests/SuiteLink.Client.Tests/SuiteLinkClientWriteTests.cs @@ -0,0 +1,126 @@ +using SuiteLink.Client.Protocol; +using SuiteLink.Client.Transport; + +namespace SuiteLink.Client.Tests; + +public sealed class SuiteLinkClientWriteTests +{ + [Fact] + public async Task WriteAsync_SendsPokeFrame_ForSubscribedTag() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + + await client.WriteAsync("Pump001.Run", SuiteLinkValue.FromBoolean(true)); + + Assert.Equal(4, transport.SentBuffers.Count); + var pokeFrame = SuiteLinkFrameReader.ParseFrame(transport.SentBuffers[3]); + Assert.Equal(SuiteLinkWriteCodec.PokeMessageType, pokeFrame.MessageType); + Assert.Equal(1u, SuiteLinkEncoding.ReadUInt32LittleEndian(pokeFrame.Payload.Span[..4])); + Assert.Equal((byte)SuiteLinkWireValueType.Binary, pokeFrame.Payload.Span[4]); + Assert.Equal((byte)1, pokeFrame.Payload.Span[5]); + } + + [Fact] + public async Task WriteAsync_UnknownTag_ThrowsInvalidOperationException() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + var ex = await Assert.ThrowsAsync( + () => client.WriteAsync("Pump001.Unknown", SuiteLinkValue.FromInt32(42))); + + Assert.Contains("not subscribed", ex.Message, StringComparison.OrdinalIgnoreCase); + Assert.Equal(2, transport.SentBuffers.Count); + } + + private static SuiteLinkConnectionOptions CreateOptions() + { + return new SuiteLinkConnectionOptions( + host: "127.0.0.1", + application: "App", + topic: "Topic", + clientName: "Client", + clientNode: "Node", + userName: "User", + serverNode: "Server", + timezone: "UTC", + port: 5413); + } + + private static byte[] BuildHandshakeAckFrame() + { + return [0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5]; + } + + private static byte[] BuildAdviseAckFrame(uint tagId) + { + Span payload = stackalloc byte[5]; + SuiteLinkEncoding.WriteUInt32LittleEndian(payload[..4], tagId); + payload[4] = 0x00; + return SuiteLinkFrameWriter.WriteFrame(SuiteLinkSubscriptionCodec.AdviseAckMessageType, payload); + } + + private sealed class FakeTransport : ISuiteLinkTransport + { + private readonly Queue _receiveChunks = []; + private readonly object _syncRoot = new(); + + public bool IsConnected { get; private set; } + + public List SentBuffers { get; } = []; + + public void EnqueueReceive(byte[] bytes) + { + lock (_syncRoot) + { + _receiveChunks.Enqueue(bytes); + } + } + + public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default) + { + IsConnected = true; + return ValueTask.CompletedTask; + } + + public ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + lock (_syncRoot) + { + SentBuffers.Add(buffer.ToArray()); + } + + return ValueTask.CompletedTask; + } + + public ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default) + { + lock (_syncRoot) + { + if (_receiveChunks.Count == 0) + { + return new ValueTask(0); + } + + var next = _receiveChunks.Dequeue(); + next.CopyTo(buffer); + return new ValueTask(next.Length); + } + } + + public ValueTask DisposeAsync() + { + IsConnected = false; + return ValueTask.CompletedTask; + } + } +} diff --git a/tests/SuiteLink.Client.Tests/Transport/SuiteLinkTcpTransportTests.cs b/tests/SuiteLink.Client.Tests/Transport/SuiteLinkTcpTransportTests.cs new file mode 100644 index 0000000..289a988 --- /dev/null +++ b/tests/SuiteLink.Client.Tests/Transport/SuiteLinkTcpTransportTests.cs @@ -0,0 +1,179 @@ +using SuiteLink.Client.Transport; +using System.Net; +using System.Net.Sockets; + +namespace SuiteLink.Client.Tests.Transport; + +public sealed class SuiteLinkTcpTransportTests +{ + [Fact] + public async Task SendAsync_WithInjectedStream_WritesBytes() + { + using var stream = new MemoryStream(); + await using var transport = new SuiteLinkTcpTransport(stream); + byte[] payload = [0x01, 0x02, 0x03]; + + await transport.SendAsync(payload, CancellationToken.None); + + Assert.Equal(payload, stream.ToArray()); + } + + [Fact] + public async Task ReceiveAsync_WithInjectedStream_ReadsBytes() + { + using var stream = new MemoryStream([0x10, 0x20, 0x30]); + await using var transport = new SuiteLinkTcpTransport(stream); + byte[] buffer = new byte[2]; + + var bytesRead = await transport.ReceiveAsync(buffer, CancellationToken.None); + + Assert.Equal(2, bytesRead); + Assert.Equal(0x10, buffer[0]); + Assert.Equal(0x20, buffer[1]); + } + + [Fact] + public async Task SendAsync_WithoutConnection_ThrowsInvalidOperationException() + { + await using var transport = new SuiteLinkTcpTransport(); + + var ex = await Assert.ThrowsAsync( + () => transport.SendAsync(new byte[] { 0xAA }, CancellationToken.None).AsTask()); + + Assert.Contains("connected", ex.Message, StringComparison.OrdinalIgnoreCase); + } + + [Fact] + public async Task ReceiveAsync_WithPartialReadStream_ReturnsPartialReadsAndEof() + { + using var stream = new PartialReadStream([0x10, 0x20, 0x30], maxChunkSize: 1); + await using var transport = new SuiteLinkTcpTransport(stream); + byte[] buffer = new byte[3]; + + var read1 = await transport.ReceiveAsync(buffer, CancellationToken.None); + var read2 = await transport.ReceiveAsync(buffer, CancellationToken.None); + var read3 = await transport.ReceiveAsync(buffer, CancellationToken.None); + var read4 = await transport.ReceiveAsync(buffer, CancellationToken.None); + + Assert.Equal(1, read1); + Assert.Equal(1, read2); + Assert.Equal(1, read3); + Assert.Equal(0, read4); + } + + [Fact] + public async Task DisposeAsync_AfterDisposal_SendAndReceiveThrowObjectDisposedException() + { + using var stream = new MemoryStream([0x01, 0x02, 0x03]); + var transport = new SuiteLinkTcpTransport(stream); + + await transport.DisposeAsync(); + + await Assert.ThrowsAsync( + () => transport.SendAsync(new byte[] { 0xAA }, CancellationToken.None).AsTask()); + + await Assert.ThrowsAsync( + () => transport.ReceiveAsync(new byte[1], CancellationToken.None).AsTask()); + } + + [Fact] + public async Task DisposeAsync_LeaveOpenTrue_DoesNotDisposeInjectedStream() + { + var stream = new TrackingStream(); + await using (var transport = new SuiteLinkTcpTransport(stream, leaveOpen: true)) + { + await transport.DisposeAsync(); + } + + Assert.False(stream.WasDisposed); + } + + [Fact] + public async Task ConnectAsync_ConcurrentCalls_CreateSingleConnection() + { + using var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + + var endpoint = (IPEndPoint)listener.LocalEndpoint; + await using var transport = new SuiteLinkTcpTransport(); + + Task[] connectTasks = + [ + transport.ConnectAsync(endpoint.Address.ToString(), endpoint.Port).AsTask(), + transport.ConnectAsync(endpoint.Address.ToString(), endpoint.Port).AsTask(), + transport.ConnectAsync(endpoint.Address.ToString(), endpoint.Port).AsTask(), + transport.ConnectAsync(endpoint.Address.ToString(), endpoint.Port).AsTask() + ]; + + await Task.WhenAll(connectTasks); + + using var accepted1 = await listener.AcceptTcpClientAsync(); + + using var secondAcceptCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300)); + await Assert.ThrowsAnyAsync( + async () => await listener.AcceptTcpClientAsync(secondAcceptCts.Token)); + } + + private sealed class PartialReadStream : Stream + { + private readonly MemoryStream _inner; + private readonly int _maxChunkSize; + + public PartialReadStream(byte[] bytes, int maxChunkSize) + { + _inner = new MemoryStream(bytes); + _maxChunkSize = maxChunkSize; + } + + public override bool CanRead => _inner.CanRead; + public override bool CanSeek => false; + public override bool CanWrite => false; + public override long Length => _inner.Length; + public override long Position + { + get => _inner.Position; + set => throw new NotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return _inner.Read(buffer, offset, Math.Min(count, _maxChunkSize)); + } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + { + return _inner.ReadAsync(buffer[..Math.Min(buffer.Length, _maxChunkSize)], cancellationToken); + } + + public override void Flush() + { + throw new NotSupportedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(); + } + } + + private sealed class TrackingStream : MemoryStream + { + public bool WasDisposed { get; private set; } + + protected override void Dispose(bool disposing) + { + WasDisposed = true; + base.Dispose(disposing); + } + } +}