From 18f0ca0587c9508ba678d67f45fcd9d198fb67bb Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 11:36:12 -0500 Subject: [PATCH] feat: add consecutive short-read counter to prevent buffer oscillation (Gap 5.10) Require 4 consecutive short reads before shrinking AdaptiveReadBuffer, matching the Go server's readLoop behaviour and preventing buffer size thrashing. --- src/NATS.Server/IO/AdaptiveReadBuffer.cs | 27 +++- .../IO/AdaptiveReadBufferShortReadTests.cs | 126 ++++++++++++++++++ 2 files changed, 152 insertions(+), 1 deletion(-) create mode 100644 tests/NATS.Server.Tests/IO/AdaptiveReadBufferShortReadTests.cs diff --git a/src/NATS.Server/IO/AdaptiveReadBuffer.cs b/src/NATS.Server/IO/AdaptiveReadBuffer.cs index 6cf6bd8..039b1a3 100644 --- a/src/NATS.Server/IO/AdaptiveReadBuffer.cs +++ b/src/NATS.Server/IO/AdaptiveReadBuffer.cs @@ -1,19 +1,44 @@ namespace NATS.Server.IO; +/// +/// Dynamically sized read buffer that grows on full reads and shrinks +/// only after consecutive short reads exceed a threshold. +/// Go reference: server/client.go — readLoop buffer sizing with short-read counter. +/// public sealed class AdaptiveReadBuffer { + private const int ShortReadThreshold = 4; private int _target = 4096; + private int _consecutiveShortReads; public int CurrentSize => Math.Clamp(_target, 512, 64 * 1024); + /// Number of consecutive short reads since last full read or grow. + public int ConsecutiveShortReads => _consecutiveShortReads; + public void RecordRead(int bytesRead) { if (bytesRead <= 0) return; if (bytesRead >= _target) + { _target = Math.Min(_target * 2, 64 * 1024); + _consecutiveShortReads = 0; // Reset on grow + } else if (bytesRead < _target / 4) - _target = Math.Max(_target / 2, 512); + { + _consecutiveShortReads++; + if (_consecutiveShortReads >= ShortReadThreshold) + { + _target = Math.Max(_target / 2, 512); + _consecutiveShortReads = 0; // Reset after shrink + } + } + else + { + // Medium read — not short, not full. Reset counter. + _consecutiveShortReads = 0; + } } } diff --git a/tests/NATS.Server.Tests/IO/AdaptiveReadBufferShortReadTests.cs b/tests/NATS.Server.Tests/IO/AdaptiveReadBufferShortReadTests.cs new file mode 100644 index 0000000..658e6ac --- /dev/null +++ b/tests/NATS.Server.Tests/IO/AdaptiveReadBufferShortReadTests.cs @@ -0,0 +1,126 @@ +using NATS.Server.IO; +using Shouldly; + +namespace NATS.Server.Tests.IO; + +/// +/// Tests for the consecutive short-read counter in AdaptiveReadBuffer. +/// Go reference: server/client.go — readLoop buffer sizing with short-read counter. +/// +public class AdaptiveReadBufferShortReadTests +{ + [Fact] + public void Initial_size_is_4096() + { + var b = new AdaptiveReadBuffer(); + b.CurrentSize.ShouldBe(4096); + } + + [Fact] + public void Full_read_doubles_size() + { + var b = new AdaptiveReadBuffer(); + b.RecordRead(4096); + b.CurrentSize.ShouldBe(8192); + } + + [Fact] + public void Single_short_read_does_not_shrink() + { + // A short read is less than target/4 = 4096/4 = 1024 + var b = new AdaptiveReadBuffer(); + b.RecordRead(100); + b.CurrentSize.ShouldBe(4096); + } + + [Fact] + public void Three_short_reads_do_not_shrink() + { + var b = new AdaptiveReadBuffer(); + b.RecordRead(100); + b.RecordRead(100); + b.RecordRead(100); + b.CurrentSize.ShouldBe(4096); + } + + [Fact] + public void Four_short_reads_triggers_shrink() + { + var b = new AdaptiveReadBuffer(); + b.RecordRead(100); + b.RecordRead(100); + b.RecordRead(100); + b.RecordRead(100); + b.CurrentSize.ShouldBe(2048); + } + + [Fact] + public void Short_read_counter_resets_on_full_read() + { + // 3 short reads, then a full read resets the counter — subsequent short read should not shrink + var b = new AdaptiveReadBuffer(); + b.RecordRead(100); // short + b.RecordRead(100); // short + b.RecordRead(100); // short (3 total, not yet at threshold) + b.RecordRead(4096); // full read — doubles size and resets counter + b.RecordRead(512); // short (relative to new size 8192; 512 < 8192/4=2048) — only 1 consecutive + b.CurrentSize.ShouldBe(8192); + } + + [Fact] + public void Short_read_counter_resets_on_medium_read() + { + // A medium read is >= target/4 but < target + // For target 4096: medium range is [1024, 4096) + var b = new AdaptiveReadBuffer(); + b.RecordRead(100); // short — counter = 1 + b.RecordRead(100); // short — counter = 2 + b.RecordRead(100); // short — counter = 3 + b.RecordRead(2000); // medium (>= 4096/4=1024, < 4096) — resets counter + b.RecordRead(100); // short — counter = 1, should not shrink + b.CurrentSize.ShouldBe(4096); + } + + [Fact] + public void Short_read_counter_resets_after_shrink() + { + // After 4 short reads trigger a shrink, counter resets to 0 + var b = new AdaptiveReadBuffer(); + b.RecordRead(100); // short — counter = 1 + b.RecordRead(100); // short — counter = 2 + b.RecordRead(100); // short — counter = 3 + b.RecordRead(100); // short — counter = 4 → shrinks to 2048, resets counter to 0 + b.ConsecutiveShortReads.ShouldBe(0); + // One more short read should be counter = 1 (not triggering another shrink) + b.RecordRead(50); // short relative to 2048 (50 < 2048/4=512) — counter = 1 + b.ConsecutiveShortReads.ShouldBe(1); + b.CurrentSize.ShouldBe(2048); + } + + [Fact] + public void Size_never_goes_below_512() + { + // Force the buffer down to 512 then attempt to shrink further + var b = new AdaptiveReadBuffer(); + + // Drive target down to 512 via repeated shrink cycles + for (var i = 0; i < 4; i++) b.RecordRead(1); // target 4096 → 2048 + for (var i = 0; i < 4; i++) b.RecordRead(1); // 2048 → 1024 + for (var i = 0; i < 4; i++) b.RecordRead(1); // 1024 → 512 + + b.CurrentSize.ShouldBe(512); + + // Now try to shrink again — should stay at 512 + for (var i = 0; i < 4; i++) b.RecordRead(1); + b.CurrentSize.ShouldBe(512); + } + + [Fact] + public void ConsecutiveShortReads_property_reflects_count() + { + var b = new AdaptiveReadBuffer(); + b.RecordRead(100); // short — counter = 1 + b.RecordRead(100); // short — counter = 2 + b.ConsecutiveShortReads.ShouldBe(2); + } +}