diff --git a/README.md b/README.md index 4d8296f..5e7ce61 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,10 @@ Current implementation targets the normal SuiteLink tag protocol and supports: - subscribe and unadvise flows - update decoding for `bool`, `int32`, `float32`, and `string` - write (`POKE`) encoding for `bool`, `int32`, `float32`, and `string` +- background receive loop for subscription updates without manual polling +- automatic reconnect with durable subscription replay after runtime disconnects +- best-effort latest-value catch-up replay after reconnect when enabled +- policy-based reconnect retry timing with exponential backoff and jitter support - client/session/transport layers suitable for macOS, Linux, and Windows ## Unsupported @@ -18,8 +22,9 @@ This repository does not currently support: - AlarmMgr / alarms and events - secure SuiteLink V3 / TLS transport -- automatic reconnect -- background receive loop / production retry behavior +- write queuing while reconnecting +- full outage-history replay of every missed value +- validated reconnect behavior against a live AVEVA server deployment - validated support for richer System Platform data types such as `double`, `int64`, or `DateTime` ## Build @@ -56,3 +61,9 @@ See [README.md](/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.I - The repository includes fixture-backed protocol tests under [Fixtures](/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/Fixtures). - Protocol assumptions derived from reverse engineering are intentionally isolated in codec classes and tests so they can be refined against live captures later. +- Normal subscription use does not require calling `ProcessIncomingAsync`; the runtime loop dispatches updates in the background. +- After a runtime disconnect, the client enters `Reconnecting`, rebuilds the transport/session startup sequence, replays durable subscriptions, and resumes dispatch without caller resubscription. +- Reconnect timing is driven by `SuiteLinkRetryPolicy`; the default policy retries immediately, then uses bounded exponential backoff with jitter. +- Catch-up replay is best-effort latest-value refresh only. It does not represent every value missed during the outage window. +- If the client is reconnecting after a runtime disconnect, `WriteAsync` fails fast until the session is ready again. +- Reconnect behavior is currently verified with fixture-backed and fake-transport tests; live AVEVA validation is still required, especially for mixed-mode deployments and server-specific timing. diff --git a/docs/plans/2026-03-17-catchup-retry-implementation-plan.md b/docs/plans/2026-03-17-catchup-retry-implementation-plan.md new file mode 100644 index 0000000..1b6a80d --- /dev/null +++ b/docs/plans/2026-03-17-catchup-retry-implementation-plan.md @@ -0,0 +1,583 @@ +# Catch-Up Replay And Advanced Retry Implementation Plan + +> **For Codex:** REQUIRED SUB-SKILL: Use `executeplan` to implement this plan task-by-task. + +**Goal:** Add best-effort latest-value catch-up after reconnect and replace the fixed reconnect delay schedule with a production-grade retry policy, while also fixing the current reconnect quality issues. + +**Architecture:** Extend the existing reconnect runtime with a small runtime-options layer, a retry-policy calculator, and a post-reconnect catch-up refresh phase. Keep reconnect success defined as restored live subscriptions, and treat catch-up as a best-effort follow-on phase that emits synthetic updates marked separately from live traffic. + +**Tech Stack:** .NET 10, C#, xUnit, existing SuiteLink protocol/client/runtime/transport layers + +--- + +### Task 1: Add Runtime Option Types + +**Files:** +- Create: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkRuntimeOptions.cs` +- Create: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkRetryPolicy.cs` +- Create: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkCatchUpPolicy.cs` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkConnectionOptions.cs` +- Test: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkConnectionOptionsTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public void ConnectionOptions_DefaultsRuntimeOptions() +{ + var options = new SuiteLinkConnectionOptions( + host: "127.0.0.1", + application: "App", + topic: "Topic", + clientName: "Client", + clientNode: "Node", + userName: "User", + serverNode: "Server"); + + Assert.NotNull(options.Runtime); + Assert.Equal(SuiteLinkCatchUpPolicy.None, options.Runtime.CatchUpPolicy); + Assert.NotNull(options.Runtime.RetryPolicy); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter ConnectionOptions_DefaultsRuntimeOptions -v minimal` +Expected: FAIL because runtime options do not exist yet + +**Step 3: Write minimal implementation** + +Create: + +```csharp +public enum SuiteLinkCatchUpPolicy +{ + None = 0, + RefreshLatestValue = 1 +} +``` + +```csharp +public sealed record class SuiteLinkRetryPolicy( + TimeSpan InitialDelay, + double Multiplier, + TimeSpan MaxDelay, + int? MaxAttempts = null, + bool UseJitter = true) +{ + public static SuiteLinkRetryPolicy Default { get; } = + new(TimeSpan.FromSeconds(1), 2.0, TimeSpan.FromSeconds(30)); +} +``` + +```csharp +public sealed record class SuiteLinkRuntimeOptions( + SuiteLinkRetryPolicy RetryPolicy, + SuiteLinkCatchUpPolicy CatchUpPolicy, + TimeSpan CatchUpTimeout) +{ + public static SuiteLinkRuntimeOptions Default { get; } = + new(SuiteLinkRetryPolicy.Default, SuiteLinkCatchUpPolicy.None, TimeSpan.FromSeconds(2)); +} +``` + +Update `SuiteLinkConnectionOptions` to expose: + +```csharp +public SuiteLinkRuntimeOptions Runtime { get; } +``` + +and default it to `SuiteLinkRuntimeOptions.Default`. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkConnectionOptionsTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkRuntimeOptions.cs /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkRetryPolicy.cs /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkCatchUpPolicy.cs /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkConnectionOptions.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkConnectionOptionsTests.cs +git commit -m "feat: add runtime reconnect option types" +``` + +### Task 2: Add Retry Policy Delay Calculator + +**Files:** +- Create: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Internal/SuiteLinkRetryDelayCalculator.cs` +- Test: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/Internal/SuiteLinkRetryDelayCalculatorTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public void GetDelay_UsesImmediateThenExponentialCap() +{ + var policy = new SuiteLinkRetryPolicy( + InitialDelay: TimeSpan.FromSeconds(1), + Multiplier: 2.0, + MaxDelay: TimeSpan.FromSeconds(30), + UseJitter: false); + + Assert.Equal(TimeSpan.Zero, SuiteLinkRetryDelayCalculator.GetDelay(policy, 0)); + Assert.Equal(TimeSpan.FromSeconds(1), SuiteLinkRetryDelayCalculator.GetDelay(policy, 1)); + Assert.Equal(TimeSpan.FromSeconds(2), SuiteLinkRetryDelayCalculator.GetDelay(policy, 2)); + Assert.Equal(TimeSpan.FromSeconds(4), SuiteLinkRetryDelayCalculator.GetDelay(policy, 3)); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkRetryDelayCalculatorTests -v minimal` +Expected: FAIL because calculator does not exist yet + +**Step 3: Write minimal implementation** + +Create: + +```csharp +internal static class SuiteLinkRetryDelayCalculator +{ + public static TimeSpan GetDelay(SuiteLinkRetryPolicy policy, int attempt) + { + if (attempt == 0) + { + return TimeSpan.Zero; + } + + var rawSeconds = policy.InitialDelay.TotalSeconds * Math.Pow(policy.Multiplier, attempt - 1); + var bounded = TimeSpan.FromSeconds(Math.Min(rawSeconds, policy.MaxDelay.TotalSeconds)); + return bounded; + } +} +``` + +Do not add jitter yet beyond the policy flag unless tests require it. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkRetryDelayCalculatorTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Internal/SuiteLinkRetryDelayCalculator.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/Internal/SuiteLinkRetryDelayCalculatorTests.cs +git commit -m "feat: add reconnect retry delay calculator" +``` + +### Task 3: Wire Retry Policy Into Reconnect Runtime + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Reconnect_UsesConfiguredRetryPolicy() +{ + var observed = new List(); + var options = CreateOptions() with + { + Runtime = new SuiteLinkRuntimeOptions( + new SuiteLinkRetryPolicy(TimeSpan.FromSeconds(3), 3.0, TimeSpan.FromSeconds(20), UseJitter: false), + SuiteLinkCatchUpPolicy.None, + TimeSpan.FromSeconds(2)) + }; + + var client = CreateReconnectClient(delayAsync: (delay, _) => + { + observed.Add(delay); + return Task.CompletedTask; + }); + + await client.ConnectAsync(options); + await EventuallyReconnectAsync(client); + + Assert.Contains(TimeSpan.FromSeconds(3), observed); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter Reconnect_UsesConfiguredRetryPolicy -v minimal` +Expected: FAIL because reconnect still uses a fixed schedule + +**Step 3: Write minimal implementation** + +In `SuiteLinkClient`: + +- remove direct use of `ReconnectDelaySchedule` +- read retry policy from `_connectionOptions!.Runtime.RetryPolicy` +- use `SuiteLinkRetryDelayCalculator.GetDelay(policy, attempt)` + +Keep the current injected `_delayAsync` test seam. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientReconnectTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs +git commit -m "feat: apply retry policy to reconnect runtime" +``` + +### Task 4: Fix Fast-Fail Writes During Reconnect + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientWriteTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task WriteAsync_DuringReconnect_ThrowsBeforeWaitingOnOperationGate() +{ + var client = CreateClientWithBlockedOperationGateAndReconnectState(); + + var ex = await Assert.ThrowsAsync( + () => client.WriteAsync("Pump001.Run", SuiteLinkValue.FromBoolean(true))); + + Assert.Contains("reconnecting", ex.Message, StringComparison.OrdinalIgnoreCase); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter WriteAsync_DuringReconnect_ThrowsBeforeWaitingOnOperationGate -v minimal` +Expected: FAIL because `WriteAsync` currently waits on `_operationGate` first + +**Step 3: Write minimal implementation** + +Move the reconnect state check ahead of: + +```csharp +await _operationGate.WaitAsync(...) +``` + +while keeping disposed-state checks intact. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientWriteTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientWriteTests.cs +git commit -m "fix: fail writes before reconnect gate contention" +``` + +### Task 5: Fix Transport Reset Ownership Semantics + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Transport/SuiteLinkTcpTransport.cs` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Transport/ISuiteLinkReconnectableTransport.cs` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/Transport/SuiteLinkTcpTransportTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task ResetConnectionAsync_LeaveOpenTrue_DoesNotDisposeInjectedStream() +{ + var stream = new TrackingStream(); + await using var transport = new SuiteLinkTcpTransport(stream, leaveOpen: true); + + await transport.ResetConnectionAsync(); + + Assert.False(stream.WasDisposed); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter ResetConnectionAsync_LeaveOpenTrue_DoesNotDisposeInjectedStream -v minimal` +Expected: FAIL because reset currently disposes caller-owned resources + +**Step 3: Write minimal implementation** + +Update `ResetConnectionAsync` to respect the same ownership rule as `DisposeAsync`: + +- if `leaveOpen` is `true`, detach without disposing injected resources +- if `leaveOpen` is `false`, dispose detached resources + +Do not broaden interface scope unnecessarily. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkTcpTransportTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Transport/SuiteLinkTcpTransport.cs /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Transport/ISuiteLinkReconnectableTransport.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/Transport/SuiteLinkTcpTransportTests.cs +git commit -m "fix: preserve transport ownership during reconnect reset" +``` + +### Task 6: Add Update Source Metadata + +**Files:** +- Create: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkUpdateSource.cs` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkTagUpdate.cs` +- Test: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkValueTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public void TagUpdate_DefaultSource_IsLive() +{ + var update = new SuiteLinkTagUpdate( + "Pump001.Run", + 1, + SuiteLinkValue.FromBoolean(true), + 0x00C0, + 1, + DateTimeOffset.UtcNow); + + Assert.Equal(SuiteLinkUpdateSource.Live, update.Source); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter TagUpdate_DefaultSource_IsLive -v minimal` +Expected: FAIL because source metadata does not exist + +**Step 3: Write minimal implementation** + +Create: + +```csharp +public enum SuiteLinkUpdateSource +{ + Live = 0, + CatchUpReplay = 1 +} +``` + +Add `Source` to `SuiteLinkTagUpdate` with default: + +```csharp +SuiteLinkUpdateSource.Live +``` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkTagUpdate -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkUpdateSource.cs /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkTagUpdate.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkValueTests.cs +git commit -m "feat: add update source metadata" +``` + +### Task 7: Add Best-Effort Catch-Up Refresh Execution + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Internal/SubscriptionRegistrationEntry.cs` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Reconnect_WithRefreshLatestValue_CanDispatchCatchUpReplay() +{ + SuiteLinkTagUpdate? catchUp = null; + var client = CreateReconnectReplayClient( + catchUpPolicy: SuiteLinkCatchUpPolicy.RefreshLatestValue, + onUpdate: update => + { + if (update.Source == SuiteLinkUpdateSource.CatchUpReplay) + { + catchUp = update; + } + }); + + await client.ConnectAsync(CreateOptionsWithCatchUp()); + + Assert.NotNull(catchUp); + Assert.Equal(SuiteLinkUpdateSource.CatchUpReplay, catchUp.Source); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter Reconnect_WithRefreshLatestValue_CanDispatchCatchUpReplay -v minimal` +Expected: FAIL because reconnect only resumes live dispatch today + +**Step 3: Write minimal implementation** + +After successful reconnect and durable subscription replay: + +- if `Runtime.CatchUpPolicy == SuiteLinkCatchUpPolicy.RefreshLatestValue` +- run a sequential refresh pass over durable subscriptions +- obtain one fresh value per item using existing temporary-read machinery or a dedicated internal refresh path +- dispatch synthetic updates with: + +```csharp +Source: SuiteLinkUpdateSource.CatchUpReplay +``` + +Do not fail reconnect if one item refresh fails or times out. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientReconnectTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Internal/SubscriptionRegistrationEntry.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs +git commit -m "feat: add reconnect catch-up refresh replay" +``` + +### Task 8: Make Catch-Up Partial Failure Non-Fatal + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Reconnect_CatchUpTimeout_DoesNotFailRecoveredSubscriptions() +{ + var client = CreateReconnectReplayClientWithTimedOutRefresh(); + + await client.ConnectAsync(CreateOptionsWithCatchUp()); + + await Eventually.AssertAsync(() => Assert.True(client.IsConnected)); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter Reconnect_CatchUpTimeout_DoesNotFailRecoveredSubscriptions -v minimal` +Expected: FAIL if catch-up failure tears down reconnect + +**Step 3: Write minimal implementation** + +Wrap each refresh item independently: + +- timeout per item from `Runtime.CatchUpTimeout` +- swallow per-item failure after optionally recording internal debug signal +- continue to remaining items + +Do not change the recovered `Ready`/`Subscribed` state. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientReconnectTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs +git commit -m "feat: tolerate partial catch-up refresh failures" +``` + +### Task 9: Add Jitter Coverage Without Flaky Tests + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Internal/SuiteLinkRetryDelayCalculator.cs` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/Internal/SuiteLinkRetryDelayCalculatorTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public void GetDelay_WithJitterEnabled_StaysWithinCap() +{ + var policy = new SuiteLinkRetryPolicy( + InitialDelay: TimeSpan.FromSeconds(2), + Multiplier: 2.0, + MaxDelay: TimeSpan.FromSeconds(10), + UseJitter: true); + + var delay = SuiteLinkRetryDelayCalculator.GetDelay(policy, 3, () => 0.5); + + Assert.InRange(delay, TimeSpan.Zero, TimeSpan.FromSeconds(10)); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkRetryDelayCalculatorTests -v minimal` +Expected: FAIL because jitter injection does not exist yet + +**Step 3: Write minimal implementation** + +Add an injected random source overload: + +```csharp +public static TimeSpan GetDelay(SuiteLinkRetryPolicy policy, int attempt, Func? nextDouble = null) +``` + +When jitter is enabled: + +- compute bounded base delay +- apply deterministic injected random value in tests +- keep final value within `[0, MaxDelay]` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkRetryDelayCalculatorTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Internal/SuiteLinkRetryDelayCalculator.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/Internal/SuiteLinkRetryDelayCalculatorTests.cs +git commit -m "feat: add deterministic jitter coverage for retry policy" +``` + +### Task 10: Update Documentation And Final Verification + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/README.md` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.IntegrationTests/README.md` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/docs/plans/2026-03-17-catchup-retry-design.md` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/docs/plans/2026-03-17-catchup-retry-implementation-plan.md` + +**Step 1: Write the documentation diff** + +Document: + +- catch-up mode is latest-value refresh only +- retry policy is configurable and jittered by default +- reconnect success is separate from best-effort catch-up completion +- writes still fail during reconnect + +**Step 2: Run targeted verification** + +Run: `rg -n "catch-up|retry|reconnect|jitter|refresh latest|reconnecting" /Users/dohertj2/Desktop/suitelinkclient/README.md /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.IntegrationTests/README.md` +Expected: PASS with updated wording + +**Step 3: Run full verification** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx -v minimal` +Expected: PASS + +**Step 4: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/README.md /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.IntegrationTests/README.md /Users/dohertj2/Desktop/suitelinkclient/docs/plans/2026-03-17-catchup-retry-design.md /Users/dohertj2/Desktop/suitelinkclient/docs/plans/2026-03-17-catchup-retry-implementation-plan.md +git commit -m "docs: describe catch-up replay and retry policy" +``` diff --git a/docs/plans/2026-03-17-runtime-reconnect-design.md b/docs/plans/2026-03-17-runtime-reconnect-design.md new file mode 100644 index 0000000..0b47b2d --- /dev/null +++ b/docs/plans/2026-03-17-runtime-reconnect-design.md @@ -0,0 +1,225 @@ +# SuiteLink Runtime Reconnect Design + +## Goal + +Add a background receive loop and automatic reconnect/recovery to the existing SuiteLink client so subscriptions are restored automatically and update callbacks resume without caller intervention. + +## Scope + +This design adds: + +- a background receive loop owned by `SuiteLinkClient` +- automatic reconnect with bounded retry backoff +- automatic subscription replay after reconnect +- resumed update dispatch after replay + +This design does not add: + +- write queuing during reconnect +- catch-up replay of missed values +- secure SuiteLink V3/TLS support +- AlarmMgr support + +## Runtime Model + +The current client uses explicit on-demand inbound processing. The new model shifts normal operation to a managed runtime loop. + +There are two categories of state: + +- durable desired state + - configured connection options + - caller subscription intent + - callbacks associated with subscribed items +- ephemeral connection state + - current transport connection + - current session state + - current `itemName <-> tagId` mappings + +Durable state survives reconnects. Ephemeral state is rebuilt on reconnect. + +## Recommended Approach + +Implement a supervised background receive loop inside `SuiteLinkClient`. + +Behavior: + +1. `ConnectAsync` establishes the initial transport/session and starts the receive loop. +2. The receive loop reads frames continuously. +3. Update frames are decoded and dispatched to user callbacks. +4. EOF, transport exceptions, malformed frames, or replay failures trigger recovery. +5. Recovery reconnects with bounded retry delays. +6. After reconnect succeeds, the client replays all current subscriptions and resumes dispatching. + +This keeps the public API simple and avoids forcing callers to manually poll `ProcessIncomingAsync`. + +## State Model + +Expand session/client lifecycle to distinguish pending vs ready vs reconnecting: + +- `Disconnected` +- `Connecting` +- `ConnectSent` +- `Ready` +- `Reconnecting` +- `Faulted` +- `Disposed` + +Definitions: + +- `Connecting`: transport connect + handshake in progress +- `ConnectSent`: startup connect has been sent but the runtime is not yet considered ready +- `Ready`: background receive loop active and subscriptions can be served normally +- `Reconnecting`: recovery loop active after a connection failure + +`IsConnected` should reflect `Ready` only. + +## Recovery Policy + +Failure triggers: + +- transport read returns `0` +- transport exception while sending or receiving +- malformed or unexpected frame during active runtime +- reconnect replay failure + +Recovery behavior: + +- stop the current receive loop +- mark the ephemeral session as disconnected/faulted +- start reconnect attempts until success or explicit shutdown + +Retry schedule: + +- first retry immediately +- then bounded retry delays such as: + - 1 second + - 2 seconds + - 5 seconds + - 10 seconds +- cap the delay instead of growing without bound + +Writes during `Reconnecting` are rejected with a clear exception. + +## Subscription Replay + +The client should maintain a durable subscription registry keyed by `itemName`. + +Each entry stores: + +- `itemName` +- callback +- requested tag id + +During reconnect: + +1. reconnect transport +2. send handshake +3. send connect +4. replay every subscribed item via `ADVISE` +5. rebuild live session mappings from fresh ACKs +6. transition to `Ready` + +Subscription replay is serialized and must not run concurrently with normal writes or new replay attempts. + +## Callback Rules + +Callbacks must never run under client locks or gates. + +Rules: + +- decode frames under internal synchronization +- dispatch callbacks only after releasing gates +- callback exceptions remain contained and do not crash the receive loop + +## Public API Effects + +Expected public behavior: + +- `ConnectAsync` + - establishes initial runtime and starts background receive +- `SubscribeAsync` + - records durable intent + - advises immediately when ready + - keeps durable subscription for replay after reconnect +- `ReadAsync` + - can remain implemented as a temporary subscription + - should still use the background runtime instead of manual caller polling +- `WriteAsync` + - allowed only in `Ready` + - fails during `Reconnecting` +- `DisconnectAsync` + - stops receive and reconnect tasks + - tears down transport + +`ProcessIncomingAsync` should stop being the primary runtime API. It can be retained only as an internal/test helper if still useful. + +## Internal Changes + +### `SuiteLinkClient` + +Add: + +- receive loop task +- reconnect supervisor task or integrated recovery loop +- cancellation tokens for runtime shutdown +- durable subscription registry +- reconnect backoff helper + +Responsibilities: + +- own runtime lifecycle +- coordinate reconnect attempts +- replay subscriptions safely +- ensure only one receive loop and one reconnect flow are active + +### `SuiteLinkSession` + +Continue to manage: + +- live connection/session state +- current `itemName <-> tagId` mappings +- live dispatch helpers + +Do not make it responsible for durable reconnect intent. + +### `SubscriptionHandle` + +Should continue to remove durable subscription intent and trigger `UNADVISE` when possible. + +If called during reconnect/disconnect, removal of durable intent still succeeds even if wire unadvise cannot be sent. + +## Testing Strategy + +### Runtime Loop Tests + +Add tests proving: + +- updates received by the background loop reach callbacks +- no manual `ProcessIncomingAsync` call is needed in normal operation + +### Recovery Tests + +Add tests proving: + +- EOF triggers reconnect +- reconnect replays handshake/connect/subscriptions +- callback dispatch resumes after reconnect +- writes during reconnect fail predictably + +### Lifecycle Tests + +Add tests proving: + +- `DisconnectAsync` stops background tasks +- `DisposeAsync` stops reconnect attempts +- repeated failures do not start multiple reconnect loops + +## Recommended Next Step + +Create an implementation plan that breaks this into small tasks: + +- durable subscription registry +- background receive loop +- reconnect loop and backoff +- replay logic +- runtime tests diff --git a/docs/plans/2026-03-17-runtime-reconnect-implementation-plan.md b/docs/plans/2026-03-17-runtime-reconnect-implementation-plan.md new file mode 100644 index 0000000..a29a245 --- /dev/null +++ b/docs/plans/2026-03-17-runtime-reconnect-implementation-plan.md @@ -0,0 +1,519 @@ +# SuiteLink Runtime Reconnect Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Add a background receive loop with automatic reconnect and subscription replay so the client continues dispatching updates after transport/session failures. + +**Architecture:** The implementation extends `SuiteLinkClient` with a supervised runtime loop and reconnect flow while keeping durable subscription intent separate from ephemeral session mappings. Recovery rebuilds transport/session state, replays subscriptions, and resumes update dispatch without caller polling. + +**Tech Stack:** .NET 10, C#, xUnit, `SemaphoreSlim`, `CancellationTokenSource`, existing SuiteLink codec/session/transport layers + +--- + +### Task 1: Add Durable Subscription Registry + +**Files:** +- Create: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Internal/SubscriptionRegistrationEntry.cs` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs` +- Test: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientSubscriptionRegistryTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task SubscribeAsync_StoresDurableSubscriptionIntent() +{ + var client = TestClientFactory.CreateReadyClient(); + + await client.SubscribeAsync("Pump001.Run", _ => { }); + + Assert.True(client.HasSubscription("Pump001.Run")); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientSubscriptionRegistryTests -v minimal` +Expected: FAIL with missing durable registry behavior + +**Step 3: Write minimal implementation** + +Add a durable registry entry model storing: + +- `ItemName` +- callback +- requested tag id + +Store these entries in `SuiteLinkClient` separately from `SuiteLinkSession`. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientSubscriptionRegistryTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Internal/SubscriptionRegistrationEntry.cs /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientSubscriptionRegistryTests.cs +git commit -m "feat: add durable subscription registry" +``` + +### Task 2: Make Subscription Handles Remove Durable Intent + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SubscriptionHandle.cs` +- Test: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientSubscriptionRegistryTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task DisposingSubscription_RemovesDurableSubscriptionIntent() +{ + var client = TestClientFactory.CreateReadyClient(); + var handle = await client.SubscribeAsync("Pump001.Run", _ => { }); + + await handle.DisposeAsync(); + + Assert.False(client.HasSubscription("Pump001.Run")); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter DisposingSubscription_RemovesDurableSubscriptionIntent -v minimal` +Expected: FAIL + +**Step 3: Write minimal implementation** + +Ensure handle disposal removes durable registry entries even when wire unadvise cannot be sent. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientSubscriptionRegistryTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SubscriptionHandle.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientSubscriptionRegistryTests.cs +git commit -m "feat: persist subscription intent across reconnects" +``` + +### Task 3: Add Runtime State For Background Loop + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Internal/SuiteLinkSessionState.cs` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs` +- Test: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientConnectionTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task ConnectAsync_TransitionsToReadyOnlyAfterRuntimeStarts() +{ + var client = TestClientFactory.CreateReadyHandshakeClient(); + + await client.ConnectAsync(TestOptions.Create()); + + Assert.True(client.IsConnected); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter ConnectAsync_TransitionsToReadyOnlyAfterRuntimeStarts -v minimal` +Expected: FAIL with missing ready/runtime state + +**Step 3: Write minimal implementation** + +Add: + +- `Ready` +- `Reconnecting` + +and transition `ConnectAsync` into `Ready` when the runtime loop has been established. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientConnectionTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Internal/SuiteLinkSessionState.cs /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientConnectionTests.cs +git commit -m "feat: add ready and reconnecting runtime states" +``` + +### Task 4: Start Background Receive Loop + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs` +- Test: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientRuntimeLoopTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task ConnectAsync_StartsBackgroundLoop_AndDispatchesUpdateWithoutManualPolling() +{ + var updateReceived = new TaskCompletionSource(); + var client = TestClientFactory.CreateClientWithQueuedUpdate(updateReceived); + + await client.ConnectAsync(TestOptions.Create()); + await client.SubscribeAsync("Pump001.Run", update => updateReceived.TrySetResult(update)); + + var update = await updateReceived.Task.WaitAsync(TimeSpan.FromSeconds(1)); + Assert.True(update.Value.TryGetBoolean(out var value) && value); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientRuntimeLoopTests -v minimal` +Expected: FAIL because manual processing is still required + +**Step 3: Write minimal implementation** + +Start a long-lived receive loop task after initial connect, and dispatch updates through existing session logic. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientRuntimeLoopTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientRuntimeLoopTests.cs +git commit -m "feat: add suitelink background receive loop" +``` + +### Task 5: Make ProcessIncomingAsync Internal Or Non-Primary + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/README.md` +- Test: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientRuntimeLoopTests.cs` + +**Step 1: Write the failing documentation/runtime check** + +Define the intended runtime contract: + +- normal operation uses background receive +- manual polling is not required for normal subscriptions + +**Step 2: Run targeted tests** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientRuntimeLoopTests -v minimal` +Expected: PASS after Task 4 + +**Step 3: Write minimal implementation** + +Keep `ProcessIncomingAsync` only as an internal/test helper or document it as non-primary API. + +**Step 4: Run test and docs verification** + +Run: `rg -n "background receive|manual polling|ProcessIncomingAsync" /Users/dohertj2/Desktop/suitelinkclient/README.md` +Expected: PASS with updated wording + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs /Users/dohertj2/Desktop/suitelinkclient/README.md +git commit -m "docs: describe background runtime model" +``` + +### Task 6: Detect EOF And Trigger Reconnect + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs` +- Test: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task ReceiveLoop_Eof_TransitionsToReconnecting() +{ + var client = TestClientFactory.CreateClientThatEofsAfterConnect(); + + await client.ConnectAsync(TestOptions.Create()); + + await Eventually.AssertAsync(() => Assert.Equal(SuiteLinkSessionState.Reconnecting, client.DebugState)); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientReconnectTests -v minimal` +Expected: FAIL + +**Step 3: Write minimal implementation** + +Treat `ReceiveAsync == 0` as a disconnect trigger and start recovery. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientReconnectTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs +git commit -m "feat: detect disconnects and enter reconnect state" +``` + +### Task 7: Add Bounded Reconnect Backoff + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs` +- Test: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Reconnect_UsesBoundedRetrySchedule() +{ + var delays = new List(); + var client = TestClientFactory.CreateReconnectTestClient(delays); + + await client.ConnectAsync(TestOptions.Create()); + + Assert.Contains(TimeSpan.FromSeconds(1), delays); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter Reconnect_UsesBoundedRetrySchedule -v minimal` +Expected: FAIL + +**Step 3: Write minimal implementation** + +Add a small capped delay schedule: + +- 0s +- 1s +- 2s +- 5s +- 10s capped + +Inject delay behavior for tests if needed. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientReconnectTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs +git commit -m "feat: add bounded reconnect backoff" +``` + +### Task 8: Replay Subscriptions After Reconnect + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Internal/SuiteLinkSession.cs` +- Test: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task Reconnect_ReplaysSubscriptions_AndRestoresDispatch() +{ + var callbackCount = 0; + var client = TestClientFactory.CreateReconnectReplayClient(() => callbackCount++); + + await client.ConnectAsync(TestOptions.Create()); + await client.SubscribeAsync("Pump001.Run", _ => callbackCount++); + + await client.WaitForReconnectReadyAsync(); + Assert.True(callbackCount > 0); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter Reconnect_ReplaysSubscriptions_AndRestoresDispatch -v minimal` +Expected: FAIL + +**Step 3: Write minimal implementation** + +On successful reconnect: + +- reset live session mappings +- replay all durable subscriptions one-by-one +- rebuild tag mappings from fresh ACKs +- return to `Ready` + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientReconnectTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/Internal/SuiteLinkSession.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs +git commit -m "feat: replay subscriptions after reconnect" +``` + +### Task 9: Reject Writes During Reconnect + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs` +- Test: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientWriteTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task WriteAsync_DuringReconnect_ThrowsClearException() +{ + var client = TestClientFactory.CreateReconnectingClient(); + + await Assert.ThrowsAsync(() => + client.WriteAsync("Pump001.Run", SuiteLinkValue.FromBoolean(true))); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter WriteAsync_DuringReconnect_ThrowsClearException -v minimal` +Expected: FAIL + +**Step 3: Write minimal implementation** + +Guard `WriteAsync` so it succeeds only in `Ready`. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientWriteTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientWriteTests.cs +git commit -m "feat: reject writes while reconnecting" +``` + +### Task 10: Stop Runtime Cleanly On Disconnect + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs` +- Test: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientConnectionTests.cs` + +**Step 1: Write the failing test** + +```csharp +[Fact] +public async Task DisconnectAsync_StopsReceiveAndReconnectLoops() +{ + var client = TestClientFactory.CreateRunningClient(); + + await client.ConnectAsync(TestOptions.Create()); + await client.DisconnectAsync(); + + Assert.False(client.IsConnected); +} +``` + +**Step 2: Run test to verify it fails** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter DisconnectAsync_StopsReceiveAndReconnectLoops -v minimal` +Expected: FAIL + +**Step 3: Write minimal implementation** + +Cancel runtime loop tokens and stop reconnect attempts on disconnect/dispose. + +**Step 4: Run test to verify it passes** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter SuiteLinkClientConnectionTests -v minimal` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/src/SuiteLink.Client/SuiteLinkClient.cs /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.Tests/SuiteLinkClientConnectionTests.cs +git commit -m "feat: stop runtime loops on disconnect" +``` + +### Task 11: Update README And Integration Docs + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/README.md` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.IntegrationTests/README.md` + +**Step 1: Write the failing documentation check** + +Define required README terms: + +- background receive loop +- automatic reconnect +- subscription replay +- writes rejected during reconnect + +**Step 2: Run documentation review** + +Run: `rg -n "background receive|automatic reconnect|subscription replay|reconnecting" /Users/dohertj2/Desktop/suitelinkclient/README.md /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.IntegrationTests/README.md` +Expected: FAIL until docs are updated + +**Step 3: Write minimal implementation** + +Update docs to describe the runtime model and recovery behavior honestly. + +**Step 4: Run documentation review** + +Run: `rg -n "background receive|automatic reconnect|subscription replay|reconnecting" /Users/dohertj2/Desktop/suitelinkclient/README.md /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.IntegrationTests/README.md` +Expected: PASS + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/README.md /Users/dohertj2/Desktop/suitelinkclient/tests/SuiteLink.Client.IntegrationTests/README.md +git commit -m "docs: describe runtime reconnect behavior" +``` + +### Task 12: Full Verification Pass + +**Files:** +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/docs/plans/2026-03-17-runtime-reconnect-design.md` +- Modify: `/Users/dohertj2/Desktop/suitelinkclient/docs/plans/2026-03-17-runtime-reconnect-implementation-plan.md` + +**Step 1: Run full test suite** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx -v minimal` +Expected: PASS with integration harness still conditional by default + +**Step 2: Run release build** + +Run: `dotnet build /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx -c Release` +Expected: PASS + +**Step 3: Run reconnect-focused tests** + +Run: `dotnet test /Users/dohertj2/Desktop/suitelinkclient/SuiteLink.Client.slnx --filter Reconnect -v minimal` +Expected: PASS + +**Step 4: Update plan notes if implementation deviated** + +Add short notes to the design/plan docs if final runtime behavior differs from original assumptions. + +**Step 5: Commit** + +```bash +git add /Users/dohertj2/Desktop/suitelinkclient/docs/plans/2026-03-17-runtime-reconnect-design.md /Users/dohertj2/Desktop/suitelinkclient/docs/plans/2026-03-17-runtime-reconnect-implementation-plan.md +git commit -m "docs: finalize reconnect implementation verification" +``` diff --git a/src/SuiteLink.Client/Internal/SubscriptionRegistrationEntry.cs b/src/SuiteLink.Client/Internal/SubscriptionRegistrationEntry.cs new file mode 100644 index 0000000..204779a --- /dev/null +++ b/src/SuiteLink.Client/Internal/SubscriptionRegistrationEntry.cs @@ -0,0 +1,6 @@ +namespace SuiteLink.Client.Internal; + +internal sealed record SubscriptionRegistrationEntry( + string ItemName, + Action OnUpdate, + uint RequestedTagId); diff --git a/src/SuiteLink.Client/Internal/SuiteLinkRetryDelayCalculator.cs b/src/SuiteLink.Client/Internal/SuiteLinkRetryDelayCalculator.cs new file mode 100644 index 0000000..af2051d --- /dev/null +++ b/src/SuiteLink.Client/Internal/SuiteLinkRetryDelayCalculator.cs @@ -0,0 +1,30 @@ +namespace SuiteLink.Client.Internal; + +internal static class SuiteLinkRetryDelayCalculator +{ + public static TimeSpan GetDelay(SuiteLinkRetryPolicy policy, int attempt, Func? nextDouble = null) + { + ArgumentNullException.ThrowIfNull(policy); + + if (attempt < 0) + { + throw new ArgumentOutOfRangeException(nameof(attempt)); + } + + if (attempt == 0) + { + return TimeSpan.Zero; + } + + var rawSeconds = policy.InitialDelay.TotalSeconds * Math.Pow(policy.Multiplier, attempt - 1); + var boundedSeconds = Math.Min(rawSeconds, policy.MaxDelay.TotalSeconds); + if (!policy.UseJitter) + { + return TimeSpan.FromSeconds(boundedSeconds); + } + + var jitter = (nextDouble ?? Random.Shared.NextDouble).Invoke(); + jitter = Math.Clamp(jitter, 0d, 1d); + return TimeSpan.FromSeconds(boundedSeconds * jitter); + } +} diff --git a/src/SuiteLink.Client/Internal/SuiteLinkSession.cs b/src/SuiteLink.Client/Internal/SuiteLinkSession.cs index 53c1b63..a25f606 100644 --- a/src/SuiteLink.Client/Internal/SuiteLinkSession.cs +++ b/src/SuiteLink.Client/Internal/SuiteLinkSession.cs @@ -148,9 +148,18 @@ public sealed class SuiteLinkSession } } + public void ClearSubscriptions() + { + lock (_syncRoot) + { + _subscriptionsByItemName.Clear(); + _subscriptionsByTagId.Clear(); + } + } + public bool TryDispatchUpdate(DecodedUpdate decodedUpdate, DateTimeOffset receivedAtUtc, out SuiteLinkTagUpdate? dispatchedUpdate) { - return TryDispatchUpdate(decodedUpdate, receivedAtUtc, out dispatchedUpdate, out _); + return TryDispatchUpdate(decodedUpdate, receivedAtUtc, SuiteLinkUpdateSource.Live, out dispatchedUpdate, out _); } public bool TryDispatchUpdate( @@ -158,6 +167,21 @@ public sealed class SuiteLinkSession DateTimeOffset receivedAtUtc, out SuiteLinkTagUpdate? dispatchedUpdate, out Exception? callbackException) + { + return TryDispatchUpdate( + decodedUpdate, + receivedAtUtc, + SuiteLinkUpdateSource.Live, + out dispatchedUpdate, + out callbackException); + } + + public bool TryDispatchUpdate( + DecodedUpdate decodedUpdate, + DateTimeOffset receivedAtUtc, + SuiteLinkUpdateSource source, + out SuiteLinkTagUpdate? dispatchedUpdate, + out Exception? callbackException) { Action? callback; string itemName; @@ -181,7 +205,8 @@ public sealed class SuiteLinkSession decodedUpdate.Value, decodedUpdate.Quality, decodedUpdate.ElapsedMilliseconds, - receivedAtUtc); + receivedAtUtc, + source); try { @@ -215,11 +240,17 @@ public sealed class SuiteLinkSession return (currentState, nextState) switch { (SuiteLinkSessionState.Disconnected, SuiteLinkSessionState.TcpConnected) => true, + (SuiteLinkSessionState.Reconnecting, SuiteLinkSessionState.TcpConnected) => true, (SuiteLinkSessionState.TcpConnected, SuiteLinkSessionState.HandshakeComplete) => true, (SuiteLinkSessionState.HandshakeComplete, SuiteLinkSessionState.ConnectSent) => true, - (SuiteLinkSessionState.ConnectSent, SuiteLinkSessionState.SessionConnected) => true, - (SuiteLinkSessionState.SessionConnected, SuiteLinkSessionState.Subscribed) => true, - (SuiteLinkSessionState.Subscribed, SuiteLinkSessionState.SessionConnected) => true, + (SuiteLinkSessionState.ConnectSent, SuiteLinkSessionState.Ready) => true, + (SuiteLinkSessionState.Ready, SuiteLinkSessionState.Subscribed) => true, + (SuiteLinkSessionState.Subscribed, SuiteLinkSessionState.Ready) => true, + (SuiteLinkSessionState.TcpConnected, SuiteLinkSessionState.Reconnecting) => true, + (SuiteLinkSessionState.HandshakeComplete, SuiteLinkSessionState.Reconnecting) => true, + (SuiteLinkSessionState.ConnectSent, SuiteLinkSessionState.Reconnecting) => true, + (SuiteLinkSessionState.Ready, SuiteLinkSessionState.Reconnecting) => true, + (SuiteLinkSessionState.Subscribed, SuiteLinkSessionState.Reconnecting) => true, (_, SuiteLinkSessionState.Disconnected) => true, (_, SuiteLinkSessionState.Faulted) => true, _ => false diff --git a/src/SuiteLink.Client/Internal/SuiteLinkSessionState.cs b/src/SuiteLink.Client/Internal/SuiteLinkSessionState.cs index 736d318..94ca6ad 100644 --- a/src/SuiteLink.Client/Internal/SuiteLinkSessionState.cs +++ b/src/SuiteLink.Client/Internal/SuiteLinkSessionState.cs @@ -6,7 +6,8 @@ public enum SuiteLinkSessionState TcpConnected = 1, HandshakeComplete = 2, ConnectSent = 3, - SessionConnected = 4, + Ready = 4, Subscribed = 5, - Faulted = 6 + Reconnecting = 6, + Faulted = 7 } diff --git a/src/SuiteLink.Client/Properties/AssemblyInfo.cs b/src/SuiteLink.Client/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..d75bfdb --- /dev/null +++ b/src/SuiteLink.Client/Properties/AssemblyInfo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("SuiteLink.Client.Tests")] diff --git a/src/SuiteLink.Client/SubscriptionHandle.cs b/src/SuiteLink.Client/SubscriptionHandle.cs index 78fff56..08383f4 100644 --- a/src/SuiteLink.Client/SubscriptionHandle.cs +++ b/src/SuiteLink.Client/SubscriptionHandle.cs @@ -19,7 +19,7 @@ public sealed class SubscriptionHandle : IAsyncDisposable public uint TagId { get; } - public bool IsDisposed => _disposeState == 1; + public bool IsDisposed => Volatile.Read(ref _disposeState) == 1; public async ValueTask DisposeAsync() { diff --git a/src/SuiteLink.Client/SuiteLinkCatchUpPolicy.cs b/src/SuiteLink.Client/SuiteLinkCatchUpPolicy.cs new file mode 100644 index 0000000..abd3d17 --- /dev/null +++ b/src/SuiteLink.Client/SuiteLinkCatchUpPolicy.cs @@ -0,0 +1,7 @@ +namespace SuiteLink.Client; + +public enum SuiteLinkCatchUpPolicy +{ + None = 0, + RefreshLatestValue = 1 +} diff --git a/src/SuiteLink.Client/SuiteLinkClient.cs b/src/SuiteLink.Client/SuiteLinkClient.cs index 31eb416..377f27d 100644 --- a/src/SuiteLink.Client/SuiteLinkClient.cs +++ b/src/SuiteLink.Client/SuiteLinkClient.cs @@ -1,3 +1,4 @@ +using System.Net.Sockets; using SuiteLink.Client.Internal; using SuiteLink.Client.Protocol; using SuiteLink.Client.Transport; @@ -8,12 +9,25 @@ public sealed class SuiteLinkClient : IAsyncDisposable { private readonly ISuiteLinkTransport _transport; private readonly bool _ownsTransport; + private readonly Func _delayAsync; + private readonly Func>? _reconnectAttemptAsyncOverride; private readonly SemaphoreSlim _connectGate = new(1, 1); private readonly SemaphoreSlim _operationGate = new(1, 1); private readonly SuiteLinkSession _session = new(); + private readonly object _durableSubscriptionsSync = new(); + private readonly Dictionary _durableSubscriptions = + new(StringComparer.Ordinal); + private static readonly TimeSpan RuntimeLoopIdleDelay = TimeSpan.FromMilliseconds(25); + private static readonly TimeSpan RuntimeLoopPollInterval = TimeSpan.FromMilliseconds(50); private byte[] _receiveBuffer = new byte[1024]; private int _receiveCount; private int _nextSubscriptionTagId; + private CancellationTokenSource? _runtimeReceiveLoopCts; + private Task? _runtimeReceiveLoopTask; + private CancellationTokenSource? _reconnectLoopCts; + private Task? _reconnectLoopTask; + private SuiteLinkConnectionOptions? _connectionOptions; + private bool _runtimeLoopEstablished; private bool _disposed; public SuiteLinkClient() @@ -22,14 +36,53 @@ public sealed class SuiteLinkClient : IAsyncDisposable } public SuiteLinkClient(ISuiteLinkTransport transport, bool ownsTransport = false) + : this(transport, ownsTransport, delayAsync: null, reconnectAttemptAsync: null) + { + } + + internal SuiteLinkClient( + ISuiteLinkTransport transport, + bool ownsTransport, + Func? delayAsync, + Func>? reconnectAttemptAsync = null) { _transport = transport ?? throw new ArgumentNullException(nameof(transport)); _ownsTransport = ownsTransport; + _delayAsync = delayAsync ?? DelayAsync; + _reconnectAttemptAsyncOverride = reconnectAttemptAsync; } public bool IsConnected => !_disposed && - _session.State is SuiteLinkSessionState.SessionConnected or SuiteLinkSessionState.Subscribed; + _session.State is SuiteLinkSessionState.Ready or SuiteLinkSessionState.Subscribed; + + internal SuiteLinkSessionState DebugState => _session.State; + + internal bool DebugHasDurableSubscription(string itemName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(itemName); + + lock (_durableSubscriptionsSync) + { + return _durableSubscriptions.ContainsKey(itemName); + } + } + + internal async Task DebugHoldOperationGateAsync(Task releaseSignal, TaskCompletionSource? acquiredSignal = null) + { + ArgumentNullException.ThrowIfNull(releaseSignal); + + await _operationGate.WaitAsync().ConfigureAwait(false); + try + { + acquiredSignal?.TrySetResult(true); + await releaseSignal.ConfigureAwait(false); + } + finally + { + _operationGate.Release(); + } + } public async Task ConnectAsync(SuiteLinkConnectionOptions options, CancellationToken cancellationToken = default) { @@ -51,23 +104,11 @@ public sealed class SuiteLinkClient : IAsyncDisposable throw new InvalidOperationException("Client is faulted and cannot be reused."); } + _connectionOptions = options; await _transport.ConnectAsync(options.Host, options.Port, cancellationToken).ConfigureAwait(false); _session.SetState(SuiteLinkSessionState.TcpConnected); - - var handshakeBytes = SuiteLinkHandshakeCodec.EncodeNormalQueryHandshake( - options.Application, - options.ClientNode, - options.UserName); - await _transport.SendAsync(handshakeBytes, cancellationToken).ConfigureAwait(false); - - var handshakeAckBytes = await ReceiveSingleFrameAsync(cancellationToken).ConfigureAwait(false); - _ = SuiteLinkHandshakeCodec.ParseNormalHandshakeAck(handshakeAckBytes); - _session.SetState(SuiteLinkSessionState.HandshakeComplete); - - var connectBytes = SuiteLinkConnectCodec.Encode(options); - await _transport.SendAsync(connectBytes, cancellationToken).ConfigureAwait(false); - // At this stage we've only submitted CONNECT. Do not report ready yet. - _session.SetState(SuiteLinkSessionState.ConnectSent); + await SendStartupSequenceAsync(options, cancellationToken).ConfigureAwait(false); + await StartBackgroundReceiveLoopAsync(cancellationToken).ConfigureAwait(false); } catch { @@ -234,10 +275,21 @@ public sealed class SuiteLinkClient : IAsyncDisposable ArgumentException.ThrowIfNullOrWhiteSpace(itemName); ThrowIfDisposed(); + if (_session.State == SuiteLinkSessionState.Reconnecting) + { + throw new InvalidOperationException("Client is reconnecting and cannot accept writes until the session is ready."); + } + await _operationGate.WaitAsync(cancellationToken).ConfigureAwait(false); try { ThrowIfDisposed(); + + if (_session.State == SuiteLinkSessionState.Reconnecting) + { + throw new InvalidOperationException("Client is reconnecting and cannot accept writes until the session is ready."); + } + EnsureTagOperationsAllowed(); if (!_session.TryGetTagId(itemName, out var tagId)) @@ -260,6 +312,345 @@ public sealed class SuiteLinkClient : IAsyncDisposable await DisposeCoreAsync(CancellationToken.None).ConfigureAwait(false); } + private async Task StartBackgroundReceiveLoopAsync(CancellationToken cancellationToken) + { + if (_runtimeReceiveLoopTask is not null && !_runtimeReceiveLoopTask.IsCompleted) + { + return; + } + + var runtimeLoopCts = new CancellationTokenSource(); + var startedSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + _runtimeLoopEstablished = false; + _runtimeReceiveLoopCts = runtimeLoopCts; + _runtimeReceiveLoopTask = RunBackgroundReceiveLoopAsync(startedSignal, runtimeLoopCts); + + try + { + await startedSignal.Task.WaitAsync(cancellationToken).ConfigureAwait(false); + } + catch + { + await StopBackgroundReceiveLoopAsync().ConfigureAwait(false); + throw; + } + } + + private async Task StopBackgroundReceiveLoopAsync() + { + var runtimeLoopCts = _runtimeReceiveLoopCts; + var runtimeLoopTask = _runtimeReceiveLoopTask; + _runtimeReceiveLoopCts = null; + _runtimeReceiveLoopTask = null; + + if (runtimeLoopCts is null && runtimeLoopTask is null) + { + return; + } + + runtimeLoopCts?.Cancel(); + + try + { + if (runtimeLoopTask is not null) + { + await runtimeLoopTask.ConfigureAwait(false); + } + } + catch + { + // Runtime-loop failures are handled in-loop via session faulting. + } + } + + private void StartReconnectLoopIfNeeded() + { + if (_disposed) + { + return; + } + + if (_reconnectLoopTask is not null && !_reconnectLoopTask.IsCompleted) + { + return; + } + + var reconnectLoopCts = new CancellationTokenSource(); + _reconnectLoopCts = reconnectLoopCts; + _reconnectLoopTask = RunReconnectLoopAsync(reconnectLoopCts); + } + + private async Task StopReconnectLoopAsync() + { + var reconnectLoopCts = _reconnectLoopCts; + var reconnectLoopTask = _reconnectLoopTask; + _reconnectLoopCts = null; + _reconnectLoopTask = null; + + if (reconnectLoopCts is null && reconnectLoopTask is null) + { + return; + } + + reconnectLoopCts?.Cancel(); + + try + { + if (reconnectLoopTask is not null) + { + await reconnectLoopTask.ConfigureAwait(false); + } + } + catch + { + // Reconnect-loop failures are handled in-loop via session state. + } + } + + private void EstablishRuntimeLoopStateIfNeeded() + { + if (_runtimeLoopEstablished) + { + return; + } + + if (!_session.TryTransitionState(SuiteLinkSessionState.ConnectSent, SuiteLinkSessionState.Ready)) + { + return; + } + + _runtimeLoopEstablished = true; + if (_session.SubscriptionCount > 0 && _session.State == SuiteLinkSessionState.Ready) + { + _session.SetState(SuiteLinkSessionState.Subscribed); + } + } + + private async Task RunBackgroundReceiveLoopAsync( + TaskCompletionSource startedSignal, + CancellationTokenSource runtimeLoopCts) + { + var cancellationToken = runtimeLoopCts.Token; + try + { + EstablishRuntimeLoopStateIfNeeded(); + startedSignal.TrySetResult(true); + + while (!cancellationToken.IsCancellationRequested) + { + if (_disposed) + { + return; + } + + if (_session.SubscriptionCount == 0) + { + // Task 6 policy: in zero-subscription Ready mode, no background liveness probe is issued. + // Disconnect is detected on the next receive-bound operation (subscribe/read/manual process). + try + { + await Task.Delay(RuntimeLoopIdleDelay, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + return; + } + + continue; + } + + IReadOnlyList decodedUpdates = []; + var dispatchUpdates = false; + var operationGateHeld = false; + try + { + operationGateHeld = await _operationGate + .WaitAsync(RuntimeLoopPollInterval, cancellationToken) + .ConfigureAwait(false); + if (!operationGateHeld) + { + continue; + } + + using var pollCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + pollCts.CancelAfter(RuntimeLoopPollInterval); + + decodedUpdates = await ProcessSingleIncomingFrameAsync(pollCts.Token).ConfigureAwait(false); + EstablishRuntimeLoopStateIfNeeded(); + dispatchUpdates = decodedUpdates.Count > 0; + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + return; + } + catch (OperationCanceledException) + { + // Poll timeout: keep the runtime receive loop alive. + } + catch (Exception ex) when (ex is IOException or SocketException) + { + if (!_disposed) + { + TransitionToReconnecting(); + } + + return; + } + catch + { + if (!_disposed) + { + try + { + _session.SetState(SuiteLinkSessionState.Faulted); + } + catch + { + // Preserve original runtime loop failure behavior. + } + } + + return; + } + finally + { + if (operationGateHeld) + { + _operationGate.Release(); + } + } + + if (dispatchUpdates) + { + DispatchDecodedUpdates(decodedUpdates); + } + } + } + finally + { + if (ReferenceEquals(_runtimeReceiveLoopCts, runtimeLoopCts)) + { + _runtimeReceiveLoopCts = null; + _runtimeReceiveLoopTask = null; + } + + runtimeLoopCts.Dispose(); + } + } + + private async Task RunReconnectLoopAsync(CancellationTokenSource reconnectLoopCts) + { + var cancellationToken = reconnectLoopCts.Token; + try + { + var attempt = 0; + while (!cancellationToken.IsCancellationRequested) + { + if (_disposed) + { + return; + } + + var reconnectDelay = GetReconnectDelay(attempt); + try + { + await _delayAsync(reconnectDelay, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + return; + } + + if (_disposed || cancellationToken.IsCancellationRequested) + { + return; + } + + bool reconnectSucceeded; + try + { + reconnectSucceeded = await AttemptReconnectAsync(cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + return; + } + catch + { + reconnectSucceeded = false; + } + + if (reconnectSucceeded) + { + return; + } + + checked + { + attempt++; + } + } + } + finally + { + if (ReferenceEquals(_reconnectLoopCts, reconnectLoopCts)) + { + _reconnectLoopCts = null; + _reconnectLoopTask = null; + } + + reconnectLoopCts.Dispose(); + } + } + + private void TransitionToReconnecting() + { + if (_session.TryTransitionState(SuiteLinkSessionState.Subscribed, SuiteLinkSessionState.Reconnecting)) + { + _runtimeLoopEstablished = false; + StartReconnectLoopIfNeeded(); + return; + } + + if (_session.TryTransitionState(SuiteLinkSessionState.Ready, SuiteLinkSessionState.Reconnecting)) + { + _runtimeLoopEstablished = false; + StartReconnectLoopIfNeeded(); + } + } + + private TimeSpan GetReconnectDelay(int attempt) + { + if (attempt < 0) + { + throw new ArgumentOutOfRangeException(nameof(attempt)); + } + + var retryPolicy = _connectionOptions?.Runtime.RetryPolicy ?? SuiteLinkRetryPolicy.Default; + return SuiteLinkRetryDelayCalculator.GetDelay(retryPolicy, attempt); + } + + private static Task DelayAsync(TimeSpan delay, CancellationToken cancellationToken) + { + if (delay <= TimeSpan.Zero) + { + return Task.CompletedTask; + } + + return Task.Delay(delay, cancellationToken); + } + + private ValueTask AttemptReconnectAsync(CancellationToken cancellationToken) + { + if (_reconnectAttemptAsyncOverride is not null) + { + return _reconnectAttemptAsyncOverride(cancellationToken); + } + + return AttemptReconnectCoreAsync(cancellationToken); + } + private async ValueTask ReceiveSingleFrameAsync(CancellationToken cancellationToken) { while (true) @@ -324,18 +715,23 @@ public sealed class SuiteLinkClient : IAsyncDisposable await _connectGate.WaitAsync(cancellationToken).ConfigureAwait(false); connectGateHeld = true; - await _operationGate.WaitAsync(cancellationToken).ConfigureAwait(false); - operationGateHeld = true; - if (_disposed) { return; } _disposed = true; + await StopBackgroundReceiveLoopAsync().ConfigureAwait(false); + await StopReconnectLoopAsync().ConfigureAwait(false); + + await _operationGate.WaitAsync(cancellationToken).ConfigureAwait(false); + operationGateHeld = true; + + _runtimeLoopEstablished = false; _session.SetState(SuiteLinkSessionState.Disconnected); _receiveCount = 0; _receiveBuffer = new byte[1024]; + _connectionOptions = null; if (_ownsTransport) { @@ -366,50 +762,33 @@ public sealed class SuiteLinkClient : IAsyncDisposable ThrowIfDisposed(); EnsureTagOperationsAllowed(); + lock (_durableSubscriptionsSync) + { + if (_durableSubscriptions.ContainsKey(itemName)) + { + throw new InvalidOperationException( + $"Tag '{itemName}' is already subscribed. Dispose the existing subscription before subscribing again."); + } + } var requestedTagId = unchecked((uint)Interlocked.Increment(ref _nextSubscriptionTagId)); - var adviseBytes = SuiteLinkSubscriptionCodec.EncodeAdvise(requestedTagId, itemName); - await _transport.SendAsync(adviseBytes, cancellationToken).ConfigureAwait(false); - - var adviseAckResult = await ReceiveAndCollectUpdatesUntilAsync( - messageType => messageType == SuiteLinkSubscriptionCodec.AdviseAckMessageType, + var deferredUpdates = await AdviseAndRegisterSubscriptionAsync( + itemName, + onUpdate, + requestedTagId, + storeDurableSubscription: true, cancellationToken).ConfigureAwait(false); - var adviseAckBytes = adviseAckResult.FrameBytes; - - var ackItems = SuiteLinkSubscriptionCodec.DecodeAdviseAckMany(adviseAckBytes); - if (ackItems.Count != 1) - { - throw new FormatException( - $"Expected exactly one advise ACK item for a single subscribe request, but decoded {ackItems.Count}."); - } - - var acknowledgedTagId = ackItems[0].TagId; - if (acknowledgedTagId != requestedTagId) - { - throw new FormatException( - $"Advise ACK tag id 0x{acknowledgedTagId:x8} did not match requested tag id 0x{requestedTagId:x8}."); - } - - _session.RegisterSubscription(itemName, acknowledgedTagId, onUpdate); - if (_session.State == SuiteLinkSessionState.ConnectSent) - { - _session.SetState(SuiteLinkSessionState.SessionConnected); - } - - if (_session.State == SuiteLinkSessionState.SessionConnected) - { - _session.SetState(SuiteLinkSessionState.Subscribed); - } var handle = new SubscriptionHandle( itemName, - acknowledgedTagId, - () => UnsubscribeAsync(acknowledgedTagId, CancellationToken.None)); - return new SubscriptionRegistration(handle, adviseAckResult.DeferredUpdates); + requestedTagId, + () => UnsubscribeAsync(itemName, requestedTagId, CancellationToken.None)); + return new SubscriptionRegistration(handle, deferredUpdates); } - private async ValueTask UnsubscribeAsync(uint tagId, CancellationToken cancellationToken) + private async ValueTask UnsubscribeAsync(string itemName, uint tagId, CancellationToken cancellationToken) { + RemoveDurableSubscription(itemName); if (_disposed) { return; @@ -438,10 +817,209 @@ public sealed class SuiteLinkClient : IAsyncDisposable if (_session.State == SuiteLinkSessionState.Subscribed && _session.SubscriptionCount == 0) { - _session.SetState(SuiteLinkSessionState.SessionConnected); + _session.SetState(SuiteLinkSessionState.Ready); } } + private void RemoveDurableSubscription(string itemName) + { + lock (_durableSubscriptionsSync) + { + _durableSubscriptions.Remove(itemName); + } + } + + private async Task SendStartupSequenceAsync( + SuiteLinkConnectionOptions options, + CancellationToken cancellationToken) + { + var handshakeBytes = SuiteLinkHandshakeCodec.EncodeNormalQueryHandshake( + options.Application, + options.ClientNode, + options.UserName); + await _transport.SendAsync(handshakeBytes, cancellationToken).ConfigureAwait(false); + + var handshakeAckBytes = await ReceiveSingleFrameAsync(cancellationToken).ConfigureAwait(false); + _ = SuiteLinkHandshakeCodec.ParseNormalHandshakeAck(handshakeAckBytes); + _session.SetState(SuiteLinkSessionState.HandshakeComplete); + + var connectBytes = SuiteLinkConnectCodec.Encode(options); + await _transport.SendAsync(connectBytes, cancellationToken).ConfigureAwait(false); + _session.SetState(SuiteLinkSessionState.ConnectSent); + } + + private async ValueTask AttemptReconnectCoreAsync(CancellationToken cancellationToken) + { + var options = _connectionOptions; + if (options is null || _disposed) + { + return false; + } + + var connectGateHeld = false; + var operationGateHeld = false; + List? deferredUpdates = null; + List? catchUpUpdates = null; + + try + { + await _connectGate.WaitAsync(cancellationToken).ConfigureAwait(false); + connectGateHeld = true; + + if (_disposed || _session.State != SuiteLinkSessionState.Reconnecting) + { + return false; + } + + await _operationGate.WaitAsync(cancellationToken).ConfigureAwait(false); + operationGateHeld = true; + + _runtimeLoopEstablished = false; + _receiveCount = 0; + _receiveBuffer = new byte[1024]; + _session.ClearSubscriptions(); + await ResetTransportForReconnectAsync(cancellationToken).ConfigureAwait(false); + + await _transport.ConnectAsync(options.Host, options.Port, cancellationToken).ConfigureAwait(false); + _session.SetState(SuiteLinkSessionState.TcpConnected); + await SendStartupSequenceAsync(options, cancellationToken).ConfigureAwait(false); + + deferredUpdates = []; + var durableSubscriptions = SnapshotDurableSubscriptions(); + foreach (var registration in durableSubscriptions) + { + var replayedUpdates = await AdviseAndRegisterSubscriptionAsync( + registration.ItemName, + registration.OnUpdate, + registration.RequestedTagId, + storeDurableSubscription: false, + cancellationToken).ConfigureAwait(false); + deferredUpdates.AddRange(replayedUpdates); + } + + if (options.Runtime.CatchUpPolicy == SuiteLinkCatchUpPolicy.RefreshLatestValue) + { + catchUpUpdates = []; + foreach (var registration in durableSubscriptions) + { + var catchUpUpdate = await TryRefreshLatestValueAsync( + registration, + options.Runtime.CatchUpTimeout, + deferredUpdates, + cancellationToken).ConfigureAwait(false); + if (catchUpUpdate is { } pendingDispatch) + { + catchUpUpdates.Add(pendingDispatch); + } + } + } + + await StartBackgroundReceiveLoopAsync(cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch + { + if (!_disposed && _session.State != SuiteLinkSessionState.Reconnecting) + { + _session.SetState(SuiteLinkSessionState.Reconnecting); + } + + return false; + } + finally + { + if (operationGateHeld) + { + _operationGate.Release(); + } + + if (connectGateHeld) + { + _connectGate.Release(); + } + } + + if (deferredUpdates is not null) + { + DispatchDecodedUpdates(deferredUpdates); + } + + if (catchUpUpdates is not null) + { + DispatchSyntheticUpdates(catchUpUpdates); + } + + return true; + } + + private async ValueTask ResetTransportForReconnectAsync(CancellationToken cancellationToken) + { + if (_transport is not ISuiteLinkReconnectableTransport reconnectableTransport) + { + return; + } + + await reconnectableTransport.ResetConnectionAsync(cancellationToken).ConfigureAwait(false); + } + + private SubscriptionRegistrationEntry[] SnapshotDurableSubscriptions() + { + lock (_durableSubscriptionsSync) + { + return [.. _durableSubscriptions.Values.OrderBy(entry => entry.RequestedTagId)]; + } + } + + private async Task> AdviseAndRegisterSubscriptionAsync( + string itemName, + Action onUpdate, + uint requestedTagId, + bool storeDurableSubscription, + CancellationToken cancellationToken) + { + var adviseBytes = SuiteLinkSubscriptionCodec.EncodeAdvise(requestedTagId, itemName); + await _transport.SendAsync(adviseBytes, cancellationToken).ConfigureAwait(false); + + var adviseAckResult = await ReceiveAndCollectUpdatesUntilAsync( + messageType => messageType == SuiteLinkSubscriptionCodec.AdviseAckMessageType, + cancellationToken).ConfigureAwait(false); + var adviseAckBytes = adviseAckResult.FrameBytes; + + var ackItems = SuiteLinkSubscriptionCodec.DecodeAdviseAckMany(adviseAckBytes); + if (ackItems.Count != 1) + { + throw new FormatException( + $"Expected exactly one advise ACK item for a single subscribe request, but decoded {ackItems.Count}."); + } + + var acknowledgedTagId = ackItems[0].TagId; + if (acknowledgedTagId != requestedTagId) + { + throw new FormatException( + $"Advise ACK tag id 0x{acknowledgedTagId:x8} did not match requested tag id 0x{requestedTagId:x8}."); + } + + _session.RegisterSubscription(itemName, acknowledgedTagId, onUpdate); + if (storeDurableSubscription) + { + lock (_durableSubscriptionsSync) + { + _durableSubscriptions[itemName] = + new SubscriptionRegistrationEntry(itemName, onUpdate, requestedTagId); + } + } + + if (_session.State == SuiteLinkSessionState.Ready) + { + _session.SetState(SuiteLinkSessionState.Subscribed); + } + + return adviseAckResult.DeferredUpdates; + } + private async Task ReceiveAndCollectUpdatesUntilAsync( Func messageTypePredicate, CancellationToken cancellationToken) @@ -464,6 +1042,114 @@ public sealed class SuiteLinkClient : IAsyncDisposable } } + private async Task TryRefreshLatestValueAsync( + SubscriptionRegistrationEntry registration, + TimeSpan timeout, + List passthroughUpdates, + CancellationToken cancellationToken) + { + using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + timeoutCts.CancelAfter(timeout); + + var requestedTagId = unchecked((uint)Interlocked.Increment(ref _nextSubscriptionTagId)); + var adviseBytes = SuiteLinkSubscriptionCodec.EncodeAdvise(requestedTagId, registration.ItemName); + await _transport.SendAsync(adviseBytes, timeoutCts.Token).ConfigureAwait(false); + + DecodedUpdate? matchedUpdate = null; + + try + { + var adviseAckResult = await ReceiveAndCollectUpdatesUntilAsync( + messageType => messageType == SuiteLinkSubscriptionCodec.AdviseAckMessageType, + timeoutCts.Token).ConfigureAwait(false); + + var ackItems = SuiteLinkSubscriptionCodec.DecodeAdviseAckMany(adviseAckResult.FrameBytes); + if (ackItems.Count != 1 || ackItems[0].TagId != requestedTagId) + { + return null; + } + + matchedUpdate = ExtractMatchedUpdate(requestedTagId, adviseAckResult.DeferredUpdates, passthroughUpdates); + while (matchedUpdate is null) + { + var frameBytes = await ReceiveSingleFrameAsync(timeoutCts.Token).ConfigureAwait(false); + var frame = SuiteLinkFrameReader.ParseFrame(frameBytes); + if (frame.MessageType != SuiteLinkUpdateCodec.UpdateMessageType) + { + continue; + } + + matchedUpdate = ExtractMatchedUpdate( + requestedTagId, + SuiteLinkUpdateCodec.DecodeMany(frameBytes), + passthroughUpdates); + } + } + catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested && !cancellationToken.IsCancellationRequested) + { + return null; + } + finally + { + try + { + var unadviseBytes = SuiteLinkSubscriptionCodec.EncodeUnadvise(requestedTagId); + await _transport.SendAsync(unadviseBytes, cancellationToken).ConfigureAwait(false); + } + catch + { + // Best-effort cleanup for temporary catch-up subscriptions. + } + } + + if (matchedUpdate is null) + { + return null; + } + + return new PendingSyntheticDispatch( + registration.OnUpdate, + new SuiteLinkTagUpdate( + registration.ItemName, + ResolveLiveTagId(registration.ItemName), + matchedUpdate.Value.Value, + matchedUpdate.Value.Quality, + matchedUpdate.Value.ElapsedMilliseconds, + DateTimeOffset.UtcNow, + SuiteLinkUpdateSource.CatchUpReplay)); + } + + private uint ResolveLiveTagId(string itemName) + { + if (_session.TryGetTagId(itemName, out var liveTagId)) + { + return liveTagId; + } + + throw new InvalidOperationException( + $"Live tag mapping for '{itemName}' was not available during catch-up replay."); + } + + private static DecodedUpdate? ExtractMatchedUpdate( + uint requestedTagId, + IReadOnlyList decodedUpdates, + List passthroughUpdates) + { + DecodedUpdate? matched = null; + foreach (var decodedUpdate in decodedUpdates) + { + if (decodedUpdate.TagId == requestedTagId && matched is null) + { + matched = decodedUpdate; + continue; + } + + passthroughUpdates.Add(decodedUpdate); + } + + return matched; + } + private async Task> ProcessSingleIncomingFrameAsync(CancellationToken cancellationToken) { var frameBytes = await ReceiveSingleFrameAsync(cancellationToken).ConfigureAwait(false); @@ -490,11 +1176,32 @@ public sealed class SuiteLinkClient : IAsyncDisposable } } + private static void DispatchSyntheticUpdates(IReadOnlyList updates) + { + if (updates.Count == 0) + { + return; + } + + foreach (var update in updates) + { + try + { + update.OnUpdate(update.Update); + } + catch + { + // Synthetic catch-up callback failures should not tear down reconnect recovery. + } + } + } + private void EnsureTagOperationsAllowed() { + // Pre-runtime-loop phase: tag operations still use explicit/on-demand reads after CONNECT. if (_session.State is not SuiteLinkSessionState.ConnectSent and - not SuiteLinkSessionState.SessionConnected and + not SuiteLinkSessionState.Ready and not SuiteLinkSessionState.Subscribed) { throw new InvalidOperationException("Client is not ready for tag operations."); @@ -508,4 +1215,8 @@ public sealed class SuiteLinkClient : IAsyncDisposable private readonly record struct FrameReadResult( byte[] FrameBytes, IReadOnlyList DeferredUpdates); + + private readonly record struct PendingSyntheticDispatch( + Action OnUpdate, + SuiteLinkTagUpdate Update); } diff --git a/src/SuiteLink.Client/SuiteLinkConnectionOptions.cs b/src/SuiteLink.Client/SuiteLinkConnectionOptions.cs index 980450e..e5dd0a4 100644 --- a/src/SuiteLink.Client/SuiteLinkConnectionOptions.cs +++ b/src/SuiteLink.Client/SuiteLinkConnectionOptions.cs @@ -11,7 +11,8 @@ public sealed record class SuiteLinkConnectionOptions string userName, string serverNode, string? timezone = null, - int port = 5413) + int port = 5413, + SuiteLinkRuntimeOptions? runtime = null) { ValidateRequired(host, nameof(host)); ValidateRequired(application, nameof(application)); @@ -35,6 +36,7 @@ public sealed record class SuiteLinkConnectionOptions ServerNode = serverNode; Timezone = string.IsNullOrWhiteSpace(timezone) ? "UTC" : timezone; Port = port; + Runtime = runtime ?? SuiteLinkRuntimeOptions.Default; } public string Host { get; } @@ -46,6 +48,7 @@ public sealed record class SuiteLinkConnectionOptions public string ServerNode { get; } public string Timezone { get; } public int Port { get; } + public SuiteLinkRuntimeOptions Runtime { get; } private static void ValidateRequired(string value, string paramName) { diff --git a/src/SuiteLink.Client/SuiteLinkRetryPolicy.cs b/src/SuiteLink.Client/SuiteLinkRetryPolicy.cs new file mode 100644 index 0000000..471a909 --- /dev/null +++ b/src/SuiteLink.Client/SuiteLinkRetryPolicy.cs @@ -0,0 +1,39 @@ +namespace SuiteLink.Client; + +public sealed record class SuiteLinkRetryPolicy +{ + public SuiteLinkRetryPolicy( + TimeSpan initialDelay, + double multiplier, + TimeSpan maxDelay, + bool useJitter = true) + { + if (initialDelay < TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException(nameof(initialDelay), initialDelay, "Initial delay cannot be negative."); + } + + if (multiplier <= 0) + { + throw new ArgumentOutOfRangeException(nameof(multiplier), multiplier, "Multiplier must be greater than zero."); + } + + if (maxDelay < TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException(nameof(maxDelay), maxDelay, "Max delay cannot be negative."); + } + + InitialDelay = initialDelay; + Multiplier = multiplier; + MaxDelay = maxDelay; + UseJitter = useJitter; + } + + public TimeSpan InitialDelay { get; init; } + public double Multiplier { get; init; } + public TimeSpan MaxDelay { get; init; } + public bool UseJitter { get; init; } + + public static SuiteLinkRetryPolicy Default { get; } = + new(TimeSpan.FromSeconds(1), 2.0, TimeSpan.FromSeconds(30)); +} diff --git a/src/SuiteLink.Client/SuiteLinkRuntimeOptions.cs b/src/SuiteLink.Client/SuiteLinkRuntimeOptions.cs new file mode 100644 index 0000000..6574ff8 --- /dev/null +++ b/src/SuiteLink.Client/SuiteLinkRuntimeOptions.cs @@ -0,0 +1,28 @@ +namespace SuiteLink.Client; + +public sealed record class SuiteLinkRuntimeOptions +{ + public SuiteLinkRuntimeOptions( + SuiteLinkRetryPolicy retryPolicy, + SuiteLinkCatchUpPolicy catchUpPolicy, + TimeSpan catchUpTimeout) + { + ArgumentNullException.ThrowIfNull(retryPolicy); + + if (catchUpTimeout <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException(nameof(catchUpTimeout), catchUpTimeout, "Catch-up timeout must be positive."); + } + + RetryPolicy = retryPolicy; + CatchUpPolicy = catchUpPolicy; + CatchUpTimeout = catchUpTimeout; + } + + public SuiteLinkRetryPolicy RetryPolicy { get; init; } + public SuiteLinkCatchUpPolicy CatchUpPolicy { get; init; } + public TimeSpan CatchUpTimeout { get; init; } + + public static SuiteLinkRuntimeOptions Default { get; } = + new(SuiteLinkRetryPolicy.Default, SuiteLinkCatchUpPolicy.None, TimeSpan.FromSeconds(2)); +} diff --git a/src/SuiteLink.Client/SuiteLinkTagUpdate.cs b/src/SuiteLink.Client/SuiteLinkTagUpdate.cs index a662631..f7cde06 100644 --- a/src/SuiteLink.Client/SuiteLinkTagUpdate.cs +++ b/src/SuiteLink.Client/SuiteLinkTagUpdate.cs @@ -6,4 +6,5 @@ public sealed record class SuiteLinkTagUpdate( SuiteLinkValue Value, ushort Quality, ushort ElapsedMilliseconds, - DateTimeOffset ReceivedAtUtc); + DateTimeOffset ReceivedAtUtc, + SuiteLinkUpdateSource Source = SuiteLinkUpdateSource.Live); diff --git a/src/SuiteLink.Client/SuiteLinkUpdateSource.cs b/src/SuiteLink.Client/SuiteLinkUpdateSource.cs new file mode 100644 index 0000000..2a961a1 --- /dev/null +++ b/src/SuiteLink.Client/SuiteLinkUpdateSource.cs @@ -0,0 +1,7 @@ +namespace SuiteLink.Client; + +public enum SuiteLinkUpdateSource +{ + Live = 0, + CatchUpReplay = 1 +} diff --git a/src/SuiteLink.Client/Transport/ISuiteLinkReconnectableTransport.cs b/src/SuiteLink.Client/Transport/ISuiteLinkReconnectableTransport.cs new file mode 100644 index 0000000..9942edf --- /dev/null +++ b/src/SuiteLink.Client/Transport/ISuiteLinkReconnectableTransport.cs @@ -0,0 +1,6 @@ +namespace SuiteLink.Client.Transport; + +public interface ISuiteLinkReconnectableTransport +{ + ValueTask ResetConnectionAsync(CancellationToken cancellationToken = default); +} diff --git a/src/SuiteLink.Client/Transport/SuiteLinkTcpTransport.cs b/src/SuiteLink.Client/Transport/SuiteLinkTcpTransport.cs index 0b18245..4d5bf1a 100644 --- a/src/SuiteLink.Client/Transport/SuiteLinkTcpTransport.cs +++ b/src/SuiteLink.Client/Transport/SuiteLinkTcpTransport.cs @@ -2,7 +2,7 @@ using System.Net.Sockets; namespace SuiteLink.Client.Transport; -public sealed class SuiteLinkTcpTransport : ISuiteLinkTransport +public sealed class SuiteLinkTcpTransport : ISuiteLinkTransport, ISuiteLinkReconnectableTransport { private readonly bool _leaveOpen; private readonly object _syncRoot = new(); @@ -132,8 +132,8 @@ public sealed class SuiteLinkTcpTransport : ISuiteLinkTransport public async ValueTask DisposeAsync() { - Stream? streamToDispose = null; - TcpClient? tcpClientToDispose = null; + Stream? streamToDispose; + TcpClient? tcpClientToDispose; lock (_syncRoot) { @@ -143,30 +143,25 @@ public sealed class SuiteLinkTcpTransport : ISuiteLinkTransport } _disposed = true; - - if (!_leaveOpen) - { - streamToDispose = _stream; - tcpClientToDispose = _tcpClient; - } - - _stream = null; - _tcpClient = null; } - if (tcpClientToDispose is not null) + (streamToDispose, tcpClientToDispose) = DetachConnection(disposeResources: !_leaveOpen); + await DisposeDetachedConnectionAsync(streamToDispose, tcpClientToDispose).ConfigureAwait(false); + } + + public async ValueTask ResetConnectionAsync(CancellationToken cancellationToken = default) + { + ObjectDisposedException.ThrowIf(_disposed, this); + + Stream? streamToDispose; + TcpClient? tcpClientToDispose; + lock (_syncRoot) { - tcpClientToDispose.Dispose(); - return; + ObjectDisposedException.ThrowIf(_disposed, this); } - if (streamToDispose is IAsyncDisposable asyncDisposable) - { - await asyncDisposable.DisposeAsync().ConfigureAwait(false); - return; - } - - streamToDispose?.Dispose(); + (streamToDispose, tcpClientToDispose) = DetachConnection(disposeResources: !_leaveOpen); + await DisposeDetachedConnectionAsync(streamToDispose, tcpClientToDispose).ConfigureAwait(false); } private Stream GetConnectedStream() @@ -183,4 +178,35 @@ public sealed class SuiteLinkTcpTransport : ISuiteLinkTransport return _stream; } } + + private (Stream? Stream, TcpClient? TcpClient) DetachConnection(bool disposeResources) + { + lock (_syncRoot) + { + var stream = disposeResources ? _stream : null; + var tcpClient = disposeResources ? _tcpClient : null; + + _stream = null; + _tcpClient = null; + + return (stream, tcpClient); + } + } + + private static async ValueTask DisposeDetachedConnectionAsync(Stream? streamToDispose, TcpClient? tcpClientToDispose) + { + if (tcpClientToDispose is not null) + { + tcpClientToDispose.Dispose(); + return; + } + + if (streamToDispose is IAsyncDisposable asyncDisposable) + { + await asyncDisposable.DisposeAsync().ConfigureAwait(false); + return; + } + + streamToDispose?.Dispose(); + } } diff --git a/tests/SuiteLink.Client.IntegrationTests/README.md b/tests/SuiteLink.Client.IntegrationTests/README.md index e622789..b71bfa4 100644 --- a/tests/SuiteLink.Client.IntegrationTests/README.md +++ b/tests/SuiteLink.Client.IntegrationTests/README.md @@ -34,3 +34,7 @@ Optional tag variables (tests run only for the tags provided): - If integration settings are missing, tests return immediately and do not perform network calls. - These tests are intended as a live harness, not deterministic CI tests. +- The client runtime now uses a background receive loop with automatic reconnect, durable subscription replay, and optional best-effort latest-value catch-up replay after reconnect. +- Reconnect timing is policy-based and jittered by default. +- These live tests still need validation against a real AVEVA server that allows legacy or mixed-mode SuiteLink traffic. +- Writes are intentionally rejected while the client is in `Reconnecting`. diff --git a/tests/SuiteLink.Client.Tests/Internal/SuiteLinkRetryDelayCalculatorTests.cs b/tests/SuiteLink.Client.Tests/Internal/SuiteLinkRetryDelayCalculatorTests.cs new file mode 100644 index 0000000..5250b26 --- /dev/null +++ b/tests/SuiteLink.Client.Tests/Internal/SuiteLinkRetryDelayCalculatorTests.cs @@ -0,0 +1,52 @@ +using SuiteLink.Client.Internal; + +namespace SuiteLink.Client.Tests.Internal; + +public sealed class SuiteLinkRetryDelayCalculatorTests +{ + [Fact] + public void GetDelay_UsesImmediateThenExponentialCap() + { + var policy = new SuiteLinkRetryPolicy( + initialDelay: TimeSpan.FromSeconds(1), + multiplier: 2.0, + maxDelay: TimeSpan.FromSeconds(30), + useJitter: false); + + Assert.Equal(TimeSpan.Zero, SuiteLinkRetryDelayCalculator.GetDelay(policy, 0)); + Assert.Equal(TimeSpan.FromSeconds(1), SuiteLinkRetryDelayCalculator.GetDelay(policy, 1)); + Assert.Equal(TimeSpan.FromSeconds(2), SuiteLinkRetryDelayCalculator.GetDelay(policy, 2)); + Assert.Equal(TimeSpan.FromSeconds(4), SuiteLinkRetryDelayCalculator.GetDelay(policy, 3)); + } + + [Theory] + [InlineData(-1, 2.0, 30)] + [InlineData(1, 0.0, 30)] + [InlineData(1, -1.0, 30)] + [InlineData(1, 2.0, -1)] + public void RetryPolicy_InvalidArguments_Throw( + int initialDelaySeconds, + double multiplier, + int maxDelaySeconds) + { + Assert.ThrowsAny(() => new SuiteLinkRetryPolicy( + initialDelay: TimeSpan.FromSeconds(initialDelaySeconds), + multiplier: multiplier, + maxDelay: TimeSpan.FromSeconds(maxDelaySeconds), + useJitter: false)); + } + + [Fact] + public void GetDelay_WithJitterEnabled_StaysWithinCap() + { + var policy = new SuiteLinkRetryPolicy( + initialDelay: TimeSpan.FromSeconds(2), + multiplier: 2.0, + maxDelay: TimeSpan.FromSeconds(10), + useJitter: true); + + var delay = SuiteLinkRetryDelayCalculator.GetDelay(policy, 3, () => 0.5); + + Assert.InRange(delay, TimeSpan.Zero, TimeSpan.FromSeconds(10)); + } +} diff --git a/tests/SuiteLink.Client.Tests/Internal/SuiteLinkSessionTests.cs b/tests/SuiteLink.Client.Tests/Internal/SuiteLinkSessionTests.cs index 1bc4c9a..5df1c88 100644 --- a/tests/SuiteLink.Client.Tests/Internal/SuiteLinkSessionTests.cs +++ b/tests/SuiteLink.Client.Tests/Internal/SuiteLinkSessionTests.cs @@ -169,12 +169,56 @@ public sealed class SuiteLinkSessionTests Assert.Equal("callback failure", callbackException.Message); } + [Fact] + public void TryDispatchUpdate_WithExplicitSource_UsesProvidedSource() + { + var session = new SuiteLinkSession(); + SuiteLinkTagUpdate? callbackUpdate = null; + + session.RegisterSubscription("Pump001.Run", 0x1234, update => callbackUpdate = update); + + var decoded = new DecodedUpdate( + TagId: 0x1234, + Quality: 0x00C0, + ElapsedMilliseconds: 10, + Value: SuiteLinkValue.FromBoolean(true)); + + var dispatched = session.TryDispatchUpdate( + decoded, + DateTimeOffset.UtcNow, + SuiteLinkUpdateSource.CatchUpReplay, + out var dispatchedUpdate, + out _); + + Assert.True(dispatched); + Assert.NotNull(dispatchedUpdate); + Assert.Equal(SuiteLinkUpdateSource.CatchUpReplay, dispatchedUpdate.Source); + Assert.Equal(dispatchedUpdate, callbackUpdate); + } + + [Fact] + public void ClearSubscriptions_RemovesAllMappings() + { + var session = new SuiteLinkSession(); + + session.RegisterSubscription("Pump001.Run", 0x1234, _ => { }); + session.RegisterSubscription("Pump001.Speed", 0x5678, _ => { }); + + session.ClearSubscriptions(); + + Assert.False(session.TryGetTagId("Pump001.Run", out _)); + Assert.False(session.TryGetTagId("Pump001.Speed", out _)); + Assert.False(session.TryGetItemName(0x1234, out _)); + Assert.False(session.TryGetItemName(0x5678, out _)); + Assert.Equal(0, session.SubscriptionCount); + } + [Fact] public void SetState_InvalidTransition_ThrowsInvalidOperationException() { var session = new SuiteLinkSession(); - var ex = Assert.Throws(() => session.SetState(SuiteLinkSessionState.SessionConnected)); + var ex = Assert.Throws(() => session.SetState(SuiteLinkSessionState.Ready)); Assert.Contains("Invalid state transition", ex.Message); Assert.Equal(SuiteLinkSessionState.Disconnected, session.State); @@ -191,4 +235,17 @@ public sealed class SuiteLinkSessionTests Assert.False(session.TryTransitionState(SuiteLinkSessionState.Disconnected, SuiteLinkSessionState.HandshakeComplete)); Assert.Equal(SuiteLinkSessionState.TcpConnected, session.State); } + + [Fact] + public void SetState_ReconnectAttemptStartupFailure_CanReturnToReconnecting() + { + var session = new SuiteLinkSession(); + + session.SetState(SuiteLinkSessionState.TcpConnected); + session.SetState(SuiteLinkSessionState.HandshakeComplete); + session.SetState(SuiteLinkSessionState.ConnectSent); + session.SetState(SuiteLinkSessionState.Reconnecting); + + Assert.Equal(SuiteLinkSessionState.Reconnecting, session.State); + } } diff --git a/tests/SuiteLink.Client.Tests/SuiteLinkClientConnectionTests.cs b/tests/SuiteLink.Client.Tests/SuiteLinkClientConnectionTests.cs index cc18da5..9dcd4e7 100644 --- a/tests/SuiteLink.Client.Tests/SuiteLinkClientConnectionTests.cs +++ b/tests/SuiteLink.Client.Tests/SuiteLinkClientConnectionTests.cs @@ -6,7 +6,7 @@ namespace SuiteLink.Client.Tests; public sealed class SuiteLinkClientConnectionTests { [Fact] - public async Task ConnectAsync_SendsHandshakeThenConnect_ButDoesNotReportReadyYet() + public async Task ConnectAsync_SendsHandshakeThenConnect_AndTransitionsToReadyWhenRuntimeLoopStarts() { var handshakeAckFrame = new byte[] { 0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5 }; var transport = new FakeTransport([handshakeAckFrame[..4], handshakeAckFrame[4..]]); @@ -15,7 +15,7 @@ public sealed class SuiteLinkClientConnectionTests await client.ConnectAsync(options); - Assert.False(client.IsConnected); + Assert.True(client.IsConnected); Assert.Equal(2, transport.SentBuffers.Count); Assert.Equal( SuiteLinkHandshakeCodec.EncodeNormalQueryHandshake( diff --git a/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs b/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs new file mode 100644 index 0000000..a2f5642 --- /dev/null +++ b/tests/SuiteLink.Client.Tests/SuiteLinkClientReconnectTests.cs @@ -0,0 +1,669 @@ +using System.Net.Sockets; +using SuiteLink.Client.Internal; +using SuiteLink.Client.Protocol; +using SuiteLink.Client.Transport; + +namespace SuiteLink.Client.Tests; + +public sealed class SuiteLinkClientReconnectTests +{ + [Fact] + public async Task Reconnect_UsesConfiguredRetryPolicy() + { + var observedDelays = new List(); + var capturedSchedule = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var syncRoot = new object(); + + Task CaptureDelayAsync(TimeSpan delay, CancellationToken _) + { + lock (syncRoot) + { + observedDelays.Add(delay); + if (observedDelays.Count >= 5) + { + capturedSchedule.TrySetResult(true); + } + } + + return Task.CompletedTask; + } + + var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ReturnEof) + .WithFrame(BuildHandshakeAckFrame()) + .WithFrame(BuildAdviseAckFrame(1)); + var client = new SuiteLinkClient(transport, ownsTransport: false, delayAsync: CaptureDelayAsync); + + await client.ConnectAsync(CreateOptions(runtime: new SuiteLinkRuntimeOptions( + retryPolicy: new SuiteLinkRetryPolicy( + initialDelay: TimeSpan.FromSeconds(3), + multiplier: 3.0, + maxDelay: TimeSpan.FromSeconds(20), + useJitter: false), + catchUpPolicy: SuiteLinkCatchUpPolicy.None, + catchUpTimeout: TimeSpan.FromSeconds(2)))); + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + _ = await capturedSchedule.Task.WaitAsync(TimeSpan.FromSeconds(2)); + + TimeSpan[] firstFiveObserved; + lock (syncRoot) + { + firstFiveObserved = + [ + observedDelays[0], + observedDelays[1], + observedDelays[2], + observedDelays[3], + observedDelays[4] + ]; + } + + Assert.Equal(TimeSpan.Zero, firstFiveObserved[0]); + Assert.Equal(TimeSpan.FromSeconds(3), firstFiveObserved[1]); + Assert.Equal(TimeSpan.FromSeconds(9), firstFiveObserved[2]); + Assert.Equal(TimeSpan.FromSeconds(20), firstFiveObserved[3]); + Assert.Equal(TimeSpan.FromSeconds(20), firstFiveObserved[4]); + + await client.DisposeAsync(); + } + + [Fact] + public async Task ReceiveLoop_Eof_TransitionsToReconnecting() + { + var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ReturnEof) + .WithFrame(BuildHandshakeAckFrame()) + .WithFrame(BuildAdviseAckFrame(1)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + + await AssertStateEventuallyAsync(client, SuiteLinkSessionState.Reconnecting); + await client.DisposeAsync(); + } + + [Fact] + public async Task ReceiveLoop_ReceiveIOException_TransitionsToReconnecting() + { + var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ThrowIoException) + .WithFrame(BuildHandshakeAckFrame()) + .WithFrame(BuildAdviseAckFrame(1)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + + await AssertStateEventuallyAsync(client, SuiteLinkSessionState.Reconnecting); + await client.DisposeAsync(); + } + + [Fact] + public async Task ReceiveLoop_ReceiveSocketException_TransitionsToReconnecting() + { + var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ThrowSocketException) + .WithFrame(BuildHandshakeAckFrame()) + .WithFrame(BuildAdviseAckFrame(1)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + + await AssertStateEventuallyAsync(client, SuiteLinkSessionState.Reconnecting); + await client.DisposeAsync(); + } + + [Fact] + public async Task ReceiveLoop_PartialFrameThenEof_TransitionsToReconnecting() + { + var updateFrame = BuildBooleanUpdateFrame(1, true); + var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ReturnEof) + .WithFrame(BuildHandshakeAckFrame()) + .WithFrame(BuildAdviseAckFrame(1)) + .WithChunk(updateFrame.AsSpan(0, 5).ToArray()); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + + await AssertStateEventuallyAsync(client, SuiteLinkSessionState.Reconnecting); + await client.DisposeAsync(); + } + + [Theory] + [InlineData(true, DisconnectBehavior.ReturnEof)] + [InlineData(true, DisconnectBehavior.ThrowIoException)] + [InlineData(false, DisconnectBehavior.ReturnEof)] + [InlineData(false, DisconnectBehavior.ThrowIoException)] + public async Task CloseOperations_RacingRuntimeDisconnect_EndInDisconnectedState( + bool useDisposeAsync, + DisconnectBehavior behavior) + { + var runtimeReceiveEntered = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var allowDisconnectSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var transport = new RuntimeDisconnectFakeTransport(behavior) + .WithFrame(BuildHandshakeAckFrame()) + .WithFrame(BuildAdviseAckFrame(1)); + transport.RuntimeReceiveEntered = runtimeReceiveEntered; + transport.AllowDisconnectSignal = allowDisconnectSignal.Task; + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + + _ = await runtimeReceiveEntered.Task.WaitAsync(TimeSpan.FromSeconds(2)); + var closeTask = useDisposeAsync + ? client.DisposeAsync().AsTask() + : client.DisconnectAsync(); + + allowDisconnectSignal.TrySetResult(true); + await closeTask.WaitAsync(TimeSpan.FromSeconds(2)); + + Assert.Equal(SuiteLinkSessionState.Disconnected, client.DebugState); + Assert.False(client.IsConnected); + } + + [Fact] + public async Task ReadyWithNoSubscriptions_DoesNotProbeTransportLiveness_AndRemainsReady() + { + var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ReturnEof) + .WithFrame(BuildHandshakeAckFrame()); + var client = new SuiteLinkClient(transport); + + await client.ConnectAsync(CreateOptions()); + await Task.Delay(250); + + Assert.Equal(SuiteLinkSessionState.Ready, client.DebugState); + Assert.Equal(0, transport.RuntimeReceiveCallCount); + await client.DisposeAsync(); + } + + [Fact] + public async Task DisconnectAsync_CancelsPendingReconnectDelay_AndEndsDisconnected() + { + var reconnectDelayStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var reconnectDelayCanceled = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + Task DelayAsync(TimeSpan delay, CancellationToken cancellationToken) + { + if (delay == TimeSpan.Zero) + { + return Task.CompletedTask; + } + + reconnectDelayStarted.TrySetResult(true); + cancellationToken.Register(() => reconnectDelayCanceled.TrySetResult(true)); + return Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken); + } + + var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ReturnEof) + .WithFrame(BuildHandshakeAckFrame()) + .WithFrame(BuildAdviseAckFrame(1)); + var client = new SuiteLinkClient( + transport, + ownsTransport: false, + delayAsync: DelayAsync, + reconnectAttemptAsync: static _ => ValueTask.FromResult(false)); + + await client.ConnectAsync(CreateOptions()); + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + _ = await reconnectDelayStarted.Task.WaitAsync(TimeSpan.FromSeconds(2)); + + await client.DisconnectAsync().WaitAsync(TimeSpan.FromSeconds(2)); + _ = await reconnectDelayCanceled.Task.WaitAsync(TimeSpan.FromSeconds(2)); + + Assert.Equal(SuiteLinkSessionState.Disconnected, client.DebugState); + Assert.False(client.IsConnected); + } + + [Fact] + public async Task Reconnect_ReplaysDurableSubscriptions_AndResumesUpdateDispatch() + { + var updateReceived = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + var transport = new ReplayableReconnectFakeTransport( + new ConnectionPlan( + EmptyReceiveBehavior.ReturnEof, + BuildHandshakeAckFrame(), + BuildAdviseAckFrame(1)), + new ConnectionPlan( + EmptyReceiveBehavior.Block, + BuildHandshakeAckFrame(), + BuildAdviseAckFrame(1), + BuildBooleanUpdateFrame(1, true))); + var client = new SuiteLinkClient( + transport, + ownsTransport: false, + delayAsync: static (_, _) => Task.CompletedTask); + + await client.ConnectAsync(CreateOptions()); + _ = await client.SubscribeAsync("Pump001.Run", update => updateReceived.TrySetResult(update)); + + var update = await updateReceived.Task.WaitAsync(TimeSpan.FromSeconds(2)); + + Assert.Equal(2, transport.ConnectCallCount); + Assert.Equal(2, CountSentMessageType(transport.SentBuffers, SuiteLinkSubscriptionCodec.AdviseMessageType)); + Assert.Equal(SuiteLinkSessionState.Subscribed, client.DebugState); + Assert.True(update.Value.TryGetBoolean(out var value)); + Assert.True(value); + + await client.DisposeAsync(); + } + + [Fact] + public async Task Reconnect_RestoresLiveTagMappings_AndAllowsWriteAfterReplay() + { + var transport = new ReplayableReconnectFakeTransport( + new ConnectionPlan( + EmptyReceiveBehavior.ReturnEof, + BuildHandshakeAckFrame(), + BuildAdviseAckFrame(1)), + new ConnectionPlan( + EmptyReceiveBehavior.Block, + BuildHandshakeAckFrame(), + BuildAdviseAckFrame(1))); + var client = new SuiteLinkClient( + transport, + ownsTransport: false, + delayAsync: static (_, _) => Task.CompletedTask); + + await client.ConnectAsync(CreateOptions()); + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + await AssertStateEventuallyAsync(client, SuiteLinkSessionState.Subscribed); + + transport.ClearSentBuffers(); + await client.WriteAsync("Pump001.Run", SuiteLinkValue.FromBoolean(false)); + + Assert.Contains( + transport.SentBuffers, + frameBytes => frameBytes.AsSpan().SequenceEqual( + SuiteLinkWriteCodec.Encode(1, SuiteLinkValue.FromBoolean(false)))); + + await client.DisposeAsync(); + } + + [Fact] + public async Task Reconnect_WithRefreshLatestValue_DispatchesCatchUpReplay() + { + var catchUpReceived = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + var transport = new ReplayableReconnectFakeTransport( + new ConnectionPlan( + EmptyReceiveBehavior.ReturnEof, + BuildHandshakeAckFrame(), + BuildAdviseAckFrame(1)), + new ConnectionPlan( + EmptyReceiveBehavior.Block, + BuildHandshakeAckFrame(), + BuildAdviseAckFrame(1), + BuildAdviseAckFrame(2), + BuildBooleanUpdateFrame(2, true))); + var client = new SuiteLinkClient( + transport, + ownsTransport: false, + delayAsync: static (_, _) => Task.CompletedTask); + + await client.ConnectAsync(CreateOptions(runtime: new SuiteLinkRuntimeOptions( + retryPolicy: SuiteLinkRetryPolicy.Default, + catchUpPolicy: SuiteLinkCatchUpPolicy.RefreshLatestValue, + catchUpTimeout: TimeSpan.FromSeconds(2)))); + _ = await client.SubscribeAsync("Pump001.Run", update => + { + if (update.Source == SuiteLinkUpdateSource.CatchUpReplay) + { + catchUpReceived.TrySetResult(update); + } + }); + + var catchUp = await catchUpReceived.Task.WaitAsync(TimeSpan.FromSeconds(2)); + + Assert.Equal(SuiteLinkUpdateSource.CatchUpReplay, catchUp.Source); + Assert.Equal(1u, catchUp.TagId); + Assert.True(catchUp.Value.TryGetBoolean(out var value)); + Assert.True(value); + + await client.DisposeAsync(); + } + + [Fact] + public async Task Reconnect_CatchUpTimeout_DoesNotFailRecoveredSubscriptions() + { + var transport = new ReplayableReconnectFakeTransport( + new ConnectionPlan( + EmptyReceiveBehavior.ReturnEof, + BuildHandshakeAckFrame(), + BuildAdviseAckFrame(1)), + new ConnectionPlan( + EmptyReceiveBehavior.Block, + BuildHandshakeAckFrame(), + BuildAdviseAckFrame(1), + BuildAdviseAckFrame(2))); + var client = new SuiteLinkClient( + transport, + ownsTransport: false, + delayAsync: static (_, _) => Task.CompletedTask); + + await client.ConnectAsync(CreateOptions(runtime: new SuiteLinkRuntimeOptions( + retryPolicy: SuiteLinkRetryPolicy.Default, + catchUpPolicy: SuiteLinkCatchUpPolicy.RefreshLatestValue, + catchUpTimeout: TimeSpan.FromMilliseconds(100)))); + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + + await AssertStateEventuallyAsync(client, SuiteLinkSessionState.Subscribed, TimeSpan.FromSeconds(2)); + Assert.True(client.IsConnected); + + await client.DisposeAsync(); + } + + [Fact] + public async Task WriteAsync_DuringReconnect_ThrowsPredictableInvalidOperationException() + { + var reconnectAttemptStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ReturnEof) + .WithFrame(BuildHandshakeAckFrame()) + .WithFrame(BuildAdviseAckFrame(1)); + var client = new SuiteLinkClient( + transport, + ownsTransport: false, + delayAsync: static (_, _) => Task.CompletedTask, + reconnectAttemptAsync: async cancellationToken => + { + reconnectAttemptStarted.TrySetResult(true); + await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false); + return false; + }); + + await client.ConnectAsync(CreateOptions()); + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + await AssertStateEventuallyAsync(client, SuiteLinkSessionState.Reconnecting); + _ = await reconnectAttemptStarted.Task.WaitAsync(TimeSpan.FromSeconds(2)); + + var ex = await Assert.ThrowsAsync( + () => client.WriteAsync("Pump001.Run", SuiteLinkValue.FromBoolean(false))); + + Assert.Contains("reconnecting", ex.Message, StringComparison.OrdinalIgnoreCase); + await client.DisposeAsync(); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task CloseOperations_DuringReconnectAttempt_CancelRecoveryAndEndDisconnected(bool useDisposeAsync) + { + var reconnectAttemptStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var reconnectAttemptCanceled = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var transport = new RuntimeDisconnectFakeTransport(DisconnectBehavior.ReturnEof) + .WithFrame(BuildHandshakeAckFrame()) + .WithFrame(BuildAdviseAckFrame(1)); + var client = new SuiteLinkClient( + transport, + ownsTransport: false, + delayAsync: static (_, _) => Task.CompletedTask, + reconnectAttemptAsync: async cancellationToken => + { + reconnectAttemptStarted.TrySetResult(true); + try + { + await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false); + return false; + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + reconnectAttemptCanceled.TrySetResult(true); + throw; + } + }); + + await client.ConnectAsync(CreateOptions()); + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + _ = await reconnectAttemptStarted.Task.WaitAsync(TimeSpan.FromSeconds(2)); + + if (useDisposeAsync) + { + await client.DisposeAsync(); + } + else + { + await client.DisconnectAsync(); + } + + _ = await reconnectAttemptCanceled.Task.WaitAsync(TimeSpan.FromSeconds(2)); + + Assert.Equal(SuiteLinkSessionState.Disconnected, client.DebugState); + Assert.False(client.IsConnected); + } + + private static async Task AssertStateEventuallyAsync( + SuiteLinkClient client, + SuiteLinkSessionState expectedState, + TimeSpan? timeout = null) + { + var deadline = DateTime.UtcNow + (timeout ?? TimeSpan.FromSeconds(2)); + while (DateTime.UtcNow < deadline) + { + if (client.DebugState == expectedState) + { + return; + } + + await Task.Delay(20); + } + + Assert.Equal(expectedState, client.DebugState); + } + + private static SuiteLinkConnectionOptions CreateOptions(SuiteLinkRuntimeOptions? runtime = null) + { + return new SuiteLinkConnectionOptions( + host: "127.0.0.1", + application: "App", + topic: "Topic", + clientName: "Client", + clientNode: "Node", + userName: "User", + serverNode: "Server", + timezone: "UTC", + port: 5413, + runtime: runtime); + } + + private static byte[] BuildHandshakeAckFrame() + { + return [0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5]; + } + + private static byte[] BuildAdviseAckFrame(params uint[] tagIds) + { + var payload = new byte[Math.Max(1, tagIds.Length) * 5]; + var ids = tagIds.Length == 0 ? [0u] : tagIds; + var offset = 0; + foreach (var tagId in ids) + { + SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(offset, 4), tagId); + payload[offset + 4] = 0x00; + offset += 5; + } + + return SuiteLinkFrameWriter.WriteFrame(SuiteLinkSubscriptionCodec.AdviseAckMessageType, payload); + } + + private static byte[] BuildBooleanUpdateFrame(uint tagId, bool value) + { + var payload = new byte[10]; + SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(0, 4), tagId); + SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(4, 2), 1); + SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(6, 2), 0x00C0); + payload[8] = (byte)SuiteLinkWireValueType.Binary; + payload[9] = value ? (byte)1 : (byte)0; + return SuiteLinkFrameWriter.WriteFrame(SuiteLinkUpdateCodec.UpdateMessageType, payload); + } + + private static int CountSentMessageType(IEnumerable sentBuffers, ushort messageType) + { + return sentBuffers.Count( + frameBytes => + SuiteLinkFrameReader.TryParseFrame(frameBytes, out var frame, out _) && + frame.MessageType == messageType); + } + + public enum DisconnectBehavior + { + ReturnEof, + ThrowIoException, + ThrowSocketException + } + + private enum EmptyReceiveBehavior + { + ReturnEof, + Block + } + + private sealed record ConnectionPlan( + EmptyReceiveBehavior EmptyReceiveBehavior, + params byte[][] Frames); + + private sealed class RuntimeDisconnectFakeTransport : ISuiteLinkTransport + { + private readonly Queue _receiveChunks = []; + private readonly DisconnectBehavior _disconnectBehavior; + + public RuntimeDisconnectFakeTransport(DisconnectBehavior disconnectBehavior) + { + _disconnectBehavior = disconnectBehavior; + } + + public bool IsConnected { get; private set; } + public int RuntimeReceiveCallCount { get; private set; } + public TaskCompletionSource? RuntimeReceiveEntered { get; set; } + public Task? AllowDisconnectSignal { get; set; } + + public RuntimeDisconnectFakeTransport WithFrame(byte[] frameBytes) + { + _receiveChunks.Enqueue(frameBytes); + return this; + } + + public RuntimeDisconnectFakeTransport WithChunk(byte[] bytes) + { + _receiveChunks.Enqueue(bytes); + return this; + } + + public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default) + { + IsConnected = true; + return ValueTask.CompletedTask; + } + + public ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + return ValueTask.CompletedTask; + } + + public async ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default) + { + if (_receiveChunks.Count > 0) + { + var next = _receiveChunks.Dequeue(); + next.CopyTo(buffer); + return next.Length; + } + + RuntimeReceiveCallCount++; + RuntimeReceiveEntered?.TrySetResult(true); + if (AllowDisconnectSignal is not null) + { + await AllowDisconnectSignal.ConfigureAwait(false); + } + + return _disconnectBehavior switch + { + DisconnectBehavior.ReturnEof => 0, + DisconnectBehavior.ThrowIoException => + throw new IOException("Synthetic runtime disconnect."), + DisconnectBehavior.ThrowSocketException => + throw new SocketException((int)SocketError.ConnectionReset), + _ => 0 + }; + } + + public ValueTask DisposeAsync() + { + IsConnected = false; + return ValueTask.CompletedTask; + } + } + + private sealed class ReplayableReconnectFakeTransport : ISuiteLinkTransport + { + private readonly object _syncRoot = new(); + private readonly List _connectionPlans; + private Queue _receiveChunks = []; + private EmptyReceiveBehavior _emptyReceiveBehavior; + private bool _disposed; + + public ReplayableReconnectFakeTransport(params ConnectionPlan[] connectionPlans) + { + _connectionPlans = [.. connectionPlans]; + } + + public int ConnectCallCount { get; private set; } + public bool IsConnected => !_disposed; + public List SentBuffers { get; } = []; + + public void ClearSentBuffers() + { + lock (_syncRoot) + { + SentBuffers.Clear(); + } + } + + public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default) + { + if (ConnectCallCount >= _connectionPlans.Count) + { + throw new InvalidOperationException("No reconnect plan is available for the requested attempt."); + } + + var plan = _connectionPlans[ConnectCallCount]; + ConnectCallCount++; + _receiveChunks = new Queue(plan.Frames); + _emptyReceiveBehavior = plan.EmptyReceiveBehavior; + return ValueTask.CompletedTask; + } + + public ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + lock (_syncRoot) + { + SentBuffers.Add(buffer.ToArray()); + } + + return ValueTask.CompletedTask; + } + + public async ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default) + { + if (_receiveChunks.Count > 0) + { + var next = _receiveChunks.Dequeue(); + next.CopyTo(buffer); + return next.Length; + } + + if (_emptyReceiveBehavior == EmptyReceiveBehavior.ReturnEof) + { + return 0; + } + + await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false); + return 0; + } + + public ValueTask DisposeAsync() + { + _disposed = true; + return ValueTask.CompletedTask; + } + } +} diff --git a/tests/SuiteLink.Client.Tests/SuiteLinkClientRuntimeLoopTests.cs b/tests/SuiteLink.Client.Tests/SuiteLinkClientRuntimeLoopTests.cs new file mode 100644 index 0000000..743f719 --- /dev/null +++ b/tests/SuiteLink.Client.Tests/SuiteLinkClientRuntimeLoopTests.cs @@ -0,0 +1,272 @@ +using System.Threading.Channels; +using SuiteLink.Client.Protocol; +using SuiteLink.Client.Transport; + +namespace SuiteLink.Client.Tests; + +public sealed class SuiteLinkClientRuntimeLoopTests +{ + [Fact] + public async Task ConnectAsync_WithZeroSubscriptions_TransitionsToReadyOnceRuntimeLoopStarts() + { + var transport = new BlockingFakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + Assert.True(client.IsConnected); + await client.DisposeAsync(); + } + + [Fact] + public async Task ConnectAsync_StartsBackgroundLoop_AndDispatchesUpdateWithoutManualPolling() + { + var updateReceived = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + + var transport = new BlockingFakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + transport.EnqueueReceive(BuildBooleanUpdateFrame(1, true)); + + _ = await client.SubscribeAsync( + "Pump001.Run", + update => updateReceived.TrySetResult(update)); + + var update = await updateReceived.Task.WaitAsync(TimeSpan.FromSeconds(2)); + Assert.True(update.Value.TryGetBoolean(out var value) && value); + await client.DisposeAsync(); + } + + [Fact] + public async Task RuntimeLoop_CallbackCanReenterClientWriteWithoutDeadlock() + { + var callbackCompleted = new TaskCompletionSource( + TaskCreationOptions.RunContinuationsAsynchronously); + + var transport = new BlockingFakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + _ = await client.SubscribeAsync( + "Pump001.Run", + _ => + { + client.WriteAsync("Pump001.Run", SuiteLinkValue.FromBoolean(false)) + .GetAwaiter() + .GetResult(); + callbackCompleted.TrySetResult(true); + }); + + transport.EnqueueReceive(BuildBooleanUpdateFrame(1, true)); + + _ = await callbackCompleted.Task.WaitAsync(TimeSpan.FromSeconds(2)); + var expectedPoke = SuiteLinkWriteCodec.Encode(1, SuiteLinkValue.FromBoolean(false)); + Assert.Contains( + transport.SentBuffers, + frameBytes => frameBytes.AsSpan().SequenceEqual(expectedPoke)); + await client.DisposeAsync(); + } + + [Fact] + public async Task DisposeAsync_AwaitsRuntimeLoopStop_BeforeDisposingOwnedTransport() + { + var transport = new OrderedShutdownFakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + + var client = new SuiteLinkClient(transport, ownsTransport: true); + await client.ConnectAsync(CreateOptions()); + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + _ = await transport.RuntimeReceiveEntered.Task.WaitAsync(TimeSpan.FromSeconds(2)); + + var disposeTask = client.DisposeAsync().AsTask(); + + // The runtime loop is still blocked in receive and has not been allowed to return. + await Task.Delay(100); + Assert.False(disposeTask.IsCompleted); + Assert.Equal(0, transport.DisposeCallCount); + + transport.AllowRuntimeReceiveReturn.TrySetResult(true); + await disposeTask.WaitAsync(TimeSpan.FromSeconds(2)); + + Assert.Equal(1, transport.DisposeCallCount); + Assert.True(transport.DisposeObservedRuntimeReceiveReturned); + } + + private static SuiteLinkConnectionOptions CreateOptions() + { + return new SuiteLinkConnectionOptions( + host: "127.0.0.1", + application: "App", + topic: "Topic", + clientName: "Client", + clientNode: "Node", + userName: "User", + serverNode: "Server", + timezone: "UTC", + port: 5413); + } + + private static byte[] BuildHandshakeAckFrame() + { + return [0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5]; + } + + private static byte[] BuildAdviseAckFrame(params uint[] tagIds) + { + var payload = new byte[Math.Max(1, tagIds.Length) * 5]; + var ids = tagIds.Length == 0 ? [0u] : tagIds; + var offset = 0; + foreach (var tagId in ids) + { + SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(offset, 4), tagId); + payload[offset + 4] = 0x00; + offset += 5; + } + + return SuiteLinkFrameWriter.WriteFrame(SuiteLinkSubscriptionCodec.AdviseAckMessageType, payload); + } + + private static byte[] BuildBooleanUpdateFrame(uint tagId, bool value) + { + var payload = new byte[10]; + SuiteLinkEncoding.WriteUInt32LittleEndian(payload.AsSpan(0, 4), tagId); + SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(4, 2), 1); + SuiteLinkEncoding.WriteUInt16LittleEndian(payload.AsSpan(6, 2), 0x00C0); + payload[8] = (byte)SuiteLinkWireValueType.Binary; + payload[9] = value ? (byte)1 : (byte)0; + return SuiteLinkFrameWriter.WriteFrame(SuiteLinkUpdateCodec.UpdateMessageType, payload); + } + + private sealed class BlockingFakeTransport : ISuiteLinkTransport + { + private readonly Channel _receiveChannel = Channel.CreateUnbounded(); + private readonly object _syncRoot = new(); + private bool _disposed; + + public bool IsConnected => !_disposed; + public List SentBuffers { get; } = []; + + public void EnqueueReceive(byte[] frameBytes) + { + if (!_receiveChannel.Writer.TryWrite(frameBytes)) + { + throw new InvalidOperationException("Unable to enqueue receive frame."); + } + } + + public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default) + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(BlockingFakeTransport)); + } + + return ValueTask.CompletedTask; + } + + public ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + lock (_syncRoot) + { + SentBuffers.Add(buffer.ToArray()); + } + + return ValueTask.CompletedTask; + } + + public async ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default) + { + var next = await _receiveChannel.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); + next.CopyTo(buffer); + return next.Length; + } + + public ValueTask DisposeAsync() + { + _disposed = true; + _receiveChannel.Writer.TryComplete(); + return ValueTask.CompletedTask; + } + } + + private sealed class OrderedShutdownFakeTransport : ISuiteLinkTransport + { + private readonly object _syncRoot = new(); + private readonly Queue _startupFrames = []; + private bool _disposed; + + public bool IsConnected => !_disposed; + public int DisposeCallCount { get; private set; } + public bool DisposeObservedRuntimeReceiveReturned { get; private set; } + public TaskCompletionSource RuntimeReceiveEntered { get; } = + new(TaskCreationOptions.RunContinuationsAsynchronously); + public TaskCompletionSource AllowRuntimeReceiveReturn { get; } = + new(TaskCreationOptions.RunContinuationsAsynchronously); + public TaskCompletionSource RuntimeReceiveReturned { get; } = + new(TaskCreationOptions.RunContinuationsAsynchronously); + + public void EnqueueReceive(byte[] frameBytes) + { + lock (_syncRoot) + { + _startupFrames.Enqueue(frameBytes); + } + } + + public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default) + { + if (_disposed) + { + throw new ObjectDisposedException(nameof(OrderedShutdownFakeTransport)); + } + + return ValueTask.CompletedTask; + } + + public ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + return ValueTask.CompletedTask; + } + + public ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default) + { + lock (_syncRoot) + { + if (_startupFrames.Count > 0) + { + var startupFrame = _startupFrames.Dequeue(); + startupFrame.CopyTo(buffer); + return ValueTask.FromResult(startupFrame.Length); + } + } + + RuntimeReceiveEntered.TrySetResult(true); + return ReceiveRuntimeLoopBlockAsync(); + } + + public ValueTask DisposeAsync() + { + _disposed = true; + DisposeCallCount++; + DisposeObservedRuntimeReceiveReturned = RuntimeReceiveReturned.Task.IsCompleted; + return ValueTask.CompletedTask; + } + + private async ValueTask ReceiveRuntimeLoopBlockAsync() + { + await AllowRuntimeReceiveReturn.Task.ConfigureAwait(false); + RuntimeReceiveReturned.TrySetResult(true); + return 0; + } + } +} diff --git a/tests/SuiteLink.Client.Tests/SuiteLinkClientSubscriptionRegistryTests.cs b/tests/SuiteLink.Client.Tests/SuiteLinkClientSubscriptionRegistryTests.cs new file mode 100644 index 0000000..b05bf52 --- /dev/null +++ b/tests/SuiteLink.Client.Tests/SuiteLinkClientSubscriptionRegistryTests.cs @@ -0,0 +1,177 @@ +using SuiteLink.Client.Protocol; +using SuiteLink.Client.Transport; + +namespace SuiteLink.Client.Tests; + +public sealed class SuiteLinkClientSubscriptionRegistryTests +{ + [Fact] + public async Task SubscribeAsync_StoresDurableSubscriptionIntent() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + + Assert.True(client.DebugHasDurableSubscription("Pump001.Run")); + } + + [Fact] + public async Task SubscribeAsync_DuplicateItem_Throws_AndKeepsOriginalCallbackRegistration() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + transport.EnqueueReceive(BuildBooleanUpdateFrame(1, true)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + var firstCallbackCount = 0; + var secondCallbackCount = 0; + _ = await client.SubscribeAsync("Pump001.Run", _ => firstCallbackCount++); + + var duplicateException = await Assert.ThrowsAsync( + () => client.SubscribeAsync("Pump001.Run", _ => secondCallbackCount++)); + Assert.Contains("already subscribed", duplicateException.Message, StringComparison.OrdinalIgnoreCase); + + await client.ProcessIncomingAsync(); + + Assert.True(client.DebugHasDurableSubscription("Pump001.Run")); + Assert.Equal(1, firstCallbackCount); + Assert.Equal(0, secondCallbackCount); + } + + [Fact] + public async Task SubscriptionHandleDisposeAsync_RemovesDurableSubscriptionIntent() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + var handle = await client.SubscribeAsync("Pump001.Run", _ => { }); + Assert.True(client.DebugHasDurableSubscription("Pump001.Run")); + + await handle.DisposeAsync(); + + Assert.False(client.DebugHasDurableSubscription("Pump001.Run")); + } + + [Fact] + public async Task SubscriptionHandleDisposeAsync_RemovesDurableIntent_WhenUnadviseSendFails() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + transport.SendFailureFactory = frameBytes => + { + var span = frameBytes.Span; + var isUnadviseFrame = span.Length >= 4 && + span[2] == 0x04 && + span[3] == 0x80; + return isUnadviseFrame ? new IOException("Synthetic unadvise send failure.") : null; + }; + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + + var handle = await client.SubscribeAsync("Pump001.Run", _ => { }); + Assert.True(client.DebugHasDurableSubscription("Pump001.Run")); + + await Assert.ThrowsAsync(() => handle.DisposeAsync().AsTask()); + Assert.False(client.DebugHasDurableSubscription("Pump001.Run")); + } + + private static SuiteLinkConnectionOptions CreateOptions() + { + return new SuiteLinkConnectionOptions( + host: "127.0.0.1", + application: "App", + topic: "Topic", + clientName: "Client", + clientNode: "Node", + userName: "User", + serverNode: "Server", + timezone: "UTC", + port: 5413); + } + + private static byte[] BuildHandshakeAckFrame() + { + return [0x06, 0x00, 0x01, 0x00, 0xA1, 0xB2, 0xC3, 0xA5]; + } + + private static byte[] BuildAdviseAckFrame(uint tagId) + { + Span payload = stackalloc byte[5]; + SuiteLinkEncoding.WriteUInt32LittleEndian(payload[..4], tagId); + payload[4] = 0x00; + return SuiteLinkFrameWriter.WriteFrame(SuiteLinkSubscriptionCodec.AdviseAckMessageType, payload); + } + + private static byte[] BuildBooleanUpdateFrame(uint tagId, bool value) + { + Span payload = stackalloc byte[10]; + SuiteLinkEncoding.WriteUInt32LittleEndian(payload[..4], tagId); + SuiteLinkEncoding.WriteUInt16LittleEndian(payload.Slice(4, 2), 1); + SuiteLinkEncoding.WriteUInt16LittleEndian(payload.Slice(6, 2), 0x00C0); + payload[8] = (byte)SuiteLinkWireValueType.Binary; + payload[9] = value ? (byte)1 : (byte)0; + return SuiteLinkFrameWriter.WriteFrame(SuiteLinkUpdateCodec.UpdateMessageType, payload); + } + + private sealed class FakeTransport : ISuiteLinkTransport + { + private readonly Queue _receiveChunks = []; + public Func, Exception?>? SendFailureFactory { get; set; } + + public bool IsConnected { get; private set; } + + public void EnqueueReceive(byte[] bytes) + { + _receiveChunks.Enqueue(bytes); + } + + public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default) + { + IsConnected = true; + return ValueTask.CompletedTask; + } + + public ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + var sendFailure = SendFailureFactory?.Invoke(buffer); + if (sendFailure is not null) + { + throw sendFailure; + } + + return ValueTask.CompletedTask; + } + + public ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default) + { + if (_receiveChunks.Count == 0) + { + return ValueTask.FromResult(0); + } + + var bytes = _receiveChunks.Dequeue(); + bytes.CopyTo(buffer); + return ValueTask.FromResult(bytes.Length); + } + + public ValueTask DisposeAsync() + { + IsConnected = false; + return ValueTask.CompletedTask; + } + } +} diff --git a/tests/SuiteLink.Client.Tests/SuiteLinkClientWriteTests.cs b/tests/SuiteLink.Client.Tests/SuiteLinkClientWriteTests.cs index 4c70f75..6df2ce8 100644 --- a/tests/SuiteLink.Client.Tests/SuiteLinkClientWriteTests.cs +++ b/tests/SuiteLink.Client.Tests/SuiteLinkClientWriteTests.cs @@ -1,4 +1,5 @@ using SuiteLink.Client.Protocol; +using SuiteLink.Client.Internal; using SuiteLink.Client.Transport; namespace SuiteLink.Client.Tests; @@ -42,6 +43,73 @@ public sealed class SuiteLinkClientWriteTests Assert.Equal(2, transport.SentBuffers.Count); } + [Fact] + public async Task WriteAsync_DuringReconnect_ThrowsClearException() + { + var transport = new RuntimeDisconnectFakeTransport() + .WithFrame(BuildHandshakeAckFrame()) + .WithFrame(BuildAdviseAckFrame(1)); + var client = new SuiteLinkClient( + transport, + ownsTransport: false, + delayAsync: static async (delay, cancellationToken) => + { + await Task.Yield(); + if (delay > TimeSpan.Zero) + { + await Task.Delay(delay, cancellationToken); + } + }, + reconnectAttemptAsync: static _ => ValueTask.FromResult(false)); + + await client.ConnectAsync(CreateOptions()); + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + + var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(2); + while (DateTime.UtcNow < deadline && client.DebugState != SuiteLinkSessionState.Reconnecting) + { + await Task.Delay(20); + } + + var ex = await Assert.ThrowsAsync( + () => client.WriteAsync("Pump001.Run", SuiteLinkValue.FromBoolean(true))); + + Assert.Contains("reconnecting", ex.Message, StringComparison.OrdinalIgnoreCase); + await client.DisposeAsync(); + } + + [Fact] + public async Task WriteAsync_DuringReconnect_ThrowsBeforeWaitingOnOperationGate() + { + var transport = new FakeTransport(); + transport.EnqueueReceive(BuildHandshakeAckFrame()); + transport.EnqueueReceive(BuildAdviseAckFrame(1)); + + var client = new SuiteLinkClient(transport); + await client.ConnectAsync(CreateOptions()); + _ = await client.SubscribeAsync("Pump001.Run", _ => { }); + + var releaseGate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var acquiredGate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var holdGateTask = client.DebugHoldOperationGateAsync(releaseGate.Task, acquiredGate); + _ = await acquiredGate.Task.WaitAsync(TimeSpan.FromSeconds(2)); + + var sessionField = typeof(SuiteLinkClient) + .GetField("_session", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic)!; + var session = (SuiteLinkSession)sessionField.GetValue(client)!; + Assert.True(session.TryTransitionState(SuiteLinkSessionState.Subscribed, SuiteLinkSessionState.Reconnecting)); + + var writeTask = client.WriteAsync("Pump001.Run", SuiteLinkValue.FromBoolean(true)); + var completed = await Task.WhenAny(writeTask, Task.Delay(200)); + + releaseGate.TrySetResult(true); + await holdGateTask.WaitAsync(TimeSpan.FromSeconds(2)); + + Assert.Same(writeTask, completed); + var ex = await Assert.ThrowsAsync(() => writeTask); + Assert.Contains("reconnecting", ex.Message, StringComparison.OrdinalIgnoreCase); + } + private static SuiteLinkConnectionOptions CreateOptions() { return new SuiteLinkConnectionOptions( @@ -123,4 +191,54 @@ public sealed class SuiteLinkClientWriteTests return ValueTask.CompletedTask; } } + + private sealed class RuntimeDisconnectFakeTransport : ISuiteLinkTransport + { + private readonly Queue _receiveChunks = []; + private readonly object _syncRoot = new(); + + public bool IsConnected { get; private set; } + + public RuntimeDisconnectFakeTransport WithFrame(byte[] bytes) + { + lock (_syncRoot) + { + _receiveChunks.Enqueue(bytes); + } + + return this; + } + + public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken = default) + { + IsConnected = true; + return ValueTask.CompletedTask; + } + + public ValueTask SendAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + return ValueTask.CompletedTask; + } + + public ValueTask ReceiveAsync(Memory buffer, CancellationToken cancellationToken = default) + { + lock (_syncRoot) + { + if (_receiveChunks.Count == 0) + { + return new ValueTask(0); + } + + var next = _receiveChunks.Dequeue(); + next.CopyTo(buffer); + return new ValueTask(next.Length); + } + } + + public ValueTask DisposeAsync() + { + IsConnected = false; + return ValueTask.CompletedTask; + } + } } diff --git a/tests/SuiteLink.Client.Tests/SuiteLinkConnectionOptionsTests.cs b/tests/SuiteLink.Client.Tests/SuiteLinkConnectionOptionsTests.cs index ff7a793..0e386ef 100644 --- a/tests/SuiteLink.Client.Tests/SuiteLinkConnectionOptionsTests.cs +++ b/tests/SuiteLink.Client.Tests/SuiteLinkConnectionOptionsTests.cs @@ -98,6 +98,37 @@ public sealed class SuiteLinkConnectionOptionsTests Assert.Equal("America/Indiana/Indianapolis", options.Timezone); } + [Fact] + public void Constructor_DefaultsRuntimeOptions() + { + var options = Create(); + + Assert.NotNull(options.Runtime); + Assert.Equal(SuiteLinkCatchUpPolicy.None, options.Runtime.CatchUpPolicy); + Assert.NotNull(options.Runtime.RetryPolicy); + Assert.Equal(TimeSpan.FromSeconds(2), options.Runtime.CatchUpTimeout); + } + + [Fact] + public void Constructor_RuntimeWithNullRetryPolicy_ThrowsArgumentNullException() + { + Assert.Throws(() => Create(runtime: new SuiteLinkRuntimeOptions( + retryPolicy: null!, + catchUpPolicy: SuiteLinkCatchUpPolicy.None, + catchUpTimeout: TimeSpan.FromSeconds(2)))); + } + + [Theory] + [InlineData(0)] + [InlineData(-1)] + public void Constructor_RuntimeWithNonPositiveCatchUpTimeout_ThrowsArgumentOutOfRangeException(int seconds) + { + Assert.Throws(() => Create(runtime: new SuiteLinkRuntimeOptions( + retryPolicy: SuiteLinkRetryPolicy.Default, + catchUpPolicy: SuiteLinkCatchUpPolicy.None, + catchUpTimeout: TimeSpan.FromSeconds(seconds)))); + } + private static SuiteLinkConnectionOptions Create( string host = "127.0.0.1", string application = "TestApp", @@ -107,7 +138,8 @@ public sealed class SuiteLinkConnectionOptionsTests string userName = "User", string serverNode = "Server", string? timezone = null, - int port = 5413) + int port = 5413, + SuiteLinkRuntimeOptions? runtime = null) { return new SuiteLinkConnectionOptions( host, @@ -118,6 +150,7 @@ public sealed class SuiteLinkConnectionOptionsTests userName, serverNode, timezone, - port); + port, + runtime); } } diff --git a/tests/SuiteLink.Client.Tests/Transport/SuiteLinkTcpTransportTests.cs b/tests/SuiteLink.Client.Tests/Transport/SuiteLinkTcpTransportTests.cs index 289a988..80063dc 100644 --- a/tests/SuiteLink.Client.Tests/Transport/SuiteLinkTcpTransportTests.cs +++ b/tests/SuiteLink.Client.Tests/Transport/SuiteLinkTcpTransportTests.cs @@ -114,6 +114,38 @@ public sealed class SuiteLinkTcpTransportTests async () => await listener.AcceptTcpClientAsync(secondAcceptCts.Token)); } + [Fact] + public async Task ResetConnectionAsync_AfterConnect_AllowsReconnect() + { + using var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + + var endpoint = (IPEndPoint)listener.LocalEndpoint; + await using var transport = new SuiteLinkTcpTransport(); + + await transport.ConnectAsync(endpoint.Address.ToString(), endpoint.Port); + using var accepted1 = await listener.AcceptTcpClientAsync(); + + await transport.ResetConnectionAsync(); + Assert.False(transport.IsConnected); + + await transport.ConnectAsync(endpoint.Address.ToString(), endpoint.Port); + using var accepted2 = await listener.AcceptTcpClientAsync(); + + Assert.True(transport.IsConnected); + } + + [Fact] + public async Task ResetConnectionAsync_LeaveOpenTrue_DoesNotDisposeInjectedStream() + { + var stream = new TrackingStream(); + await using var transport = new SuiteLinkTcpTransport(stream, leaveOpen: true); + + await transport.ResetConnectionAsync(); + + Assert.False(stream.WasDisposed); + } + private sealed class PartialReadStream : Stream { private readonly MemoryStream _inner;