diff --git a/src/NATS.Server/IO/AdaptiveReadBuffer.cs b/src/NATS.Server/IO/AdaptiveReadBuffer.cs
index 6cf6bd8..039b1a3 100644
--- a/src/NATS.Server/IO/AdaptiveReadBuffer.cs
+++ b/src/NATS.Server/IO/AdaptiveReadBuffer.cs
@@ -1,19 +1,44 @@
namespace NATS.Server.IO;
+///
+/// Dynamically sized read buffer that grows on full reads and shrinks
+/// only after consecutive short reads exceed a threshold.
+/// Go reference: server/client.go — readLoop buffer sizing with short-read counter.
+///
public sealed class AdaptiveReadBuffer
{
+ private const int ShortReadThreshold = 4;
private int _target = 4096;
+ private int _consecutiveShortReads;
public int CurrentSize => Math.Clamp(_target, 512, 64 * 1024);
+ /// Number of consecutive short reads since last full read or grow.
+ public int ConsecutiveShortReads => _consecutiveShortReads;
+
public void RecordRead(int bytesRead)
{
if (bytesRead <= 0)
return;
if (bytesRead >= _target)
+ {
_target = Math.Min(_target * 2, 64 * 1024);
+ _consecutiveShortReads = 0; // Reset on grow
+ }
else if (bytesRead < _target / 4)
- _target = Math.Max(_target / 2, 512);
+ {
+ _consecutiveShortReads++;
+ if (_consecutiveShortReads >= ShortReadThreshold)
+ {
+ _target = Math.Max(_target / 2, 512);
+ _consecutiveShortReads = 0; // Reset after shrink
+ }
+ }
+ else
+ {
+ // Medium read — not short, not full. Reset counter.
+ _consecutiveShortReads = 0;
+ }
}
}
diff --git a/tests/NATS.Server.Tests/IO/AdaptiveReadBufferShortReadTests.cs b/tests/NATS.Server.Tests/IO/AdaptiveReadBufferShortReadTests.cs
new file mode 100644
index 0000000..658e6ac
--- /dev/null
+++ b/tests/NATS.Server.Tests/IO/AdaptiveReadBufferShortReadTests.cs
@@ -0,0 +1,126 @@
+using NATS.Server.IO;
+using Shouldly;
+
+namespace NATS.Server.Tests.IO;
+
+///
+/// Tests for the consecutive short-read counter in AdaptiveReadBuffer.
+/// Go reference: server/client.go — readLoop buffer sizing with short-read counter.
+///
+public class AdaptiveReadBufferShortReadTests
+{
+ [Fact]
+ public void Initial_size_is_4096()
+ {
+ var b = new AdaptiveReadBuffer();
+ b.CurrentSize.ShouldBe(4096);
+ }
+
+ [Fact]
+ public void Full_read_doubles_size()
+ {
+ var b = new AdaptiveReadBuffer();
+ b.RecordRead(4096);
+ b.CurrentSize.ShouldBe(8192);
+ }
+
+ [Fact]
+ public void Single_short_read_does_not_shrink()
+ {
+ // A short read is less than target/4 = 4096/4 = 1024
+ var b = new AdaptiveReadBuffer();
+ b.RecordRead(100);
+ b.CurrentSize.ShouldBe(4096);
+ }
+
+ [Fact]
+ public void Three_short_reads_do_not_shrink()
+ {
+ var b = new AdaptiveReadBuffer();
+ b.RecordRead(100);
+ b.RecordRead(100);
+ b.RecordRead(100);
+ b.CurrentSize.ShouldBe(4096);
+ }
+
+ [Fact]
+ public void Four_short_reads_triggers_shrink()
+ {
+ var b = new AdaptiveReadBuffer();
+ b.RecordRead(100);
+ b.RecordRead(100);
+ b.RecordRead(100);
+ b.RecordRead(100);
+ b.CurrentSize.ShouldBe(2048);
+ }
+
+ [Fact]
+ public void Short_read_counter_resets_on_full_read()
+ {
+ // 3 short reads, then a full read resets the counter — subsequent short read should not shrink
+ var b = new AdaptiveReadBuffer();
+ b.RecordRead(100); // short
+ b.RecordRead(100); // short
+ b.RecordRead(100); // short (3 total, not yet at threshold)
+ b.RecordRead(4096); // full read — doubles size and resets counter
+ b.RecordRead(512); // short (relative to new size 8192; 512 < 8192/4=2048) — only 1 consecutive
+ b.CurrentSize.ShouldBe(8192);
+ }
+
+ [Fact]
+ public void Short_read_counter_resets_on_medium_read()
+ {
+ // A medium read is >= target/4 but < target
+ // For target 4096: medium range is [1024, 4096)
+ var b = new AdaptiveReadBuffer();
+ b.RecordRead(100); // short — counter = 1
+ b.RecordRead(100); // short — counter = 2
+ b.RecordRead(100); // short — counter = 3
+ b.RecordRead(2000); // medium (>= 4096/4=1024, < 4096) — resets counter
+ b.RecordRead(100); // short — counter = 1, should not shrink
+ b.CurrentSize.ShouldBe(4096);
+ }
+
+ [Fact]
+ public void Short_read_counter_resets_after_shrink()
+ {
+ // After 4 short reads trigger a shrink, counter resets to 0
+ var b = new AdaptiveReadBuffer();
+ b.RecordRead(100); // short — counter = 1
+ b.RecordRead(100); // short — counter = 2
+ b.RecordRead(100); // short — counter = 3
+ b.RecordRead(100); // short — counter = 4 → shrinks to 2048, resets counter to 0
+ b.ConsecutiveShortReads.ShouldBe(0);
+ // One more short read should be counter = 1 (not triggering another shrink)
+ b.RecordRead(50); // short relative to 2048 (50 < 2048/4=512) — counter = 1
+ b.ConsecutiveShortReads.ShouldBe(1);
+ b.CurrentSize.ShouldBe(2048);
+ }
+
+ [Fact]
+ public void Size_never_goes_below_512()
+ {
+ // Force the buffer down to 512 then attempt to shrink further
+ var b = new AdaptiveReadBuffer();
+
+ // Drive target down to 512 via repeated shrink cycles
+ for (var i = 0; i < 4; i++) b.RecordRead(1); // target 4096 → 2048
+ for (var i = 0; i < 4; i++) b.RecordRead(1); // 2048 → 1024
+ for (var i = 0; i < 4; i++) b.RecordRead(1); // 1024 → 512
+
+ b.CurrentSize.ShouldBe(512);
+
+ // Now try to shrink again — should stay at 512
+ for (var i = 0; i < 4; i++) b.RecordRead(1);
+ b.CurrentSize.ShouldBe(512);
+ }
+
+ [Fact]
+ public void ConsecutiveShortReads_property_reflects_count()
+ {
+ var b = new AdaptiveReadBuffer();
+ b.RecordRead(100); // short — counter = 1
+ b.RecordRead(100); // short — counter = 2
+ b.ConsecutiveShortReads.ShouldBe(2);
+ }
+}