From bc8f0e63bbb9c903aa01ca8a1cae23faae59a699 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 11:31:29 -0500 Subject: [PATCH] feat: add dynamic write buffer pooling with broadcast drain (Gap 5.6) Enhances OutboundBufferPool with tiered internal pools (512/4096/65536), RentBuffer/ReturnBuffer raw-array surface, BroadcastDrain coalescing for fan-out publish, and Interlocked stats counters (RentCount, ReturnCount, BroadcastCount). Adds 10 DynamicBufferPoolTests covering all new paths. --- src/NATS.Server/IO/OutboundBufferPool.cs | 177 ++++++++++++++++- .../IO/DynamicBufferPoolTests.cs | 179 ++++++++++++++++++ 2 files changed, 351 insertions(+), 5 deletions(-) create mode 100644 tests/NATS.Server.Tests/IO/DynamicBufferPoolTests.cs diff --git a/src/NATS.Server/IO/OutboundBufferPool.cs b/src/NATS.Server/IO/OutboundBufferPool.cs index 288466d..8310431 100644 --- a/src/NATS.Server/IO/OutboundBufferPool.cs +++ b/src/NATS.Server/IO/OutboundBufferPool.cs @@ -1,15 +1,182 @@ using System.Buffers; +using System.Collections.Concurrent; namespace NATS.Server.IO; +/// +/// Tiered write buffer pool with broadcast drain capability. +/// Maintains internal pools for common sizes (512, 4096, 65536) to reduce +/// allocation overhead on the hot publish path. +/// Go reference: client.go — dynamic buffer sizing and broadcast flush coalescing for fan-out. +/// public sealed class OutboundBufferPool { + private const int SmallSize = 512; + private const int MediumSize = 4096; + private const int LargeSize = 65536; + + private readonly ConcurrentBag _small = new(); // 512 B + private readonly ConcurrentBag _medium = new(); // 4 KiB + private readonly ConcurrentBag _large = new(); // 64 KiB + + private long _rentCount; + private long _returnCount; + private long _broadcastCount; + + public long RentCount => Interlocked.Read(ref _rentCount); + public long ReturnCount => Interlocked.Read(ref _returnCount); + public long BroadcastCount => Interlocked.Read(ref _broadcastCount); + + // ----------------------------------------------------------------------- + // IMemoryOwner surface (preserves existing callers) + // ----------------------------------------------------------------------- + + /// + /// Rents an whose Memory.Length is at least + /// bytes. Tries the internal pool first; falls back to + /// . + /// public IMemoryOwner Rent(int size) { - if (size <= 512) - return MemoryPool.Shared.Rent(512); - if (size <= 4096) - return MemoryPool.Shared.Rent(4096); - return MemoryPool.Shared.Rent(64 * 1024); + Interlocked.Increment(ref _rentCount); + + // Try to serve from the internal pool so that Dispose() returns the + // raw buffer back to us rather than to MemoryPool.Shared. + if (size <= SmallSize && _small.TryTake(out var sb)) + return new PooledMemoryOwner(sb, _small); + + if (size <= MediumSize && _medium.TryTake(out var mb)) + return new PooledMemoryOwner(mb, _medium); + + if (size <= LargeSize && _large.TryTake(out var lb)) + return new PooledMemoryOwner(lb, _large); + + // Nothing cached — rent from the system pool (which may return a larger + // buffer; that's fine, callers must honour Memory.Length, not the + // requested size). + int rounded = size <= SmallSize ? SmallSize + : size <= MediumSize ? MediumSize + : LargeSize; + + return MemoryPool.Shared.Rent(rounded); + } + + // ----------------------------------------------------------------------- + // Raw byte[] surface + // ----------------------------------------------------------------------- + + /// + /// Returns a byte[] from the internal pool whose length is at least + /// bytes. The caller is responsible for calling + /// when finished. + /// + public byte[] RentBuffer(int size) + { + Interlocked.Increment(ref _rentCount); + + if (size <= SmallSize) + { + if (_small.TryTake(out var b)) return b; + return new byte[SmallSize]; + } + + if (size <= MediumSize) + { + if (_medium.TryTake(out var b)) return b; + return new byte[MediumSize]; + } + + if (_large.TryTake(out var lb)) return lb; + return new byte[LargeSize]; + } + + /// + /// Returns to the appropriate tier so it can be + /// reused by a subsequent call. + /// + public void ReturnBuffer(byte[] buffer) + { + Interlocked.Increment(ref _returnCount); + + switch (buffer.Length) + { + case SmallSize: + _small.Add(buffer); + break; + case MediumSize: + _medium.Add(buffer); + break; + case LargeSize: + _large.Add(buffer); + break; + // Buffers of unexpected sizes are simply dropped (GC reclaims them). + } + } + + // ----------------------------------------------------------------------- + // Broadcast drain + // ----------------------------------------------------------------------- + + /// + /// Coalesces multiple pending payloads into a single contiguous buffer for + /// batch writing. Copies every entry in + /// sequentially into and returns the total + /// number of bytes written. + /// + /// The caller must ensure is large enough + /// (use to pre-check). + /// + /// Go reference: client.go — broadcast flush coalescing for fan-out. + /// + public int BroadcastDrain(IReadOnlyList> pendingWrites, byte[] destination) + { + var offset = 0; + foreach (var write in pendingWrites) + { + write.Span.CopyTo(destination.AsSpan(offset)); + offset += write.Length; + } + Interlocked.Increment(ref _broadcastCount); + return offset; + } + + /// + /// Returns the total number of bytes needed to coalesce all + /// into a single buffer. + /// + public static int CalculateBroadcastSize(IReadOnlyList> pendingWrites) + { + var total = 0; + foreach (var w in pendingWrites) total += w.Length; + return total; + } + + // ----------------------------------------------------------------------- + // Inner type: pooled IMemoryOwner + // ----------------------------------------------------------------------- + + /// + /// Wraps a raw byte[] rented from an internal + /// and returns it to that bag on disposal. + /// + private sealed class PooledMemoryOwner : IMemoryOwner + { + private readonly ConcurrentBag _pool; + private byte[]? _buffer; + + public PooledMemoryOwner(byte[] buffer, ConcurrentBag pool) + { + _buffer = buffer; + _pool = pool; + } + + public Memory Memory => + _buffer is { } b ? b.AsMemory() : Memory.Empty; + + public void Dispose() + { + if (Interlocked.Exchange(ref _buffer, null) is { } b) + _pool.Add(b); + } } } diff --git a/tests/NATS.Server.Tests/IO/DynamicBufferPoolTests.cs b/tests/NATS.Server.Tests/IO/DynamicBufferPoolTests.cs new file mode 100644 index 0000000..92ca8fd --- /dev/null +++ b/tests/NATS.Server.Tests/IO/DynamicBufferPoolTests.cs @@ -0,0 +1,179 @@ +using System.Text; +using NATS.Server.IO; +using Shouldly; + +// Go reference: client.go — dynamic buffer sizing and broadcast flush coalescing for fan-out. + +namespace NATS.Server.Tests.IO; + +public class DynamicBufferPoolTests +{ + // ----------------------------------------------------------------------- + // Rent (IMemoryOwner) + // ----------------------------------------------------------------------- + + [Fact] + public void Rent_returns_buffer_of_requested_size_or_larger() + { + // Go ref: client.go — dynamic buffer sizing (512 → 65536). + var pool = new OutboundBufferPool(); + using var owner = pool.Rent(100); + owner.Memory.Length.ShouldBeGreaterThanOrEqualTo(100); + } + + // ----------------------------------------------------------------------- + // RentBuffer — tier sizing + // ----------------------------------------------------------------------- + + [Fact] + public void RentBuffer_returns_small_buffer() + { + // Go ref: client.go — initial 512 B write buffer per connection. + var pool = new OutboundBufferPool(); + var buf = pool.RentBuffer(100); + buf.Length.ShouldBeGreaterThanOrEqualTo(512); + pool.ReturnBuffer(buf); + } + + [Fact] + public void RentBuffer_returns_medium_buffer() + { + // Go ref: client.go — 4 KiB write buffer growth step. + var pool = new OutboundBufferPool(); + var buf = pool.RentBuffer(1000); + buf.Length.ShouldBeGreaterThanOrEqualTo(4096); + pool.ReturnBuffer(buf); + } + + [Fact] + public void RentBuffer_returns_large_buffer() + { + // Go ref: client.go — max 64 KiB write buffer per connection. + var pool = new OutboundBufferPool(); + var buf = pool.RentBuffer(10000); + buf.Length.ShouldBeGreaterThanOrEqualTo(65536); + pool.ReturnBuffer(buf); + } + + // ----------------------------------------------------------------------- + // ReturnBuffer + reuse + // ----------------------------------------------------------------------- + + [Fact] + public void ReturnBuffer_and_reuse() + { + // Verifies that a returned buffer is available for reuse on the next + // RentBuffer call of the same tier. + // Go ref: client.go — buffer pooling to avoid GC pressure. + var pool = new OutboundBufferPool(); + + var first = pool.RentBuffer(100); // small tier → 512 B + first.Length.ShouldBe(512); + pool.ReturnBuffer(first); + + var second = pool.RentBuffer(100); // should reuse the returned buffer + second.Length.ShouldBe(512); + // ReferenceEquals confirms the exact same array instance was reused. + ReferenceEquals(first, second).ShouldBeTrue(); + pool.ReturnBuffer(second); + } + + // ----------------------------------------------------------------------- + // BroadcastDrain — coalescing + // ----------------------------------------------------------------------- + + [Fact] + public void BroadcastDrain_coalesces_writes() + { + // Go ref: client.go — broadcast flush for fan-out publish. + var pool = new OutboundBufferPool(); + + var p1 = Encoding.UTF8.GetBytes("Hello"); + var p2 = Encoding.UTF8.GetBytes(", "); + var p3 = Encoding.UTF8.GetBytes("World"); + + IReadOnlyList> pending = + [ + p1.AsMemory(), + p2.AsMemory(), + p3.AsMemory(), + ]; + + var dest = new byte[OutboundBufferPool.CalculateBroadcastSize(pending)]; + pool.BroadcastDrain(pending, dest); + + Encoding.UTF8.GetString(dest).ShouldBe("Hello, World"); + } + + [Fact] + public void BroadcastDrain_returns_correct_byte_count() + { + // Go ref: client.go — total bytes written during coalesced drain. + var pool = new OutboundBufferPool(); + + IReadOnlyList> pending = + [ + new byte[10].AsMemory(), + new byte[20].AsMemory(), + new byte[30].AsMemory(), + ]; + + var dest = new byte[60]; + var written = pool.BroadcastDrain(pending, dest); + + written.ShouldBe(60); + } + + // ----------------------------------------------------------------------- + // CalculateBroadcastSize + // ----------------------------------------------------------------------- + + [Fact] + public void CalculateBroadcastSize_sums_all_writes() + { + // Go ref: client.go — pre-check buffer capacity before coalesced drain. + IReadOnlyList> pending = + [ + new byte[7].AsMemory(), + new byte[13].AsMemory(), + ]; + + OutboundBufferPool.CalculateBroadcastSize(pending).ShouldBe(20); + } + + // ----------------------------------------------------------------------- + // Stats counters + // ----------------------------------------------------------------------- + + [Fact] + public void RentCount_increments() + { + // Go ref: client.go — observability for buffer allocation rate. + var pool = new OutboundBufferPool(); + + pool.RentCount.ShouldBe(0L); + + using var _ = pool.Rent(100); + pool.RentBuffer(200); + + pool.RentCount.ShouldBe(2L); + } + + [Fact] + public void BroadcastCount_increments() + { + // Go ref: client.go — observability for fan-out drain operations. + var pool = new OutboundBufferPool(); + + pool.BroadcastCount.ShouldBe(0L); + + IReadOnlyList> pending = [new byte[4].AsMemory()]; + var dest = new byte[4]; + + pool.BroadcastDrain(pending, dest); + pool.BroadcastDrain(pending, dest); + pool.BroadcastDrain(pending, dest); + + pool.BroadcastCount.ShouldBe(3L); + } +}