diff --git a/src/NATS.Server/JetStream/Publish/JetStreamPubAckFormatter.cs b/src/NATS.Server/JetStream/Publish/JetStreamPubAckFormatter.cs new file mode 100644 index 0000000..565b523 --- /dev/null +++ b/src/NATS.Server/JetStream/Publish/JetStreamPubAckFormatter.cs @@ -0,0 +1,41 @@ +using System.Text; + +namespace NATS.Server.JetStream.Publish; + +/// +/// Hand-rolled UTF-8 formatter for the common success PubAck case. +/// Avoids JsonSerializer overhead (~100-200B internal allocations + reflection). +/// For error/duplicate/batch acks, callers fall back to JsonSerializer. +/// +internal static class JetStreamPubAckFormatter +{ + // Pre-encoded UTF-8 fragments for {"stream":"NAME","seq":N} + private static readonly byte[] Prefix = "{\"stream\":\""u8.ToArray(); + private static readonly byte[] SeqField = "\",\"seq\":"u8.ToArray(); + private static readonly byte[] Suffix = "}"u8.ToArray(); + + /// + /// Formats a success PubAck directly into a span. Returns bytes written. + /// Caller must ensure dest is large enough (256 bytes is safe for any stream name). + /// + public static int FormatSuccess(Span dest, string streamName, ulong seq) + { + var pos = 0; + Prefix.CopyTo(dest); + pos += Prefix.Length; + pos += Encoding.UTF8.GetBytes(streamName, dest[pos..]); + SeqField.CopyTo(dest[pos..]); + pos += SeqField.Length; + seq.TryFormat(dest[pos..], out var written); + pos += written; + Suffix.CopyTo(dest[pos..]); + pos += Suffix.Length; + return pos; + } + + /// + /// Returns true if this PubAck is a simple success that can use the fast formatter. + /// + public static bool IsSimpleSuccess(PubAck ack) + => ack.ErrorCode == null && !ack.Duplicate && ack.BatchId == null; +} diff --git a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs index 3941499..4c4e2ad 100644 --- a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs +++ b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs @@ -37,8 +37,8 @@ public sealed class JetStreamPublisher } // --- Normal (non-batch) publish path --- - var state = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); - if (!_preconditions.CheckExpectedLastSeq(options.ExpectedLastSeq, state.LastSeq)) + // Use cached LastSeq property instead of GetStateAsync to avoid allocation. + if (!_preconditions.CheckExpectedLastSeq(options.ExpectedLastSeq, stream.Store.LastSeq)) { ack = new PubAck { ErrorCode = 10071 }; return true; @@ -54,7 +54,8 @@ public sealed class JetStreamPublisher return true; } - var captured = _streamManager.Capture(subject, payload); + // Pass resolved stream to avoid double FindBySubject lookup. + var captured = _streamManager.Capture(stream, subject, payload); ack = captured ?? new PubAck(); _preconditions.Record(options.MsgId, ack.Seq); _preconditions.TrimOlderThan(stream.Config.DuplicateWindowMs); @@ -136,15 +137,14 @@ public sealed class JetStreamPublisher stream.Config.DuplicateWindowMs, staged => { - // Check expected last sequence. + // Check expected last sequence using cached property. if (staged.ExpectedLastSeq > 0) { - var st = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); - if (st.LastSeq != staged.ExpectedLastSeq) + if (stream.Store.LastSeq != staged.ExpectedLastSeq) return new PubAck { ErrorCode = 10071, Stream = stream.Config.Name }; } - var captured = _streamManager.Capture(staged.Subject, staged.Payload); + var captured = _streamManager.Capture(stream, staged.Subject, staged.Payload); return captured ?? new PubAck { Stream = stream.Config.Name }; }); diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 07c2603..373bc9d 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -28,7 +28,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // In-memory cache: keyed by sequence number. This is the primary data structure // for reads and queries. The blocks are the on-disk persistence layer. private readonly Dictionary _messages = new(); - private readonly Dictionary _messageIndexes = new(); + // _messageIndexes removed — all lookups now use _messages directly to avoid + // per-message StoredMessageIndex allocation on the write path. private readonly Dictionary _lastSequenceBySubject = new(StringComparer.Ordinal); // Block-based storage: the active (writable) block and sealed blocks. @@ -89,6 +90,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public int BlockCount => _blocks.Count; public bool UsedIndexManifestOnStartup { get; private set; } + // IStreamStore cached state properties — O(1), maintained incrementally. + public ulong LastSeq => _last; + public ulong MessageCount => _messageCount; + public ulong TotalBytes => _totalBytes; + ulong IStreamStore.FirstSeq => _messageCount == 0 ? (_first > 0 ? _first : 0UL) : _firstSeq; + public FileStore(FileStoreOptions options) { _options = options; @@ -266,7 +273,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask RestoreSnapshotAsync(ReadOnlyMemory snapshot, CancellationToken ct) { _messages.Clear(); - _messageIndexes.Clear(); _lastSequenceBySubject.Clear(); _last = 0; _messageCount = 0; @@ -415,7 +421,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { var count = (ulong)_messages.Count; _messages.Clear(); - _messageIndexes.Clear(); _lastSequenceBySubject.Clear(); _generation++; _last = 0; @@ -542,7 +547,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { // Truncate to nothing. _messages.Clear(); - _messageIndexes.Clear(); _lastSequenceBySubject.Clear(); _generation++; _last = 0; @@ -845,7 +849,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable private void TrackMessage(StoredMessage message) { _messages[message.Sequence] = message; - _messageIndexes[message.Sequence] = message.ToIndex(); _lastSequenceBySubject[message.Subject] = message.Sequence; _messageCount++; _totalBytes += (ulong)(message.RawHeaders.Length + message.Payload.Length); @@ -861,8 +864,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { if (!_messages.Remove(sequence, out var message)) return false; - - _messageIndexes.Remove(sequence); _messageCount--; _totalBytes -= (ulong)(message.RawHeaders.Length + message.Payload.Length); UpdateLastSequenceForSubject(message.Subject, sequence); @@ -890,7 +891,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable private void AdvanceFirstSequence(ulong start) { var candidate = start; - while (!_messageIndexes.ContainsKey(candidate) && candidate <= _last) + while (!_messages.ContainsKey(candidate) && candidate <= _last) candidate++; if (candidate <= _last) @@ -911,7 +912,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable for (var seq = startExclusive - 1; ; seq--) { - if (_messageIndexes.ContainsKey(seq)) + if (_messages.ContainsKey(seq)) return seq; if (seq == 0) @@ -926,7 +927,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable for (var seq = removedSequence - 1; ; seq--) { - if (_messageIndexes.TryGetValue(seq, out var candidate) && string.Equals(candidate.Subject, subject, StringComparison.Ordinal)) + if (_messages.TryGetValue(seq, out var candidate) && string.Equals(candidate.Subject, subject, StringComparison.Ordinal)) { _lastSequenceBySubject[subject] = seq; return; @@ -941,7 +942,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable private void RebuildIndexesFromMessages() { - _messageIndexes.Clear(); _lastSequenceBySubject.Clear(); _messageCount = 0; _totalBytes = 0; @@ -2317,12 +2317,16 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (block is null) continue; - var waited = 0; - while (block.PendingWriteSize < CoalesceMinimum && waited < MaxFlushWaitMs) + // Go-style exponential backoff: 1→2→4→8ms (vs linear 1ms × 8). + var waitMs = 1; + var totalWaited = 0; + while (block.PendingWriteSize < CoalesceMinimum && totalWaited < MaxFlushWaitMs) { - try { await Task.Delay(1, ct); } + var delay = Math.Min(waitMs, MaxFlushWaitMs - totalWaited); + try { await Task.Delay(delay, ct); } catch (OperationCanceledException) { break; } - waited++; + totalWaited += delay; + waitMs *= 2; } block.FlushPending(); diff --git a/src/NATS.Server/JetStream/Storage/IStreamStore.cs b/src/NATS.Server/JetStream/Storage/IStreamStore.cs index a034a6f..1601897 100644 --- a/src/NATS.Server/JetStream/Storage/IStreamStore.cs +++ b/src/NATS.Server/JetStream/Storage/IStreamStore.cs @@ -32,6 +32,13 @@ public interface IStreamStore // Existing MemStore/FileStore implementations return this type. ValueTask GetStateAsync(CancellationToken ct); + // Cached state properties — avoid GetStateAsync on the publish hot path. + // These are maintained incrementally by FileStore/MemStore and are O(1). + ulong LastSeq => throw new NotSupportedException("LastSeq not implemented."); + ulong MessageCount => throw new NotSupportedException("MessageCount not implemented."); + ulong TotalBytes => throw new NotSupportedException("TotalBytes not implemented."); + ulong FirstSeq => throw new NotSupportedException("FirstSeq not implemented."); + // ------------------------------------------------------------------------- // Go-parity sync interface — mirrors server/store.go StreamStore // Default implementations throw NotSupportedException so existing diff --git a/src/NATS.Server/JetStream/Storage/MemStore.cs b/src/NATS.Server/JetStream/Storage/MemStore.cs index b141768..750ba46 100644 --- a/src/NATS.Server/JetStream/Storage/MemStore.cs +++ b/src/NATS.Server/JetStream/Storage/MemStore.cs @@ -122,6 +122,12 @@ public sealed class MemStore : IStreamStore } } + // IStreamStore cached state properties — O(1), maintained incrementally. + public ulong LastSeq { get { lock (_gate) return _st.LastSeq; } } + public ulong MessageCount { get { lock (_gate) return _st.Msgs; } } + public ulong TotalBytes { get { lock (_gate) return _st.Bytes; } } + ulong IStreamStore.FirstSeq { get { lock (_gate) return _st.Msgs == 0 ? (_st.FirstSeq > 0 ? _st.FirstSeq : 0UL) : _st.FirstSeq; } } + // ------------------------------------------------------------------------- // Async helpers (used by existing JetStream layer) // ------------------------------------------------------------------------- diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index 82699d0..e0b5a65 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -397,6 +397,11 @@ public sealed class StreamManager : IDisposable if (stream == null) return null; + return Capture(stream, subject, payload); + } + + public PubAck? Capture(StreamHandle stream, string subject, ReadOnlyMemory payload) + { // Go: sealed stream rejects all publishes. // Reference: server/stream.go — processJetStreamMsg checks mset.cfg.Sealed. if (stream.Config.Sealed) @@ -414,17 +419,20 @@ public sealed class StreamManager : IDisposable // Go: memStoreMsgSize — full message size includes subject + headers + payload + 16 bytes overhead. var msgSize = subject.Length + payload.Length + 16; - var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); + // Use cached state properties instead of GetStateAsync to avoid allocation on hot path. + var currentMsgCount = stream.Store.MessageCount; + var currentBytes = stream.Store.TotalBytes; + var currentFirstSeq = stream.Store.FirstSeq; // Go: DiscardPolicy.New — reject when MaxMsgs reached. // Reference: server/stream.go — processJetStreamMsg checks discard new + maxMsgs. if (stream.Config.MaxMsgs > 0 && stream.Config.Discard == DiscardPolicy.New - && (long)stateBefore.Messages >= stream.Config.MaxMsgs) + && (long)currentMsgCount >= stream.Config.MaxMsgs) { return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 }; } - if (stream.Config.MaxBytes > 0 && (long)stateBefore.Bytes + msgSize > stream.Config.MaxBytes) + if (stream.Config.MaxBytes > 0 && (long)currentBytes + msgSize > stream.Config.MaxBytes) { if (stream.Config.Discard == DiscardPolicy.New) { @@ -435,10 +443,9 @@ public sealed class StreamManager : IDisposable }; } - while ((long)stateBefore.Bytes + msgSize > stream.Config.MaxBytes && stateBefore.FirstSeq > 0) + while ((long)stream.Store.TotalBytes + msgSize > stream.Config.MaxBytes && stream.Store.FirstSeq > 0) { - stream.Store.RemoveAsync(stateBefore.FirstSeq, default).GetAwaiter().GetResult(); - stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); + stream.Store.RemoveAsync(stream.Store.FirstSeq, default).GetAwaiter().GetResult(); } } diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 87aea24..a8daf14 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -1399,8 +1399,19 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable // Go reference: server/jetstream.go — jsPubAckResponse sent to reply. if (replyTo != null) { - var ackData = JsonSerializer.SerializeToUtf8Bytes(pubAck, s_jetStreamJsonOptions); - ProcessMessage(replyTo, null, default, ackData, sender); + if (JetStream.Publish.JetStreamPubAckFormatter.IsSimpleSuccess(pubAck)) + { + // Fast path: hand-rolled UTF-8 formatter avoids JsonSerializer overhead. + Span ackBuf = stackalloc byte[256]; + var ackLen = JetStream.Publish.JetStreamPubAckFormatter.FormatSuccess(ackBuf, pubAck.Stream, pubAck.Seq); + ProcessMessage(replyTo, null, default, ackBuf[..ackLen].ToArray(), sender); + } + else + { + var ackData = JsonSerializer.SerializeToUtf8Bytes(pubAck, s_jetStreamJsonOptions); + ProcessMessage(replyTo, null, default, ackData, sender); + } + return; } }