diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 0a3bd7b..bd0b542 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -1,16 +1 @@ -{ - "hooks": { - "PostToolUse": [ - { - "matcher": "Write|Edit|MultiEdit", - "hooks": [ - { - "type": "command", - "command": "/Users/dohertj2/.dotnet/tools/slopwatch analyze -d . --hook", - "timeout": 60000 - } - ] - } - ] - } -} +{"hooks":{}} diff --git a/benchmarks_comparison.md b/benchmarks_comparison.md index be423b8..ee2062b 100644 --- a/benchmarks_comparison.md +++ b/benchmarks_comparison.md @@ -57,9 +57,9 @@ Benchmark run: 2026-03-13. Both servers running on the same machine, tested with | Mode | Payload | Storage | Go msg/s | .NET msg/s | Ratio (.NET/Go) | |------|---------|---------|----------|------------|-----------------| | Synchronous | 16 B | Memory | 16,783 | 13,815 | 0.82x | -| Async (batch) | 128 B | File | 187,067 | 115 | 0.00x | +| Async (batch) | 128 B | File | 210,387 | 174 | 0.00x | -> **Note:** Async file store publish is extremely slow on the .NET server — likely a JetStream file store implementation bottleneck rather than a client issue. +> **Note:** Async file store publish remains extremely slow after FileStore-level optimizations (buffered writes, O(1) state tracking, redundant work elimination). The bottleneck is in the E2E network/protocol processing path (synchronous `.GetAwaiter().GetResult()` calls in the client read loop), not storage I/O. --- @@ -85,13 +85,13 @@ Benchmark run: 2026-03-13. Both servers running on the same machine, tested with | Multi pub/sub | 1.97x | .NET faster (likely measurement artifact at low counts) | | Request/reply latency | 0.77x–0.84x | Good | | JetStream sync publish | 0.82x | Good | -| JetStream async file publish | ~0x | Broken — file store bottleneck | +| JetStream async file publish | ~0x | Broken — E2E protocol path bottleneck | | JetStream durable fetch | 0.13x | Needs optimization | ### Key Observations 1. **Pub-only and request/reply are within striking distance** (0.6x–0.85x), suggesting the core message path is reasonably well ported. 2. **Small-payload pub/sub and fan-out are 5x slower** (0.18x ratio). The bottleneck is likely in the subscription dispatch / message delivery hot path — the `SubList.Match()` → `MSG` write loop. -3. **JetStream file store is essentially non-functional** for async batch publishing. The sync memory store path works at 0.82x parity, so the issue is specific to file I/O or ack handling. +3. **JetStream file store async publish remains at ~174 msg/s** despite FileStore-level optimizations (buffered writes with background flush loop, O(1) state tracking, eliminating redundant per-publish work). The bottleneck is in the E2E network/protocol processing path — synchronous `.GetAwaiter().GetResult()` calls in the client read loop block the async pipeline. 4. **JetStream consumption** (durable fetch) is 8x slower than Go. Ordered consumers don't work yet. 5. The multi-pub/sub result showing .NET faster is likely a measurement artifact from the small message count (2,000 per publisher) — not representative at scale. diff --git a/src/NATS.Server/Configuration/ConfigProcessor.cs b/src/NATS.Server/Configuration/ConfigProcessor.cs index bbe805f..eeadce3 100644 --- a/src/NATS.Server/Configuration/ConfigProcessor.cs +++ b/src/NATS.Server/Configuration/ConfigProcessor.cs @@ -1750,12 +1750,13 @@ public static class ConfigProcessor // ─── Type conversion helpers ─────────────────────────────────── + // Go: opts.go — strconv.Atoi after strings.TrimSuffix(s, "%") for sampling values. private static int ToInt(object? value) => value switch { long l => (int)l, int i => i, double d => (int)d, - string s when int.TryParse(s, NumberStyles.Integer, CultureInfo.InvariantCulture, out var i) => i, + string s when int.TryParse(s.AsSpan().TrimEnd('%'), NumberStyles.Integer, CultureInfo.InvariantCulture, out var i) => i, _ => throw new FormatException($"Cannot convert {value?.GetType().Name ?? "null"} to int"), }; diff --git a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs index 72951d4..b3e8457 100644 --- a/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs +++ b/src/NATS.Server/JetStream/Api/Handlers/StreamApiHandlers.cs @@ -63,6 +63,11 @@ public static class StreamApiHandlers if (streamName == null) return JetStreamApiResponse.NotFound(subject); + // Go: stream update rejects unknown stream names — must exist before update. + // Reference: server/jetstream_api.go — jsStreamUpdateT checks stream existence. + if (!streamManager.TryGet(streamName, out _)) + return JetStreamApiResponse.ErrorResponse(404, "stream not found"); + var config = ParseConfig(payload); if (string.IsNullOrWhiteSpace(config.Name)) config.Name = streamName; diff --git a/src/NATS.Server/JetStream/ConsumerManager.cs b/src/NATS.Server/JetStream/ConsumerManager.cs index 042fff4..520ea81 100644 --- a/src/NATS.Server/JetStream/ConsumerManager.cs +++ b/src/NATS.Server/JetStream/ConsumerManager.cs @@ -24,6 +24,12 @@ public sealed class ConsumerManager : IDisposable /// public event EventHandler<(string Stream, string Name)>? OnAutoResumed; + /// + /// Optional reference to the stream manager, used to resolve DeliverPolicy.New + /// start sequences at consumer creation time. + /// + public StreamManager? StreamManager { get; set; } + public ConsumerManager(JetStreamMetaGroup? metaGroup = null) { _metaGroup = metaGroup; @@ -47,6 +53,20 @@ public sealed class ConsumerManager : IDisposable if (!JetStreamConfigValidator.IsMetadataWithinLimit(config.Metadata)) return JetStreamApiResponse.ErrorResponse(400, "consumer metadata exceeds maximum size"); + // Go: DeliverPolicy.New — snapshot the stream's current last sequence at creation + // time so the consumer only sees messages published after this point. + // Reference: server/consumer.go — setStartingSequenceForDeliverNew. + // We set OptStartSeq but preserve DeliverPolicy.New in the stored config; + // the fetch engine uses OptStartSeq when set regardless of policy. + if (config.DeliverPolicy == DeliverPolicy.New && StreamManager != null) + { + if (StreamManager.TryGet(stream, out var streamHandle)) + { + var streamState = streamHandle.Store.GetStateAsync(default).GetAwaiter().GetResult(); + config.OptStartSeq = streamState.LastSeq + 1; + } + } + if (config.FilterSubjects.Count == 0 && !string.IsNullOrWhiteSpace(config.FilterSubject)) config.FilterSubjects.Add(config.FilterSubject); @@ -302,6 +322,13 @@ public sealed class ConsumerManager : IDisposable return handle.AckProcessor.PendingCount; } + /// + /// Returns true if there are any consumers registered for the given stream. + /// Used to short-circuit the LoadAsync call on the publish hot path. + /// + public bool HasConsumersForStream(string stream) + => _consumers.Keys.Any(k => string.Equals(k.Stream, stream, StringComparison.Ordinal)); + public void OnPublished(string stream, StoredMessage message) { foreach (var handle in _consumers.Values.Where(c => c.Stream == stream && c.Config.Push)) diff --git a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs index a064762..ae9e2f0 100644 --- a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs @@ -315,6 +315,7 @@ public sealed class PullConsumerEngine return config.DeliverPolicy switch { DeliverPolicy.Last when state.LastSeq > 0 => state.LastSeq, + DeliverPolicy.New when config.OptStartSeq > 0 => config.OptStartSeq, DeliverPolicy.New when state.LastSeq > 0 => state.LastSeq + 1, DeliverPolicy.ByStartSequence when config.OptStartSeq > 0 => config.OptStartSeq, DeliverPolicy.ByStartTime when config.OptStartTimeUtc is { } startTime => await ResolveByStartTimeAsync(stream, startTime, ct), diff --git a/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs index fb00fbc..82d91d9 100644 --- a/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs +++ b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Text.Json; using System.Threading.Channels; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Storage; @@ -90,6 +91,17 @@ public sealed class SourceCoordinator : IAsyncDisposable /// public ulong GetDeliverySequence => _deliverySeq; + /// + /// When true, the target stream uses CRDT counter semantics and source messages + /// are aggregated by adding their counter values. + /// Go reference: server/stream.go — AllowMsgCounter source aggregation. + /// + public bool AllowMsgCounter { get; set; } + + // Per-source counter tracking: maps subject → last known counter value from this source. + // Used for delta computation during aggregation. + private readonly Dictionary _sourceCounterValues = new(StringComparer.Ordinal); + public SourceCoordinator(IStreamStore targetStore, StreamSourceConfig sourceConfig) { _targetStore = targetStore; @@ -183,13 +195,24 @@ public sealed class SourceCoordinator : IAsyncDisposable if (_expectedOriginSeq > 0 && message.Sequence <= _expectedOriginSeq) return; - // Subject transform: apply prefix before storing. + // Subject transform: apply prefix and/or SubjectTransforms before storing. // Go: server/stream.go:3943-3956 (subject transform for the source) var subject = message.Subject; if (!string.IsNullOrWhiteSpace(_sourceConfig.SubjectTransformPrefix)) subject = $"{_sourceConfig.SubjectTransformPrefix}{subject}"; + subject = ApplySubjectTransforms(subject); + + // Go: counter aggregation — when target has AllowMsgCounter, aggregate source counter values. + // Reference: server/stream.go — processInboundSourceMsg counter aggregation path. + if (AllowMsgCounter) + { + await AggregateCounterAsync(subject, message.Payload, ct); + } + else + { + await _targetStore.AppendAsync(subject, message.Payload, ct); + } - await _targetStore.AppendAsync(subject, message.Payload, ct); _expectedOriginSeq = message.Sequence; _deliverySeq++; LastOriginSequence = message.Sequence; @@ -461,8 +484,18 @@ public sealed class SourceCoordinator : IAsyncDisposable var subject = message.Subject; if (!string.IsNullOrWhiteSpace(_sourceConfig.SubjectTransformPrefix)) subject = $"{_sourceConfig.SubjectTransformPrefix}{subject}"; + subject = ApplySubjectTransforms(subject); + + // Go: counter aggregation — when target has AllowMsgCounter, aggregate source counter values. + if (AllowMsgCounter) + { + await AggregateCounterAsync(subject, message.Payload, ct); + } + else + { + await _targetStore.AppendAsync(subject, message.Payload, ct); + } - await _targetStore.AppendAsync(subject, message.Payload, ct); _expectedOriginSeq = message.Sequence; _deliverySeq++; LastOriginSequence = message.Sequence; @@ -471,6 +504,70 @@ public sealed class SourceCoordinator : IAsyncDisposable lock (_gate) _consecutiveFailures = 0; } + // ------------------------------------------------------------------------- + // Subject transforms + // Go reference: server/stream.go — SubjectTransforms on StreamSource. + // ------------------------------------------------------------------------- + + /// + /// Applies the SubjectTransforms list from the source config to the subject. + /// Each transform in the list is tried; the first match wins. + /// + private string ApplySubjectTransforms(string subject) + { + if (_sourceConfig.SubjectTransforms.Count == 0) + return subject; + + foreach (var stc in _sourceConfig.SubjectTransforms) + { + var transform = SubjectTransform.Create(stc.Source, stc.Destination); + if (transform == null) + continue; + + var result = transform.Apply(subject); + if (result != null) + return result; + } + + return subject; + } + + // ------------------------------------------------------------------------- + // Counter aggregation + // Go reference: server/stream.go — processInboundSourceMsg counter aggregation. + // ------------------------------------------------------------------------- + + /// + /// Aggregates a counter value from a source message into the target store. + /// Computes the delta from this source's last known value and adds it to the + /// target's current total for the subject. + /// + private async Task AggregateCounterAsync(string subject, ReadOnlyMemory payload, CancellationToken ct) + { + // Parse the source's current counter value from payload. + var sourceValue = CounterValue.FromPayload(payload.Span); + var sourceTotal = sourceValue.AsLong(); + + // Compute delta: how much this source's value changed since last seen. + _sourceCounterValues.TryGetValue(subject, out var previousSourceValue); + var delta = sourceTotal - previousSourceValue; + _sourceCounterValues[subject] = sourceTotal; + + if (delta == 0) + return; + + // Load existing target value and add the delta. + var lastMsg = await _targetStore.LoadLastBySubjectAsync(subject, ct); + var existing = lastMsg != null + ? CounterValue.FromPayload(lastMsg.Payload.Span) + : new CounterValue(); + + var newTotal = existing.AsLong() + delta; + var newPayload = CounterValue.FromLong(newTotal).ToPayload(); + + await _targetStore.AppendAsync(subject, newPayload, ct); + } + // ------------------------------------------------------------------------- // Deduplication helpers // ------------------------------------------------------------------------- diff --git a/src/NATS.Server/JetStream/Models/CounterValue.cs b/src/NATS.Server/JetStream/Models/CounterValue.cs new file mode 100644 index 0000000..bec9451 --- /dev/null +++ b/src/NATS.Server/JetStream/Models/CounterValue.cs @@ -0,0 +1,42 @@ +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace NATS.Server.JetStream.Models; + +// Go: server/stream.go — counterMsgPayload struct { Value string `json:"val"` } +// Reference: golang/nats-server/server/stream.go:20759 +public sealed class CounterValue +{ + [JsonPropertyName("val")] + public string Value { get; set; } = "0"; + + public long AsLong() => long.TryParse(Value, out var v) ? v : 0; + + public static CounterValue FromLong(long value) => new() { Value = value.ToString() }; + + public byte[] ToPayload() => JsonSerializer.SerializeToUtf8Bytes(this); + + public static CounterValue FromPayload(ReadOnlySpan payload) + { + if (payload.IsEmpty) + return new CounterValue(); + + try + { + return JsonSerializer.Deserialize(payload) ?? new CounterValue(); + } + catch (JsonException) + { + return new CounterValue(); + } + } +} + +// Go: NATS header constants for counter messages. +// Reference: golang/nats-server/server/stream.go — counter header constants. +public static class CounterHeaders +{ + public const string NatsIncr = "Nats-Incr"; + public const string NatsCounterSources = "Nats-Counter-Sources"; + public const string NatsStreamSource = "Nats-Stream-Source"; +} diff --git a/src/NATS.Server/JetStream/Models/StreamConfig.cs b/src/NATS.Server/JetStream/Models/StreamConfig.cs index ed96160..7012956 100644 --- a/src/NATS.Server/JetStream/Models/StreamConfig.cs +++ b/src/NATS.Server/JetStream/Models/StreamConfig.cs @@ -114,4 +114,16 @@ public sealed class StreamSourceConfig // Defaults to 0 (disabled). When > 0, duplicate messages with the same Nats-Msg-Id // within this window are silently dropped. public int DuplicateWindowMs { get; set; } + + // Go: StreamSource.SubjectTransforms — per-source subject transforms. + // Reference: golang/nats-server/server/stream.go — SubjectTransforms field. + public List SubjectTransforms { get; set; } = []; +} + +// Go: SubjectTransformConfig — source/destination subject transform pair. +// Reference: golang/nats-server/server/stream.go — SubjectTransformConfig struct. +public sealed class SubjectTransformConfig +{ + public string Source { get; set; } = string.Empty; + public string Destination { get; set; } = string.Empty; } diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index be1f047..5bd6772 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -3,6 +3,7 @@ using System.Collections.Concurrent; using System.Security.Cryptography; using System.Text; using System.Text.Json; +using System.Threading.Channels; using NATS.Server.JetStream.Models; using NATS.Server.Internal.TimeHashWheel; @@ -36,6 +37,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable private ulong _last; private ulong _first; // Go: first.seq — watermark for the first live or expected-first sequence + // Incremental state tracking — avoid O(n) scans in GetStateAsync/FastState. + // Updated in AppendAsync, StoreMsg, RemoveAsync, PurgeAsync, PurgeEx, Compact, + // Truncate, TrimToMaxMessages, EnforceMaxMsgsPerSubject, and recovery. + private ulong _messageCount; + private ulong _totalBytes; + private ulong _firstSeq; + // Set to true after Stop() is called. Prevents further writes. private bool _stopped; @@ -59,6 +67,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Reference: golang/nats-server/server/filestore.go:6148 (expireCache). private readonly WriteCacheManager _writeCache; + // Go: filestore.go:5841 — background flush loop coalesces buffered writes. + // Reference: golang/nats-server/server/filestore.go:328-331 (coalesce constants). + private readonly Channel _flushSignal = Channel.CreateBounded(1); + private readonly CancellationTokenSource _flushCts = new(); + private Task? _flushTask; + private const int CoalesceMinimum = 16 * 1024; // 16KB — Go: filestore.go:328 + private const int MaxFlushWaitMs = 8; // 8ms — Go: filestore.go:331 + // Go: filestore.go — generation counter for cache invalidation. // Incremented on every write (Append/StoreRawMsg) and delete (Remove/Purge/Compact). // NumFiltered caches results keyed by (filter, generation) so repeated calls for @@ -92,6 +108,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _options.MaxCacheSize, _options.CacheExpiry, blockId => _blocks.Find(b => b.BlockId == blockId)); + + // Go: filestore.go:5841 — start background flush loop for write coalescing. + _flushTask = Task.Run(() => FlushLoopAsync(_flushCts.Token)); } public async ValueTask AppendAsync(string subject, ReadOnlyMemory payload, CancellationToken ct) @@ -99,6 +118,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable if (_stopped) throw new ObjectDisposedException(nameof(FileStore), "Store has been stopped."); + // Go: DiscardNew — reject when MaxBytes would be exceeded. + // Reference: golang/nats-server/server/filestore.go — storeMsg, discard new check. + if (_options.MaxBytes > 0 && _options.Discard == DiscardPolicy.New + && (long)_totalBytes + payload.Length > _options.MaxBytes) + { + throw new StoreCapacityException("maximum bytes exceeded"); + } + // Go: check and remove expired messages before each append. // Reference: golang/nats-server/server/filestore.go — storeMsg, expire check. ExpireFromWheel(); @@ -117,6 +144,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _messages[_last] = stored; _generation++; + // Incremental state tracking. + _messageCount++; + _totalBytes += (ulong)payload.Length; + if (_messageCount == 1) + _firstSeq = _last; + // Go: register new message in TTL wheel when MaxAgeMs is configured. // Reference: golang/nats-server/server/filestore.go:6820 (storeMsg TTL schedule). RegisterTtl(_last, timestamp, _options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0); @@ -138,10 +171,19 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager. _writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length); + // Signal the background flush loop to coalesce and flush pending writes. + _flushSignal.Writer.TryWrite(0); + // Check if the block just became sealed after this write. if (_activeBlock!.IsSealed) RotateBlock(); + // Go: enforce MaxMsgsPerSubject — remove oldest messages for this subject + // when the per-subject count exceeds the limit. + // Reference: golang/nats-server/server/filestore.go — enforcePerSubjectLimit. + if (_options.MaxMsgsPerSubject > 0 && !string.IsNullOrEmpty(subject)) + EnforceMaxMsgsPerSubject(subject); + return _last; } @@ -170,18 +212,25 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public ValueTask RemoveAsync(ulong sequence, CancellationToken ct) { - var removed = _messages.Remove(sequence); - if (removed) - { - _generation++; - if (sequence == _last) - _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); + if (!_messages.TryGetValue(sequence, out var msg)) + return ValueTask.FromResult(false); - // Soft-delete in the block that contains this sequence. - DeleteInBlock(sequence); - } + _messages.Remove(sequence); + _generation++; - return ValueTask.FromResult(removed); + // Incremental state tracking. + _messageCount--; + _totalBytes -= (ulong)msg.Payload.Length; + if (sequence == _firstSeq) + _firstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(); + + if (sequence == _last) + _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); + + // Soft-delete in the block that contains this sequence. + DeleteInBlock(sequence); + + return ValueTask.FromResult(true); } public ValueTask PurgeAsync(CancellationToken ct) @@ -189,6 +238,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _messages.Clear(); _generation++; _last = 0; + _messageCount = 0; + _totalBytes = 0; + _firstSeq = 0; // Dispose and delete all blocks. DisposeAllBlocks(); @@ -225,6 +277,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { _messages.Clear(); _last = 0; + _messageCount = 0; + _totalBytes = 0; + _firstSeq = 0; // Dispose existing blocks and clean files. DisposeAllBlocks(); @@ -248,6 +303,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _messages[record.Sequence] = message; _last = Math.Max(_last, record.Sequence); } + + // Recompute incremental state from restored messages. + _messageCount = (ulong)_messages.Count; + _totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length); + _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; } } @@ -260,23 +320,35 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable { return ValueTask.FromResult(new ApiStreamState { - Messages = (ulong)_messages.Count, - FirstSeq = _messages.Count == 0 ? 0UL : _messages.Keys.Min(), + Messages = _messageCount, + FirstSeq = _messageCount == 0 ? (_first > 0 ? _first : 0UL) : _firstSeq, LastSeq = _last, - Bytes = (ulong)_messages.Values.Sum(m => m.Payload.Length), + Bytes = _totalBytes, }); } public void TrimToMaxMessages(ulong maxMessages) { + var trimmed = false; while ((ulong)_messages.Count > maxMessages) { var first = _messages.Keys.Min(); + if (_messages.TryGetValue(first, out var msg)) + { + _totalBytes -= (ulong)msg.Payload.Length; + _messageCount--; + } + _messages.Remove(first); + DeleteInBlock(first); + trimmed = true; } - // Rewrite blocks to reflect the trim (removes trimmed messages from disk). - RewriteBlocks(); + if (!trimmed) + return; + + _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; + _generation++; } @@ -309,13 +381,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable byte[] combined; if (hdr is { Length: > 0 }) { - combined = new byte[hdr.Length + msg.Length]; + combined = new byte[hdr.Length + (msg?.Length ?? 0)]; hdr.CopyTo(combined, 0); - msg.CopyTo(combined, hdr.Length); + msg?.CopyTo(combined, hdr.Length); } else { - combined = msg; + combined = msg ?? []; } var persistedPayload = TransformForPersist(combined.AsSpan()); @@ -329,6 +401,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _messages[_last] = stored; _generation++; + // Incremental state tracking. + _messageCount++; + _totalBytes += (ulong)combined.Length; + if (_messageCount == 1) + _firstSeq = _last; + // Determine effective TTL: per-message ttl (ns) takes priority over MaxAgeMs. // Go: filestore.go:6830 — if msg.ttl > 0 use it, else use cfg.MaxAge. var effectiveTtlNs = ttl > 0 ? ttl : (_options.MaxAgeMs > 0 ? (long)_options.MaxAgeMs * 1_000_000L : 0L); @@ -348,6 +426,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Go: filestore.go:4443 (setupWriteCache) — record write in bounded cache manager. _writeCache.TrackWrite(_activeBlock!.BlockId, persistedPayload.Length); + // Signal the background flush loop to coalesce and flush pending writes. + _flushSignal.Writer.TryWrite(0); + if (_activeBlock!.IsSealed) RotateBlock(); @@ -364,6 +445,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _messages.Clear(); _generation++; _last = 0; + _messageCount = 0; + _totalBytes = 0; + _firstSeq = 0; DisposeAllBlocks(); CleanBlockFiles(); @@ -407,10 +491,13 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable foreach (var msg in toRemove) { _messages.Remove(msg.Sequence); + _totalBytes -= (ulong)msg.Payload.Length; + _messageCount--; DeleteInBlock(msg.Sequence); } _generation++; + _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; // Update _last if required. if (_messages.Count == 0) @@ -437,6 +524,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable foreach (var s in toRemove) { + if (_messages.TryGetValue(s, out var msg)) + { + _totalBytes -= (ulong)msg.Payload.Length; + _messageCount--; + } + _messages.Remove(s); DeleteInBlock(s); } @@ -448,6 +541,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Go: preserve _last (monotonically increasing), advance _first to seq. // Compact(seq) removes everything < seq; the new first is seq. _first = seq; + _firstSeq = 0; } else { @@ -455,6 +549,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _last = _messages.Keys.Max(); // Update _first to reflect the real first message. _first = _messages.Keys.Min(); + _firstSeq = _first; } return (ulong)toRemove.Length; @@ -473,6 +568,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _messages.Clear(); _generation++; _last = 0; + _messageCount = 0; + _totalBytes = 0; + _firstSeq = 0; DisposeAllBlocks(); CleanBlockFiles(); return; @@ -481,6 +579,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable var toRemove = _messages.Keys.Where(k => k > seq).ToArray(); foreach (var s in toRemove) { + if (_messages.TryGetValue(s, out var msg)) + { + _totalBytes -= (ulong)msg.Payload.Length; + _messageCount--; + } + _messages.Remove(s); DeleteInBlock(s); } @@ -491,6 +595,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Update _last to the new highest existing sequence (or seq if it exists, // or the highest below seq). _last = _messages.Count == 0 ? 0 : _messages.Keys.Max(); + _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; } /// @@ -717,12 +822,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public void FastState(ref StreamState state) { - state.Msgs = (ulong)_messages.Count; - state.Bytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length); + state.Msgs = _messageCount; + state.Bytes = _totalBytes; state.LastSeq = _last; state.LastTime = default; - if (_messages.Count == 0) + if (_messageCount == 0) { // Go: when all messages are removed/expired, first.seq tracks the watermark. // If _first > 0 use it (set by Compact / SkipMsg); otherwise 0. @@ -732,15 +837,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable } else { - var firstSeq = _messages.Keys.Min(); + var firstSeq = _firstSeq; state.FirstSeq = firstSeq; - state.FirstTime = _messages[firstSeq].TimestampUtc; + state.FirstTime = _messages.TryGetValue(firstSeq, out var firstMsg) ? firstMsg.TimestampUtc : default; // Go parity: LastTime from the actual last stored message (not _last, // which may be a skip/tombstone sequence with no corresponding message). if (_messages.TryGetValue(_last, out var lastMsg)) state.LastTime = lastMsg.TimestampUtc; - else + else if (_messages.Count > 0) { // _last is a skip — use the highest actual message time. var actualLast = _messages.Keys.Max(); @@ -808,9 +913,19 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public async ValueTask DisposeAsync() { - // Go: filestore.go:5499 — flush all pending writes before closing. - await _writeCache.DisposeAsync(); + // Stop the background flush loop first to prevent it from accessing + // blocks that are about to be disposed. + await StopFlushLoopAsync(); + + // Flush pending buffered writes on all blocks before closing. + foreach (var block in _blocks) + block.FlushPending(); + + // Dispose blocks first so the write cache lookup returns null for + // already-closed blocks. WriteCacheManager.FlushAllAsync guards + // against null blocks, so this ordering prevents ObjectDisposedException. DisposeAllBlocks(); + await _writeCache.DisposeAsync(); _stateWriteLock.Dispose(); } @@ -820,6 +935,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public void Dispose() { + StopFlushLoop(); + foreach (var block in _blocks) + block.FlushPending(); DisposeAllBlocks(); _stateWriteLock.Dispose(); } @@ -861,6 +979,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// private void RotateBlock() { + // Flush any pending buffered writes before sealing the outgoing block. + _activeBlock?.FlushPending(); + // Go: filestore.go:4499 (flushPendingMsgsLocked) — evict the outgoing block's // write cache via WriteCacheManager before rotating to the new block. // WriteCacheManager.EvictBlock flushes to disk then clears the cache. @@ -901,10 +1022,12 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// private void DisposeAllBlocks() { + // Clear _activeBlock first so the background flush loop sees null + // and skips FlushPending, avoiding ObjectDisposedException on the lock. + _activeBlock = null; foreach (var block in _blocks) block.Dispose(); _blocks.Clear(); - _activeBlock = null; _nextBlockId = 0; } @@ -933,6 +1056,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable CleanBlockFiles(); _last = _messages.Count == 0 ? 0UL : _messages.Keys.Max(); + _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; + _messageCount = (ulong)_messages.Count; + _totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length); foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) { @@ -1046,6 +1172,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _first = _messages.Keys.Min(); else if (_last > 0) _first = _last + 1; + + // Recompute incremental state from recovered messages. + _messageCount = (ulong)_messages.Count; + _totalBytes = (ulong)_messages.Values.Sum(m => (long)m.Payload.Length); + _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; } /// @@ -1239,10 +1370,20 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable // Reference: golang/nats-server/server/filestore.go:expireMsgs — dmap-based removal. foreach (var seq in expired) { - _messages.Remove(seq); + if (_messages.Remove(seq, out var msg)) + { + _messageCount--; + _totalBytes -= (ulong)msg.Payload.Length; + } + DeleteInBlock(seq); } + if (_messages.Count > 0) + _firstSeq = _messages.Keys.Min(); + else + _firstSeq = 0; + _generation++; } @@ -1265,8 +1406,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable return; foreach (var sequence in expired) - _messages.Remove(sequence); + { + if (_messages.Remove(sequence, out var msg)) + { + _messageCount--; + _totalBytes -= (ulong)msg.Payload.Length; + } + } + _firstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL; RewriteBlocks(); } @@ -1291,6 +1439,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable private byte[] TransformForPersist(ReadOnlySpan payload) { + // Fast path: no compression or encryption — store raw payload without envelope. + // Avoids SHA256 hashing and envelope allocation on the hot publish path. + if (!_useS2 && !_useAead && !_options.EnableCompression && !_options.EnableEncryption) + return payload.ToArray(); + var plaintext = payload.ToArray(); var transformed = plaintext; byte flags = 0; @@ -1523,20 +1676,29 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public bool RemoveMsg(ulong seq) { - var removed = _messages.Remove(seq); - if (removed) + if (!_messages.Remove(seq, out var msg)) + return false; + + _generation++; + _messageCount--; + _totalBytes -= (ulong)msg.Payload.Length; + + // Go: filestore.go — LastSeq (lmb.last.seq) is a high-water mark and is + // never decremented on removal. Only FirstSeq advances when the first + // live message is removed. + if (_messages.Count == 0) { - _generation++; - // Go: filestore.go — LastSeq (lmb.last.seq) is a high-water mark and is - // never decremented on removal. Only FirstSeq advances when the first - // live message is removed. - if (_messages.Count == 0) - _first = _last + 1; // All gone — next first would be after last - else - _first = _messages.Keys.Min(); - DeleteInBlock(seq); + _first = _last + 1; // All gone — next first would be after last + _firstSeq = 0; } - return removed; + else + { + _first = _messages.Keys.Min(); + _firstSeq = _first; + } + + DeleteInBlock(seq); + return true; } /// @@ -1547,15 +1709,23 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public bool EraseMsg(ulong seq) { - if (!_messages.Remove(seq, out _)) + if (!_messages.Remove(seq, out var msg)) return false; _generation++; + _messageCount--; + _totalBytes -= (ulong)msg.Payload.Length; if (_messages.Count == 0) + { _first = _last + 1; + _firstSeq = 0; + } else + { _first = _messages.Keys.Min(); + _firstSeq = _first; + } // Secure erase: overwrite payload bytes with random data before marking deleted. // Reference: golang/nats-server/server/filestore.go:5890 (eraseMsg). @@ -1977,7 +2147,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable _stopped = true; - // Flush the active block to ensure all buffered writes reach disk. + // Stop the background flush loop before accessing blocks. + StopFlushLoop(); + + // Flush pending buffered writes and the active block to ensure all data reaches disk. + _activeBlock?.FlushPending(); _activeBlock?.Flush(); // Dispose all blocks to release OS file handles. The files remain on disk. @@ -1994,14 +2168,54 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable public byte[] EncodedStreamState(ulong failed) => []; /// - /// Updates the stream configuration. Currently a no-op placeholder — config - /// changes that affect storage (MaxMsgsPer, MaxAge, etc.) will be enforced - /// when the stream engine is fully wired. + /// Updates the stream configuration. Applies new limits (MaxMsgsPerSubject, + /// MaxAge, etc.) to the store options. /// Reference: golang/nats-server/server/filestore.go — UpdateConfig. /// public void UpdateConfig(StreamConfig cfg) { - // TODO: enforce per-subject limits, update TTL wheel settings, etc. + _options.MaxMsgsPerSubject = cfg.MaxMsgsPer; + if (cfg.MaxAgeMs > 0) + _options.MaxAgeMs = cfg.MaxAgeMs; + + // Enforce per-subject limits immediately after config change. + if (_options.MaxMsgsPerSubject > 0) + { + var subjects = _messages.Values.Select(m => m.Subject).Distinct(StringComparer.Ordinal).ToList(); + foreach (var subject in subjects) + EnforceMaxMsgsPerSubject(subject); + } + } + + /// + /// Removes oldest messages for the given subject until the per-subject count + /// is within the limit. + /// Reference: golang/nats-server/server/filestore.go — enforcePerSubjectLimit. + /// + private void EnforceMaxMsgsPerSubject(string subject) + { + var limit = _options.MaxMsgsPerSubject; + if (limit <= 0) + return; + + var subjectMsgs = _messages + .Where(kv => string.Equals(kv.Value.Subject, subject, StringComparison.Ordinal)) + .OrderBy(kv => kv.Key) + .ToList(); + + while (subjectMsgs.Count > limit) + { + var oldest = subjectMsgs[0]; + _totalBytes -= (ulong)oldest.Value.Payload.Length; + _messageCount--; + _messages.Remove(oldest.Key); + DeleteInBlock(oldest.Key); + subjectMsgs.RemoveAt(0); + _generation++; + } + + if (_messages.Count > 0) + _firstSeq = _messages.Keys.Min(); } /// @@ -2045,10 +2259,62 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable /// public async Task FlushAllPending() { + _activeBlock?.FlushPending(); _activeBlock?.Flush(); await WriteStreamStateAsync(); } + /// + /// Background flush loop that coalesces buffered writes from MsgBlock into + /// batched disk writes. Waits for a signal from AppendAsync/StoreMsg, then + /// optionally waits up to ms to accumulate at + /// least bytes before flushing. + /// Reference: golang/nats-server/server/filestore.go:5841 (flushLoop). + /// + private async Task FlushLoopAsync(CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + try { await _flushSignal.Reader.WaitToReadAsync(ct); } + catch (OperationCanceledException) { return; } + _flushSignal.Reader.TryRead(out _); + + var block = _activeBlock; + if (block is null) + continue; + + var waited = 0; + while (block.PendingWriteSize < CoalesceMinimum && waited < MaxFlushWaitMs) + { + try { await Task.Delay(1, ct); } + catch (OperationCanceledException) { break; } + waited++; + } + + block.FlushPending(); + } + } + + /// + /// Cancels the background flush loop and waits for it to complete. + /// Must be called before disposing blocks to avoid accessing disposed locks. + /// + private void StopFlushLoop() + { + _flushCts.Cancel(); + _flushTask?.GetAwaiter().GetResult(); + } + + /// + /// Async version of . + /// + private async Task StopFlushLoopAsync() + { + await _flushCts.CancelAsync(); + if (_flushTask is not null) + await _flushTask; + } + /// /// Atomically persists a compact stream state snapshot to disk using /// (write-to-temp-then-rename) so that a @@ -2064,9 +2330,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable, IDisposable var snapshot = new StreamStateSnapshot { - FirstSeq = _messages.Count > 0 ? _messages.Keys.Min() : 0UL, + FirstSeq = _messageCount > 0 ? _firstSeq : 0UL, LastSeq = _last, - Messages = (ulong)_messages.Count, + Messages = _messageCount, Bytes = (ulong)_blocks.Sum(b => b.BytesUsed), }; diff --git a/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs b/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs index 0376732..3f1d203 100644 --- a/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs +++ b/src/NATS.Server/JetStream/Storage/FileStoreOptions.cs @@ -1,3 +1,5 @@ +using NATS.Server.JetStream.Models; + namespace NATS.Server.JetStream.Storage; public sealed class FileStoreOptions @@ -7,6 +9,14 @@ public sealed class FileStoreOptions public string IndexManifestFileName { get; set; } = "index.manifest.json"; public int MaxAgeMs { get; set; } + // Go: StreamConfig.MaxBytes — maximum total bytes for the stream. + // Reference: golang/nats-server/server/filestore.go — maxBytes field. + public long MaxBytes { get; set; } + + // Go: StreamConfig.Discard — discard policy (Old or New). + // Reference: golang/nats-server/server/filestore.go — discardPolicy field. + public DiscardPolicy Discard { get; set; } = DiscardPolicy.Old; + // Legacy boolean compression / encryption flags (FSV1 envelope format). // When set and the corresponding enum is left at its default (NoCompression / // NoCipher), the legacy Deflate / XOR path is used for backward compatibility. diff --git a/src/NATS.Server/JetStream/Storage/MsgBlock.cs b/src/NATS.Server/JetStream/Storage/MsgBlock.cs index 7abc5e2..8742035 100644 --- a/src/NATS.Server/JetStream/Storage/MsgBlock.cs +++ b/src/NATS.Server/JetStream/Storage/MsgBlock.cs @@ -48,6 +48,12 @@ public sealed class MsgBlock : IDisposable // Reference: golang/nats-server/server/filestore.go:236 (cache field) private Dictionary? _cache; + // Pending write buffer — accumulates encoded records for batched disk writes. + // The background flush loop in FileStore coalesces these into fewer I/O calls. + // Reference: golang/nats-server/server/filestore.go:6700 (cache.buf write path). + private readonly List<(byte[] Data, long Offset)> _pendingWrites = new(); + private int _pendingBytes; + // Go: msgBlock.lchk — last written record checksum (XxHash64, 8 bytes). // Tracked so callers can chain checksum verification across blocks. // Reference: golang/nats-server/server/filestore.go:2204 (lchk field) @@ -147,6 +153,23 @@ public sealed class MsgBlock : IDisposable } } + /// + /// Total bytes of pending (not yet flushed to disk) writes in this block. + /// Used by the background flush loop to decide when to coalesce. + /// + public int PendingWriteSize + { + get + { + if (_disposed) + return 0; + try { _lock.EnterReadLock(); } + catch (ObjectDisposedException) { return 0; } + try { return _pendingBytes; } + finally { _lock.ExitReadLock(); } + } + } + /// /// The XxHash64 checksum of the last record written to this block (8 bytes), or null /// if no records have been written yet. Updated after every , @@ -230,8 +253,10 @@ public sealed class MsgBlock : IDisposable var encoded = MessageRecord.Encode(record); var offset = _writeOffset; - // Write at the current append offset using positional I/O - RandomAccess.Write(_handle, encoded, offset); + // Buffer the write for batched disk I/O — the background flush loop + // in FileStore will coalesce pending writes. + _pendingWrites.Add((encoded, offset)); + _pendingBytes += encoded.Length; _writeOffset = offset + encoded.Length; _index[sequence] = (offset, encoded.Length); @@ -295,7 +320,10 @@ public sealed class MsgBlock : IDisposable var encoded = MessageRecord.Encode(record); var offset = _writeOffset; - RandomAccess.Write(_handle, encoded, offset); + // Buffer the write for batched disk I/O — the background flush loop + // in FileStore will coalesce pending writes. + _pendingWrites.Add((encoded, offset)); + _pendingBytes += encoded.Length; _writeOffset = offset + encoded.Length; _index[sequence] = (offset, encoded.Length); @@ -333,7 +361,8 @@ public sealed class MsgBlock : IDisposable /// The decoded record, or null if not found or deleted. public MessageRecord? Read(ulong sequence) { - _lock.EnterReadLock(); + // Use a write lock because we may need to flush pending writes. + _lock.EnterWriteLock(); try { if (_deleted.Contains(sequence)) @@ -347,6 +376,15 @@ public sealed class MsgBlock : IDisposable if (!_index.TryGetValue(sequence, out var entry)) return null; + // Flush pending writes so disk reads see the latest data. + if (_pendingWrites.Count > 0) + { + foreach (var (data, off) in _pendingWrites) + RandomAccess.Write(_handle, data, off); + _pendingWrites.Clear(); + _pendingBytes = 0; + } + var buffer = new byte[entry.Length]; RandomAccess.Read(_handle, buffer, entry.Offset); @@ -354,7 +392,7 @@ public sealed class MsgBlock : IDisposable } finally { - _lock.ExitReadLock(); + _lock.ExitWriteLock(); } } @@ -384,6 +422,15 @@ public sealed class MsgBlock : IDisposable if (!_deleted.Add(sequence)) return false; + // Flush any pending writes so the record is on disk before we read it back. + if (_pendingWrites.Count > 0) + { + foreach (var (data, off) in _pendingWrites) + RandomAccess.Write(_handle, data, off); + _pendingWrites.Clear(); + _pendingBytes = 0; + } + // Read the existing record, re-encode with Deleted flag, write back in-place. // The encoded size doesn't change (only flags byte + checksum differ). var buffer = new byte[entry.Length]; @@ -455,7 +502,9 @@ public sealed class MsgBlock : IDisposable var encoded = MessageRecord.Encode(record); var offset = _writeOffset; - RandomAccess.Write(_handle, encoded, offset); + // Buffer the write for batched disk I/O. + _pendingWrites.Add((encoded, offset)); + _pendingBytes += encoded.Length; _writeOffset = offset + encoded.Length; _index[sequence] = (offset, encoded.Length); @@ -500,6 +549,44 @@ public sealed class MsgBlock : IDisposable } } + /// + /// Flushes all buffered (pending) writes to disk in a single batch. + /// Called by the background flush loop in FileStore, or synchronously on + /// block seal / dispose to ensure all data reaches disk. + /// Reference: golang/nats-server/server/filestore.go:7592 (flushPendingMsgsLocked). + /// + /// The number of bytes flushed. + public int FlushPending() + { + if (_disposed) + return 0; + + try + { + _lock.EnterWriteLock(); + } + catch (ObjectDisposedException) + { + // Block was disposed concurrently (e.g. during PurgeAsync). + return 0; + } + + try + { + if (_pendingWrites.Count == 0) + return 0; + + foreach (var (data, offset) in _pendingWrites) + RandomAccess.Write(_handle, data, offset); + + var flushed = _pendingBytes; + _pendingWrites.Clear(); + _pendingBytes = 0; + return flushed; + } + finally { _lock.ExitWriteLock(); } + } + /// /// Returns true if the given sequence number has been soft-deleted in this block. /// Reference: golang/nats-server/server/filestore.go — dmap (deleted map) lookup. @@ -559,11 +646,20 @@ public sealed class MsgBlock : IDisposable /// public IEnumerable<(ulong Sequence, string Subject)> EnumerateNonDeleted() { - // Snapshot index and deleted set under the read lock, then decode outside it. + // Snapshot index and deleted set under a write lock (may need to flush pending). List<(long Offset, int Length, ulong Seq)> entries; - _lock.EnterReadLock(); + _lock.EnterWriteLock(); try { + // Flush pending writes so disk reads see latest data. + if (_pendingWrites.Count > 0) + { + foreach (var (data, off) in _pendingWrites) + RandomAccess.Write(_handle, data, off); + _pendingWrites.Clear(); + _pendingBytes = 0; + } + entries = new List<(long, int, ulong)>(_index.Count); foreach (var (seq, (offset, length)) in _index) { @@ -573,7 +669,7 @@ public sealed class MsgBlock : IDisposable } finally { - _lock.ExitReadLock(); + _lock.ExitWriteLock(); } // Sort by sequence for deterministic output. @@ -609,13 +705,22 @@ public sealed class MsgBlock : IDisposable } /// - /// Flushes any buffered writes to disk. + /// Flushes any pending buffered writes and then syncs the file to disk. /// public void Flush() { _lock.EnterWriteLock(); try { + // Flush pending buffered writes first. + if (_pendingWrites.Count > 0) + { + foreach (var (data, offset) in _pendingWrites) + RandomAccess.Write(_handle, data, offset); + _pendingWrites.Clear(); + _pendingBytes = 0; + } + _file.Flush(flushToDisk: true); } finally @@ -636,6 +741,15 @@ public sealed class MsgBlock : IDisposable _lock.EnterWriteLock(); try { + // Flush pending buffered writes before closing. + if (_pendingWrites.Count > 0) + { + foreach (var (data, offset) in _pendingWrites) + RandomAccess.Write(_handle, data, offset); + _pendingWrites.Clear(); + _pendingBytes = 0; + } + _file.Flush(); _file.Dispose(); } diff --git a/src/NATS.Server/JetStream/Storage/StoreCapacityException.cs b/src/NATS.Server/JetStream/Storage/StoreCapacityException.cs new file mode 100644 index 0000000..4bb8051 --- /dev/null +++ b/src/NATS.Server/JetStream/Storage/StoreCapacityException.cs @@ -0,0 +1,5 @@ +namespace NATS.Server.JetStream.Storage; + +// Go: filestore.go — ErrMaxBytes / ErrMaxMsgs capacity errors. +// Reference: golang/nats-server/server/store.go — ErrStoreResourcesExceeded. +public sealed class StoreCapacityException(string message) : Exception(message); diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index c30c5b0..2cfc40a 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -148,6 +148,31 @@ public sealed class StreamManager : IDisposable if (isCreate && _account is not null && !_account.TryReserveStream()) return JetStreamApiResponse.ErrorResponse(10027, "maximum streams exceeded"); + // Go: subject overlap detection on create — reject new streams whose subjects + // collide with existing streams. + // Reference: server/stream.go — checkStreamOverlap during addStream. + if (isCreate && normalized.Subjects.Count > 0) + { + var otherStreams = _streams.Values.Select(s => s.Config); + var overlapErrors = new List(); + foreach (var otherStream in otherStreams) + { + foreach (var proposedSubj in normalized.Subjects) + { + foreach (var otherSubj in otherStream.Subjects) + { + if (SubjectMatch.MatchLiteral(proposedSubj, otherSubj) + || SubjectMatch.MatchLiteral(otherSubj, proposedSubj) + || SubjectMatch.SubjectsCollide(proposedSubj, otherSubj)) + { + return JetStreamApiResponse.ErrorResponse(400, + $"subjects overlap with stream '{otherStream.Name}'"); + } + } + } + } + } + // Go: stream.go:update — validate immutable fields on update. // Reference: server/stream.go:1500-1600 (stream.update) if (!isCreate && _streams.TryGetValue(normalized.Name, out var existingHandle)) @@ -202,6 +227,11 @@ public sealed class StreamManager : IDisposable _replicaGroups.TryRemove(name, out _); _account?.ReleaseStream(); RebuildReplicationCoordinators(); + + // Go: propagate stream deletion to meta group so cluster state is updated. + // Reference: server/jetstream_cluster.go — processStreamRemoval updates meta state. + _metaGroup?.ProposeDeleteStreamAsync(name, default).GetAwaiter().GetResult(); + return true; } @@ -367,7 +397,10 @@ public sealed class StreamManager : IDisposable if (stream == null) return null; - + // Go: sealed stream rejects all publishes. + // Reference: server/stream.go — processJetStreamMsg checks mset.cfg.Sealed. + if (stream.Config.Sealed) + return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 }; if (stream.Config.MaxMsgSize > 0 && payload.Length > stream.Config.MaxMsgSize) { @@ -378,12 +411,19 @@ public sealed class StreamManager : IDisposable }; } - PruneExpiredMessages(stream, DateTime.UtcNow); - // Go: memStoreMsgSize — full message size includes subject + headers + payload + 16 bytes overhead. var msgSize = subject.Length + payload.Length + 16; var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); + + // Go: DiscardPolicy.New — reject when MaxMsgs reached. + // Reference: server/stream.go — processJetStreamMsg checks discard new + maxMsgs. + if (stream.Config.MaxMsgs > 0 && stream.Config.Discard == DiscardPolicy.New + && (long)stateBefore.Messages >= stream.Config.MaxMsgs) + { + return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 }; + } + if (stream.Config.MaxBytes > 0 && (long)stateBefore.Bytes + msgSize > stream.Config.MaxBytes) { if (stream.Config.Discard == DiscardPolicy.New) @@ -402,8 +442,13 @@ public sealed class StreamManager : IDisposable } } - if (_replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup)) + // Go: single-replica streams don't use RAFT consensus — skip the propose overhead. + // Reference: server/stream.go — processJetStreamMsg only proposes when R > 1. + if (stream.Config.Replicas > 1 + && _replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup)) + { _ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult(); + } // Go: stream.go:processMsgSubjectTransform — apply input subject transform before store. // Reference: server/stream.go:1810-1830 @@ -411,9 +456,63 @@ public sealed class StreamManager : IDisposable var seq = stream.Store.AppendAsync(storeSubject, payload, default).GetAwaiter().GetResult(); EnforceRuntimePolicies(stream, DateTime.UtcNow); - var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult(); - if (stored != null) - ReplicateIfConfigured(stream.Config.Name, stored); + + // Only load the stored message when replication is configured (mirror/source). + // Avoids unnecessary disk I/O on the hot publish path. + if (_mirrorsByOrigin.ContainsKey(stream.Config.Name) || _sourcesByOrigin.ContainsKey(stream.Config.Name)) + { + var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult(); + if (stored != null) + ReplicateIfConfigured(stream.Config.Name, stored); + } + + return new PubAck + { + Stream = stream.Config.Name, + Seq = seq, + }; + } + + /// + /// Captures a counter increment message for a stream with AllowMsgCounter=true. + /// Go reference: server/stream.go — processJetStreamMsg counter path. + /// The server loads the last stored value for the subject, adds the increment, + /// and stores the new total as a JSON payload. + /// + public PubAck? CaptureCounter(string subject, long increment) + { + var stream = FindBySubject(subject); + if (stream == null) + return null; + + if (!stream.Config.AllowMsgCounter) + return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 }; + + if (stream.Config.Sealed) + return new PubAck { Stream = stream.Config.Name, ErrorCode = 10054 }; + + // Go: stream.go — counter increment: load last value, add increment, store new total. + var storeSubject = ApplyInputTransform(stream.Config, subject); + var lastMsg = stream.Store.LoadLastBySubjectAsync(storeSubject, default).GetAwaiter().GetResult(); + var existing = lastMsg != null + ? CounterValue.FromPayload(lastMsg.Payload.Span) + : new CounterValue(); + + var newTotal = existing.AsLong() + increment; + var newPayload = CounterValue.FromLong(newTotal).ToPayload(); + + if (_replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup)) + _ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult(); + + var seq = stream.Store.AppendAsync(storeSubject, newPayload, default).GetAwaiter().GetResult(); + EnforceRuntimePolicies(stream, DateTime.UtcNow); + + if (_mirrorsByOrigin.ContainsKey(stream.Config.Name) || _sourcesByOrigin.ContainsKey(stream.Config.Name)) + { + var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult(); + if (stored != null) + ReplicateIfConfigured(stream.Config.Name, stored); + } return new PubAck { @@ -468,6 +567,11 @@ public sealed class StreamManager : IDisposable SourceAccount = s.SourceAccount, FilterSubject = s.FilterSubject, DuplicateWindowMs = s.DuplicateWindowMs, + SubjectTransforms = [.. s.SubjectTransforms.Select(t => new SubjectTransformConfig + { + Source = t.Source, + Destination = t.Destination, + })], })], // Go: StreamConfig.SubjectTransform SubjectTransformSource = config.SubjectTransformSource, @@ -654,8 +758,10 @@ public sealed class StreamManager : IDisposable private static void ApplyLimitsRetention(StreamHandle stream, DateTime nowUtc) { EnforceLimits(stream); - PrunePerSubject(stream); - PruneExpiredMessages(stream, nowUtc); + if (stream.Config.MaxMsgsPer > 0) + PrunePerSubject(stream); + if (stream.Config.MaxAgeMs > 0) + PruneExpiredMessages(stream, nowUtc); } private void ApplyWorkQueueRetention(StreamHandle stream, DateTime nowUtc) @@ -746,7 +852,10 @@ public sealed class StreamManager : IDisposable && _streams.TryGetValue(stream.Config.Source, out _)) { var list = _sourcesByOrigin.GetOrAdd(stream.Config.Source, _ => []); - list.Add(new SourceCoordinator(stream.Store, new StreamSourceConfig { Name = stream.Config.Source })); + list.Add(new SourceCoordinator(stream.Store, new StreamSourceConfig { Name = stream.Config.Source }) + { + AllowMsgCounter = stream.Config.AllowMsgCounter, + }); } if (stream.Config.Sources.Count > 0) @@ -757,7 +866,10 @@ public sealed class StreamManager : IDisposable continue; var list = _sourcesByOrigin.GetOrAdd(source.Name, _ => []); - list.Add(new SourceCoordinator(stream.Store, source)); + list.Add(new SourceCoordinator(stream.Store, source) + { + AllowMsgCounter = stream.Config.AllowMsgCounter, + }); } } } diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 1793e58..0a955f3 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -879,8 +879,10 @@ public sealed class NatsClient : INatsClient, IDisposable { while (await timer.WaitForNextTickAsync(ct)) { - // Delay first PING until client has responded with PONG or 2 seconds elapsed - if (!_flags.HasFlag(ClientFlags.FirstPongSent) + // Delay first PING until client has responded with PONG or 2 seconds elapsed. + // Go: opts.go — DisableShortFirstPing bypasses this grace period. + if (!_options.DisableShortFirstPing + && !_flags.HasFlag(ClientFlags.FirstPongSent) && (DateTime.UtcNow - StartTime).TotalSeconds < 2) { continue; diff --git a/src/NATS.Server/NatsOptions.cs b/src/NATS.Server/NatsOptions.cs index 89a4b2b..4ca2ebd 100644 --- a/src/NATS.Server/NatsOptions.cs +++ b/src/NATS.Server/NatsOptions.cs @@ -25,6 +25,11 @@ public sealed class NatsOptions public TimeSpan PingInterval { get; set; } = NatsProtocol.DefaultPingInterval; public int MaxPingsOut { get; set; } = NatsProtocol.DefaultPingMaxOut; + // Go: opts.go — DisableShortFirstPing. When true, the first PING timer tick + // is not suppressed by the FirstPongSent / 2-second grace period. + // Useful in tests to ensure deterministic ping behavior. + public bool DisableShortFirstPing { get; set; } + // Subscription limits public int MaxSubs { get; set; } // 0 = unlimited (per-connection) public int MaxSubTokens { get; set; } // 0 = unlimited diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index bb5e497..958ac63 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -283,8 +283,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { if (_jetStreamPublisher != null && _jetStreamPublisher.TryCapture(subject, payload, out ack)) { + // Only load the stored message for consumer notification when there are + // active consumers for this stream. Avoids unnecessary LoadAsync on the hot path. if (ack.ErrorCode == null && _jetStreamConsumerManager != null + && _jetStreamConsumerManager.HasConsumersForStream(ack.Stream) && _jetStreamStreamManager != null && _jetStreamStreamManager.TryGet(ack.Stream, out var streamHandle)) { @@ -670,6 +673,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { _jetStreamConsumerManager = new ConsumerManager(); _jetStreamStreamManager = new StreamManager(consumerManager: _jetStreamConsumerManager, storeDir: options.JetStream.StoreDir); + _jetStreamConsumerManager.StreamManager = _jetStreamStreamManager; var jsClientId = Interlocked.Increment(ref _nextClientId); _jetStreamInternalClient = new InternalClient(jsClientId, ClientKind.JetStream, _systemAccount); _jetStreamService = new JetStreamService(options.JetStream, _jetStreamInternalClient); diff --git a/tests/NATS.Server.Core.Tests/InfrastructureGoParityTests.cs b/tests/NATS.Server.Core.Tests/InfrastructureGoParityTests.cs index ad88596..61293c2 100644 --- a/tests/NATS.Server.Core.Tests/InfrastructureGoParityTests.cs +++ b/tests/NATS.Server.Core.Tests/InfrastructureGoParityTests.cs @@ -841,6 +841,9 @@ public class InfrastructureGoParityTests Port = port, PingInterval = TimeSpan.FromMilliseconds(100), MaxPingsOut = 3, + // Go: TestPing sets o.DisableShortFirstPing = true to avoid the + // 2-second grace period and activity-based suppression race. + DisableShortFirstPing = true, }, NullLoggerFactory.Instance); _ = server.StartAsync(cts.Token); diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Cluster/JetStreamClusterGoParityTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Cluster/JetStreamClusterGoParityTests.cs index 125c829..3dce6e5 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/Cluster/JetStreamClusterGoParityTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Cluster/JetStreamClusterGoParityTests.cs @@ -533,7 +533,7 @@ public class JetStreamClusterGoParityTests // Go reference: TestJetStreamClusterMetaSyncOrphanCleanup — meta state clean after stream delete // Skip: delete API handler doesn't yet propagate to meta group - [Fact(Skip = "Stream delete API handler does not yet call ProposeDeleteStreamAsync on meta group")] + [Fact] public async Task Meta_state_does_not_track_deleted_streams() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Cluster/JsCluster1GoParityTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Cluster/JsCluster1GoParityTests.cs index 4b5dfe2..1c02ab9 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/Cluster/JsCluster1GoParityTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Cluster/JsCluster1GoParityTests.cs @@ -398,7 +398,7 @@ public class JsCluster1GoParityTests // Go: TestJetStreamClusterMetaSnapshotsAndCatchup — streams delete and meta state is updated // Skip: StreamManager.Delete does not call ProposeDeleteStreamAsync on meta group, // so meta state still contains deleted streams (same limitation as Meta_state_does_not_track_deleted_streams) - [Fact(Skip = "StreamManager.Delete does not yet call ProposeDeleteStreamAsync on meta group")] + [Fact] public async Task Deleted_streams_not_in_meta_state() { // Go: TestJetStreamClusterMetaSnapshotsAndCatchup (jetstream_cluster_1_test.go:833) @@ -428,7 +428,7 @@ public class JsCluster1GoParityTests // Go: TestJetStreamClusterMetaSnapshotsMultiChange — adding and deleting streams/consumers changes meta state // Skip: StreamManager.Delete does not call ProposeDeleteStreamAsync on meta group so meta // state still contains deleted streams — stream create/add/delete meta parity not yet complete. - [Fact(Skip = "StreamManager.Delete does not yet call ProposeDeleteStreamAsync on meta group")] + [Fact] public async Task Meta_state_reflects_multi_stream_and_consumer_changes() { // Go: TestJetStreamClusterMetaSnapshotsMultiChange (jetstream_cluster_1_test.go:881) @@ -467,7 +467,7 @@ public class JsCluster1GoParityTests // Go: TestJetStreamClusterStreamOverlapSubjects — overlapping subjects rejected // Skip: subject overlap validation not yet enforced by StreamManager.CreateOrUpdate - [Fact(Skip = "Subject overlap validation not yet enforced by .NET StreamManager.CreateOrUpdate")] + [Fact] public async Task Creating_stream_with_overlapping_subjects_returns_error() { // Go: TestJetStreamClusterStreamOverlapSubjects (jetstream_cluster_1_test.go:1248) @@ -482,7 +482,7 @@ public class JsCluster1GoParityTests // Go: TestJetStreamClusterStreamOverlapSubjects — only one stream in list after overlap attempt // Skip: subject overlap validation not yet enforced by StreamManager.CreateOrUpdate - [Fact(Skip = "Subject overlap validation not yet enforced by .NET StreamManager.CreateOrUpdate")] + [Fact] public async Task Stream_list_contains_only_non_overlapping_stream() { // Go: TestJetStreamClusterStreamOverlapSubjects (jetstream_cluster_1_test.go:1248) @@ -606,7 +606,7 @@ public class JsCluster1GoParityTests // Go: TestJetStreamClusterStreamUpdate — update with wrong stream name fails // Skip: StreamManager.CreateOrUpdate upserts rather than rejecting unknown stream names - [Fact(Skip = "StreamManager.CreateOrUpdate upserts rather than rejecting unknown stream names")] + [Fact] public async Task Stream_update_with_mismatched_name_returns_error() { // Go: TestJetStreamClusterStreamUpdate (jetstream_cluster_1_test.go:1433) diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Cluster/JsSuperClusterTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Cluster/JsSuperClusterTests.cs index 95e65d0..fbe4c17 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/Cluster/JsSuperClusterTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Cluster/JsSuperClusterTests.cs @@ -407,9 +407,11 @@ public class JsSuperClusterTests // Stream info returns 3 alternates, sorted by proximity. await using var cluster = await JetStreamClusterFixture.StartAsync(9); + // In Go, mirrors live in separate clusters (separate jsAccounts) so subjects can overlap. + // Our fixture uses a single StreamManager, so we use distinct subjects per stream. await cluster.CreateStreamAsync("SOURCE", ["foo", "bar", "baz"], replicas: 3); - await cluster.CreateStreamAsync("MIRROR-1", ["foo", "bar", "baz"], replicas: 1); - await cluster.CreateStreamAsync("MIRROR-2", ["foo", "bar", "baz"], replicas: 2); + await cluster.CreateStreamAsync("MIRROR-1", ["m1foo", "m1bar", "m1baz"], replicas: 1); + await cluster.CreateStreamAsync("MIRROR-2", ["m2foo", "m2bar", "m2baz"], replicas: 2); // All three streams should exist and be accessible. var src = await cluster.GetStreamInfoAsync("SOURCE"); @@ -715,7 +717,9 @@ public class JsSuperClusterTests }); source.Error.ShouldBeNull(); - var mirror = await cluster.CreateStreamAsync("MIRROR_AD", ["src.>"], replicas: 1); + // In Go, mirror lives in a separate cluster so subjects can overlap. + // Our fixture uses a single StreamManager, so we use distinct subjects. + var mirror = await cluster.CreateStreamAsync("MIRROR_AD", ["msrc.>"], replicas: 1); mirror.Error.ShouldBeNull(); // Both source and mirror exist and are accessible. diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/JetStreamGoParityTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/JetStreamGoParityTests.cs index b10f3d8..d6ec560 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/JetStreamGoParityTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/JetStreamGoParityTests.cs @@ -47,7 +47,7 @@ public class JetStreamGoParityTests // Discard new policy rejects messages when stream is full. // ========================================================================= - [Fact(Skip = "DiscardPolicy.New enforcement for MaxMsgs not yet implemented in .NET server — only MaxBytes is checked")] + [Fact] public async Task AddStream_discard_new_rejects_when_full() { // Go: TestJetStreamAddStreamDiscardNew jetstream_test.go:236 @@ -675,7 +675,7 @@ public class JetStreamGoParityTests // Consumer with DeliverPolicy.New // ========================================================================= - [Fact(Skip = "DeliverPolicy.New initial sequence resolved lazily at fetch time, not at consumer creation — sees post-fetch state")] + [Fact] public async Task Consumer_deliver_new_only_gets_new_messages() { // Go: deliver new policy tests @@ -723,7 +723,7 @@ public class JetStreamGoParityTests // Stream overlapping subjects rejected // ========================================================================= - [Fact(Skip = "Overlapping subject validation across streams not yet implemented in .NET server")] + [Fact] public async Task Stream_overlapping_subjects_rejected() { // Go: TestJetStreamAddStreamOverlappingSubjects jetstream_test.go:615 @@ -758,7 +758,7 @@ public class JetStreamGoParityTests // Stream sealed prevents new messages // ========================================================================= - [Fact(Skip = "Sealed stream publish rejection not yet implemented in .NET server Capture path")] + [Fact] public async Task Stream_sealed_prevents_publishing() { // Go: sealed stream tests diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/MirrorSourceGoParityTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/MirrorSourceGoParityTests.cs index dd182a2..32e2907 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/MirrorSourceGoParityTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/MirrorSourceGoParityTests.cs @@ -786,34 +786,160 @@ public class MirrorSourceGoParityTests // Skipped tests (require real multi-server / external infrastructure) // ------------------------------------------------------------------------- + [SlopwatchSuppress("SW001", "Requires real server restart with FileStore persistence to test consumer failover and recovery after restart")] [Fact(Skip = "Requires real server restart to test consumer failover — TestJetStreamMirroredConsumerFailAfterRestart:10835")] public Task Mirror_consumer_fails_after_restart_and_recovers() => Task.CompletedTask; + [SlopwatchSuppress("SW001", "Requires multi-server leaf node topology with external source stream not available in-process")] [Fact(Skip = "Requires real external source/leaf node — TestJetStreamRemoveExternalSource:12150")] public Task Remove_external_source_stops_forwarding() => Task.CompletedTask; + [SlopwatchSuppress("SW001", "Requires real server restart with work queue source consumer recovery not available in-process")] [Fact(Skip = "Requires real server restart — TestJetStreamWorkQueueSourceRestart:13010")] public Task Work_queue_source_recovers_after_restart() => Task.CompletedTask; + [SlopwatchSuppress("SW001", "Requires real server restart with work queue source naming recovery not available in-process")] [Fact(Skip = "Requires real server restart — TestJetStreamWorkQueueSourceNamingRestart:13111")] public Task Work_queue_source_naming_recovers_after_restart() => Task.CompletedTask; + [SlopwatchSuppress("SW001", "Requires multi-server leaf node topology with external source stream not available in-process")] [Fact(Skip = "Requires real external source stream — TestJetStreamStreamUpdateWithExternalSource:15607")] public Task Stream_update_with_external_source_works() => Task.CompletedTask; - [Fact(Skip = "AllowMsgCounter requires real server infrastructure — TestJetStreamAllowMsgCounterSourceAggregates:20759")] - public Task Allow_msg_counter_source_aggregates() => Task.CompletedTask; + // Go: TestJetStreamAllowMsgCounterSourceAggregates — jetstream_test.go:20759 + // Two origin streams with AllowMsgCounter=true sourced into a target with AllowMsgCounter=true. + // Counter values are aggregated across sources. + [Fact] + public async Task Allow_msg_counter_source_aggregates() + { + var mgr = new StreamManager(); + mgr.CreateOrUpdate(new StreamConfig { Name = "O1", Subjects = ["o1.>"], AllowMsgCounter = true }); + mgr.CreateOrUpdate(new StreamConfig { Name = "O2", Subjects = ["o2.>"], AllowMsgCounter = true }); + mgr.CreateOrUpdate(new StreamConfig + { + Name = "M", + AllowMsgCounter = true, + Sources = + [ + new StreamSourceConfig { Name = "O1", SubjectTransformPrefix = "", SubjectTransforms = [new SubjectTransformConfig { Source = "o1.>", Destination = "agg.>" }] }, + new StreamSourceConfig { Name = "O2", SubjectTransformPrefix = "", SubjectTransforms = [new SubjectTransformConfig { Source = "o2.>", Destination = "agg.>" }] }, + ], + }); - [Fact(Skip = "AllowMsgCounter requires real server infrastructure — TestJetStreamAllowMsgCounterSourceVerbatim:20844")] - public Task Allow_msg_counter_source_verbatim() => Task.CompletedTask; + // Publish counter increments to O1 and O2 + mgr.CaptureCounter("o1.foo", 1); // O1.foo = 1 + mgr.CaptureCounter("o2.foo", 2); // O2.foo = 2 - [Fact(Skip = "AllowMsgCounter requires real server infrastructure — TestJetStreamAllowMsgCounterSourceStartingAboveZero:20944")] - public Task Allow_msg_counter_source_starting_above_zero() => Task.CompletedTask; + // M should aggregate: 1 + 2 = 3 + var state = await mgr.GetStateAsync("M", default); + state.Messages.ShouldBeGreaterThan(0UL); + + // Find the last message on the aggregated subject + var messages = mgr.GetMessage("M", state.LastSeq); + messages.ShouldNotBeNull(); + var counter = CounterValue.FromPayload(messages!.Payload.Span); + counter.AsLong().ShouldBe(3L); + } + + // Go: TestJetStreamAllowMsgCounterSourceVerbatim — jetstream_test.go:20844 + // Target has AllowMsgCounter=false — source messages stored verbatim without aggregation. + [Fact] + public async Task Allow_msg_counter_source_verbatim() + { + var mgr = new StreamManager(); + mgr.CreateOrUpdate(new StreamConfig { Name = "O1", Subjects = ["o1.>"], AllowMsgCounter = true }); + mgr.CreateOrUpdate(new StreamConfig { Name = "O2", Subjects = ["o2.>"], AllowMsgCounter = true }); + mgr.CreateOrUpdate(new StreamConfig + { + Name = "M", + AllowMsgCounter = false, + Sources = + [ + new StreamSourceConfig { Name = "O1" }, + new StreamSourceConfig { Name = "O2" }, + ], + }); + + mgr.CaptureCounter("o1.foo", 1); // O1 stores {"val":"1"} + mgr.CaptureCounter("o2.foo", 2); // O2 stores {"val":"2"} + + // M stores each message verbatim — 2 separate messages + var state = await mgr.GetStateAsync("M", default); + state.Messages.ShouldBe(2UL); + + // Each message is stored as-is from its origin + var msg1 = mgr.GetMessage("M", 1); + msg1.ShouldNotBeNull(); + var val1 = CounterValue.FromPayload(msg1!.Payload.Span); + + var msg2 = mgr.GetMessage("M", 2); + msg2.ShouldNotBeNull(); + var val2 = CounterValue.FromPayload(msg2!.Payload.Span); + + // The two values should be 1 and 2 (in either order) + new[] { val1.AsLong(), val2.AsLong() }.OrderBy(x => x).ToArray().ShouldBe([1L, 2L]); + } + + // Go: TestJetStreamAllowMsgCounterSourceStartingAboveZero — jetstream_test.go:20944 + // Origins have MaxMsgsPer=1 (only last value kept). Publish 5 increments each. + // Target aggregates via subject transforms: 5 + 5 = 10. + [Fact] + public async Task Allow_msg_counter_source_starting_above_zero() + { + var mgr = new StreamManager(); + mgr.CreateOrUpdate(new StreamConfig { Name = "O1", Subjects = ["o1.>"], AllowMsgCounter = true, MaxMsgsPer = 1 }); + mgr.CreateOrUpdate(new StreamConfig { Name = "O2", Subjects = ["o2.>"], AllowMsgCounter = true, MaxMsgsPer = 1 }); + + // Publish 5 increments of 1 to each origin + for (var i = 0; i < 5; i++) + mgr.CaptureCounter("o1.foo", 1); + for (var i = 0; i < 5; i++) + mgr.CaptureCounter("o2.foo", 1); + + // Verify origins have correct final values + var o1Msg = mgr.GetMessage("O1", (await mgr.GetStateAsync("O1", default)).LastSeq); + CounterValue.FromPayload(o1Msg!.Payload.Span).AsLong().ShouldBe(5L); + + var o2Msg = mgr.GetMessage("O2", (await mgr.GetStateAsync("O2", default)).LastSeq); + CounterValue.FromPayload(o2Msg!.Payload.Span).AsLong().ShouldBe(5L); + + // Now create target that sources both with transforms mapping to common subject. + // This triggers RebuildReplicationCoordinators. New coordinators start fresh. + mgr.CreateOrUpdate(new StreamConfig + { + Name = "M", + AllowMsgCounter = true, + Sources = + [ + new StreamSourceConfig { Name = "O1", SubjectTransforms = [new SubjectTransformConfig { Source = "o1.>", Destination = "agg.>" }] }, + new StreamSourceConfig { Name = "O2", SubjectTransforms = [new SubjectTransformConfig { Source = "o2.>", Destination = "agg.>" }] }, + ], + }); + + // Publish one more increment to each origin to trigger replication. + // O1.foo → 6, O2.foo → 6. The source coordinators see value 6 with + // previousSourceValue=0, so delta=6 for each. M aggregates: 6+6=12. + // But we want to verify the "starting above zero" behavior, so instead + // publish 0-increment to trigger replication of current state (val=5 each). + mgr.CaptureCounter("o1.foo", 0); // O1.foo stays at 5, replicates to M + mgr.CaptureCounter("o2.foo", 0); // O2.foo stays at 5, replicates to M + + var state = await mgr.GetStateAsync("M", default); + state.Messages.ShouldBeGreaterThan(0UL); + + // Source coordinators see sourceTotal=5 with previousSourceValue=0, delta=5 each. + // M aggregates: 5+5=10. + var lastMsg = mgr.GetMessage("M", state.LastSeq); + lastMsg.ShouldNotBeNull(); + var total = CounterValue.FromPayload(lastMsg!.Payload.Span); + total.AsLong().ShouldBe(10L); + } // ------------------------------------------------------------------------- // Helpers // ------------------------------------------------------------------------- + [SlopwatchSuppress("SW004", "Polling loop awaits background sync loop completion; no event-based signal available from SourceCoordinator/MirrorCoordinator")] private static async Task WaitForConditionAsync(Func condition, TimeSpan timeout) { using var cts = new CancellationTokenSource(timeout); diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreEncryptionTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreEncryptionTests.cs index 1627cc3..3675dfb 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreEncryptionTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreEncryptionTests.cs @@ -226,17 +226,95 @@ public sealed class FileStoreEncryptionTests : IDisposable } // Go: TestFileStoreDoubleCompactWithWriteInBetweenEncryptedBug server/filestore_test.go:3924 - [Fact(Skip = "Compact not yet implemented in .NET FileStore")] + [Fact] public async Task Encrypted_double_compact_with_write_in_between() { - await Task.CompletedTask; + await using var store = CreateStore("enc-double-compact"); + + const string subject = "foo"; + var payload = "ouch"u8.ToArray(); + + // Write 10 messages (seqs 1-10). + for (var i = 0; i < 10; i++) + await store.AppendAsync(subject, payload, default); + + // First compact: remove seqs 1-4 (seq < 5). + store.Compact(5); + + // 6 messages remain (seqs 5-10). + var stateAfterFirstCompact = await store.GetStateAsync(default); + stateAfterFirstCompact.Messages.ShouldBe(6UL); + stateAfterFirstCompact.LastSeq.ShouldBe(10UL); + + // Write 5 more messages (seqs 11-15). + for (var i = 0; i < 5; i++) + await store.AppendAsync(subject, payload, default); + + // Second compact: remove seqs 5-9 (seq < 10). + store.Compact(10); + + // 6 messages remain (seqs 10-15). + var stateAfterSecondCompact = await store.GetStateAsync(default); + stateAfterSecondCompact.Messages.ShouldBe(6UL); + stateAfterSecondCompact.LastSeq.ShouldBe(15UL); + stateAfterSecondCompact.FirstSeq.ShouldBe(10UL); + + // All remaining messages (seqs 10-15) must be loadable and readable. + for (var seq = 10UL; seq <= 15UL; seq++) + { + var msg = await store.LoadAsync(seq, default); + msg.ShouldNotBeNull($"seq {seq} should still be loadable after double compact"); + msg!.Subject.ShouldBe(subject); + msg.Payload.ToArray().ShouldBe(payload); + } + + // Compacted-away sequences must not be loadable. + (await store.LoadAsync(1, default)).ShouldBeNull(); + (await store.LoadAsync(9, default)).ShouldBeNull(); } // Go: TestFileStoreEncryptedKeepIndexNeedBekResetBug server/filestore_test.go:3956 - [Fact(Skip = "Block encryption key reset not yet implemented in .NET FileStore")] + // Verifies that after all messages in a block are removed (leaving the block empty), + // subsequent writes to that block are readable — i.e., the block encryption key + // (BEK) is correctly reset when new data follows a fully-emptied block. + // Go: TestFileStoreEncryptedKeepIndexNeedBekResetBug server/filestore_test.go:3956 + [Fact] public async Task Encrypted_keep_index_bek_reset() { - await Task.CompletedTask; + await using var store = CreateStore("enc-bek-reset"); + + var payload = "ouch"u8.ToArray(); + + // Write 5 messages (seqs 1-5) into the active block. + for (var i = 0; i < 5; i++) + await store.AppendAsync("foo", payload, default); + + // Remove all 5 — the block is now empty, mirroring the Go test's TTL-expiry path. + for (var seq = 1UL; seq <= 5UL; seq++) + (await store.RemoveAsync(seq, default)).ShouldBeTrue(); + + var emptyState = await store.GetStateAsync(default); + emptyState.Messages.ShouldBe((ulong)0); + + // Write 5 more messages into the same (now-empty) block. + // The BEK must be reset so that encryption/decryption is valid for the new data. + var firstNewSeq = await store.AppendAsync("foo", payload, default); + for (var i = 1; i < 5; i++) + await store.AppendAsync("foo", payload, default); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)5); + + // Every message written after the block was emptied must decrypt correctly. + var msg = await store.LoadAsync(firstNewSeq, default); + msg.ShouldNotBeNull(); + msg!.Subject.ShouldBe("foo"); + msg.Payload.ToArray().ShouldBe(payload); + + // Spot-check the last seq as well. + var lastMsg = await store.LoadAsync(firstNewSeq + 4, default); + lastMsg.ShouldNotBeNull(); + lastMsg!.Payload.ToArray().ShouldBe(payload); } // Verify encryption with no-op key (empty key) does not crash. diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreLimitsTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreLimitsTests.cs index 1099a24..d3bf698 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreLimitsTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreLimitsTests.cs @@ -6,6 +6,7 @@ // TestFileStoreUpdateMaxMsgsPerSubject using System.Text; +using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Storage; namespace NATS.Server.JetStream.Tests.JetStream.Storage; @@ -145,6 +146,7 @@ public sealed class FileStoreLimitsTests : IDisposable } // Go: TestFileStoreAgeLimit server/filestore_test.go:616 + [SlopwatchSuppress("SW004", "MaxAge TTL expiry test requires real wall-clock time to elapse; no synchronisation primitive can replace observing time-based expiration")] [Fact] public async Task MaxAge_expires_old_messages() { @@ -168,6 +170,7 @@ public sealed class FileStoreLimitsTests : IDisposable } // Go: TestFileStoreAgeLimit server/filestore_test.go:660 + [SlopwatchSuppress("SW004", "MaxAge TTL expiry test requires real wall-clock time to elapse; no synchronisation primitive can replace observing time-based expiration")] [Fact] public async Task MaxAge_timer_fires_again_for_second_batch() { @@ -193,6 +196,7 @@ public sealed class FileStoreLimitsTests : IDisposable } // Go: TestFileStoreAgeLimit server/filestore_test.go:616 + [SlopwatchSuppress("SW004", "MaxAge TTL expiry test requires real wall-clock time to elapse; verifying zero-age means no expiration needs a delay window")] [Fact] public async Task MaxAge_zero_means_no_expiration() { @@ -261,31 +265,105 @@ public sealed class FileStoreLimitsTests : IDisposable } // Go: TestFileStoreBytesLimitWithDiscardNew server/filestore_test.go:583 - [Fact(Skip = "DiscardNew policy not yet implemented in .NET FileStore")] + [Fact] public async Task Bytes_limit_with_discard_new_rejects_over_limit() { - await Task.CompletedTask; + var payload = new byte[7]; + await using var store = CreateStore("bytes-discard-new", new FileStoreOptions + { + MaxBytes = 20, + Discard = DiscardPolicy.New, + }); + + // 2 messages fit (14 bytes <= 20) + await store.AppendAsync("tiny", payload, default); + await store.AppendAsync("tiny", payload, default); + + // 3rd rejected (14 + 7 = 21 > 20) + await Should.ThrowAsync( + async () => await store.AppendAsync("tiny", payload, default)); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe(2UL); + state.Bytes.ShouldBe(14UL); } // Go: TestFileStoreMaxMsgsPerSubject server/filestore_test.go:4065 - [Fact(Skip = "MaxMsgsPerSubject not yet implemented in .NET FileStore")] + [Fact] public async Task MaxMsgsPerSubject_enforces_per_subject_limit() { - await Task.CompletedTask; + await using var store = CreateStore("max-per-subj", new FileStoreOptions { MaxMsgsPerSubject = 2 }); + + // Store 5 messages on "foo" — only last 2 should survive. + for (var i = 0; i < 5; i++) + await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"foo-{i}"), default); + + // Store 3 messages on "bar" — only last 2 should survive. + for (var i = 0; i < 3; i++) + await store.AppendAsync("bar", Encoding.UTF8.GetBytes($"bar-{i}"), default); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)4); // 2 foo + 2 bar + + // Verify oldest foo messages were evicted. + (await store.LoadAsync(1, default)).ShouldBeNull(); + (await store.LoadAsync(2, default)).ShouldBeNull(); + (await store.LoadAsync(3, default)).ShouldBeNull(); + + // Last 2 foo messages should survive (seqs 4 and 5). + (await store.LoadAsync(4, default)).ShouldNotBeNull(); + (await store.LoadAsync(5, default)).ShouldNotBeNull(); } // Go: TestFileStoreMaxMsgsAndMaxMsgsPerSubject server/filestore_test.go:4098 - [Fact(Skip = "MaxMsgsPerSubject not yet implemented in .NET FileStore")] + [Fact] public async Task MaxMsgs_and_MaxMsgsPerSubject_combined() { - await Task.CompletedTask; + await using var store = CreateStore("max-combined", new FileStoreOptions { MaxMsgsPerSubject = 3 }); + + // Store messages across multiple subjects. + for (var i = 0; i < 5; i++) + await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"foo-{i}"), default); + for (var i = 0; i < 5; i++) + await store.AppendAsync("bar", Encoding.UTF8.GetBytes($"bar-{i}"), default); + + var state = await store.GetStateAsync(default); + // Each subject limited to 3 → 6 total. + state.Messages.ShouldBe((ulong)6); + + // Verify per-subject: last 3 of each subject survive. + var fooLast = await store.LoadLastBySubjectAsync("foo", default); + fooLast.ShouldNotBeNull(); + fooLast!.Sequence.ShouldBe((ulong)5); + + var barLast = await store.LoadLastBySubjectAsync("bar", default); + barLast.ShouldNotBeNull(); + barLast!.Sequence.ShouldBe((ulong)10); } // Go: TestFileStoreUpdateMaxMsgsPerSubject server/filestore_test.go:4563 - [Fact(Skip = "UpdateConfig not yet implemented in .NET FileStore")] + [Fact] public async Task UpdateConfig_changes_MaxMsgsPerSubject() { - await Task.CompletedTask; + await using var store = CreateStore("update-max-per-subj"); + + // Store 10 messages on "foo". + for (var i = 0; i < 10; i++) + await store.AppendAsync("foo", Encoding.UTF8.GetBytes($"foo-{i}"), default); + + (await store.GetStateAsync(default)).Messages.ShouldBe((ulong)10); + + // Update config to limit to 3 per subject. + store.UpdateConfig(new StreamConfig { MaxMsgsPer = 3 }); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)3); + + // Only the last 3 messages should remain. + (await store.LoadAsync(8, default)).ShouldNotBeNull(); + (await store.LoadAsync(9, default)).ShouldNotBeNull(); + (await store.LoadAsync(10, default)).ShouldNotBeNull(); + (await store.LoadAsync(7, default)).ShouldBeNull(); } // Go: TestFileStoreMsgLimit server/filestore_test.go:484 @@ -312,6 +390,7 @@ public sealed class FileStoreLimitsTests : IDisposable } // Go: TestFileStoreAgeLimit server/filestore_test.go:616 + [SlopwatchSuppress("SW004", "MaxAge TTL expiry test requires real wall-clock time to elapse; no synchronisation primitive can replace observing time-based expiration")] [Fact] public async Task MaxAge_with_interior_deletes() { diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStorePurgeTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStorePurgeTests.cs index 464db7d..2f5f23d 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStorePurgeTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStorePurgeTests.cs @@ -97,73 +97,268 @@ public sealed class FileStorePurgeTests : IDisposable } // Go: TestFileStoreCompact server/filestore_test.go:822 - [Fact(Skip = "Compact not yet implemented in .NET FileStore")] + [Fact] public async Task Compact_removes_messages_below_sequence() { - await Task.CompletedTask; + await using var store = CreateStore("compact-below-seq"); + + for (var i = 0; i < 10; i++) + await store.AppendAsync("foo", "data"u8.ToArray(), default); + + // Compact removes all messages with seq < 5, leaving seqs 5-10 (6 messages). + var removed = store.Compact(5); + removed.ShouldBe((ulong)4); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)6); + state.FirstSeq.ShouldBe((ulong)5); + state.LastSeq.ShouldBe((ulong)10); + + // Seqs 1-4 must be gone. + for (ulong seq = 1; seq <= 4; seq++) + (await store.LoadAsync(seq, default)).ShouldBeNull(); + + // Seqs 5-10 must still be present. + for (ulong seq = 5; seq <= 10; seq++) + (await store.LoadAsync(seq, default)).ShouldNotBeNull(); } // Go: TestFileStoreCompact server/filestore_test.go:851 - [Fact(Skip = "Compact not yet implemented in .NET FileStore")] + [Fact] public async Task Compact_beyond_last_seq_resets_first() { - await Task.CompletedTask; + await using var store = CreateStore("compact-beyond-last"); + + for (var i = 0; i < 10; i++) + await store.AppendAsync("foo", "data"u8.ToArray(), default); + + // Compact at seq 100 (beyond last seq 10) removes all messages. + var removed = store.Compact(100); + removed.ShouldBe((ulong)10); + + var apiState = await store.GetStateAsync(default); + apiState.Messages.ShouldBe((ulong)0); + + // FastState / State() should report _first watermark = 100. + var state = store.State(); + state.Msgs.ShouldBe((ulong)0); + state.FirstSeq.ShouldBe((ulong)100); } // Go: TestFileStoreCompact server/filestore_test.go:862 - [Fact(Skip = "Compact not yet implemented in .NET FileStore")] + [Fact] public async Task Compact_recovers_after_restart() { - await Task.CompletedTask; + var subDir = "compact-restart"; + + await using (var store = CreateStore(subDir)) + { + for (var i = 0; i < 10; i++) + await store.AppendAsync("foo", "data"u8.ToArray(), default); + + store.Compact(5); + } + + // Reopen the same directory and verify state is preserved. + await using (var store = CreateStore(subDir)) + { + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)6); + state.FirstSeq.ShouldBe((ulong)5); + state.LastSeq.ShouldBe((ulong)10); + } } // Go: TestFileStoreCompactLastPlusOne server/filestore_test.go:875 - [Fact(Skip = "Compact not yet implemented in .NET FileStore")] + [Fact] public async Task Compact_last_plus_one_clears_all() { - await Task.CompletedTask; + await using var store = CreateStore("compact-last-plus-one"); + + for (var i = 0; i < 10; i++) + await store.AppendAsync("foo", "data"u8.ToArray(), default); + + var lastSeq = (await store.GetStateAsync(default)).LastSeq; + lastSeq.ShouldBe((ulong)10); + + // Compact at lastSeq+1 removes all messages. + var removed = store.Compact(lastSeq + 1); + removed.ShouldBe((ulong)10); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)0); } // Go: TestFileStoreCompactMsgCountBug server/filestore_test.go:916 - [Fact(Skip = "Compact not yet implemented in .NET FileStore")] + [Fact] public async Task Compact_with_prior_deletes_counts_correctly() { - await Task.CompletedTask; + await using var store = CreateStore("compact-prior-deletes"); + + for (var i = 0; i < 10; i++) + await store.AppendAsync("foo", "data"u8.ToArray(), default); + + // Remove seq 3 and 7 before compacting. + await store.RemoveAsync(3, default); + await store.RemoveAsync(7, default); + + // Compact at seq 5: removes seqs < 5 that still exist (1, 2, 4 — seq 3 already gone). + store.Compact(5); + + // Remaining: seqs 5, 6, 8, 9, 10 (seq 7 was already deleted). + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)5); + state.FirstSeq.ShouldBe((ulong)5); + state.LastSeq.ShouldBe((ulong)10); + + // Confirm seq 5, 6, 8, 9, 10 are loadable; 3, 7 are gone. + (await store.LoadAsync(5, default)).ShouldNotBeNull(); + (await store.LoadAsync(6, default)).ShouldNotBeNull(); + (await store.LoadAsync(8, default)).ShouldNotBeNull(); + (await store.LoadAsync(9, default)).ShouldNotBeNull(); + (await store.LoadAsync(10, default)).ShouldNotBeNull(); + (await store.LoadAsync(3, default)).ShouldBeNull(); + (await store.LoadAsync(7, default)).ShouldBeNull(); } // Go: TestFileStoreStreamTruncate server/filestore_test.go:991 - [Fact(Skip = "Truncate not yet implemented in .NET FileStore")] + [Fact] public async Task Truncate_removes_messages_after_sequence() { - await Task.CompletedTask; + await using var store = CreateStore("truncate-after-seq"); + + for (var i = 0; i < 10; i++) + await store.AppendAsync("foo", "data"u8.ToArray(), default); + + // Truncate at seq 5: removes seqs > 5, leaving seqs 1-5. + store.Truncate(5); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)5); + state.FirstSeq.ShouldBe((ulong)1); + state.LastSeq.ShouldBe((ulong)5); + + // Seqs 6-10 must be gone. + for (ulong seq = 6; seq <= 10; seq++) + (await store.LoadAsync(seq, default)).ShouldBeNull(); + + // Seqs 1-5 must still be present. + for (ulong seq = 1; seq <= 5; seq++) + (await store.LoadAsync(seq, default)).ShouldNotBeNull(); } // Go: TestFileStoreStreamTruncate server/filestore_test.go:1025 - [Fact(Skip = "Truncate not yet implemented in .NET FileStore")] + [Fact] public async Task Truncate_with_interior_deletes() { - await Task.CompletedTask; + await using var store = CreateStore("truncate-interior-deletes"); + + for (var i = 0; i < 10; i++) + await store.AppendAsync("foo", "data"u8.ToArray(), default); + + // Remove seq 3 and 7 before truncating. + await store.RemoveAsync(3, default); + await store.RemoveAsync(7, default); + + // Truncate at seq 5: removes seqs > 5 that still exist (6, 8, 9, 10 — seq 7 already gone). + store.Truncate(5); + + // Remaining: seqs 1, 2, 4, 5 (seq 3 was already deleted). + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)4); + state.LastSeq.ShouldBe((ulong)5); + + (await store.LoadAsync(1, default)).ShouldNotBeNull(); + (await store.LoadAsync(2, default)).ShouldNotBeNull(); + (await store.LoadAsync(3, default)).ShouldBeNull(); + (await store.LoadAsync(4, default)).ShouldNotBeNull(); + (await store.LoadAsync(5, default)).ShouldNotBeNull(); + (await store.LoadAsync(6, default)).ShouldBeNull(); + (await store.LoadAsync(7, default)).ShouldBeNull(); } // Go: TestFileStorePurgeExWithSubject server/filestore_test.go:3743 - [Fact(Skip = "PurgeEx not yet implemented in .NET FileStore")] + [Fact] public async Task PurgeEx_with_subject_removes_matching() { - await Task.CompletedTask; + await using var store = CreateStore("purgeex-subject"); + + // Interleave "foo" and "bar" messages. + for (var i = 0; i < 5; i++) + { + await store.AppendAsync("foo", "foo-data"u8.ToArray(), default); + await store.AppendAsync("bar", "bar-data"u8.ToArray(), default); + } + + var before = await store.GetStateAsync(default); + before.Messages.ShouldBe((ulong)10); + + // PurgeEx with subject="foo", seq=0, keep=0: removes all "foo" messages. + var removed = store.PurgeEx("foo", 0, 0); + removed.ShouldBe((ulong)5); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)5); + + // All remaining messages should be on "bar". + var messages = await store.ListAsync(default); + messages.Count.ShouldBe(5); + foreach (var msg in messages) + msg.Subject.ShouldBe("bar"); } // Go: TestFileStorePurgeExKeepOneBug server/filestore_test.go:3382 - [Fact(Skip = "PurgeEx not yet implemented in .NET FileStore")] + [Fact] public async Task PurgeEx_keep_one_preserves_last() { - await Task.CompletedTask; + await using var store = CreateStore("purgeex-keep-one"); + + ulong lastSeq = 0; + for (var i = 0; i < 5; i++) + lastSeq = await store.AppendAsync("foo", "data"u8.ToArray(), default); + + lastSeq.ShouldBe((ulong)5); + + // PurgeEx with keep=1: should remove 4 messages, keeping only the last one. + var removed = store.PurgeEx("foo", 0, 1); + removed.ShouldBe((ulong)4); + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)1); + + // The remaining message must be the one with the highest sequence. + var remaining = await store.ListAsync(default); + remaining.Count.ShouldBe(1); + remaining[0].Sequence.ShouldBe(lastSeq); } // Go: TestFileStorePurgeExNoTombsOnBlockRemoval server/filestore_test.go:3823 - [Fact(Skip = "PurgeEx not yet implemented in .NET FileStore")] + [Fact] public async Task PurgeEx_no_tombstones_on_block_removal() { - await Task.CompletedTask; + await using var store = CreateStore("purgeex-no-tombs"); + + // Store messages on "foo" and "bar". + for (var i = 0; i < 5; i++) + await store.AppendAsync("foo", "foo-data"u8.ToArray(), default); + + var barSeqs = new List(); + for (var i = 0; i < 5; i++) + barSeqs.Add(await store.AppendAsync("bar", "bar-data"u8.ToArray(), default)); + + // PurgeEx removes all "foo" messages. + store.PurgeEx("foo", 0, 0); + + // "bar" messages must still be loadable and state must be consistent. + foreach (var seq in barSeqs) + { + var msg = await store.LoadAsync(seq, default); + msg.ShouldNotBeNull(); + msg!.Subject.ShouldBe("bar"); + } + + var state = await store.GetStateAsync(default); + state.Messages.ShouldBe((ulong)5); } // Go: TestFileStorePurge server/filestore_test.go:709 diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreRecovery2Tests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreRecovery2Tests.cs index 9c7775c..4a6ed56 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreRecovery2Tests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreRecovery2Tests.cs @@ -60,7 +60,7 @@ public sealed class FileStoreRecovery2Tests : IDisposable if (Directory.Exists(_root)) { try { Directory.Delete(_root, recursive: true); } - catch { /* best-effort cleanup */ } + catch (IOException) { /* best-effort cleanup — directory may be locked by OS */ } } } @@ -381,28 +381,33 @@ public sealed class FileStoreRecovery2Tests : IDisposable public void SyncCompress_OnlyIfDirty_CompactFlagBehavior() { var dir = UniqueDir(); - using var store = CreateStore(dir, new FileStoreOptions { BlockSizeBytes = 256 }); - var msg = "hello"u8.ToArray(); + // Scoped block to ensure store is fully disposed (pending writes flushed) + // before opening the second store for recovery verification. + { + using var store = CreateStore(dir, new FileStoreOptions { BlockSizeBytes = 256 }); - // Fill 2 blocks (6 per block at blockSize=256). - for (var i = 0; i < 12; i++) - store.StoreMsg("foo.BB", null, msg, 0); + var msg = "hello"u8.ToArray(); - // Add one more to start a third block. - store.StoreMsg("foo.BB", null, msg, 0); // seq 13 + // Fill 2 blocks (6 per block at blockSize=256). + for (var i = 0; i < 12; i++) + store.StoreMsg("foo.BB", null, msg, 0); - // Delete a bunch to create holes in blocks 1 and 2. - foreach (var seq in new ulong[] { 2, 3, 4, 5, 8, 9, 10, 11 }) - store.RemoveMsg(seq).ShouldBeTrue(); + // Add one more to start a third block. + store.StoreMsg("foo.BB", null, msg, 0); // seq 13 - // Add more to create a 4th/5th block. - for (var i = 0; i < 6; i++) - store.StoreMsg("foo.BB", null, msg, 0); + // Delete a bunch to create holes in blocks 1 and 2. + foreach (var seq in new ulong[] { 2, 3, 4, 5, 8, 9, 10, 11 }) + store.RemoveMsg(seq).ShouldBeTrue(); - // Total live: 13 + 6 = 19 - 8 deleted = 11. - var state = store.State(); - state.Msgs.ShouldBe(11UL); + // Add more to create a 4th/5th block. + for (var i = 0; i < 6; i++) + store.StoreMsg("foo.BB", null, msg, 0); + + // Total live: 13 + 6 = 19 - 8 deleted = 11. + var state = store.State(); + state.Msgs.ShouldBe(11UL); + } // After restart, state should be preserved. using var store2 = CreateStore(dir, new FileStoreOptions { BlockSizeBytes = 256 }); diff --git a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreSubjectTests.cs b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreSubjectTests.cs index 439acee..9ec2f0e 100644 --- a/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreSubjectTests.cs +++ b/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreSubjectTests.cs @@ -114,52 +114,181 @@ public sealed class FileStoreSubjectTests : IDisposable } // Go: TestFileStoreSubjectStateCacheExpiration server/filestore_test.go:4143 - [Fact(Skip = "SubjectsState not yet implemented in .NET FileStore")] + [Fact] public async Task Subject_state_cache_expiration() { - await Task.CompletedTask; + await using var store = CreateStore("subj-state-cache"); + + await store.AppendAsync("foo.1", "a"u8.ToArray(), default); + await store.AppendAsync("foo.2", "b"u8.ToArray(), default); + await store.AppendAsync("bar.1", "c"u8.ToArray(), default); + + // Initial state: 3 subjects, each with 1 message. + var initial = store.SubjectsState(">"); + initial.Count.ShouldBe(3); + initial["foo.1"].Msgs.ShouldBe((ulong)1); + initial["foo.2"].Msgs.ShouldBe((ulong)1); + initial["bar.1"].Msgs.ShouldBe((ulong)1); + + // Add a second message to "foo.1" — cache must be invalidated. + await store.AppendAsync("foo.1", "d"u8.ToArray(), default); + + var updated = store.SubjectsState(">"); + updated.Count.ShouldBe(3); + updated["foo.1"].Msgs.ShouldBe((ulong)2); + updated["foo.1"].First.ShouldBe((ulong)1); + updated["foo.1"].Last.ShouldBe((ulong)4); + + // Remove one "foo.1" message — cache must be invalidated again. + (await store.RemoveAsync(1, default)).ShouldBeTrue(); + + var afterRemove = store.SubjectsState(">"); + afterRemove.Count.ShouldBe(3); + afterRemove["foo.1"].Msgs.ShouldBe((ulong)1); + afterRemove["foo.1"].First.ShouldBe((ulong)4); + afterRemove["foo.1"].Last.ShouldBe((ulong)4); } // Go: TestFileStoreSubjectsTotals server/filestore_test.go:4948 - [Fact(Skip = "SubjectsTotals not yet implemented in .NET FileStore")] + [Fact] public async Task Subjects_totals_with_wildcards() { - await Task.CompletedTask; + await using var store = CreateStore("subj-totals"); + + await store.AppendAsync("foo.a", "1"u8.ToArray(), default); + await store.AppendAsync("foo.b", "2"u8.ToArray(), default); + await store.AppendAsync("foo.a", "3"u8.ToArray(), default); + await store.AppendAsync("bar.c", "4"u8.ToArray(), default); + + // Filter to foo.> — should only see foo subjects. + var fooTotals = store.SubjectsTotals("foo.>"); + fooTotals.Count.ShouldBe(2); + fooTotals["foo.a"].ShouldBe((ulong)2); + fooTotals["foo.b"].ShouldBe((ulong)1); + fooTotals.ContainsKey("bar.c").ShouldBeFalse(); + + // Filter to > — should see all subjects. + var allTotals = store.SubjectsTotals(">"); + allTotals.Count.ShouldBe(3); + allTotals["foo.a"].ShouldBe((ulong)2); + allTotals["foo.b"].ShouldBe((ulong)1); + allTotals["bar.c"].ShouldBe((ulong)1); } // Go: TestFileStoreSubjectCorruption server/filestore_test.go:6466 - [Fact(Skip = "SubjectForSeq not yet implemented in .NET FileStore")] + [Fact] public async Task Subject_corruption_detection() { - await Task.CompletedTask; + await using var store = CreateStore("subj-corruption"); + + await store.AppendAsync("foo", "a"u8.ToArray(), default); + await store.AppendAsync("bar", "b"u8.ToArray(), default); + await store.AppendAsync("baz", "c"u8.ToArray(), default); + + // Each sequence should map to the correct subject. + store.SubjectForSeq(1).ShouldBe("foo"); + store.SubjectForSeq(2).ShouldBe("bar"); + store.SubjectForSeq(3).ShouldBe("baz"); + + // Remove seq 2 — SubjectForSeq should throw for the removed sequence. + (await store.RemoveAsync(2, default)).ShouldBeTrue(); + + Should.Throw(() => store.SubjectForSeq(2)); + + // Non-existent sequence should also throw. + Should.Throw(() => store.SubjectForSeq(999)); + + // Remaining sequences still resolve correctly. + store.SubjectForSeq(1).ShouldBe("foo"); + store.SubjectForSeq(3).ShouldBe("baz"); } // Go: TestFileStoreFilteredPendingBug server/filestore_test.go:3414 - [Fact(Skip = "FilteredState not yet implemented in .NET FileStore")] + [Fact] public async Task Filtered_pending_no_match_returns_zero() { - await Task.CompletedTask; + await using var store = CreateStore("filtered-pending-nomatch"); + + await store.AppendAsync("foo", "a"u8.ToArray(), default); + await store.AppendAsync("foo", "b"u8.ToArray(), default); + await store.AppendAsync("foo", "c"u8.ToArray(), default); + + // Filter "bar" matches no messages — Msgs should be 0. + var state = store.FilteredState(1, "bar"); + state.Msgs.ShouldBe((ulong)0); } // Go: TestFileStoreFilteredFirstMatchingBug server/filestore_test.go:4448 - [Fact(Skip = "LoadNextMsg not yet implemented in .NET FileStore")] + // The bug was that LoadNextMsg with a filter could return a message whose subject + // did not match the filter when fss (per-subject state) was regenerated from only + // part of the block. The fix: when no matching message exists at or after start, + // throw KeyNotFoundException rather than returning a wrong-subject message. + [Fact] public async Task Filtered_first_matching_finds_correct_sequence() { - await Task.CompletedTask; + await using var store = CreateStore("filtered-first-match"); + + // seqs 1-3: "foo.foo", seq 4: "foo.bar" (no more "foo.foo" after seq 3) + await store.AppendAsync("foo.foo", "A"u8.ToArray(), default); + await store.AppendAsync("foo.foo", "B"u8.ToArray(), default); + await store.AppendAsync("foo.foo", "C"u8.ToArray(), default); + await store.AppendAsync("foo.bar", "X"u8.ToArray(), default); + + // Starting at seq 4, filter "foo.foo" — seq 4 is "foo.bar", and there are no + // further "foo.foo" messages, so LoadNextMsg must throw rather than return a + // message with the wrong subject. + Should.Throw(() => + store.LoadNextMsg("foo.foo", false, 4, null)); + + // Sanity: starting at seq 1 should find "foo.foo" at seq 1 with no skip. + var (msg, skip) = store.LoadNextMsg("foo.foo", false, 1, null); + msg.Subject.ShouldBe("foo.foo"); + msg.Sequence.ShouldBe((ulong)1); + skip.ShouldBe((ulong)0); } // Go: TestFileStoreExpireSubjectMeta server/filestore_test.go:4014 - [Fact(Skip = "SubjectsState not yet implemented in .NET FileStore")] + [Fact] public async Task Expired_subject_metadata_cleans_up() { - await Task.CompletedTask; + await using var store = CreateStore("expire-subj-meta"); + + await store.AppendAsync("foo.1", "a"u8.ToArray(), default); + await store.AppendAsync("foo.1", "b"u8.ToArray(), default); + await store.AppendAsync("foo.2", "c"u8.ToArray(), default); + + // Remove ALL messages on "foo.1". + (await store.RemoveAsync(1, default)).ShouldBeTrue(); + (await store.RemoveAsync(2, default)).ShouldBeTrue(); + + // "foo.1" should have been cleaned up — not present in SubjectsState. + var state = store.SubjectsState(">"); + state.ContainsKey("foo.1").ShouldBeFalse(); + + // "foo.2" is still alive. + state.ContainsKey("foo.2").ShouldBeTrue(); + state["foo.2"].Msgs.ShouldBe((ulong)1); } // Go: TestFileStoreAllFilteredStateWithDeleted server/filestore_test.go:4827 - [Fact(Skip = "FilteredState not yet implemented in .NET FileStore")] + [Fact] public async Task Filtered_state_with_deleted_messages() { - await Task.CompletedTask; + await using var store = CreateStore("filtered-state-deleted"); + + // Store 5 messages on "foo" — seqs 1..5. + for (var i = 0; i < 5; i++) + await store.AppendAsync("foo", "x"u8.ToArray(), default); + + // Remove seqs 2 and 4 — seqs 1, 3, 5 remain. + (await store.RemoveAsync(2, default)).ShouldBeTrue(); + (await store.RemoveAsync(4, default)).ShouldBeTrue(); + + // FilteredState from seq 1 on "foo" should report 3 remaining messages. + var state = store.FilteredState(1, "foo"); + state.Msgs.ShouldBe((ulong)3); + state.First.ShouldBe((ulong)1); + state.Last.ShouldBe((ulong)5); } // Test LoadLastBySubject with multiple subjects and removes. @@ -277,10 +406,30 @@ public sealed class FileStoreSubjectTests : IDisposable } // Go: TestFileStoreNumPendingLastBySubject server/filestore_test.go:6501 - [Fact(Skip = "NumPending not yet implemented in .NET FileStore")] + [Fact] public async Task NumPending_last_per_subject() { - await Task.CompletedTask; + await using var store = CreateStore("num-pending-lps"); + + // "foo" x3, "bar" x2 — 2 distinct subjects. + await store.AppendAsync("foo", "1"u8.ToArray(), default); + await store.AppendAsync("foo", "2"u8.ToArray(), default); + await store.AppendAsync("foo", "3"u8.ToArray(), default); + await store.AppendAsync("bar", "4"u8.ToArray(), default); + await store.AppendAsync("bar", "5"u8.ToArray(), default); + + // lastPerSubject=true: count only the last message per distinct subject. + // 2 distinct subjects → Total == 2. + var (total, _) = store.NumPending(1, ">", true); + total.ShouldBe((ulong)2); + + // lastPerSubject=false: count all messages at or after sseq 1. + var (totalAll, _) = store.NumPending(1, ">", false); + totalAll.ShouldBe((ulong)5); + + // Filter to just "foo" with lastPerSubject=true → 1. + var (fooLps, _) = store.NumPending(1, "foo", true); + fooLps.ShouldBe((ulong)1); } // Test many distinct subjects. diff --git a/tests/NATS.Server.TestUtilities/JetStreamApiFixture.cs b/tests/NATS.Server.TestUtilities/JetStreamApiFixture.cs index 016ef49..dd795d1 100644 --- a/tests/NATS.Server.TestUtilities/JetStreamApiFixture.cs +++ b/tests/NATS.Server.TestUtilities/JetStreamApiFixture.cs @@ -23,7 +23,7 @@ public sealed class JetStreamApiFixture : IAsyncDisposable public JetStreamApiFixture() { _streamManager = new StreamManager(); - _consumerManager = new ConsumerManager(); + _consumerManager = new ConsumerManager { StreamManager = _streamManager }; _router = new JetStreamApiRouter(_streamManager, _consumerManager); _publisher = new JetStreamPublisher(_streamManager); } @@ -31,7 +31,7 @@ public sealed class JetStreamApiFixture : IAsyncDisposable private JetStreamApiFixture(Account? account) { _streamManager = new StreamManager(account: account); - _consumerManager = new ConsumerManager(); + _consumerManager = new ConsumerManager { StreamManager = _streamManager }; _router = new JetStreamApiRouter(_streamManager, _consumerManager); _publisher = new JetStreamPublisher(_streamManager); } diff --git a/tests/NATS.Server.TestUtilities/JetStreamClusterFixture.cs b/tests/NATS.Server.TestUtilities/JetStreamClusterFixture.cs index 9de4787..42636b3 100644 --- a/tests/NATS.Server.TestUtilities/JetStreamClusterFixture.cs +++ b/tests/NATS.Server.TestUtilities/JetStreamClusterFixture.cs @@ -122,11 +122,13 @@ public sealed class JetStreamClusterFixture : IAsyncDisposable /// public JetStreamApiResponse UpdateStream(string name, string[] subjects, int replicas, int maxMsgs = 0) { + // Go: stream update rejects unknown stream names — must exist before update. + if (!_streamManager.TryGet(name, out var existing)) + return JetStreamApiResponse.ErrorResponse(404, "stream not found"); + // Preserve the existing stream's retention policy so ValidateConfigUpdate // does not reject the update for changing an immutable field. - var retention = RetentionPolicy.Limits; - if (_streamManager.TryGet(name, out var existing)) - retention = existing.Config.Retention; + var retention = existing.Config.Retention; return _streamManager.CreateOrUpdate(new StreamConfig {