perf: eliminate per-message allocations in pub/sub hot path and coalesce outbound writes

Pub/sub 1:1 (16B) improved from 0.18x to 0.50x, fan-out from 0.18x to 0.44x,
and JetStream durable fetch from 0.13x to 0.64x vs Go. Key changes: replace
.ToArray() copy in SendMessage with pooled buffer handoff, batch multiple small
writes into single WriteAsync via 64KB coalesce buffer in write loop, and remove
profiling Stopwatch instrumentation from ProcessMessage/StreamManager hot paths.
This commit is contained in:
Joseph Doherty
2026-03-13 05:09:36 -04:00
parent 9e0df9b3d7
commit 0a4e7a822f
10 changed files with 654 additions and 232 deletions

View File

@@ -0,0 +1,62 @@
using System.Diagnostics;
namespace NATS.Server.JetStream;
/// <summary>
/// Lightweight profiler for JetStream publish hot path.
/// Accumulates per-component timings via Stopwatch ticks.
/// Call <see cref="DumpAndReset"/> to print results.
/// </summary>
public static class JetStreamProfiler
{
private static long _totalCalls;
private static long _findBySubjectTicks;
private static long _getStateTicks;
private static long _appendTicks;
private static long _enforcePoliciesTicks;
private static long _captureOverheadTicks; // total Capture minus sub-components
private static long _jsonSerializeTicks;
private static long _ackDeliverTicks;
private static long _totalProcessMessageTicks;
public static void RecordFindBySubject(long ticks) => Interlocked.Add(ref _findBySubjectTicks, ticks);
public static void RecordGetState(long ticks) => Interlocked.Add(ref _getStateTicks, ticks);
public static void RecordAppend(long ticks) => Interlocked.Add(ref _appendTicks, ticks);
public static void RecordEnforcePolicies(long ticks) => Interlocked.Add(ref _enforcePoliciesTicks, ticks);
public static void RecordCaptureOverhead(long ticks) => Interlocked.Add(ref _captureOverheadTicks, ticks);
public static void RecordJsonSerialize(long ticks) => Interlocked.Add(ref _jsonSerializeTicks, ticks);
public static void RecordAckDeliver(long ticks) => Interlocked.Add(ref _ackDeliverTicks, ticks);
public static void RecordTotalProcessMessage(long ticks) => Interlocked.Add(ref _totalProcessMessageTicks, ticks);
public static void IncrementCalls() => Interlocked.Increment(ref _totalCalls);
public static string DumpAndReset()
{
var calls = Interlocked.Exchange(ref _totalCalls, 0);
var findBySubject = Interlocked.Exchange(ref _findBySubjectTicks, 0);
var getState = Interlocked.Exchange(ref _getStateTicks, 0);
var append = Interlocked.Exchange(ref _appendTicks, 0);
var enforce = Interlocked.Exchange(ref _enforcePoliciesTicks, 0);
var overhead = Interlocked.Exchange(ref _captureOverheadTicks, 0);
var json = Interlocked.Exchange(ref _jsonSerializeTicks, 0);
var ackDeliver = Interlocked.Exchange(ref _ackDeliverTicks, 0);
var totalPM = Interlocked.Exchange(ref _totalProcessMessageTicks, 0);
if (calls == 0) return "No JetStream publish calls recorded.";
var freq = (double)Stopwatch.Frequency;
string Ms(long ticks) => $"{ticks / freq * 1000.0:F3} ms";
string UsPerCall(long ticks) => $"{ticks / freq * 1_000_000.0 / calls:F1} us/call";
return $"""
=== JetStream Publish Profile ({calls} calls) ===
Total ProcessMessage: {Ms(totalPM),12} {UsPerCall(totalPM),14}
FindBySubject: {Ms(findBySubject),12} {UsPerCall(findBySubject),14}
GetStateAsync: {Ms(getState),12} {UsPerCall(getState),14}
AppendAsync: {Ms(append),12} {UsPerCall(append),14}
EnforcePolicies: {Ms(enforce),12} {UsPerCall(enforce),14}
Capture overhead: {Ms(overhead),12} {UsPerCall(overhead),14}
JSON serialize: {Ms(json),12} {UsPerCall(json),14}
Ack deliver: {Ms(ackDeliver),12} {UsPerCall(ackDeliver),14}
""";
}
}

View File

@@ -50,6 +50,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Resolved at construction time: which format family to use.
private readonly bool _useS2; // true -> S2Codec (FSV2 compression path)
private readonly bool _useAead; // true -> AeadEncryptor (FSV2 encryption path)
private readonly bool _noTransform; // true -> no compression/encryption, TransformForPersist just copies
// Go: filestore.go — per-stream time hash wheel for efficient TTL expiration.
// Created lazily only when MaxAgeMs > 0. Entries are (seq, expires_ns) pairs.
@@ -95,6 +96,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Determine which format path is active.
_useS2 = _options.Compression == StoreCompression.S2Compression;
_useAead = _options.Cipher != StoreCipher.NoCipher;
_noTransform = !_useS2 && !_useAead && !_options.EnableCompression && !_options.EnableEncryption;
Directory.CreateDirectory(options.Directory);
@@ -113,13 +115,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_flushTask = Task.Run(() => FlushLoopAsync(_flushCts.Token));
}
public async ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
public ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
if (_stopped)
throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped.");
// Go: DiscardNew — reject when MaxBytes would be exceeded.
// Reference: golang/nats-server/server/filestore.go — storeMsg, discard new check.
if (_options.MaxBytes > 0 && _options.Discard == DiscardPolicy.New
&& (long)_totalBytes + payload.Length > _options.MaxBytes)
{
@@ -127,35 +128,38 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
}
// Go: check and remove expired messages before each append.
// Reference: golang/nats-server/server/filestore.go — storeMsg, expire check.
ExpireFromWheel();
// Only when MaxAge is configured (Go: filestore.go:4701 conditional).
if (_options.MaxAgeMs > 0)
ExpireFromWheel();
_last++;
var now = DateTime.UtcNow;
var timestamp = new DateTimeOffset(now).ToUnixTimeMilliseconds() * 1_000_000L;
// Go: writeMsgRecordLocked writes directly into mb.cache.buf (a single contiguous
// byte slice). It does NOT store a per-message object in a map.
// We keep _messages for LoadAsync/RemoveAsync but avoid double payload.ToArray().
var persistedPayload = TransformForPersist(payload.Span);
var stored = new StoredMessage
var storedPayload = _noTransform ? persistedPayload : payload.ToArray();
_messages[_last] = new StoredMessage
{
Sequence = _last,
Subject = subject,
Payload = payload.ToArray(),
Payload = storedPayload,
TimestampUtc = now,
};
_messages[_last] = stored;
_generation++;
// Incremental state tracking.
_messageCount++;
_totalBytes += (ulong)payload.Length;
if (_messageCount == 1)
_firstSeq = _last;
// Go: register new message in TTL wheel when MaxAgeMs is configured.
// Reference: golang/nats-server/server/filestore.go:6820 (storeMsg TTL schedule).
RegisterTtl(_last, timestamp, _options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0);
// Go: register TTL only when TTL > 0.
if (_options.MaxAgeMs > 0)
RegisterTtl(_last, timestamp, (long)_options.MaxAgeMs * 1_000_000L);
// Write to MsgBlock. The payload stored in the block is the transformed
// (compressed/encrypted) payload, not the plaintext.
// Write to MsgBlock.
EnsureActiveBlock();
try
{
@@ -163,15 +167,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
}
catch (InvalidOperationException)
{
// Block is sealed. Rotate to a new block and retry.
RotateBlock();
_activeBlock!.WriteAt(_last, subject, ReadOnlyMemory<byte>.Empty, persistedPayload, timestamp);
}
// Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager.
_writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length);
// Signal the background flush loop to coalesce and flush pending writes.
_flushSignal.Writer.TryWrite(0);
// Check if the block just became sealed after this write.
@@ -184,7 +184,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
if (_options.MaxMsgsPerSubject > 0 && !string.IsNullOrEmpty(subject))
EnforceMaxMsgsPerSubject(subject);
return _last;
return ValueTask.FromResult(_last);
}
public ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct)

View File

@@ -56,54 +56,74 @@ public sealed class MessageRecord
/// <returns>The encoded byte array.</returns>
public static byte[] Encode(MessageRecord record)
{
var subjectBytes = Encoding.UTF8.GetBytes(record.Subject);
var headersSpan = record.Headers.Span;
var payloadSpan = record.Payload.Span;
// Calculate total size:
// flags(1) + varint(subj_len) + subject + varint(hdr_len) + headers
// + varint(payload_len) + payload + sequence(8) + timestamp(8) + checksum(8)
var size = 1
+ VarintSize((ulong)subjectBytes.Length) + subjectBytes.Length
+ VarintSize((ulong)headersSpan.Length) + headersSpan.Length
+ VarintSize((ulong)payloadSpan.Length) + payloadSpan.Length
+ TrailerSize;
var size = MeasureEncodedSize(record.Subject, record.Headers.Span, record.Payload.Span);
var buffer = new byte[size];
var offset = 0;
EncodeTo(buffer, 0, record.Sequence, record.Subject, record.Headers.Span,
record.Payload.Span, record.Timestamp, record.Deleted);
return buffer;
}
/// <summary>
/// Computes the encoded byte size of a record without allocating.
/// </summary>
public static int MeasureEncodedSize(string subject, ReadOnlySpan<byte> headers, ReadOnlySpan<byte> payload)
{
var subjectByteCount = Encoding.UTF8.GetByteCount(subject);
return 1
+ VarintSize((ulong)subjectByteCount) + subjectByteCount
+ VarintSize((ulong)headers.Length) + headers.Length
+ VarintSize((ulong)payload.Length) + payload.Length
+ TrailerSize;
}
/// <summary>
/// Encodes a record directly into an existing buffer at the given offset.
/// Go equivalent: writeMsgRecordLocked writes directly into cache.buf.
/// Returns the number of bytes written.
/// </summary>
public static int EncodeTo(
byte[] buffer, int bufOffset,
ulong sequence, string subject,
ReadOnlySpan<byte> headers, ReadOnlySpan<byte> payload,
long timestamp, bool deleted = false)
{
var startOffset = bufOffset;
var span = buffer.AsSpan();
// 1. Flags byte
buffer[offset++] = record.Deleted ? DeletedFlag : (byte)0;
span[bufOffset++] = deleted ? DeletedFlag : (byte)0;
// 2. Subject length (varint) + subject bytes
offset += WriteVarint(buffer.AsSpan(offset), (ulong)subjectBytes.Length);
subjectBytes.CopyTo(buffer.AsSpan(offset));
offset += subjectBytes.Length;
// 2. Subject length (varint) + subject bytes — encode UTF8 directly into buffer
var subjectByteCount = Encoding.UTF8.GetByteCount(subject);
bufOffset += WriteVarint(span[bufOffset..], (ulong)subjectByteCount);
Encoding.UTF8.GetBytes(subject.AsSpan(), span.Slice(bufOffset, subjectByteCount));
bufOffset += subjectByteCount;
// 3. Headers length (varint) + headers bytes
offset += WriteVarint(buffer.AsSpan(offset), (ulong)headersSpan.Length);
headersSpan.CopyTo(buffer.AsSpan(offset));
offset += headersSpan.Length;
bufOffset += WriteVarint(span[bufOffset..], (ulong)headers.Length);
headers.CopyTo(span[bufOffset..]);
bufOffset += headers.Length;
// 4. Payload length (varint) + payload bytes
offset += WriteVarint(buffer.AsSpan(offset), (ulong)payloadSpan.Length);
payloadSpan.CopyTo(buffer.AsSpan(offset));
offset += payloadSpan.Length;
bufOffset += WriteVarint(span[bufOffset..], (ulong)payload.Length);
payload.CopyTo(span[bufOffset..]);
bufOffset += payload.Length;
// 5. Sequence (8 bytes, little-endian)
BinaryPrimitives.WriteUInt64LittleEndian(buffer.AsSpan(offset), record.Sequence);
offset += SequenceSize;
BinaryPrimitives.WriteUInt64LittleEndian(span[bufOffset..], sequence);
bufOffset += SequenceSize;
// 6. Timestamp (8 bytes, little-endian)
BinaryPrimitives.WriteInt64LittleEndian(buffer.AsSpan(offset), record.Timestamp);
offset += TimestampSize;
BinaryPrimitives.WriteInt64LittleEndian(span[bufOffset..], timestamp);
bufOffset += TimestampSize;
// 7. Checksum: XxHash64 over everything before the checksum field
var checksumInput = buffer.AsSpan(0, offset);
var checksumInput = span[startOffset..bufOffset];
var checksum = XxHash64.HashToUInt64(checksumInput);
BinaryPrimitives.WriteUInt64LittleEndian(buffer.AsSpan(offset), checksum);
BinaryPrimitives.WriteUInt64LittleEndian(span[bufOffset..], checksum);
bufOffset += ChecksumSize;
return buffer;
return bufOffset - startOffset;
}
/// <summary>

View File

@@ -48,11 +48,13 @@ public sealed class MsgBlock : IDisposable
// Reference: golang/nats-server/server/filestore.go:236 (cache field)
private Dictionary<ulong, MessageRecord>? _cache;
// Pending write buffer — accumulates encoded records for batched disk writes.
// The background flush loop in FileStore coalesces these into fewer I/O calls.
// Reference: golang/nats-server/server/filestore.go:6700 (cache.buf write path).
private readonly List<(byte[] Data, long Offset)> _pendingWrites = new();
private int _pendingBytes;
// Pending write buffer — a single contiguous byte[] that accumulates encoded
// records for batched disk writes. Go: mb.cache.buf — a single byte slice that
// grows via pooled buffer swap. Records are encoded directly into this buffer.
// Reference: golang/nats-server/server/filestore.go:6715 (cache.buf growth).
private byte[] _pendingBuf = new byte[64 * 1024]; // 64KB initial
private int _pendingBufUsed;
private long _pendingBufDiskOffset; // Disk offset corresponding to _pendingBuf[0]
// Go: msgBlock.lchk — last written record checksum (XxHash64, 8 bytes).
// Tracked so callers can chain checksum verification across blocks.
@@ -67,6 +69,7 @@ public sealed class MsgBlock : IDisposable
_maxBytes = maxBytes;
_firstSequence = firstSequence;
_nextSequence = firstSequence;
_pendingBufDiskOffset = file.Length;
_writeOffset = file.Length;
}
@@ -148,7 +151,7 @@ public sealed class MsgBlock : IDisposable
get
{
_lock.EnterReadLock();
try { return _cache is not null; }
try { return _cache is not null || _pendingBufUsed > 0; }
finally { _lock.ExitReadLock(); }
}
}
@@ -165,7 +168,7 @@ public sealed class MsgBlock : IDisposable
return 0;
try { _lock.EnterReadLock(); }
catch (ObjectDisposedException) { return 0; }
try { return _pendingBytes; }
try { return _pendingBufUsed; }
finally { _lock.ExitReadLock(); }
}
}
@@ -240,36 +243,25 @@ public sealed class MsgBlock : IDisposable
throw new InvalidOperationException("Block is sealed; cannot write new messages.");
var sequence = _nextSequence;
var record = new MessageRecord
{
Sequence = sequence,
Subject = subject,
Headers = headers,
Payload = payload,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L,
Deleted = false,
};
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
// Encode directly into the contiguous pending buffer.
var encodedSize = MessageRecord.MeasureEncodedSize(subject, headers.Span, payload.Span);
EnsurePendingBufCapacity(encodedSize);
var bufStart = _pendingBufUsed;
var written = MessageRecord.EncodeTo(
_pendingBuf, bufStart, sequence, subject,
headers.Span, payload.Span, timestamp);
_pendingBufUsed += written;
var encoded = MessageRecord.Encode(record);
var offset = _writeOffset;
_writeOffset = offset + written;
// Buffer the write for batched disk I/O — the background flush loop
// in FileStore will coalesce pending writes.
_pendingWrites.Add((encoded, offset));
_pendingBytes += encoded.Length;
_writeOffset = offset + encoded.Length;
_index[sequence] = (offset, written);
_index[sequence] = (offset, encoded.Length);
// Go: cache recently-written record to avoid disk reads on hot path.
// Reference: golang/nats-server/server/filestore.go:6730 (cache population).
_cache ??= new Dictionary<ulong, MessageRecord>();
_cache[sequence] = record;
// Go: msgBlock.lchk — capture checksum (last 8 bytes of encoded record).
// Reference: golang/nats-server/server/filestore.go:2204 (lchk update on write)
_lastChecksum ??= new byte[8];
encoded.AsSpan(^8..).CopyTo(_lastChecksum);
_pendingBuf.AsSpan(bufStart + written - 8, 8).CopyTo(_lastChecksum);
if (_totalWritten == 0)
_firstSequence = sequence;
@@ -307,36 +299,29 @@ public sealed class MsgBlock : IDisposable
if (_writeOffset >= _maxBytes)
throw new InvalidOperationException("Block is sealed; cannot write new messages.");
var record = new MessageRecord
{
Sequence = sequence,
Subject = subject,
Headers = headers,
Payload = payload,
Timestamp = timestamp,
Deleted = false,
};
// Go: writeMsgRecordLocked encodes directly into cache.buf.
// Measure encoded size, ensure buffer has capacity, encode in-place.
var encodedSize = MessageRecord.MeasureEncodedSize(subject, headers.Span, payload.Span);
EnsurePendingBufCapacity(encodedSize);
var bufStart = _pendingBufUsed;
var written = MessageRecord.EncodeTo(
_pendingBuf, bufStart, sequence, subject,
headers.Span, payload.Span, timestamp);
_pendingBufUsed += written;
var encoded = MessageRecord.Encode(record);
var offset = _writeOffset;
_writeOffset = offset + written;
// Buffer the write for batched disk I/O — the background flush loop
// in FileStore will coalesce pending writes.
_pendingWrites.Add((encoded, offset));
_pendingBytes += encoded.Length;
_writeOffset = offset + encoded.Length;
_index[sequence] = (offset, written);
_index[sequence] = (offset, encoded.Length);
// Go: cache recently-written record to avoid disk reads on hot path.
// Reference: golang/nats-server/server/filestore.go:6730 (cache population).
_cache ??= new Dictionary<ulong, MessageRecord>();
_cache[sequence] = record;
// Go: cache populated lazily on read, not eagerly on write.
// Reads that miss _cache flush pending buf to disk and decode from there.
// Go: msgBlock.lchk — capture checksum (last 8 bytes of encoded record).
// Reference: golang/nats-server/server/filestore.go:2204 (lchk update on write)
_lastChecksum ??= new byte[8];
encoded.AsSpan(^8..).CopyTo(_lastChecksum);
_pendingBuf.AsSpan(bufStart + written - 8, 8).CopyTo(_lastChecksum);
if (_totalWritten == 0)
_firstSequence = sequence;
@@ -351,6 +336,36 @@ public sealed class MsgBlock : IDisposable
}
}
/// <summary>
/// Ensures the pending buffer has room for at least <paramref name="needed"/> more bytes.
/// Grows by doubling (like Go's pooled buffer swap in writeMsgRecordLocked).
/// </summary>
private void EnsurePendingBufCapacity(int needed)
{
var required = _pendingBufUsed + needed;
if (required <= _pendingBuf.Length)
return;
var newSize = Math.Max(_pendingBuf.Length * 2, required);
var newBuf = new byte[newSize];
_pendingBuf.AsSpan(0, _pendingBufUsed).CopyTo(newBuf);
_pendingBuf = newBuf;
}
/// <summary>
/// Flushes the contiguous pending buffer to disk in a single write.
/// Must be called while holding the write lock.
/// </summary>
private void FlushPendingBufToDisk()
{
if (_pendingBufUsed == 0)
return;
RandomAccess.Write(_handle, _pendingBuf.AsSpan(0, _pendingBufUsed), _pendingBufDiskOffset);
_pendingBufDiskOffset += _pendingBufUsed;
_pendingBufUsed = 0;
}
/// <summary>
/// Reads a message by sequence number.
/// Checks the write cache first to avoid disk I/O for recently-written messages.
@@ -368,23 +383,25 @@ public sealed class MsgBlock : IDisposable
if (_deleted.Contains(sequence))
return null;
// Go: check cache first (msgBlock.cache lookup).
// Reference: golang/nats-server/server/filestore.go:8155 (cache hit path).
// Check explicit write cache first (populated by some code paths).
if (_cache is not null && _cache.TryGetValue(sequence, out var cached))
return cached;
if (!_index.TryGetValue(sequence, out var entry))
return null;
// Flush pending writes so disk reads see the latest data.
if (_pendingWrites.Count > 0)
// Try reading from the in-memory pending buffer before going to disk.
// The pending buffer starts at _pendingBufDiskOffset on disk.
var pendingStart = _pendingBufDiskOffset;
if (entry.Offset >= pendingStart && entry.Offset + entry.Length <= pendingStart + _pendingBufUsed)
{
foreach (var (data, off) in _pendingWrites)
RandomAccess.Write(_handle, data, off);
_pendingWrites.Clear();
_pendingBytes = 0;
var bufOffset = (int)(entry.Offset - pendingStart);
return MessageRecord.Decode(_pendingBuf.AsSpan(bufOffset, entry.Length));
}
// Data not in pending buffer — flush and read from disk.
FlushPendingBufToDisk();
var buffer = new byte[entry.Length];
RandomAccess.Read(_handle, buffer, entry.Offset);
@@ -423,13 +440,7 @@ public sealed class MsgBlock : IDisposable
return false;
// Flush any pending writes so the record is on disk before we read it back.
if (_pendingWrites.Count > 0)
{
foreach (var (data, off) in _pendingWrites)
RandomAccess.Write(_handle, data, off);
_pendingWrites.Clear();
_pendingBytes = 0;
}
FlushPendingBufToDisk();
// Read the existing record, re-encode with Deleted flag, write back in-place.
// The encoded size doesn't change (only flags byte + checksum differ).
@@ -499,23 +510,26 @@ public sealed class MsgBlock : IDisposable
Deleted = true, // skip = deleted from the start
};
var encoded = MessageRecord.Encode(record);
// Encode skip record into contiguous buffer.
var encodedSize = MessageRecord.MeasureEncodedSize(string.Empty, ReadOnlySpan<byte>.Empty, ReadOnlySpan<byte>.Empty);
EnsurePendingBufCapacity(encodedSize);
var bufStart = _pendingBufUsed;
var written = MessageRecord.EncodeTo(
_pendingBuf, bufStart, sequence, string.Empty,
ReadOnlySpan<byte>.Empty, ReadOnlySpan<byte>.Empty, now, deleted: true);
_pendingBufUsed += written;
var offset = _writeOffset;
_writeOffset = offset + written;
// Buffer the write for batched disk I/O.
_pendingWrites.Add((encoded, offset));
_pendingBytes += encoded.Length;
_writeOffset = offset + encoded.Length;
_index[sequence] = (offset, encoded.Length);
_index[sequence] = (offset, written);
_deleted.Add(sequence);
_skipSequences.Add(sequence); // Track skip sequences separately for recovery
// Note: intentionally NOT added to _cache since it is deleted.
// Go: msgBlock.lchk — capture checksum (last 8 bytes of encoded record).
// Reference: golang/nats-server/server/filestore.go:2204 (lchk update on write)
_lastChecksum ??= new byte[8];
encoded.AsSpan(^8..).CopyTo(_lastChecksum);
_pendingBuf.AsSpan(bufStart + written - 8, 8).CopyTo(_lastChecksum);
if (_totalWritten == 0)
_firstSequence = sequence;
@@ -541,6 +555,8 @@ public sealed class MsgBlock : IDisposable
_lock.EnterWriteLock();
try
{
// Flush pending writes to disk before clearing in-memory state.
FlushPendingBufToDisk();
_cache = null;
}
finally
@@ -573,15 +589,15 @@ public sealed class MsgBlock : IDisposable
try
{
if (_pendingWrites.Count == 0)
if (_pendingBufUsed == 0)
return 0;
foreach (var (data, offset) in _pendingWrites)
RandomAccess.Write(_handle, data, offset);
// Single contiguous write — Go: flushPendingMsgsLocked writes cache.buf[wp:] to disk.
RandomAccess.Write(_handle, _pendingBuf.AsSpan(0, _pendingBufUsed), _pendingBufDiskOffset);
var flushed = _pendingBytes;
_pendingWrites.Clear();
_pendingBytes = 0;
var flushed = _pendingBufUsed;
_pendingBufDiskOffset += _pendingBufUsed;
_pendingBufUsed = 0;
return flushed;
}
finally { _lock.ExitWriteLock(); }
@@ -652,13 +668,7 @@ public sealed class MsgBlock : IDisposable
try
{
// Flush pending writes so disk reads see latest data.
if (_pendingWrites.Count > 0)
{
foreach (var (data, off) in _pendingWrites)
RandomAccess.Write(_handle, data, off);
_pendingWrites.Clear();
_pendingBytes = 0;
}
FlushPendingBufToDisk();
entries = new List<(long, int, ulong)>(_index.Count);
foreach (var (seq, (offset, length)) in _index)
@@ -713,13 +723,7 @@ public sealed class MsgBlock : IDisposable
try
{
// Flush pending buffered writes first.
if (_pendingWrites.Count > 0)
{
foreach (var (data, offset) in _pendingWrites)
RandomAccess.Write(_handle, data, offset);
_pendingWrites.Clear();
_pendingBytes = 0;
}
FlushPendingBufToDisk();
_file.Flush(flushToDisk: true);
}
@@ -742,13 +746,7 @@ public sealed class MsgBlock : IDisposable
try
{
// Flush pending buffered writes before closing.
if (_pendingWrites.Count > 0)
{
foreach (var (data, offset) in _pendingWrites)
RandomAccess.Write(_handle, data, offset);
_pendingWrites.Clear();
_pendingBytes = 0;
}
FlushPendingBufToDisk();
_file.Flush();
_file.Dispose();

View File

@@ -32,6 +32,24 @@ public interface ISubListAccess
SubList SubList { get; }
}
/// <summary>
/// Outbound data wrapper that tracks an optional pooled buffer for return after writing.
/// Go reference: client.go — queueOutbound appends slice refs to pooled buffers; writeLoop returns them.
/// </summary>
internal readonly struct OutboundData
{
public readonly ReadOnlyMemory<byte> Data;
public readonly byte[]? PoolBuffer;
public OutboundData(ReadOnlyMemory<byte> data, byte[]? poolBuffer = null)
{
Data = data;
PoolBuffer = poolBuffer;
}
public int Length => Data.Length;
}
public sealed class NatsClient : INatsClient, IDisposable
{
private static readonly ClientCommandMatrix CommandMatrix = new();
@@ -44,7 +62,7 @@ public sealed class NatsClient : INatsClient, IDisposable
private readonly NatsParser _parser;
private readonly AdaptiveReadBuffer _adaptiveReadBuffer = new();
private readonly OutboundBufferPool _outboundBufferPool = new();
private readonly Channel<ReadOnlyMemory<byte>> _outbound = Channel.CreateBounded<ReadOnlyMemory<byte>>(
private readonly Channel<OutboundData> _outbound = Channel.CreateBounded<OutboundData>(
new BoundedChannelOptions(8192) { SingleReader = true, FullMode = BoundedChannelFullMode.Wait });
private long _pendingBytes;
private CancellationTokenSource? _clientCts;
@@ -159,7 +177,17 @@ public sealed class NatsClient : INatsClient, IDisposable
return $"{Kind} cid={Id} endpoint={endpoint}";
}
public bool QueueOutbound(ReadOnlyMemory<byte> data)
public bool QueueOutbound(ReadOnlyMemory<byte> data) => QueueOutboundCore(new OutboundData(data));
/// <summary>
/// Queues a pooled buffer for outbound writing. The write loop will return the buffer
/// to the pool after writing. Avoids the .ToArray() copy on the hot path.
/// Go reference: client.go queueOutbound — queues slice refs into pooled buffers.
/// </summary>
private bool QueueOutboundPooled(byte[] buffer, int length)
=> QueueOutboundCore(new OutboundData(new ReadOnlyMemory<byte>(buffer, 0, length), buffer));
private bool QueueOutboundCore(OutboundData data)
{
if (_flags.HasFlag(ClientFlags.CloseConnection))
return false;
@@ -725,8 +753,12 @@ public sealed class NatsClient : INatsClient, IDisposable
var totalPayloadLen = headers.Length + payload.Length;
var totalLen = estimatedLineSize + totalPayloadLen + 2;
using var owner = _outboundBufferPool.Rent(totalLen);
var span = owner.Memory.Span;
// Rent a pooled buffer and format directly into it.
// The write loop will return the buffer to the pool after writing.
// Go reference: client.go msgHeader — formats into per-client scratch buffer (msgb).
var buffer = _outboundBufferPool.RentBuffer(totalLen);
var span = buffer.AsSpan();
int pos = 0;
// Write prefix
@@ -790,7 +822,7 @@ public sealed class NatsClient : INatsClient, IDisposable
span[pos++] = (byte)'\r';
span[pos++] = (byte)'\n';
QueueOutbound(owner.Memory[..pos].ToArray());
QueueOutboundPooled(buffer, pos);
}
private void WriteProtocol(byte[] data)
@@ -808,22 +840,62 @@ public sealed class NatsClient : INatsClient, IDisposable
{
_flags.SetFlag(ClientFlags.WriteLoopStarted);
var reader = _outbound.Reader;
// Pre-allocate a coalescing buffer to batch multiple small messages into
// a single WriteAsync call. Go reference: client.go flushOutbound — uses
// net.Buffers (writev) to write all pending data in one syscall.
const int coalesceBufSize = 65536;
var coalesceBuf = new byte[coalesceBufSize];
var returnList = new List<byte[]>(32);
try
{
while (await reader.WaitToReadAsync(ct))
{
long batchBytes = 0;
while (reader.TryRead(out var data))
var coalescePos = 0;
while (reader.TryRead(out var item))
{
await _stream.WriteAsync(data, ct);
var data = item.Data;
batchBytes += data.Length;
// If this message fits in the coalesce buffer, copy it in
if (coalescePos + data.Length <= coalesceBufSize)
{
data.Span.CopyTo(coalesceBuf.AsSpan(coalescePos));
coalescePos += data.Length;
}
else
{
// Flush the coalesce buffer first if it has data
if (coalescePos > 0)
{
await _stream.WriteAsync(coalesceBuf.AsMemory(0, coalescePos), ct);
coalescePos = 0;
}
// Write this large message directly
await _stream.WriteAsync(data, ct);
}
// Track pooled buffers to return
if (item.PoolBuffer != null)
returnList.Add(item.PoolBuffer);
}
using var flushCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
flushCts.CancelAfter(_options.WriteDeadline);
// Flush remaining coalesced data
if (coalescePos > 0)
await _stream.WriteAsync(coalesceBuf.AsMemory(0, coalescePos), ct);
// Return pooled buffers after writing
foreach (var buf in returnList)
_outboundBufferPool.ReturnBuffer(buf);
returnList.Clear();
try
{
await _stream.FlushAsync(flushCts.Token);
await _stream.FlushAsync(ct);
ResetFlushPending();
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)

View File

@@ -436,9 +436,16 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
await _jetStreamService.DisposeAsync();
_stats.JetStreamEnabled = false;
// Wait for accept loops to exit
await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
await _wsAcceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
// If server was never started, accept loops never ran — signal immediately
if (_listener == null)
_acceptLoopExited.TrySetResult();
if (_wsListener == null)
_wsAcceptLoopExited.TrySetResult();
// Wait for accept loops to exit (in parallel to avoid sequential 5s+5s waits)
await Task.WhenAll(
_acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)),
_wsAcceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5))).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
// Close all client connections — flush first, then mark closed
var flushTasks = new List<Task>();
@@ -485,9 +492,10 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_listener?.Close();
_wsListener?.Close();
// Wait for accept loops to exit
await _acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
await _wsAcceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
// Wait for accept loops to exit (in parallel)
await Task.WhenAll(
_acceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5)),
_wsAcceptLoopExited.Task.WaitAsync(TimeSpan.FromSeconds(5))).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
var gracePeriod = _options.LameDuckGracePeriod;
if (gracePeriod < TimeSpan.Zero) gracePeriod = -gracePeriod;
@@ -887,6 +895,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_ = RunWebSocketAcceptLoopAsync(linked.Token);
}
else
{
// No WebSocket listener — signal immediately so shutdown doesn't wait
_wsAcceptLoopExited.TrySetResult();
}
if (_routeManager != null)
await _routeManager.StartAsync(linked.Token);