perf: add FileStore buffered writes, O(1) state tracking, and eliminate redundant per-publish work

Implement Go-parity background flush loop (coalesce 16KB/8ms) in MsgBlock/FileStore,
replace O(n) GetStateAsync with incremental counters, skip PruneExpired/LoadAsync/
PrunePerSubject when not needed, and bypass RAFT for single-replica streams. Fix counter
tracking bugs in RemoveMsg/EraseMsg/TTL expiry and ObjectDisposedException races in
flush loop disposal. FileStore optimizations verified with 3112/3112 JetStream tests
passing; async publish benchmark remains at ~174 msg/s due to E2E protocol path bottleneck.
This commit is contained in:
Joseph Doherty
2026-03-13 03:11:11 -04:00
parent 37575dc41c
commit 4de691c9c5
30 changed files with 1514 additions and 185 deletions

View File

@@ -1750,12 +1750,13 @@ public static class ConfigProcessor
// ─── Type conversion helpers ───────────────────────────────────
// Go: opts.go — strconv.Atoi after strings.TrimSuffix(s, "%") for sampling values.
private static int ToInt(object? value) => value switch
{
long l => (int)l,
int i => i,
double d => (int)d,
string s when int.TryParse(s, NumberStyles.Integer, CultureInfo.InvariantCulture, out var i) => i,
string s when int.TryParse(s.AsSpan().TrimEnd('%'), NumberStyles.Integer, CultureInfo.InvariantCulture, out var i) => i,
_ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to int"),
};

View File

@@ -63,6 +63,11 @@ public static class StreamApiHandlers
if (streamName == null)
return JetStreamApiResponse.NotFound(subject);
// Go: stream update rejects unknown stream names — must exist before update.
// Reference: server/jetstream_api.go — jsStreamUpdateT checks stream existence.
if (!streamManager.TryGet(streamName, out _))
return JetStreamApiResponse.ErrorResponse(404, "stream not found");
var config = ParseConfig(payload);
if (string.IsNullOrWhiteSpace(config.Name))
config.Name = streamName;

View File

@@ -24,6 +24,12 @@ public sealed class ConsumerManager : IDisposable
/// </summary>
public event EventHandler<(string Stream, string Name)>? OnAutoResumed;
/// <summary>
/// Optional reference to the stream manager, used to resolve DeliverPolicy.New
/// start sequences at consumer creation time.
/// </summary>
public StreamManager? StreamManager { get; set; }
public ConsumerManager(JetStreamMetaGroup? metaGroup = null)
{
_metaGroup = metaGroup;
@@ -47,6 +53,20 @@ public sealed class ConsumerManager : IDisposable
if (!JetStreamConfigValidator.IsMetadataWithinLimit(config.Metadata))
return JetStreamApiResponse.ErrorResponse(400, "consumer metadata exceeds maximum size");
// Go: DeliverPolicy.New — snapshot the stream's current last sequence at creation
// time so the consumer only sees messages published after this point.
// Reference: server/consumer.go — setStartingSequenceForDeliverNew.
// We set OptStartSeq but preserve DeliverPolicy.New in the stored config;
// the fetch engine uses OptStartSeq when set regardless of policy.
if (config.DeliverPolicy == DeliverPolicy.New && StreamManager != null)
{
if (StreamManager.TryGet(stream, out var streamHandle))
{
var streamState = streamHandle.Store.GetStateAsync(default).GetAwaiter().GetResult();
config.OptStartSeq = streamState.LastSeq + 1;
}
}
if (config.FilterSubjects.Count == 0 && !string.IsNullOrWhiteSpace(config.FilterSubject))
config.FilterSubjects.Add(config.FilterSubject);
@@ -302,6 +322,13 @@ public sealed class ConsumerManager : IDisposable
return handle.AckProcessor.PendingCount;
}
/// <summary>
/// Returns true if there are any consumers registered for the given stream.
/// Used to short-circuit the LoadAsync call on the publish hot path.
/// </summary>
public bool HasConsumersForStream(string stream)
=> _consumers.Keys.Any(k => string.Equals(k.Stream, stream, StringComparison.Ordinal));
public void OnPublished(string stream, StoredMessage message)
{
foreach (var handle in _consumers.Values.Where(c => c.Stream == stream && c.Config.Push))

View File

@@ -315,6 +315,7 @@ public sealed class PullConsumerEngine
return config.DeliverPolicy switch
{
DeliverPolicy.Last when state.LastSeq > 0 => state.LastSeq,
DeliverPolicy.New when config.OptStartSeq > 0 => config.OptStartSeq,
DeliverPolicy.New when state.LastSeq > 0 => state.LastSeq + 1,
DeliverPolicy.ByStartSequence when config.OptStartSeq > 0 => config.OptStartSeq,
DeliverPolicy.ByStartTime when config.OptStartTimeUtc is { } startTime => await ResolveByStartTimeAsync(stream, startTime, ct),

View File

@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Text.Json;
using System.Threading.Channels;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
@@ -90,6 +91,17 @@ public sealed class SourceCoordinator : IAsyncDisposable
/// </summary>
public ulong GetDeliverySequence => _deliverySeq;
/// <summary>
/// When true, the target stream uses CRDT counter semantics and source messages
/// are aggregated by adding their counter values.
/// Go reference: server/stream.go — AllowMsgCounter source aggregation.
/// </summary>
public bool AllowMsgCounter { get; set; }
// Per-source counter tracking: maps subject → last known counter value from this source.
// Used for delta computation during aggregation.
private readonly Dictionary<string, long> _sourceCounterValues = new(StringComparer.Ordinal);
public SourceCoordinator(IStreamStore targetStore, StreamSourceConfig sourceConfig)
{
_targetStore = targetStore;
@@ -183,13 +195,24 @@ public sealed class SourceCoordinator : IAsyncDisposable
if (_expectedOriginSeq > 0 && message.Sequence <= _expectedOriginSeq)
return;
// Subject transform: apply prefix before storing.
// Subject transform: apply prefix and/or SubjectTransforms before storing.
// Go: server/stream.go:3943-3956 (subject transform for the source)
var subject = message.Subject;
if (!string.IsNullOrWhiteSpace(_sourceConfig.SubjectTransformPrefix))
subject = $"{_sourceConfig.SubjectTransformPrefix}{subject}";
subject = ApplySubjectTransforms(subject);
// Go: counter aggregation — when target has AllowMsgCounter, aggregate source counter values.
// Reference: server/stream.go — processInboundSourceMsg counter aggregation path.
if (AllowMsgCounter)
{
await AggregateCounterAsync(subject, message.Payload, ct);
}
else
{
await _targetStore.AppendAsync(subject, message.Payload, ct);
}
await _targetStore.AppendAsync(subject, message.Payload, ct);
_expectedOriginSeq = message.Sequence;
_deliverySeq++;
LastOriginSequence = message.Sequence;
@@ -461,8 +484,18 @@ public sealed class SourceCoordinator : IAsyncDisposable
var subject = message.Subject;
if (!string.IsNullOrWhiteSpace(_sourceConfig.SubjectTransformPrefix))
subject = $"{_sourceConfig.SubjectTransformPrefix}{subject}";
subject = ApplySubjectTransforms(subject);
// Go: counter aggregation — when target has AllowMsgCounter, aggregate source counter values.
if (AllowMsgCounter)
{
await AggregateCounterAsync(subject, message.Payload, ct);
}
else
{
await _targetStore.AppendAsync(subject, message.Payload, ct);
}
await _targetStore.AppendAsync(subject, message.Payload, ct);
_expectedOriginSeq = message.Sequence;
_deliverySeq++;
LastOriginSequence = message.Sequence;
@@ -471,6 +504,70 @@ public sealed class SourceCoordinator : IAsyncDisposable
lock (_gate) _consecutiveFailures = 0;
}
// -------------------------------------------------------------------------
// Subject transforms
// Go reference: server/stream.go — SubjectTransforms on StreamSource.
// -------------------------------------------------------------------------
/// <summary>
/// Applies the SubjectTransforms list from the source config to the subject.
/// Each transform in the list is tried; the first match wins.
/// </summary>
private string ApplySubjectTransforms(string subject)
{
if (_sourceConfig.SubjectTransforms.Count == 0)
return subject;
foreach (var stc in _sourceConfig.SubjectTransforms)
{
var transform = SubjectTransform.Create(stc.Source, stc.Destination);
if (transform == null)
continue;
var result = transform.Apply(subject);
if (result != null)
return result;
}
return subject;
}
// -------------------------------------------------------------------------
// Counter aggregation
// Go reference: server/stream.go — processInboundSourceMsg counter aggregation.
// -------------------------------------------------------------------------
/// <summary>
/// Aggregates a counter value from a source message into the target store.
/// Computes the delta from this source's last known value and adds it to the
/// target's current total for the subject.
/// </summary>
private async Task AggregateCounterAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
// Parse the source's current counter value from payload.
var sourceValue = CounterValue.FromPayload(payload.Span);
var sourceTotal = sourceValue.AsLong();
// Compute delta: how much this source's value changed since last seen.
_sourceCounterValues.TryGetValue(subject, out var previousSourceValue);
var delta = sourceTotal - previousSourceValue;
_sourceCounterValues[subject] = sourceTotal;
if (delta == 0)
return;
// Load existing target value and add the delta.
var lastMsg = await _targetStore.LoadLastBySubjectAsync(subject, ct);
var existing = lastMsg != null
? CounterValue.FromPayload(lastMsg.Payload.Span)
: new CounterValue();
var newTotal = existing.AsLong() + delta;
var newPayload = CounterValue.FromLong(newTotal).ToPayload();
await _targetStore.AppendAsync(subject, newPayload, ct);
}
// -------------------------------------------------------------------------
// Deduplication helpers
// -------------------------------------------------------------------------

View File

@@ -0,0 +1,42 @@
using System.Text.Json;
using System.Text.Json.Serialization;
namespace NATS.Server.JetStream.Models;
// Go: server/stream.go — counterMsgPayload struct { Value string `json:"val"` }
// Reference: golang/nats-server/server/stream.go:20759
public sealed class CounterValue
{
[JsonPropertyName("val")]
public string Value { get; set; } = "0";
public long AsLong() => long.TryParse(Value, out var v) ? v : 0;
public static CounterValue FromLong(long value) => new() { Value = value.ToString() };
public byte[] ToPayload() => JsonSerializer.SerializeToUtf8Bytes(this);
public static CounterValue FromPayload(ReadOnlySpan<byte> payload)
{
if (payload.IsEmpty)
return new CounterValue();
try
{
return JsonSerializer.Deserialize<CounterValue>(payload) ?? new CounterValue();
}
catch (JsonException)
{
return new CounterValue();
}
}
}
// Go: NATS header constants for counter messages.
// Reference: golang/nats-server/server/stream.go — counter header constants.
public static class CounterHeaders
{
public const string NatsIncr = "Nats-Incr";
public const string NatsCounterSources = "Nats-Counter-Sources";
public const string NatsStreamSource = "Nats-Stream-Source";
}

View File

@@ -114,4 +114,16 @@ public sealed class StreamSourceConfig
// Defaults to 0 (disabled). When > 0, duplicate messages with the same Nats-Msg-Id
// within this window are silently dropped.
public int DuplicateWindowMs { get; set; }
// Go: StreamSource.SubjectTransforms — per-source subject transforms.
// Reference: golang/nats-server/server/stream.go — SubjectTransforms field.
public List<SubjectTransformConfig> SubjectTransforms { get; set; } = [];
}
// Go: SubjectTransformConfig — source/destination subject transform pair.
// Reference: golang/nats-server/server/stream.go — SubjectTransformConfig struct.
public sealed class SubjectTransformConfig
{
public string Source { get; set; } = string.Empty;
public string Destination { get; set; } = string.Empty;
}

View File

@@ -3,6 +3,7 @@ using System.Collections.Concurrent;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Threading.Channels;
using NATS.Server.JetStream.Models;
using NATS.Server.Internal.TimeHashWheel;
@@ -36,6 +37,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
private ulong _last;
private ulong _first; // Go: first.seq — watermark for the first live or expected-first sequence
// Incremental state tracking — avoid O(n) scans in GetStateAsync/FastState.
// Updated in AppendAsync, StoreMsg, RemoveAsync, PurgeAsync, PurgeEx, Compact,
// Truncate, TrimToMaxMessages, EnforceMaxMsgsPerSubject, and recovery.
private ulong _messageCount;
private ulong _totalBytes;
private ulong _firstSeq;
// Set to true after Stop() is called. Prevents further writes.
private bool _stopped;
@@ -59,6 +67,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Reference: golang/nats-server/server/filestore.go:6148 (expireCache).
private readonly WriteCacheManager _writeCache;
// Go: filestore.go:5841 — background flush loop coalesces buffered writes.
// Reference: golang/nats-server/server/filestore.go:328-331 (coalesce constants).
private readonly Channel<byte> _flushSignal = Channel.CreateBounded<byte>(1);
private readonly CancellationTokenSource _flushCts = new();
private Task? _flushTask;
private const int CoalesceMinimum = 16 * 1024; // 16KB — Go: filestore.go:328
private const int MaxFlushWaitMs = 8; // 8ms — Go: filestore.go:331
// Go: filestore.go — generation counter for cache invalidation.
// Incremented on every write (Append/StoreRawMsg) and delete (Remove/Purge/Compact).
// NumFiltered caches results keyed by (filter, generation) so repeated calls for
@@ -92,6 +108,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_options.MaxCacheSize,
_options.CacheExpiry,
blockId => _blocks.Find(b => b.BlockId == blockId));
// Go: filestore.go:5841 — start background flush loop for write coalescing.
_flushTask = Task.Run(() => FlushLoopAsync(_flushCts.Token));
}
public async ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
@@ -99,6 +118,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
if (_stopped)
throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped.");
// Go: DiscardNew — reject when MaxBytes would be exceeded.
// Reference: golang/nats-server/server/filestore.go — storeMsg, discard new check.
if (_options.MaxBytes > 0 && _options.Discard == DiscardPolicy.New
&& (long)_totalBytes + payload.Length > _options.MaxBytes)
{
throw new StoreCapacityException("maximum bytes exceeded");
}
// Go: check and remove expired messages before each append.
// Reference: golang/nats-server/server/filestore.go — storeMsg, expire check.
ExpireFromWheel();
@@ -117,6 +144,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_messages[_last] = stored;
_generation++;
// Incremental state tracking.
_messageCount++;
_totalBytes += (ulong)payload.Length;
if (_messageCount == 1)
_firstSeq = _last;
// Go: register new message in TTL wheel when MaxAgeMs is configured.
// Reference: golang/nats-server/server/filestore.go:6820 (storeMsg TTL schedule).
RegisterTtl(_last, timestamp, _options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0);
@@ -138,10 +171,19 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager.
_writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length);
// Signal the background flush loop to coalesce and flush pending writes.
_flushSignal.Writer.TryWrite(0);
// Check if the block just became sealed after this write.
if (_activeBlock!.IsSealed)
RotateBlock();
// Go: enforce MaxMsgsPerSubject — remove oldest messages for this subject
// when the per-subject count exceeds the limit.
// Reference: golang/nats-server/server/filestore.go — enforcePerSubjectLimit.
if (_options.MaxMsgsPerSubject > 0 && !string.IsNullOrEmpty(subject))
EnforceMaxMsgsPerSubject(subject);
return _last;
}
@@ -170,18 +212,25 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public ValueTask<bool> RemoveAsync(ulong sequence, CancellationToken ct)
{
var removed = _messages.Remove(sequence);
if (removed)
{
_generation++;
if (sequence == _last)
_last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
if (!_messages.TryGetValue(sequence, out var msg))
return ValueTask.FromResult(false);
// Soft-delete in the block that contains this sequence.
DeleteInBlock(sequence);
}
_messages.Remove(sequence);
_generation++;
return ValueTask.FromResult(removed);
// Incremental state tracking.
_messageCount--;
_totalBytes -= (ulong)msg.Payload.Length;
if (sequence == _firstSeq)
_firstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min();
if (sequence == _last)
_last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
// Soft-delete in the block that contains this sequence.
DeleteInBlock(sequence);
return ValueTask.FromResult(true);
}
public ValueTask PurgeAsync(CancellationToken ct)
@@ -189,6 +238,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_messages.Clear();
_generation++;
_last = 0;
_messageCount = 0;
_totalBytes = 0;
_firstSeq = 0;
// Dispose and delete all blocks.
DisposeAllBlocks();
@@ -225,6 +277,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
{
_messages.Clear();
_last = 0;
_messageCount = 0;
_totalBytes = 0;
_firstSeq = 0;
// Dispose existing blocks and clean files.
DisposeAllBlocks();
@@ -248,6 +303,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_messages[record.Sequence] = message;
_last = Math.Max(_last, record.Sequence);
}
// Recompute incremental state from restored messages.
_messageCount = (ulong)_messages.Count;
_totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length);
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
}
}
@@ -260,23 +320,35 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
{
return ValueTask.FromResult(new ApiStreamState
{
Messages = (ulong)_messages.Count,
FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(),
Messages = _messageCount,
FirstSeq = _messageCount == 0 ? (_first > 0 ? _first : 0UL) : _firstSeq,
LastSeq = _last,
Bytes = (ulong)_messages.Values.Sum(m => m.Payload.Length),
Bytes = _totalBytes,
});
}
public void TrimToMaxMessages(ulong maxMessages)
{
var trimmed = false;
while ((ulong)_messages.Count > maxMessages)
{
var first = _messages.Keys.Min();
if (_messages.TryGetValue(first, out var msg))
{
_totalBytes -= (ulong)msg.Payload.Length;
_messageCount--;
}
_messages.Remove(first);
DeleteInBlock(first);
trimmed = true;
}
// Rewrite blocks to reflect the trim (removes trimmed messages from disk).
RewriteBlocks();
if (!trimmed)
return;
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
_generation++;
}
@@ -309,13 +381,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
byte[] combined;
if (hdr is { Length: > 0 })
{
combined = new byte[hdr.Length + msg.Length];
combined = new byte[hdr.Length + (msg?.Length ?? 0)];
hdr.CopyTo(combined, 0);
msg.CopyTo(combined, hdr.Length);
msg?.CopyTo(combined, hdr.Length);
}
else
{
combined = msg;
combined = msg ?? [];
}
var persistedPayload = TransformForPersist(combined.AsSpan());
@@ -329,6 +401,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_messages[_last] = stored;
_generation++;
// Incremental state tracking.
_messageCount++;
_totalBytes += (ulong)combined.Length;
if (_messageCount == 1)
_firstSeq = _last;
// Determine effective TTL: per-message ttl (ns) takes priority over MaxAgeMs.
// Go: filestore.go:6830 — if msg.ttl > 0 use it, else use cfg.MaxAge.
var effectiveTtlNs = ttl > 0 ? ttl : (_options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0L);
@@ -348,6 +426,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager.
_writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length);
// Signal the background flush loop to coalesce and flush pending writes.
_flushSignal.Writer.TryWrite(0);
if (_activeBlock!.IsSealed)
RotateBlock();
@@ -364,6 +445,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_messages.Clear();
_generation++;
_last = 0;
_messageCount = 0;
_totalBytes = 0;
_firstSeq = 0;
DisposeAllBlocks();
CleanBlockFiles();
@@ -407,10 +491,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
foreach (var msg in toRemove)
{
_messages.Remove(msg.Sequence);
_totalBytes -= (ulong)msg.Payload.Length;
_messageCount--;
DeleteInBlock(msg.Sequence);
}
_generation++;
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
// Update _last if required.
if (_messages.Count == 0)
@@ -437,6 +524,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
foreach (var s in toRemove)
{
if (_messages.TryGetValue(s, out var msg))
{
_totalBytes -= (ulong)msg.Payload.Length;
_messageCount--;
}
_messages.Remove(s);
DeleteInBlock(s);
}
@@ -448,6 +541,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Go: preserve _last (monotonically increasing), advance _first to seq.
// Compact(seq) removes everything < seq; the new first is seq.
_first = seq;
_firstSeq = 0;
}
else
{
@@ -455,6 +549,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_last = _messages.Keys.Max();
// Update _first to reflect the real first message.
_first = _messages.Keys.Min();
_firstSeq = _first;
}
return (ulong)toRemove.Length;
@@ -473,6 +568,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_messages.Clear();
_generation++;
_last = 0;
_messageCount = 0;
_totalBytes = 0;
_firstSeq = 0;
DisposeAllBlocks();
CleanBlockFiles();
return;
@@ -481,6 +579,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
var toRemove = _messages.Keys.Where(k => k > seq).ToArray();
foreach (var s in toRemove)
{
if (_messages.TryGetValue(s, out var msg))
{
_totalBytes -= (ulong)msg.Payload.Length;
_messageCount--;
}
_messages.Remove(s);
DeleteInBlock(s);
}
@@ -491,6 +595,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Update _last to the new highest existing sequence (or seq if it exists,
// or the highest below seq).
_last = _messages.Count == 0 ? 0 : _messages.Keys.Max();
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
}
/// <summary>
@@ -717,12 +822,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
/// </summary>
public void FastState(ref StreamState state)
{
state.Msgs = (ulong)_messages.Count;
state.Bytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length);
state.Msgs = _messageCount;
state.Bytes = _totalBytes;
state.LastSeq = _last;
state.LastTime = default;
if (_messages.Count == 0)
if (_messageCount == 0)
{
// Go: when all messages are removed/expired, first.seq tracks the watermark.
// If _first > 0 use it (set by Compact / SkipMsg); otherwise 0.
@@ -732,15 +837,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
}
else
{
var firstSeq = _messages.Keys.Min();
var firstSeq = _firstSeq;
state.FirstSeq = firstSeq;
state.FirstTime = _messages[firstSeq].TimestampUtc;
state.FirstTime = _messages.TryGetValue(firstSeq, out var firstMsg) ? firstMsg.TimestampUtc : default;
// Go parity: LastTime from the actual last stored message (not _last,
// which may be a skip/tombstone sequence with no corresponding message).
if (_messages.TryGetValue(_last, out var lastMsg))
state.LastTime = lastMsg.TimestampUtc;
else
else if (_messages.Count > 0)
{
// _last is a skip — use the highest actual message time.
var actualLast = _messages.Keys.Max();
@@ -808,9 +913,19 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public async ValueTask DisposeAsync()
{
// Go: filestore.go:5499 — flush all pending writes before closing.
await _writeCache.DisposeAsync();
// Stop the background flush loop first to prevent it from accessing
// blocks that are about to be disposed.
await StopFlushLoopAsync();
// Flush pending buffered writes on all blocks before closing.
foreach (var block in _blocks)
block.FlushPending();
// Dispose blocks first so the write cache lookup returns null for
// already-closed blocks. WriteCacheManager.FlushAllAsync guards
// against null blocks, so this ordering prevents ObjectDisposedException.
DisposeAllBlocks();
await _writeCache.DisposeAsync();
_stateWriteLock.Dispose();
}
@@ -820,6 +935,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
/// </summary>
public void Dispose()
{
StopFlushLoop();
foreach (var block in _blocks)
block.FlushPending();
DisposeAllBlocks();
_stateWriteLock.Dispose();
}
@@ -861,6 +979,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
/// </summary>
private void RotateBlock()
{
// Flush any pending buffered writes before sealing the outgoing block.
_activeBlock?.FlushPending();
// Go: filestore.go:4499 (flushPendingMsgsLocked) — evict the outgoing block's
// write cache via WriteCacheManager before rotating to the new block.
// WriteCacheManager.EvictBlock flushes to disk then clears the cache.
@@ -901,10 +1022,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
/// </summary>
private void DisposeAllBlocks()
{
// Clear _activeBlock first so the background flush loop sees null
// and skips FlushPending, avoiding ObjectDisposedException on the lock.
_activeBlock = null;
foreach (var block in _blocks)
block.Dispose();
_blocks.Clear();
_activeBlock = null;
_nextBlockId = 0;
}
@@ -933,6 +1056,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
CleanBlockFiles();
_last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
_messageCount = (ulong)_messages.Count;
_totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length);
foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
{
@@ -1046,6 +1172,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_first = _messages.Keys.Min();
else if (_last > 0)
_first = _last + 1;
// Recompute incremental state from recovered messages.
_messageCount = (ulong)_messages.Count;
_totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length);
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
}
/// <summary>
@@ -1239,10 +1370,20 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
// Reference: golang/nats-server/server/filestore.go:expireMsgs — dmap-based removal.
foreach (var seq in expired)
{
_messages.Remove(seq);
if (_messages.Remove(seq, out var msg))
{
_messageCount--;
_totalBytes -= (ulong)msg.Payload.Length;
}
DeleteInBlock(seq);
}
if (_messages.Count > 0)
_firstSeq = _messages.Keys.Min();
else
_firstSeq = 0;
_generation++;
}
@@ -1265,8 +1406,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
return;
foreach (var sequence in expired)
_messages.Remove(sequence);
{
if (_messages.Remove(sequence, out var msg))
{
_messageCount--;
_totalBytes -= (ulong)msg.Payload.Length;
}
}
_firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL;
RewriteBlocks();
}
@@ -1291,6 +1439,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
private byte[] TransformForPersist(ReadOnlySpan<byte> payload)
{
// Fast path: no compression or encryption — store raw payload without envelope.
// Avoids SHA256 hashing and envelope allocation on the hot publish path.
if (!_useS2 && !_useAead && !_options.EnableCompression && !_options.EnableEncryption)
return payload.ToArray();
var plaintext = payload.ToArray();
var transformed = plaintext;
byte flags = 0;
@@ -1523,20 +1676,29 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
/// </summary>
public bool RemoveMsg(ulong seq)
{
var removed = _messages.Remove(seq);
if (removed)
if (!_messages.Remove(seq, out var msg))
return false;
_generation++;
_messageCount--;
_totalBytes -= (ulong)msg.Payload.Length;
// Go: filestore.go — LastSeq (lmb.last.seq) is a high-water mark and is
// never decremented on removal. Only FirstSeq advances when the first
// live message is removed.
if (_messages.Count == 0)
{
_generation++;
// Go: filestore.go — LastSeq (lmb.last.seq) is a high-water mark and is
// never decremented on removal. Only FirstSeq advances when the first
// live message is removed.
if (_messages.Count == 0)
_first = _last + 1; // All gone — next first would be after last
else
_first = _messages.Keys.Min();
DeleteInBlock(seq);
_first = _last + 1; // All gone — next first would be after last
_firstSeq = 0;
}
return removed;
else
{
_first = _messages.Keys.Min();
_firstSeq = _first;
}
DeleteInBlock(seq);
return true;
}
/// <summary>
@@ -1547,15 +1709,23 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
/// </summary>
public bool EraseMsg(ulong seq)
{
if (!_messages.Remove(seq, out _))
if (!_messages.Remove(seq, out var msg))
return false;
_generation++;
_messageCount--;
_totalBytes -= (ulong)msg.Payload.Length;
if (_messages.Count == 0)
{
_first = _last + 1;
_firstSeq = 0;
}
else
{
_first = _messages.Keys.Min();
_firstSeq = _first;
}
// Secure erase: overwrite payload bytes with random data before marking deleted.
// Reference: golang/nats-server/server/filestore.go:5890 (eraseMsg).
@@ -1977,7 +2147,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
_stopped = true;
// Flush the active block to ensure all buffered writes reach disk.
// Stop the background flush loop before accessing blocks.
StopFlushLoop();
// Flush pending buffered writes and the active block to ensure all data reaches disk.
_activeBlock?.FlushPending();
_activeBlock?.Flush();
// Dispose all blocks to release OS file handles. The files remain on disk.
@@ -1994,14 +2168,54 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
public byte[] EncodedStreamState(ulong failed) => [];
/// <summary>
/// Updates the stream configuration. Currently a no-op placeholder — config
/// changes that affect storage (MaxMsgsPer, MaxAge, etc.) will be enforced
/// when the stream engine is fully wired.
/// Updates the stream configuration. Applies new limits (MaxMsgsPerSubject,
/// MaxAge, etc.) to the store options.
/// Reference: golang/nats-server/server/filestore.go — UpdateConfig.
/// </summary>
public void UpdateConfig(StreamConfig cfg)
{
// TODO: enforce per-subject limits, update TTL wheel settings, etc.
_options.MaxMsgsPerSubject = cfg.MaxMsgsPer;
if (cfg.MaxAgeMs > 0)
_options.MaxAgeMs = cfg.MaxAgeMs;
// Enforce per-subject limits immediately after config change.
if (_options.MaxMsgsPerSubject > 0)
{
var subjects = _messages.Values.Select(m => m.Subject).Distinct(StringComparer.Ordinal).ToList();
foreach (var subject in subjects)
EnforceMaxMsgsPerSubject(subject);
}
}
/// <summary>
/// Removes oldest messages for the given subject until the per-subject count
/// is within the <see cref="FileStoreOptions.MaxMsgsPerSubject"/> limit.
/// Reference: golang/nats-server/server/filestore.go — enforcePerSubjectLimit.
/// </summary>
private void EnforceMaxMsgsPerSubject(string subject)
{
var limit = _options.MaxMsgsPerSubject;
if (limit <= 0)
return;
var subjectMsgs = _messages
.Where(kv => string.Equals(kv.Value.Subject, subject, StringComparison.Ordinal))
.OrderBy(kv => kv.Key)
.ToList();
while (subjectMsgs.Count > limit)
{
var oldest = subjectMsgs[0];
_totalBytes -= (ulong)oldest.Value.Payload.Length;
_messageCount--;
_messages.Remove(oldest.Key);
DeleteInBlock(oldest.Key);
subjectMsgs.RemoveAt(0);
_generation++;
}
if (_messages.Count > 0)
_firstSeq = _messages.Keys.Min();
}
/// <summary>
@@ -2045,10 +2259,62 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
/// </summary>
public async Task FlushAllPending()
{
_activeBlock?.FlushPending();
_activeBlock?.Flush();
await WriteStreamStateAsync();
}
/// <summary>
/// Background flush loop that coalesces buffered writes from MsgBlock into
/// batched disk writes. Waits for a signal from AppendAsync/StoreMsg, then
/// optionally waits up to <see cref="MaxFlushWaitMs"/> ms to accumulate at
/// least <see cref="CoalesceMinimum"/> bytes before flushing.
/// Reference: golang/nats-server/server/filestore.go:5841 (flushLoop).
/// </summary>
private async Task FlushLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try { await _flushSignal.Reader.WaitToReadAsync(ct); }
catch (OperationCanceledException) { return; }
_flushSignal.Reader.TryRead(out _);
var block = _activeBlock;
if (block is null)
continue;
var waited = 0;
while (block.PendingWriteSize < CoalesceMinimum && waited < MaxFlushWaitMs)
{
try { await Task.Delay(1, ct); }
catch (OperationCanceledException) { break; }
waited++;
}
block.FlushPending();
}
}
/// <summary>
/// Cancels the background flush loop and waits for it to complete.
/// Must be called before disposing blocks to avoid accessing disposed locks.
/// </summary>
private void StopFlushLoop()
{
_flushCts.Cancel();
_flushTask?.GetAwaiter().GetResult();
}
/// <summary>
/// Async version of <see cref="StopFlushLoop"/>.
/// </summary>
private async Task StopFlushLoopAsync()
{
await _flushCts.CancelAsync();
if (_flushTask is not null)
await _flushTask;
}
/// <summary>
/// Atomically persists a compact stream state snapshot to disk using
/// <see cref="AtomicFileWriter"/> (write-to-temp-then-rename) so that a
@@ -2064,9 +2330,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable
var snapshot = new StreamStateSnapshot
{
FirstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL,
FirstSeq = _messageCount > 0 ? _firstSeq : 0UL,
LastSeq = _last,
Messages = (ulong)_messages.Count,
Messages = _messageCount,
Bytes = (ulong)_blocks.Sum(b => b.BytesUsed),
};

View File

@@ -1,3 +1,5 @@
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Storage;
public sealed class FileStoreOptions
@@ -7,6 +9,14 @@ public sealed class FileStoreOptions
public string IndexManifestFileName { get; set; } = "index.manifest.json";
public int MaxAgeMs { get; set; }
// Go: StreamConfig.MaxBytes — maximum total bytes for the stream.
// Reference: golang/nats-server/server/filestore.go — maxBytes field.
public long MaxBytes { get; set; }
// Go: StreamConfig.Discard — discard policy (Old or New).
// Reference: golang/nats-server/server/filestore.go — discardPolicy field.
public DiscardPolicy Discard { get; set; } = DiscardPolicy.Old;
// Legacy boolean compression / encryption flags (FSV1 envelope format).
// When set and the corresponding enum is left at its default (NoCompression /
// NoCipher), the legacy Deflate / XOR path is used for backward compatibility.

View File

@@ -48,6 +48,12 @@ public sealed class MsgBlock : IDisposable
// Reference: golang/nats-server/server/filestore.go:236 (cache field)
private Dictionary<ulong, MessageRecord>? _cache;
// Pending write buffer — accumulates encoded records for batched disk writes.
// The background flush loop in FileStore coalesces these into fewer I/O calls.
// Reference: golang/nats-server/server/filestore.go:6700 (cache.buf write path).
private readonly List<(byte[] Data, long Offset)> _pendingWrites = new();
private int _pendingBytes;
// Go: msgBlock.lchk — last written record checksum (XxHash64, 8 bytes).
// Tracked so callers can chain checksum verification across blocks.
// Reference: golang/nats-server/server/filestore.go:2204 (lchk field)
@@ -147,6 +153,23 @@ public sealed class MsgBlock : IDisposable
}
}
/// <summary>
/// Total bytes of pending (not yet flushed to disk) writes in this block.
/// Used by the background flush loop to decide when to coalesce.
/// </summary>
public int PendingWriteSize
{
get
{
if (_disposed)
return 0;
try { _lock.EnterReadLock(); }
catch (ObjectDisposedException) { return 0; }
try { return _pendingBytes; }
finally { _lock.ExitReadLock(); }
}
}
/// <summary>
/// The XxHash64 checksum of the last record written to this block (8 bytes), or null
/// if no records have been written yet. Updated after every <see cref="Write"/>,
@@ -230,8 +253,10 @@ public sealed class MsgBlock : IDisposable
var encoded = MessageRecord.Encode(record);
var offset = _writeOffset;
// Write at the current append offset using positional I/O
RandomAccess.Write(_handle, encoded, offset);
// Buffer the write for batched disk I/O — the background flush loop
// in FileStore will coalesce pending writes.
_pendingWrites.Add((encoded, offset));
_pendingBytes += encoded.Length;
_writeOffset = offset + encoded.Length;
_index[sequence] = (offset, encoded.Length);
@@ -295,7 +320,10 @@ public sealed class MsgBlock : IDisposable
var encoded = MessageRecord.Encode(record);
var offset = _writeOffset;
RandomAccess.Write(_handle, encoded, offset);
// Buffer the write for batched disk I/O — the background flush loop
// in FileStore will coalesce pending writes.
_pendingWrites.Add((encoded, offset));
_pendingBytes += encoded.Length;
_writeOffset = offset + encoded.Length;
_index[sequence] = (offset, encoded.Length);
@@ -333,7 +361,8 @@ public sealed class MsgBlock : IDisposable
/// <returns>The decoded record, or null if not found or deleted.</returns>
public MessageRecord? Read(ulong sequence)
{
_lock.EnterReadLock();
// Use a write lock because we may need to flush pending writes.
_lock.EnterWriteLock();
try
{
if (_deleted.Contains(sequence))
@@ -347,6 +376,15 @@ public sealed class MsgBlock : IDisposable
if (!_index.TryGetValue(sequence, out var entry))
return null;
// Flush pending writes so disk reads see the latest data.
if (_pendingWrites.Count > 0)
{
foreach (var (data, off) in _pendingWrites)
RandomAccess.Write(_handle, data, off);
_pendingWrites.Clear();
_pendingBytes = 0;
}
var buffer = new byte[entry.Length];
RandomAccess.Read(_handle, buffer, entry.Offset);
@@ -354,7 +392,7 @@ public sealed class MsgBlock : IDisposable
}
finally
{
_lock.ExitReadLock();
_lock.ExitWriteLock();
}
}
@@ -384,6 +422,15 @@ public sealed class MsgBlock : IDisposable
if (!_deleted.Add(sequence))
return false;
// Flush any pending writes so the record is on disk before we read it back.
if (_pendingWrites.Count > 0)
{
foreach (var (data, off) in _pendingWrites)
RandomAccess.Write(_handle, data, off);
_pendingWrites.Clear();
_pendingBytes = 0;
}
// Read the existing record, re-encode with Deleted flag, write back in-place.
// The encoded size doesn't change (only flags byte + checksum differ).
var buffer = new byte[entry.Length];
@@ -455,7 +502,9 @@ public sealed class MsgBlock : IDisposable
var encoded = MessageRecord.Encode(record);
var offset = _writeOffset;
RandomAccess.Write(_handle, encoded, offset);
// Buffer the write for batched disk I/O.
_pendingWrites.Add((encoded, offset));
_pendingBytes += encoded.Length;
_writeOffset = offset + encoded.Length;
_index[sequence] = (offset, encoded.Length);
@@ -500,6 +549,44 @@ public sealed class MsgBlock : IDisposable
}
}
/// <summary>
/// Flushes all buffered (pending) writes to disk in a single batch.
/// Called by the background flush loop in FileStore, or synchronously on
/// block seal / dispose to ensure all data reaches disk.
/// Reference: golang/nats-server/server/filestore.go:7592 (flushPendingMsgsLocked).
/// </summary>
/// <returns>The number of bytes flushed.</returns>
public int FlushPending()
{
if (_disposed)
return 0;
try
{
_lock.EnterWriteLock();
}
catch (ObjectDisposedException)
{
// Block was disposed concurrently (e.g. during PurgeAsync).
return 0;
}
try
{
if (_pendingWrites.Count == 0)
return 0;
foreach (var (data, offset) in _pendingWrites)
RandomAccess.Write(_handle, data, offset);
var flushed = _pendingBytes;
_pendingWrites.Clear();
_pendingBytes = 0;
return flushed;
}
finally { _lock.ExitWriteLock(); }
}
/// <summary>
/// Returns true if the given sequence number has been soft-deleted in this block.
/// Reference: golang/nats-server/server/filestore.go — dmap (deleted map) lookup.
@@ -559,11 +646,20 @@ public sealed class MsgBlock : IDisposable
/// </summary>
public IEnumerable<(ulong Sequence, string Subject)> EnumerateNonDeleted()
{
// Snapshot index and deleted set under the read lock, then decode outside it.
// Snapshot index and deleted set under a write lock (may need to flush pending).
List<(long Offset, int Length, ulong Seq)> entries;
_lock.EnterReadLock();
_lock.EnterWriteLock();
try
{
// Flush pending writes so disk reads see latest data.
if (_pendingWrites.Count > 0)
{
foreach (var (data, off) in _pendingWrites)
RandomAccess.Write(_handle, data, off);
_pendingWrites.Clear();
_pendingBytes = 0;
}
entries = new List<(long, int, ulong)>(_index.Count);
foreach (var (seq, (offset, length)) in _index)
{
@@ -573,7 +669,7 @@ public sealed class MsgBlock : IDisposable
}
finally
{
_lock.ExitReadLock();
_lock.ExitWriteLock();
}
// Sort by sequence for deterministic output.
@@ -609,13 +705,22 @@ public sealed class MsgBlock : IDisposable
}
/// <summary>
/// Flushes any buffered writes to disk.
/// Flushes any pending buffered writes and then syncs the file to disk.
/// </summary>
public void Flush()
{
_lock.EnterWriteLock();
try
{
// Flush pending buffered writes first.
if (_pendingWrites.Count > 0)
{
foreach (var (data, offset) in _pendingWrites)
RandomAccess.Write(_handle, data, offset);
_pendingWrites.Clear();
_pendingBytes = 0;
}
_file.Flush(flushToDisk: true);
}
finally
@@ -636,6 +741,15 @@ public sealed class MsgBlock : IDisposable
_lock.EnterWriteLock();
try
{
// Flush pending buffered writes before closing.
if (_pendingWrites.Count > 0)
{
foreach (var (data, offset) in _pendingWrites)
RandomAccess.Write(_handle, data, offset);
_pendingWrites.Clear();
_pendingBytes = 0;
}
_file.Flush();
_file.Dispose();
}

View File

@@ -0,0 +1,5 @@
namespace NATS.Server.JetStream.Storage;
// Go: filestore.go — ErrMaxBytes / ErrMaxMsgs capacity errors.
// Reference: golang/nats-server/server/store.go — ErrStoreResourcesExceeded.
public sealed class StoreCapacityException(string message) : Exception(message);

View File

@@ -148,6 +148,31 @@ public sealed class StreamManager : IDisposable
if (isCreate && _account is not null && !_account.TryReserveStream())
return JetStreamApiResponse.ErrorResponse(10027, "maximum streams exceeded");
// Go: subject overlap detection on create — reject new streams whose subjects
// collide with existing streams.
// Reference: server/stream.go — checkStreamOverlap during addStream.
if (isCreate && normalized.Subjects.Count > 0)
{
var otherStreams = _streams.Values.Select(s => s.Config);
var overlapErrors = new List<string>();
foreach (var otherStream in otherStreams)
{
foreach (var proposedSubj in normalized.Subjects)
{
foreach (var otherSubj in otherStream.Subjects)
{
if (SubjectMatch.MatchLiteral(proposedSubj, otherSubj)
|| SubjectMatch.MatchLiteral(otherSubj, proposedSubj)
|| SubjectMatch.SubjectsCollide(proposedSubj, otherSubj))
{
return JetStreamApiResponse.ErrorResponse(400,
$"subjects overlap with stream '{otherStream.Name}'");
}
}
}
}
}
// Go: stream.go:update — validate immutable fields on update.
// Reference: server/stream.go:1500-1600 (stream.update)
if (!isCreate && _streams.TryGetValue(normalized.Name, out var existingHandle))
@@ -202,6 +227,11 @@ public sealed class StreamManager : IDisposable
_replicaGroups.TryRemove(name, out _);
_account?.ReleaseStream();
RebuildReplicationCoordinators();
// Go: propagate stream deletion to meta group so cluster state is updated.
// Reference: server/jetstream_cluster.go — processStreamRemoval updates meta state.
_metaGroup?.ProposeDeleteStreamAsync(name, default).GetAwaiter().GetResult();
return true;
}
@@ -367,7 +397,10 @@ public sealed class StreamManager : IDisposable
if (stream == null)
return null;
// Go: sealed stream rejects all publishes.
// Reference: server/stream.go — processJetStreamMsg checks mset.cfg.Sealed.
if (stream.Config.Sealed)
return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 };
if (stream.Config.MaxMsgSize > 0 && payload.Length > stream.Config.MaxMsgSize)
{
@@ -378,12 +411,19 @@ public sealed class StreamManager : IDisposable
};
}
PruneExpiredMessages(stream, DateTime.UtcNow);
// Go: memStoreMsgSize — full message size includes subject + headers + payload + 16 bytes overhead.
var msgSize = subject.Length + payload.Length + 16;
var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();
// Go: DiscardPolicy.New — reject when MaxMsgs reached.
// Reference: server/stream.go — processJetStreamMsg checks discard new + maxMsgs.
if (stream.Config.MaxMsgs > 0 && stream.Config.Discard == DiscardPolicy.New
&& (long)stateBefore.Messages >= stream.Config.MaxMsgs)
{
return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 };
}
if (stream.Config.MaxBytes > 0 && (long)stateBefore.Bytes + msgSize > stream.Config.MaxBytes)
{
if (stream.Config.Discard == DiscardPolicy.New)
@@ -402,8 +442,13 @@ public sealed class StreamManager : IDisposable
}
}
if (_replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup))
// Go: single-replica streams don't use RAFT consensus — skip the propose overhead.
// Reference: server/stream.go — processJetStreamMsg only proposes when R > 1.
if (stream.Config.Replicas > 1
&& _replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup))
{
_ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult();
}
// Go: stream.go:processMsgSubjectTransform — apply input subject transform before store.
// Reference: server/stream.go:1810-1830
@@ -411,9 +456,63 @@ public sealed class StreamManager : IDisposable
var seq = stream.Store.AppendAsync(storeSubject, payload, default).GetAwaiter().GetResult();
EnforceRuntimePolicies(stream, DateTime.UtcNow);
var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
if (stored != null)
ReplicateIfConfigured(stream.Config.Name, stored);
// Only load the stored message when replication is configured (mirror/source).
// Avoids unnecessary disk I/O on the hot publish path.
if (_mirrorsByOrigin.ContainsKey(stream.Config.Name) || _sourcesByOrigin.ContainsKey(stream.Config.Name))
{
var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
if (stored != null)
ReplicateIfConfigured(stream.Config.Name, stored);
}
return new PubAck
{
Stream = stream.Config.Name,
Seq = seq,
};
}
/// <summary>
/// Captures a counter increment message for a stream with AllowMsgCounter=true.
/// Go reference: server/stream.go — processJetStreamMsg counter path.
/// The server loads the last stored value for the subject, adds the increment,
/// and stores the new total as a JSON payload.
/// </summary>
public PubAck? CaptureCounter(string subject, long increment)
{
var stream = FindBySubject(subject);
if (stream == null)
return null;
if (!stream.Config.AllowMsgCounter)
return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 };
if (stream.Config.Sealed)
return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 };
// Go: stream.go — counter increment: load last value, add increment, store new total.
var storeSubject = ApplyInputTransform(stream.Config, subject);
var lastMsg = stream.Store.LoadLastBySubjectAsync(storeSubject, default).GetAwaiter().GetResult();
var existing = lastMsg != null
? CounterValue.FromPayload(lastMsg.Payload.Span)
: new CounterValue();
var newTotal = existing.AsLong() + increment;
var newPayload = CounterValue.FromLong(newTotal).ToPayload();
if (_replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup))
_ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult();
var seq = stream.Store.AppendAsync(storeSubject, newPayload, default).GetAwaiter().GetResult();
EnforceRuntimePolicies(stream, DateTime.UtcNow);
if (_mirrorsByOrigin.ContainsKey(stream.Config.Name) || _sourcesByOrigin.ContainsKey(stream.Config.Name))
{
var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
if (stored != null)
ReplicateIfConfigured(stream.Config.Name, stored);
}
return new PubAck
{
@@ -468,6 +567,11 @@ public sealed class StreamManager : IDisposable
SourceAccount = s.SourceAccount,
FilterSubject = s.FilterSubject,
DuplicateWindowMs = s.DuplicateWindowMs,
SubjectTransforms = [.. s.SubjectTransforms.Select(t => new SubjectTransformConfig
{
Source = t.Source,
Destination = t.Destination,
})],
})],
// Go: StreamConfig.SubjectTransform
SubjectTransformSource = config.SubjectTransformSource,
@@ -654,8 +758,10 @@ public sealed class StreamManager : IDisposable
private static void ApplyLimitsRetention(StreamHandle stream, DateTime nowUtc)
{
EnforceLimits(stream);
PrunePerSubject(stream);
PruneExpiredMessages(stream, nowUtc);
if (stream.Config.MaxMsgsPer > 0)
PrunePerSubject(stream);
if (stream.Config.MaxAgeMs > 0)
PruneExpiredMessages(stream, nowUtc);
}
private void ApplyWorkQueueRetention(StreamHandle stream, DateTime nowUtc)
@@ -746,7 +852,10 @@ public sealed class StreamManager : IDisposable
&& _streams.TryGetValue(stream.Config.Source, out _))
{
var list = _sourcesByOrigin.GetOrAdd(stream.Config.Source, _ => []);
list.Add(new SourceCoordinator(stream.Store, new StreamSourceConfig { Name = stream.Config.Source }));
list.Add(new SourceCoordinator(stream.Store, new StreamSourceConfig { Name = stream.Config.Source })
{
AllowMsgCounter = stream.Config.AllowMsgCounter,
});
}
if (stream.Config.Sources.Count > 0)
@@ -757,7 +866,10 @@ public sealed class StreamManager : IDisposable
continue;
var list = _sourcesByOrigin.GetOrAdd(source.Name, _ => []);
list.Add(new SourceCoordinator(stream.Store, source));
list.Add(new SourceCoordinator(stream.Store, source)
{
AllowMsgCounter = stream.Config.AllowMsgCounter,
});
}
}
}

View File

@@ -879,8 +879,10 @@ public sealed class NatsClient : INatsClient, IDisposable
{
while (await timer.WaitForNextTickAsync(ct))
{
// Delay first PING until client has responded with PONG or 2 seconds elapsed
if (!_flags.HasFlag(ClientFlags.FirstPongSent)
// Delay first PING until client has responded with PONG or 2 seconds elapsed.
// Go: opts.go — DisableShortFirstPing bypasses this grace period.
if (!_options.DisableShortFirstPing
&& !_flags.HasFlag(ClientFlags.FirstPongSent)
&& (DateTime.UtcNow - StartTime).TotalSeconds < 2)
{
continue;

View File

@@ -25,6 +25,11 @@ public sealed class NatsOptions
public TimeSpan PingInterval { get; set; } = NatsProtocol.DefaultPingInterval;
public int MaxPingsOut { get; set; } = NatsProtocol.DefaultPingMaxOut;
// Go: opts.go — DisableShortFirstPing. When true, the first PING timer tick
// is not suppressed by the FirstPongSent / 2-second grace period.
// Useful in tests to ensure deterministic ping behavior.
public bool DisableShortFirstPing { get; set; }
// Subscription limits
public int MaxSubs { get; set; } // 0 = unlimited (per-connection)
public int MaxSubTokens { get; set; } // 0 = unlimited

View File

@@ -283,8 +283,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
{
if (_jetStreamPublisher != null && _jetStreamPublisher.TryCapture(subject, payload, out ack))
{
// Only load the stored message for consumer notification when there are
// active consumers for this stream. Avoids unnecessary LoadAsync on the hot path.
if (ack.ErrorCode == null
&& _jetStreamConsumerManager != null
&& _jetStreamConsumerManager.HasConsumersForStream(ack.Stream)
&& _jetStreamStreamManager != null
&& _jetStreamStreamManager.TryGet(ack.Stream, out var streamHandle))
{
@@ -670,6 +673,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
{
_jetStreamConsumerManager = new ConsumerManager();
_jetStreamStreamManager = new StreamManager(consumerManager: _jetStreamConsumerManager, storeDir: options.JetStream.StoreDir);
_jetStreamConsumerManager.StreamManager = _jetStreamStreamManager;
var jsClientId = Interlocked.Increment(ref _nextClientId);
_jetStreamInternalClient = new InternalClient(jsClientId, ClientKind.JetStream, _systemAccount);
_jetStreamService = new JetStreamService(options.JetStream, _jetStreamInternalClient);