namespace NATS.Server.JetStream.Publish; public sealed class JetStreamPublisher { private readonly StreamManager _streamManager; private readonly PublishPreconditions _preconditions = new(); // One engine per publisher (stream-scoped in real server; here publisher-scoped). // Go reference: server/jetstream_batching.go streamBatches private readonly AtomicBatchPublishEngine _batchEngine = new(); public JetStreamPublisher(StreamManager streamManager) { _streamManager = streamManager; } public bool TryCapture(string subject, ReadOnlyMemory payload, out PubAck ack) => TryCaptureWithOptions(subject, payload, new PublishOptions(), out ack); public bool TryCapture(string subject, ReadOnlyMemory payload, string? msgId, out PubAck ack) => TryCaptureWithOptions(subject, payload, new PublishOptions { MsgId = msgId }, out ack); public bool TryCaptureWithOptions(string subject, ReadOnlyMemory payload, PublishOptions options, out PubAck ack) { if (_streamManager.FindBySubject(subject) is not { } stream) { ack = new PubAck(); return false; } // --- Atomic batch publish path --- // Go: server/stream.go processInboundMsg — checks batch headers before normal flow. if (!string.IsNullOrEmpty(options.BatchId)) { ack = ProcessBatchMessage(stream, subject, payload, options); return true; } // --- Normal (non-batch) publish path --- // Use cached LastSeq property instead of GetStateAsync to avoid allocation. if (!_preconditions.CheckExpectedLastSeq(options.ExpectedLastSeq, stream.Store.LastSeq)) { ack = new PubAck { ErrorCode = 10071 }; return true; } if (_preconditions.IsDuplicate(options.MsgId, stream.Config.DuplicateWindowMs, out var existingSequence)) { ack = new PubAck { Seq = existingSequence, ErrorCode = 10071, }; return true; } // Pass resolved stream to avoid double FindBySubject lookup. var captured = _streamManager.Capture(stream, subject, payload); ack = captured ?? new PubAck(); _preconditions.Record(options.MsgId, ack.Seq); _preconditions.TrimOlderThan(stream.Config.DuplicateWindowMs); return true; } // Go: server/stream.go processInboundMsg — batch message handling. private PubAck ProcessBatchMessage( StreamHandle stream, string subject, ReadOnlyMemory payload, PublishOptions options) { // Stream must have AllowAtomicPublish enabled. // Go: server/stream.go:6351 NewJSAtomicPublishDisabledError if (!stream.Config.AllowAtomicPublish) { return new PubAck { ErrorCode = AtomicBatchPublishErrorCodes.Disabled, Stream = stream.Config.Name, }; } // BatchSeq must be present (non-zero). // Go: server/stream.go:6371 NewJSAtomicPublishMissingSeqError if (options.BatchSeq == 0) { return new PubAck { ErrorCode = AtomicBatchPublishErrorCodes.MissingSeq, Stream = stream.Config.Name, }; } // Nats-Expected-Last-Msg-Id is unsupported in batch context. // Go: server/stream.go:6584 NewJSAtomicPublishUnsupportedHeaderBatchError if (!string.IsNullOrEmpty(options.ExpectedLastMsgId)) { return new PubAck { ErrorCode = AtomicBatchPublishErrorCodes.UnsupportedHeader, Stream = stream.Config.Name, }; } var commitValue = options.BatchCommit; var isCommit = !string.IsNullOrEmpty(commitValue); // Validate commit value immediately if present. if (isCommit && commitValue is not ("1" or "eob")) { // Roll back any in-flight batch with this ID. _batchEngine.Clear(); // simplified: in production this only removes the specific batch return new PubAck { ErrorCode = AtomicBatchPublishErrorCodes.InvalidCommit, Stream = stream.Config.Name, }; } var req = new BatchPublishRequest { BatchId = options.BatchId!, BatchSeq = options.BatchSeq, Subject = subject, Payload = payload, IsCommit = isCommit, CommitValue = commitValue, MsgId = options.MsgId, ExpectedLastSeq = options.ExpectedLastSeq, ExpectedLastSubjectSeq = options.ExpectedLastSubjectSeq, ExpectedLastSubjectSeqSubject = options.ExpectedLastSubjectSeqSubject, }; var result = _batchEngine.Process( req, _preconditions, stream.Config.DuplicateWindowMs, staged => { // Check expected last sequence using cached property. if (staged.ExpectedLastSeq > 0) { if (stream.Store.LastSeq != staged.ExpectedLastSeq) return new PubAck { ErrorCode = 10071, Stream = stream.Config.Name }; } var captured = _streamManager.Capture(stream, staged.Subject, staged.Payload); return captured ?? new PubAck { Stream = stream.Config.Name }; }); return result.Kind switch { AtomicBatchResult.ResultKind.Staged => new PubAck { Stream = stream.Config.Name, // Empty ack for staged (flow control). }, AtomicBatchResult.ResultKind.Committed => result.CommitAck!, AtomicBatchResult.ResultKind.Error => new PubAck { ErrorCode = result.ErrorCode, Stream = stream.Config.Name, }, _ => new PubAck { Stream = stream.Config.Name }, }; } /// /// Clears all in-flight batches (called when stream is disabled or deleted). /// Go: server/jetstream_batching.go streamBatches.cleanup() /// public void ClearBatches() => _batchEngine.Clear(); }