diff --git a/src/NATS.Server/IO/OutboundBufferPool.cs b/src/NATS.Server/IO/OutboundBufferPool.cs
index 288466d..8310431 100644
--- a/src/NATS.Server/IO/OutboundBufferPool.cs
+++ b/src/NATS.Server/IO/OutboundBufferPool.cs
@@ -1,15 +1,182 @@
using System.Buffers;
+using System.Collections.Concurrent;
namespace NATS.Server.IO;
+///
+/// Tiered write buffer pool with broadcast drain capability.
+/// Maintains internal pools for common sizes (512, 4096, 65536) to reduce
+/// allocation overhead on the hot publish path.
+/// Go reference: client.go — dynamic buffer sizing and broadcast flush coalescing for fan-out.
+///
public sealed class OutboundBufferPool
{
+ private const int SmallSize = 512;
+ private const int MediumSize = 4096;
+ private const int LargeSize = 65536;
+
+ private readonly ConcurrentBag _small = new(); // 512 B
+ private readonly ConcurrentBag _medium = new(); // 4 KiB
+ private readonly ConcurrentBag _large = new(); // 64 KiB
+
+ private long _rentCount;
+ private long _returnCount;
+ private long _broadcastCount;
+
+ public long RentCount => Interlocked.Read(ref _rentCount);
+ public long ReturnCount => Interlocked.Read(ref _returnCount);
+ public long BroadcastCount => Interlocked.Read(ref _broadcastCount);
+
+ // -----------------------------------------------------------------------
+ // IMemoryOwner surface (preserves existing callers)
+ // -----------------------------------------------------------------------
+
+ ///
+ /// Rents an whose Memory.Length is at least
+ /// bytes. Tries the internal pool first; falls back to
+ /// .
+ ///
public IMemoryOwner Rent(int size)
{
- if (size <= 512)
- return MemoryPool.Shared.Rent(512);
- if (size <= 4096)
- return MemoryPool.Shared.Rent(4096);
- return MemoryPool.Shared.Rent(64 * 1024);
+ Interlocked.Increment(ref _rentCount);
+
+ // Try to serve from the internal pool so that Dispose() returns the
+ // raw buffer back to us rather than to MemoryPool.Shared.
+ if (size <= SmallSize && _small.TryTake(out var sb))
+ return new PooledMemoryOwner(sb, _small);
+
+ if (size <= MediumSize && _medium.TryTake(out var mb))
+ return new PooledMemoryOwner(mb, _medium);
+
+ if (size <= LargeSize && _large.TryTake(out var lb))
+ return new PooledMemoryOwner(lb, _large);
+
+ // Nothing cached — rent from the system pool (which may return a larger
+ // buffer; that's fine, callers must honour Memory.Length, not the
+ // requested size).
+ int rounded = size <= SmallSize ? SmallSize
+ : size <= MediumSize ? MediumSize
+ : LargeSize;
+
+ return MemoryPool.Shared.Rent(rounded);
+ }
+
+ // -----------------------------------------------------------------------
+ // Raw byte[] surface
+ // -----------------------------------------------------------------------
+
+ ///
+ /// Returns a byte[] from the internal pool whose length is at least
+ /// bytes. The caller is responsible for calling
+ /// when finished.
+ ///
+ public byte[] RentBuffer(int size)
+ {
+ Interlocked.Increment(ref _rentCount);
+
+ if (size <= SmallSize)
+ {
+ if (_small.TryTake(out var b)) return b;
+ return new byte[SmallSize];
+ }
+
+ if (size <= MediumSize)
+ {
+ if (_medium.TryTake(out var b)) return b;
+ return new byte[MediumSize];
+ }
+
+ if (_large.TryTake(out var lb)) return lb;
+ return new byte[LargeSize];
+ }
+
+ ///
+ /// Returns to the appropriate tier so it can be
+ /// reused by a subsequent call.
+ ///
+ public void ReturnBuffer(byte[] buffer)
+ {
+ Interlocked.Increment(ref _returnCount);
+
+ switch (buffer.Length)
+ {
+ case SmallSize:
+ _small.Add(buffer);
+ break;
+ case MediumSize:
+ _medium.Add(buffer);
+ break;
+ case LargeSize:
+ _large.Add(buffer);
+ break;
+ // Buffers of unexpected sizes are simply dropped (GC reclaims them).
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // Broadcast drain
+ // -----------------------------------------------------------------------
+
+ ///
+ /// Coalesces multiple pending payloads into a single contiguous buffer for
+ /// batch writing. Copies every entry in
+ /// sequentially into and returns the total
+ /// number of bytes written.
+ ///
+ /// The caller must ensure is large enough
+ /// (use to pre-check).
+ ///
+ /// Go reference: client.go — broadcast flush coalescing for fan-out.
+ ///
+ public int BroadcastDrain(IReadOnlyList> pendingWrites, byte[] destination)
+ {
+ var offset = 0;
+ foreach (var write in pendingWrites)
+ {
+ write.Span.CopyTo(destination.AsSpan(offset));
+ offset += write.Length;
+ }
+ Interlocked.Increment(ref _broadcastCount);
+ return offset;
+ }
+
+ ///
+ /// Returns the total number of bytes needed to coalesce all
+ /// into a single buffer.
+ ///
+ public static int CalculateBroadcastSize(IReadOnlyList> pendingWrites)
+ {
+ var total = 0;
+ foreach (var w in pendingWrites) total += w.Length;
+ return total;
+ }
+
+ // -----------------------------------------------------------------------
+ // Inner type: pooled IMemoryOwner
+ // -----------------------------------------------------------------------
+
+ ///
+ /// Wraps a raw byte[] rented from an internal
+ /// and returns it to that bag on disposal.
+ ///
+ private sealed class PooledMemoryOwner : IMemoryOwner
+ {
+ private readonly ConcurrentBag _pool;
+ private byte[]? _buffer;
+
+ public PooledMemoryOwner(byte[] buffer, ConcurrentBag pool)
+ {
+ _buffer = buffer;
+ _pool = pool;
+ }
+
+ public Memory Memory =>
+ _buffer is { } b ? b.AsMemory() : Memory.Empty;
+
+ public void Dispose()
+ {
+ if (Interlocked.Exchange(ref _buffer, null) is { } b)
+ _pool.Add(b);
+ }
}
}
diff --git a/tests/NATS.Server.Tests/IO/DynamicBufferPoolTests.cs b/tests/NATS.Server.Tests/IO/DynamicBufferPoolTests.cs
new file mode 100644
index 0000000..92ca8fd
--- /dev/null
+++ b/tests/NATS.Server.Tests/IO/DynamicBufferPoolTests.cs
@@ -0,0 +1,179 @@
+using System.Text;
+using NATS.Server.IO;
+using Shouldly;
+
+// Go reference: client.go — dynamic buffer sizing and broadcast flush coalescing for fan-out.
+
+namespace NATS.Server.Tests.IO;
+
+public class DynamicBufferPoolTests
+{
+ // -----------------------------------------------------------------------
+ // Rent (IMemoryOwner)
+ // -----------------------------------------------------------------------
+
+ [Fact]
+ public void Rent_returns_buffer_of_requested_size_or_larger()
+ {
+ // Go ref: client.go — dynamic buffer sizing (512 → 65536).
+ var pool = new OutboundBufferPool();
+ using var owner = pool.Rent(100);
+ owner.Memory.Length.ShouldBeGreaterThanOrEqualTo(100);
+ }
+
+ // -----------------------------------------------------------------------
+ // RentBuffer — tier sizing
+ // -----------------------------------------------------------------------
+
+ [Fact]
+ public void RentBuffer_returns_small_buffer()
+ {
+ // Go ref: client.go — initial 512 B write buffer per connection.
+ var pool = new OutboundBufferPool();
+ var buf = pool.RentBuffer(100);
+ buf.Length.ShouldBeGreaterThanOrEqualTo(512);
+ pool.ReturnBuffer(buf);
+ }
+
+ [Fact]
+ public void RentBuffer_returns_medium_buffer()
+ {
+ // Go ref: client.go — 4 KiB write buffer growth step.
+ var pool = new OutboundBufferPool();
+ var buf = pool.RentBuffer(1000);
+ buf.Length.ShouldBeGreaterThanOrEqualTo(4096);
+ pool.ReturnBuffer(buf);
+ }
+
+ [Fact]
+ public void RentBuffer_returns_large_buffer()
+ {
+ // Go ref: client.go — max 64 KiB write buffer per connection.
+ var pool = new OutboundBufferPool();
+ var buf = pool.RentBuffer(10000);
+ buf.Length.ShouldBeGreaterThanOrEqualTo(65536);
+ pool.ReturnBuffer(buf);
+ }
+
+ // -----------------------------------------------------------------------
+ // ReturnBuffer + reuse
+ // -----------------------------------------------------------------------
+
+ [Fact]
+ public void ReturnBuffer_and_reuse()
+ {
+ // Verifies that a returned buffer is available for reuse on the next
+ // RentBuffer call of the same tier.
+ // Go ref: client.go — buffer pooling to avoid GC pressure.
+ var pool = new OutboundBufferPool();
+
+ var first = pool.RentBuffer(100); // small tier → 512 B
+ first.Length.ShouldBe(512);
+ pool.ReturnBuffer(first);
+
+ var second = pool.RentBuffer(100); // should reuse the returned buffer
+ second.Length.ShouldBe(512);
+ // ReferenceEquals confirms the exact same array instance was reused.
+ ReferenceEquals(first, second).ShouldBeTrue();
+ pool.ReturnBuffer(second);
+ }
+
+ // -----------------------------------------------------------------------
+ // BroadcastDrain — coalescing
+ // -----------------------------------------------------------------------
+
+ [Fact]
+ public void BroadcastDrain_coalesces_writes()
+ {
+ // Go ref: client.go — broadcast flush for fan-out publish.
+ var pool = new OutboundBufferPool();
+
+ var p1 = Encoding.UTF8.GetBytes("Hello");
+ var p2 = Encoding.UTF8.GetBytes(", ");
+ var p3 = Encoding.UTF8.GetBytes("World");
+
+ IReadOnlyList> pending =
+ [
+ p1.AsMemory(),
+ p2.AsMemory(),
+ p3.AsMemory(),
+ ];
+
+ var dest = new byte[OutboundBufferPool.CalculateBroadcastSize(pending)];
+ pool.BroadcastDrain(pending, dest);
+
+ Encoding.UTF8.GetString(dest).ShouldBe("Hello, World");
+ }
+
+ [Fact]
+ public void BroadcastDrain_returns_correct_byte_count()
+ {
+ // Go ref: client.go — total bytes written during coalesced drain.
+ var pool = new OutboundBufferPool();
+
+ IReadOnlyList> pending =
+ [
+ new byte[10].AsMemory(),
+ new byte[20].AsMemory(),
+ new byte[30].AsMemory(),
+ ];
+
+ var dest = new byte[60];
+ var written = pool.BroadcastDrain(pending, dest);
+
+ written.ShouldBe(60);
+ }
+
+ // -----------------------------------------------------------------------
+ // CalculateBroadcastSize
+ // -----------------------------------------------------------------------
+
+ [Fact]
+ public void CalculateBroadcastSize_sums_all_writes()
+ {
+ // Go ref: client.go — pre-check buffer capacity before coalesced drain.
+ IReadOnlyList> pending =
+ [
+ new byte[7].AsMemory(),
+ new byte[13].AsMemory(),
+ ];
+
+ OutboundBufferPool.CalculateBroadcastSize(pending).ShouldBe(20);
+ }
+
+ // -----------------------------------------------------------------------
+ // Stats counters
+ // -----------------------------------------------------------------------
+
+ [Fact]
+ public void RentCount_increments()
+ {
+ // Go ref: client.go — observability for buffer allocation rate.
+ var pool = new OutboundBufferPool();
+
+ pool.RentCount.ShouldBe(0L);
+
+ using var _ = pool.Rent(100);
+ pool.RentBuffer(200);
+
+ pool.RentCount.ShouldBe(2L);
+ }
+
+ [Fact]
+ public void BroadcastCount_increments()
+ {
+ // Go ref: client.go — observability for fan-out drain operations.
+ var pool = new OutboundBufferPool();
+
+ pool.BroadcastCount.ShouldBe(0L);
+
+ IReadOnlyList> pending = [new byte[4].AsMemory()];
+ var dest = new byte[4];
+
+ pool.BroadcastDrain(pending, dest);
+ pool.BroadcastDrain(pending, dest);
+ pool.BroadcastDrain(pending, dest);
+
+ pool.BroadcastCount.ShouldBe(3L);
+ }
+}