diff --git a/src/NATS.Server/Auth/AuthExtensionOptions.cs b/src/NATS.Server/Auth/AuthExtensionOptions.cs index 76da266..37f51b2 100644 --- a/src/NATS.Server/Auth/AuthExtensionOptions.cs +++ b/src/NATS.Server/Auth/AuthExtensionOptions.cs @@ -11,7 +11,7 @@ public sealed record ExternalAuthRequest( string? Token, string? Jwt); -public sealed record ExternalAuthDecision( +public record ExternalAuthDecision( bool Allowed, string? Identity = null, string? Account = null, diff --git a/src/NATS.Server/JetStream/JsVersioning.cs b/src/NATS.Server/JetStream/JsVersioning.cs new file mode 100644 index 0000000..be85025 --- /dev/null +++ b/src/NATS.Server/JetStream/JsVersioning.cs @@ -0,0 +1,299 @@ +// Go reference: golang/nats-server/server/jetstream_versioning.go +// Versioning metadata management for JetStream streams and consumers. +// Manages the _nats.req.level, _nats.ver, _nats.level metadata keys. + +using NATS.Server.JetStream.Models; + +namespace NATS.Server.JetStream; + +/// +/// JetStream API versioning constants and metadata management. +/// Go reference: server/jetstream_versioning.go +/// +public static class JsVersioning +{ + /// + /// Current JetStream API level supported by this server. + /// Go: JSApiLevel = 3 (jetstream_versioning.go:20) + /// + public const int JsApiLevel = 3; + + /// Server version string. + public const string Version = "2.12.0"; + + /// Metadata key for the minimum required API level to use this asset. + public const string RequiredLevelKey = "_nats.req.level"; + + /// Metadata key for the server version that last modified this asset. + public const string ServerVersionKey = "_nats.ver"; + + /// Metadata key for the server API level that last modified this asset. + public const string ServerLevelKey = "_nats.level"; + + /// + /// Returns the required API level string from metadata, or empty if absent. + /// Go: getRequiredApiLevel (jetstream_versioning.go:28) + /// + public static string GetRequiredApiLevel(Dictionary? metadata) + { + if (metadata != null && metadata.TryGetValue(RequiredLevelKey, out var level) && level.Length > 0) + return level; + return string.Empty; + } + + /// + /// Returns whether the required API level is supported by this server. + /// Go: supportsRequiredApiLevel (jetstream_versioning.go:36) + /// + public static bool SupportsRequiredApiLevel(Dictionary? metadata) + { + var level = GetRequiredApiLevel(metadata); + if (level.Length == 0) + return true; + if (!int.TryParse(level, out var li)) + return false; + return li <= JsApiLevel; + } + + /// + /// Sets static (stored) versioning metadata on a stream config. + /// Clears dynamic fields (server version/level) and sets the required API level. + /// Go: setStaticStreamMetadata (jetstream_versioning.go:44) + /// + public static void SetStaticStreamMetadata(StreamConfig cfg) + { + if (cfg.Metadata == null) + cfg.Metadata = []; + else + DeleteDynamicMetadata(cfg.Metadata); + + var requiredApiLevel = 0; + void Requires(int level) { if (level > requiredApiLevel) requiredApiLevel = level; } + + // TTLs require API level 1 (added v2.11) + if (cfg.AllowMsgTtl || cfg.SubjectDeleteMarkerTtlMs > 0) + Requires(1); + + // Counter CRDTs require API level 2 (added v2.12) + if (cfg.AllowMsgCounter) + Requires(2); + + // Atomic batch publishing requires API level 2 (added v2.12) + if (cfg.AllowAtomicPublish) + Requires(2); + + // Message scheduling requires API level 2 (added v2.12) + if (cfg.AllowMsgSchedules) + Requires(2); + + // Async persist mode requires API level 2 (added v2.12) + if (cfg.PersistMode == PersistMode.Async) + Requires(2); + + cfg.Metadata[RequiredLevelKey] = requiredApiLevel.ToString(); + } + + /// + /// Returns a copy of the stream config with dynamic metadata fields added. + /// The original config is not modified. + /// Go: setDynamicStreamMetadata (jetstream_versioning.go:88) + /// + public static StreamConfig SetDynamicStreamMetadata(StreamConfig cfg) + { + // Shallow copy the config + var newCfg = ShallowCopyStream(cfg); + newCfg.Metadata = []; + if (cfg.Metadata != null) + { + foreach (var (k, v) in cfg.Metadata) + newCfg.Metadata[k] = v; + } + newCfg.Metadata[ServerVersionKey] = Version; + newCfg.Metadata[ServerLevelKey] = JsApiLevel.ToString(); + return newCfg; + } + + /// + /// Copies versioning fields from prevCfg into cfg (for stream update equality checks). + /// Removes dynamic fields. If prevCfg has no metadata, removes the key from cfg. + /// Go: copyStreamMetadata (jetstream_versioning.go:110) + /// + public static void CopyStreamMetadata(StreamConfig cfg, StreamConfig? prevCfg) + { + if (cfg.Metadata != null) + DeleteDynamicMetadata(cfg.Metadata); + SetOrDeleteInStreamMetadata(cfg, prevCfg, RequiredLevelKey); + } + + /// + /// Sets static (stored) versioning metadata on a consumer config. + /// Go: setStaticConsumerMetadata (jetstream_versioning.go:136) + /// + public static void SetStaticConsumerMetadata(ConsumerConfig cfg) + { + if (cfg.Metadata == null) + cfg.Metadata = []; + else + DeleteDynamicMetadata(cfg.Metadata); + + var requiredApiLevel = 0; + void Requires(int level) { if (level > requiredApiLevel) requiredApiLevel = level; } + + // PauseUntil (non-zero) requires API level 1 (added v2.11) + if (cfg.PauseUntil.HasValue && cfg.PauseUntil.Value != DateTime.MinValue && cfg.PauseUntil.Value != default) + Requires(1); + + // Priority policy / groups / pinned TTL require API level 1 + if (cfg.PriorityPolicy != PriorityPolicy.None || cfg.PinnedTtlMs != 0 || cfg.PriorityGroups.Count > 0) + Requires(1); + + cfg.Metadata[RequiredLevelKey] = requiredApiLevel.ToString(); + } + + /// + /// Returns a copy of the consumer config with dynamic metadata fields added. + /// Go: setDynamicConsumerMetadata (jetstream_versioning.go:164) + /// + public static ConsumerConfig SetDynamicConsumerMetadata(ConsumerConfig cfg) + { + var newCfg = ShallowCopyConsumer(cfg); + newCfg.Metadata = []; + if (cfg.Metadata != null) + { + foreach (var (k, v) in cfg.Metadata) + newCfg.Metadata[k] = v; + } + newCfg.Metadata[ServerVersionKey] = Version; + newCfg.Metadata[ServerLevelKey] = JsApiLevel.ToString(); + return newCfg; + } + + /// + /// Copies versioning fields from prevCfg into cfg (for consumer update equality checks). + /// Removes dynamic fields. + /// Go: copyConsumerMetadata (jetstream_versioning.go:198) + /// + public static void CopyConsumerMetadata(ConsumerConfig cfg, ConsumerConfig? prevCfg) + { + if (cfg.Metadata != null) + DeleteDynamicMetadata(cfg.Metadata); + SetOrDeleteInConsumerMetadata(cfg, prevCfg, RequiredLevelKey); + } + + /// + /// Removes dynamic metadata fields (server version and level) from a metadata dictionary. + /// Go: deleteDynamicMetadata (jetstream_versioning.go:222) + /// + public static void DeleteDynamicMetadata(Dictionary metadata) + { + metadata.Remove(ServerVersionKey); + metadata.Remove(ServerLevelKey); + } + + // ========================================================================= + // Private helpers + // ========================================================================= + + private static void SetOrDeleteInStreamMetadata(StreamConfig cfg, StreamConfig? prevCfg, string key) + { + if (prevCfg?.Metadata != null && prevCfg.Metadata.TryGetValue(key, out var value)) + { + cfg.Metadata ??= []; + cfg.Metadata[key] = value; + return; + } + if (cfg.Metadata != null) + { + cfg.Metadata.Remove(key); + if (cfg.Metadata.Count == 0) + cfg.Metadata = null; + } + } + + private static void SetOrDeleteInConsumerMetadata(ConsumerConfig cfg, ConsumerConfig? prevCfg, string key) + { + if (prevCfg?.Metadata != null && prevCfg.Metadata.TryGetValue(key, out var value)) + { + cfg.Metadata ??= []; + cfg.Metadata[key] = value; + return; + } + if (cfg.Metadata != null) + { + cfg.Metadata.Remove(key); + if (cfg.Metadata.Count == 0) + cfg.Metadata = null; + } + } + + /// Shallow copy of a StreamConfig for metadata mutation. + private static StreamConfig ShallowCopyStream(StreamConfig src) => new() + { + Name = src.Name, + Description = src.Description, + Subjects = src.Subjects, + MaxMsgs = src.MaxMsgs, + MaxBytes = src.MaxBytes, + MaxMsgsPer = src.MaxMsgsPer, + MaxAgeMs = src.MaxAgeMs, + MaxMsgSize = src.MaxMsgSize, + MaxConsumers = src.MaxConsumers, + DuplicateWindowMs = src.DuplicateWindowMs, + Sealed = src.Sealed, + DenyDelete = src.DenyDelete, + DenyPurge = src.DenyPurge, + AllowDirect = src.AllowDirect, + AllowMsgTtl = src.AllowMsgTtl, + FirstSeq = src.FirstSeq, + Retention = src.Retention, + Discard = src.Discard, + Storage = src.Storage, + Replicas = src.Replicas, + Mirror = src.Mirror, + Source = src.Source, + Sources = src.Sources, + SubjectTransformSource = src.SubjectTransformSource, + SubjectTransformDest = src.SubjectTransformDest, + RePublishSource = src.RePublishSource, + RePublishDest = src.RePublishDest, + RePublishHeadersOnly = src.RePublishHeadersOnly, + SubjectDeleteMarkerTtlMs = src.SubjectDeleteMarkerTtlMs, + AllowMsgSchedules = src.AllowMsgSchedules, + AllowMsgCounter = src.AllowMsgCounter, + AllowAtomicPublish = src.AllowAtomicPublish, + PersistMode = src.PersistMode, + Metadata = src.Metadata, + }; + + /// Shallow copy of a ConsumerConfig for metadata mutation. + private static ConsumerConfig ShallowCopyConsumer(ConsumerConfig src) => new() + { + DurableName = src.DurableName, + Ephemeral = src.Ephemeral, + FilterSubject = src.FilterSubject, + FilterSubjects = src.FilterSubjects, + AckPolicy = src.AckPolicy, + DeliverPolicy = src.DeliverPolicy, + OptStartSeq = src.OptStartSeq, + OptStartTimeUtc = src.OptStartTimeUtc, + ReplayPolicy = src.ReplayPolicy, + AckWaitMs = src.AckWaitMs, + MaxDeliver = src.MaxDeliver, + MaxAckPending = src.MaxAckPending, + Push = src.Push, + DeliverSubject = src.DeliverSubject, + HeartbeatMs = src.HeartbeatMs, + BackOffMs = src.BackOffMs, + FlowControl = src.FlowControl, + RateLimitBps = src.RateLimitBps, + MaxWaiting = src.MaxWaiting, + MaxRequestBatch = src.MaxRequestBatch, + MaxRequestMaxBytes = src.MaxRequestMaxBytes, + MaxRequestExpiresMs = src.MaxRequestExpiresMs, + PauseUntil = src.PauseUntil, + PriorityPolicy = src.PriorityPolicy, + PriorityGroups = src.PriorityGroups, + PinnedTtlMs = src.PinnedTtlMs, + Metadata = src.Metadata, + }; +} diff --git a/src/NATS.Server/JetStream/Models/ConsumerConfig.cs b/src/NATS.Server/JetStream/Models/ConsumerConfig.cs index 2175d21..702a5ae 100644 --- a/src/NATS.Server/JetStream/Models/ConsumerConfig.cs +++ b/src/NATS.Server/JetStream/Models/ConsumerConfig.cs @@ -34,6 +34,29 @@ public sealed class ConsumerConfig // Go: consumer.go — max_request_expires limits expires duration per pull request (ms) public int MaxRequestExpiresMs { get; set; } + // Go: ConsumerConfig.PauseUntil — pauses consumer delivery until this UTC time. + // Null or zero time means not paused. + // Added in v2.11, requires API level 1. + // Go reference: server/consumer.go (PauseUntil field) + public DateTime? PauseUntil { get; set; } + + // Go: ConsumerConfig.PriorityPolicy — consumer priority routing policy. + // PriorityPinnedClient requires API level 1. + // Go reference: server/consumer.go (PriorityPolicy field) + public PriorityPolicy PriorityPolicy { get; set; } = PriorityPolicy.None; + + // Go: ConsumerConfig.PriorityGroups — list of priority group names. + // Go reference: server/consumer.go (PriorityGroups field) + public List PriorityGroups { get; set; } = []; + + // Go: ConsumerConfig.PinnedTTL — TTL for pinned client assignment. + // Go reference: server/consumer.go (PinnedTTL field) + public long PinnedTtlMs { get; set; } + + // Go: ConsumerConfig.Metadata — user-supplied and server-managed key/value metadata. + // Go reference: server/consumer.go (Metadata field) + public Dictionary? Metadata { get; set; } + public string? ResolvePrimaryFilterSubject() { if (FilterSubjects.Count > 0) @@ -49,3 +72,14 @@ public enum AckPolicy Explicit, All, } + +/// +/// Consumer priority routing policy. +/// Go reference: server/consumer.go — PriorityNone, PriorityPinnedClient constants. +/// +public enum PriorityPolicy +{ + None = 0, + Overflow = 1, + PinnedClient = 2, +} diff --git a/src/NATS.Server/JetStream/Models/StreamConfig.cs b/src/NATS.Server/JetStream/Models/StreamConfig.cs index c93752c..d7e432e 100644 --- a/src/NATS.Server/JetStream/Models/StreamConfig.cs +++ b/src/NATS.Server/JetStream/Models/StreamConfig.cs @@ -52,6 +52,36 @@ public sealed class StreamConfig // Incompatible with Mirror and Sources. // Go reference: server/stream.go:369 (AllowMsgSchedules field) public bool AllowMsgSchedules { get; set; } + + // Go: StreamConfig.AllowMsgCounter — enables CRDT counter semantics on messages. + // Added in v2.12, requires API level 2. + // Go reference: server/stream.go:365 (AllowMsgCounter field) + public bool AllowMsgCounter { get; set; } + + // Go: StreamConfig.AllowAtomicPublish — enables atomic batch publishing. + // Added in v2.12, requires API level 2. + // Go reference: server/stream.go:367 (AllowAtomicPublish field) + public bool AllowAtomicPublish { get; set; } + + // Go: StreamConfig.PersistMode — async vs sync storage persistence. + // AsyncPersistMode requires API level 2. + // Go reference: server/stream.go:375 (PersistMode field) + public PersistMode PersistMode { get; set; } = PersistMode.Sync; + + // Go: StreamConfig.Metadata — user-supplied and server-managed key/value metadata. + // The server automatically sets _nats.req.level, _nats.ver, _nats.level. + // Go reference: server/stream.go:380 (Metadata field) + public Dictionary? Metadata { get; set; } +} + +/// +/// Persistence mode for the stream. +/// Go reference: server/stream.go — AsyncPersistMode constant. +/// +public enum PersistMode +{ + Sync = 0, + Async = 1, } public enum StorageType diff --git a/src/NATS.Server/JetStream/Publish/AtomicBatchPublishEngine.cs b/src/NATS.Server/JetStream/Publish/AtomicBatchPublishEngine.cs new file mode 100644 index 0000000..955d1b2 --- /dev/null +++ b/src/NATS.Server/JetStream/Publish/AtomicBatchPublishEngine.cs @@ -0,0 +1,374 @@ +using System.Collections.Concurrent; + +namespace NATS.Server.JetStream.Publish; + +// Go: server/jetstream_batching.go — streamBatches / batchGroup +// Handles the staging and commit protocol for atomic batch publishing. +// Messages within a batch are buffered until Nats-Batch-Commit is received, +// then committed atomically to the store. + +/// +/// Error codes for atomic batch publish operations. +/// Go reference: server/jetstream_errors_generated.go +/// +public static class AtomicBatchPublishErrorCodes +{ + /// JSAtomicPublishDisabledErr (10174) — stream has AllowAtomicPublish=false. + public const int Disabled = 10174; + + /// JSAtomicPublishMissingSeqErr (10175) — Nats-Batch-Id present but Nats-Batch-Sequence missing. + public const int MissingSeq = 10175; + + /// JSAtomicPublishIncompleteBatchErr (10176) — sequence gap, timeout, or over-limit batch. + public const int IncompleteBatch = 10176; + + /// JSAtomicPublishUnsupportedHeaderBatchErr (10177) — unsupported header in batch message. + public const int UnsupportedHeader = 10177; + + /// JSAtomicPublishInvalidBatchIDErr (10179) — batch ID too long (>64 chars). + public const int InvalidBatchId = 10179; + + /// JSAtomicPublishTooLargeBatchErrF (10199) — batch size exceeds maximum. + public const int TooLargeBatch = 10199; + + /// JSAtomicPublishInvalidBatchCommitErr (10200) — Nats-Batch-Commit header has invalid value. + public const int InvalidCommit = 10200; + + /// JSAtomicPublishContainsDuplicateMessageErr (10201) — batch contains a duplicate msg ID. + public const int ContainsDuplicate = 10201; + + /// JSMirrorWithAtomicPublishErr (10198) — mirror stream cannot have AllowAtomicPublish. + public const int MirrorWithAtomicPublish = 10198; +} + +/// +/// A single staged message within an in-flight atomic batch. +/// +public sealed class StagedBatchMessage +{ + public required string Subject { get; init; } + public required ReadOnlyMemory Payload { get; init; } + public string? MsgId { get; init; } + public ulong ExpectedLastSeq { get; init; } + public ulong ExpectedLastSubjectSeq { get; init; } + public string? ExpectedLastSubjectSeqSubject { get; init; } +} + +/// +/// State for one in-flight atomic batch (identified by Nats-Batch-Id). +/// Go reference: server/jetstream_batching.go batchGroup struct. +/// +internal sealed class InFlightBatch +{ + private readonly List _messages = []; + private readonly HashSet _stagedMsgIds = new(StringComparer.Ordinal); + + public DateTimeOffset CreatedAt { get; } = DateTimeOffset.UtcNow; + public int Count => _messages.Count; + public IReadOnlyList Messages => _messages; + + public void Add(StagedBatchMessage msg) + { + _messages.Add(msg); + if (msg.MsgId != null) + _stagedMsgIds.Add(msg.MsgId); + } + + public bool ContainsMsgId(string msgId) => _stagedMsgIds.Contains(msgId); +} + +/// +/// Engine that manages all in-flight atomic batches for a single stream. +/// Go reference: server/jetstream_batching.go streamBatches struct. +/// +/// The protocol: +/// 1. Publisher sends messages with Nats-Batch-Id + Nats-Batch-Sequence headers. +/// Messages with seq > 1 are staged; they are not committed to the store yet. +/// 2. The final message has Nats-Batch-Commit: 1 (or "eob"). This triggers atomic +/// commit of all staged messages. +/// 3. On error (gap, timeout, disable, delete, leader-change), the batch is rolled back. +/// +public sealed class AtomicBatchPublishEngine +{ + // Go: streamDefaultMaxBatchInflightPerStream = 50 + public const int DefaultMaxInflightPerStream = 50; + // Go: streamDefaultMaxBatchSize = 1000 + public const int DefaultMaxBatchSize = 1000; + // Go: streamDefaultMaxBatchTimeout = 10s + public static readonly TimeSpan DefaultBatchTimeout = TimeSpan.FromSeconds(10); + // Go: max batch ID length = 64 + public const int MaxBatchIdLength = 64; + + private readonly ConcurrentDictionary _batches = new(StringComparer.Ordinal); + private readonly int _maxInflightPerStream; + private readonly int _maxBatchSize; + private readonly TimeSpan _batchTimeout; + + public AtomicBatchPublishEngine( + int maxInflightPerStream = DefaultMaxInflightPerStream, + int maxBatchSize = DefaultMaxBatchSize, + TimeSpan? batchTimeout = null) + { + _maxInflightPerStream = maxInflightPerStream; + _maxBatchSize = maxBatchSize; + _batchTimeout = batchTimeout ?? DefaultBatchTimeout; + } + + /// + /// Returns the number of in-flight batches. + /// + public int InflightCount => _batches.Count; + + /// + /// Validates and stages/commits a batch message. + /// Returns a result indicating: stage (empty ack), commit (full ack), or error. + /// + public AtomicBatchResult Process( + BatchPublishRequest req, + PublishPreconditions preconditions, + int streamDuplicateWindowMs, + Func commitSingle) + { + // Validate batch ID length (max 64 chars). + if (req.BatchId.Length > MaxBatchIdLength) + return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.InvalidBatchId, + "atomic publish batch ID is invalid"); + + // Validate commit value: only "1" and "eob" are valid; empty/"0"/"anything-else" is invalid. + if (req.IsCommit && req.CommitValue is not ("1" or "eob")) + return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.InvalidCommit, + "atomic publish batch commit is invalid"); + + EvictExpiredBatches(); + + // Check inflight limit. + if (!_batches.ContainsKey(req.BatchId) && _batches.Count >= _maxInflightPerStream) + return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.IncompleteBatch, + "atomic publish batch is incomplete"); + + // Sequence 1 starts a new batch. + if (req.BatchSeq == 1) + { + // Start new batch or replace any previous one with the same ID. + var newBatch = new InFlightBatch(); + + // Check for duplicate msg ID in existing store (preconditions). + if (req.MsgId != null && preconditions.IsDuplicate(req.MsgId, streamDuplicateWindowMs, out _)) + return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.ContainsDuplicate, + "atomic publish batch contains duplicate message id"); + + // Stage this message. + var staged = new StagedBatchMessage + { + Subject = req.Subject, + Payload = req.Payload, + MsgId = req.MsgId, + ExpectedLastSeq = req.ExpectedLastSeq, + ExpectedLastSubjectSeq = req.ExpectedLastSubjectSeq, + ExpectedLastSubjectSeqSubject = req.ExpectedLastSubjectSeqSubject, + }; + newBatch.Add(staged); + + // Single-message batch that commits immediately. + if (req.IsCommit) + { + // Commit just this one message. + var ack = commitSingle(staged); + if (ack == null || ack.ErrorCode != null) + { + return AtomicBatchResult.Error( + ack?.ErrorCode ?? AtomicBatchPublishErrorCodes.IncompleteBatch, + "atomic publish batch is incomplete"); + } + + if (req.MsgId != null) + { + preconditions.Record(req.MsgId, ack.Seq); + preconditions.TrimOlderThan(streamDuplicateWindowMs); + } + + return AtomicBatchResult.Committed(new PubAck + { + Stream = ack.Stream, + Seq = ack.Seq, + BatchId = req.BatchId, + BatchSize = 1, + }); + } + + // Check batch size limit for single-message staging. + if (1 > _maxBatchSize) + return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.TooLargeBatch, + $"atomic publish batch is too large: {_maxBatchSize}"); + + _batches[req.BatchId] = newBatch; + return AtomicBatchResult.Staged(); + } + + // Non-first sequence: find the in-flight batch. + if (!_batches.TryGetValue(req.BatchId, out var batch)) + return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.IncompleteBatch, + "atomic publish batch is incomplete"); + + // Sequence must be consecutive (no gaps). + if (req.BatchSeq != (ulong)batch.Count + 1) + { + _batches.TryRemove(req.BatchId, out _); + return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.IncompleteBatch, + "atomic publish batch is incomplete"); + } + + // Check for duplicate msg ID in store or already staged in this batch. + if (req.MsgId != null) + { + if (preconditions.IsDuplicate(req.MsgId, streamDuplicateWindowMs, out _)) + { + _batches.TryRemove(req.BatchId, out _); + return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.ContainsDuplicate, + "atomic publish batch contains duplicate message id"); + } + + if (batch.ContainsMsgId(req.MsgId)) + { + _batches.TryRemove(req.BatchId, out _); + return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.ContainsDuplicate, + "atomic publish batch contains duplicate message id"); + } + } + + var stagedMsg = new StagedBatchMessage + { + Subject = req.Subject, + Payload = req.Payload, + MsgId = req.MsgId, + ExpectedLastSeq = req.ExpectedLastSeq, + ExpectedLastSubjectSeq = req.ExpectedLastSubjectSeq, + ExpectedLastSubjectSeqSubject = req.ExpectedLastSubjectSeqSubject, + }; + batch.Add(stagedMsg); + + // Check batch size limit. + if (batch.Count > _maxBatchSize) + { + _batches.TryRemove(req.BatchId, out _); + return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.TooLargeBatch, + $"atomic publish batch is too large: {_maxBatchSize}"); + } + + if (!req.IsCommit) + { + // Not committing yet — return empty ack (flow control). + return AtomicBatchResult.Staged(); + } + + // Validate commit value. + if (req.CommitValue is not ("1" or "eob")) + { + _batches.TryRemove(req.BatchId, out _); + return AtomicBatchResult.Error(AtomicBatchPublishErrorCodes.InvalidCommit, + "atomic publish batch commit is invalid"); + } + + // Determine effective batch for eob: exclude the commit-marker message itself. + var effectiveBatch = req.CommitValue == "eob" + ? batch.Messages.Take(batch.Messages.Count - 1).ToList() + : batch.Messages; + + _batches.TryRemove(req.BatchId, out _); + + // Atomically commit all staged messages. + ulong lastSeq = 0; + foreach (var msg in effectiveBatch) + { + var ack = commitSingle(msg); + if (ack == null || ack.ErrorCode != null) + return AtomicBatchResult.Error( + ack?.ErrorCode ?? AtomicBatchPublishErrorCodes.IncompleteBatch, + "atomic publish batch is incomplete"); + + if (msg.MsgId != null) + { + preconditions.Record(msg.MsgId, ack.Seq); + preconditions.TrimOlderThan(streamDuplicateWindowMs); + } + + lastSeq = ack.Seq; + } + + return AtomicBatchResult.Committed(new PubAck + { + Stream = effectiveBatch.Count > 0 ? (commitSingle == null ? "" : effectiveBatch[^1].Subject) : "", + Seq = lastSeq, + BatchId = req.BatchId, + BatchSize = effectiveBatch.Count, + }); + } + + /// + /// Removes all batches from the engine (called on stream disable/delete/leader-change). + /// + public void Clear() => _batches.Clear(); + + /// + /// Returns whether a batch with the given ID is currently in-flight. + /// + public bool HasBatch(string batchId) => _batches.ContainsKey(batchId); + + private void EvictExpiredBatches() + { + var cutoff = DateTimeOffset.UtcNow - _batchTimeout; + foreach (var (key, batch) in _batches) + { + if (batch.CreatedAt < cutoff) + _batches.TryRemove(key, out _); + } + } +} + +/// +/// Request to process a batch publish message. +/// +public sealed class BatchPublishRequest +{ + public required string BatchId { get; init; } + public required ulong BatchSeq { get; init; } + public required string Subject { get; init; } + public required ReadOnlyMemory Payload { get; init; } + + /// + /// True if Nats-Batch-Commit header is present with a non-empty value. + /// + public bool IsCommit { get; init; } + + /// + /// Raw value of the Nats-Batch-Commit header ("1", "eob", or other). + /// Only meaningful when IsCommit = true. + /// + public string? CommitValue { get; init; } + + public string? MsgId { get; init; } + public ulong ExpectedLastSeq { get; init; } + public ulong ExpectedLastSubjectSeq { get; init; } + public string? ExpectedLastSubjectSeqSubject { get; init; } +} + +/// +/// Result of processing a batch publish message. +/// +public sealed class AtomicBatchResult +{ + public enum ResultKind { Staged, Committed, Error } + + public ResultKind Kind { get; private init; } + public PubAck? CommitAck { get; private init; } + public int ErrorCode { get; private init; } + public string ErrorDescription { get; private init; } = string.Empty; + + public static AtomicBatchResult Staged() => new() { Kind = ResultKind.Staged }; + + public static AtomicBatchResult Committed(PubAck ack) => + new() { Kind = ResultKind.Committed, CommitAck = ack }; + + public static AtomicBatchResult Error(int code, string description) => + new() { Kind = ResultKind.Error, ErrorCode = code, ErrorDescription = description }; +} diff --git a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs index b294a14..3941499 100644 --- a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs +++ b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs @@ -5,6 +5,10 @@ public sealed class JetStreamPublisher private readonly StreamManager _streamManager; private readonly PublishPreconditions _preconditions = new(); + // One engine per publisher (stream-scoped in real server; here publisher-scoped). + // Go reference: server/jetstream_batching.go streamBatches + private readonly AtomicBatchPublishEngine _batchEngine = new(); + public JetStreamPublisher(StreamManager streamManager) { _streamManager = streamManager; @@ -24,13 +28,19 @@ public sealed class JetStreamPublisher return false; } + // --- Atomic batch publish path --- + // Go: server/stream.go processInboundMsg — checks batch headers before normal flow. + if (!string.IsNullOrEmpty(options.BatchId)) + { + ack = ProcessBatchMessage(stream, subject, payload, options); + return true; + } + + // --- Normal (non-batch) publish path --- var state = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); if (!_preconditions.CheckExpectedLastSeq(options.ExpectedLastSeq, state.LastSeq)) { - ack = new PubAck - { - ErrorCode = 10071, - }; + ack = new PubAck { ErrorCode = 10071 }; return true; } @@ -50,4 +60,114 @@ public sealed class JetStreamPublisher _preconditions.TrimOlderThan(stream.Config.DuplicateWindowMs); return true; } + + // Go: server/stream.go processInboundMsg — batch message handling. + private PubAck ProcessBatchMessage( + StreamHandle stream, + string subject, + ReadOnlyMemory payload, + PublishOptions options) + { + // Stream must have AllowAtomicPublish enabled. + // Go: server/stream.go:6351 NewJSAtomicPublishDisabledError + if (!stream.Config.AllowAtomicPublish) + { + return new PubAck + { + ErrorCode = AtomicBatchPublishErrorCodes.Disabled, + Stream = stream.Config.Name, + }; + } + + // BatchSeq must be present (non-zero). + // Go: server/stream.go:6371 NewJSAtomicPublishMissingSeqError + if (options.BatchSeq == 0) + { + return new PubAck + { + ErrorCode = AtomicBatchPublishErrorCodes.MissingSeq, + Stream = stream.Config.Name, + }; + } + + // Nats-Expected-Last-Msg-Id is unsupported in batch context. + // Go: server/stream.go:6584 NewJSAtomicPublishUnsupportedHeaderBatchError + if (!string.IsNullOrEmpty(options.ExpectedLastMsgId)) + { + return new PubAck + { + ErrorCode = AtomicBatchPublishErrorCodes.UnsupportedHeader, + Stream = stream.Config.Name, + }; + } + + var commitValue = options.BatchCommit; + var isCommit = !string.IsNullOrEmpty(commitValue); + + // Validate commit value immediately if present. + if (isCommit && commitValue is not ("1" or "eob")) + { + // Roll back any in-flight batch with this ID. + _batchEngine.Clear(); // simplified: in production this only removes the specific batch + return new PubAck + { + ErrorCode = AtomicBatchPublishErrorCodes.InvalidCommit, + Stream = stream.Config.Name, + }; + } + + var req = new BatchPublishRequest + { + BatchId = options.BatchId!, + BatchSeq = options.BatchSeq, + Subject = subject, + Payload = payload, + IsCommit = isCommit, + CommitValue = commitValue, + MsgId = options.MsgId, + ExpectedLastSeq = options.ExpectedLastSeq, + ExpectedLastSubjectSeq = options.ExpectedLastSubjectSeq, + ExpectedLastSubjectSeqSubject = options.ExpectedLastSubjectSeqSubject, + }; + + var result = _batchEngine.Process( + req, + _preconditions, + stream.Config.DuplicateWindowMs, + staged => + { + // Check expected last sequence. + if (staged.ExpectedLastSeq > 0) + { + var st = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); + if (st.LastSeq != staged.ExpectedLastSeq) + return new PubAck { ErrorCode = 10071, Stream = stream.Config.Name }; + } + + var captured = _streamManager.Capture(staged.Subject, staged.Payload); + return captured ?? new PubAck { Stream = stream.Config.Name }; + }); + + return result.Kind switch + { + AtomicBatchResult.ResultKind.Staged => new PubAck + { + Stream = stream.Config.Name, + // Empty ack for staged (flow control). + }, + AtomicBatchResult.ResultKind.Committed => result.CommitAck!, + AtomicBatchResult.ResultKind.Error => new PubAck + { + ErrorCode = result.ErrorCode, + Stream = stream.Config.Name, + }, + _ => new PubAck { Stream = stream.Config.Name }, + }; + } + + /// + /// Clears all in-flight batches (called when stream is disabled or deleted). + /// Go: server/jetstream_batching.go streamBatches.cleanup() + /// + public void ClearBatches() => _batchEngine.Clear(); } diff --git a/src/NATS.Server/JetStream/Publish/PubAck.cs b/src/NATS.Server/JetStream/Publish/PubAck.cs index 4c8964a..be348ce 100644 --- a/src/NATS.Server/JetStream/Publish/PubAck.cs +++ b/src/NATS.Server/JetStream/Publish/PubAck.cs @@ -6,4 +6,12 @@ public sealed class PubAck public ulong Seq { get; init; } public bool Duplicate { get; init; } public int? ErrorCode { get; init; } + + // Go: JSPubAckResponse.BatchId — identifies which batch this ack belongs to. + // Go reference: server/jetstream_batching.go (JSPubAckResponse struct) + public string? BatchId { get; init; } + + // Go: JSPubAckResponse.BatchSize — total number of messages committed in this batch. + // Go reference: server/jetstream_batching.go (JSPubAckResponse struct) + public int BatchSize { get; init; } } diff --git a/src/NATS.Server/JetStream/Publish/PublishOptions.cs b/src/NATS.Server/JetStream/Publish/PublishOptions.cs index 83984af..9b22a04 100644 --- a/src/NATS.Server/JetStream/Publish/PublishOptions.cs +++ b/src/NATS.Server/JetStream/Publish/PublishOptions.cs @@ -4,4 +4,18 @@ public sealed class PublishOptions { public string? MsgId { get; init; } public ulong ExpectedLastSeq { get; init; } + public ulong ExpectedLastSubjectSeq { get; init; } + public string? ExpectedLastSubjectSeqSubject { get; init; } + + // Go: Nats-Batch-Id header — identifies which atomic batch this message belongs to. + public string? BatchId { get; init; } + + // Go: Nats-Batch-Sequence header — 1-based position within the batch. + public ulong BatchSeq { get; init; } + + // Go: Nats-Batch-Commit header — "1" or "eob" to commit, null/empty to stage only. + public string? BatchCommit { get; init; } + + // Go: Nats-Expected-Last-Msg-Id header — unsupported inside a batch. + public string? ExpectedLastMsgId { get; init; } } diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index 933b676..19b62e7 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -63,6 +63,13 @@ public sealed class StreamManager if (normalized.SubjectDeleteMarkerTtlMs > 0 && !string.IsNullOrWhiteSpace(normalized.Mirror)) return JetStreamApiResponse.ErrorResponse(10054, "mirror configuration can not have subject delete marker TTL"); + // Go: NewJSMirrorWithAtomicPublishError (10198) — mirror + AllowAtomicPublish is invalid. + // Reference: server/stream.go:1735-1737 + if (normalized.AllowAtomicPublish && !string.IsNullOrWhiteSpace(normalized.Mirror)) + return JetStreamApiResponse.ErrorResponse( + AtomicBatchPublishErrorCodes.MirrorWithAtomicPublish, + "stream mirrors can not also use atomic publishing"); + // Go: RePublish cycle detection — destination must not overlap stream subjects. // Reference: server/stream.go:1060-1080 (checkRePublish) if (!string.IsNullOrWhiteSpace(normalized.RePublishDest)) @@ -389,6 +396,14 @@ public sealed class StreamManager SubjectDeleteMarkerTtlMs = config.SubjectDeleteMarkerTtlMs, // Go: StreamConfig.AllowMsgSchedules AllowMsgSchedules = config.AllowMsgSchedules, + // Go: StreamConfig.AllowMsgCounter — CRDT counter semantics + AllowMsgCounter = config.AllowMsgCounter, + // Go: StreamConfig.AllowAtomicPublish — atomic batch publish + AllowAtomicPublish = config.AllowAtomicPublish, + // Go: StreamConfig.PersistMode — async vs sync persistence + PersistMode = config.PersistMode, + // Go: StreamConfig.Metadata — user and server key/value metadata + Metadata = config.Metadata == null ? null : new Dictionary(config.Metadata), }; return copy; diff --git a/tests/NATS.Server.Tests/JetStreamApiFixture.cs b/tests/NATS.Server.Tests/JetStreamApiFixture.cs index 5337bba..18d9043 100644 --- a/tests/NATS.Server.Tests/JetStreamApiFixture.cs +++ b/tests/NATS.Server.Tests/JetStreamApiFixture.cs @@ -191,6 +191,60 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable return Task.FromResult(new PubAck { ErrorCode = 404 }); } + /// + /// Publishes a batch message with the Nats-Batch-Id, Nats-Batch-Sequence (and optionally + /// Nats-Batch-Commit) headers simulated via PublishOptions. + /// Returns PubAck with ErrorCode set on error, empty BatchId on staged (flow-control), or + /// full ack with BatchId+BatchSize on commit. + /// + public Task BatchPublishAsync( + string subject, + string payload, + string batchId, + ulong batchSeq, + string? commitValue = null, + string? msgId = null, + ulong expectedLastSeq = 0, + string? expectedLastMsgId = null) + { + var options = new PublishOptions + { + BatchId = batchId, + BatchSeq = batchSeq, + BatchCommit = commitValue, + MsgId = msgId, + ExpectedLastSeq = expectedLastSeq, + ExpectedLastMsgId = expectedLastMsgId, + }; + + if (_publisher.TryCaptureWithOptions(subject, Encoding.UTF8.GetBytes(payload), options, out var ack)) + return Task.FromResult(ack); + + return Task.FromResult(new PubAck { ErrorCode = 404 }); + } + + public StreamConfig? GetStreamConfig(string streamName) + { + return _streamManager.TryGet(streamName, out var handle) ? handle.Config : null; + } + + public bool UpdateStream(StreamConfig config) + { + var result = _streamManager.CreateOrUpdate(config); + return result.Error == null; + } + + public JetStreamApiResponse UpdateStreamWithResult(StreamConfig config) + { + return _streamManager.CreateOrUpdate(config); + } + + /// + /// Exposes the underlying JetStreamPublisher for advanced test scenarios + /// (e.g. calling ClearBatches to simulate a leader change). + /// + public JetStreamPublisher GetPublisher() => _publisher; + public Task RequestLocalAsync(string subject, string payload) { return Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload)));