// Ported from golang/nats-server/server/jetstream_batching_test.go // Go: TestJetStreamAtomicBatch* series — atomic batch publish protocol: // stage/commit semantics, error codes, limits, dedupe, mirror/source constraints, // cleanup on disable/delete, config options, deny-headers, advisory payloads, // expected-seq checks, rollback, and recovery behaviors. using System.Text; using NATS.Server.JetStream; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Publish; using NATS.Server.TestUtilities; namespace NATS.Server.JetStream.Tests.JetStream; /// /// Go-parity tests for atomic batch publish (Nats-Batch-Id / Nats-Batch-Sequence / /// Nats-Batch-Commit headers). Ported from server/jetstream_batching_test.go. /// /// The protocol: /// - Nats-Batch-Id: UUID identifying the batch /// - Nats-Batch-Sequence: 1-based position within the batch /// - Nats-Batch-Commit: "1" or "eob" to atomically commit all staged messages /// /// Non-commit messages return an empty ack (flow control). /// A commit returns the full PubAck with BatchId + BatchSize set. /// public class JsBatchingTests { // ========================================================================= // TestJetStreamAtomicBatchPublish — jetstream_batching_test.go:35 // Basic end-to-end: stage, commit, validate stream contents. // ========================================================================= [Fact] public async Task AtomicBatchPublish_disabled_by_default_returns_error() { // Go: TestJetStreamAtomicBatchPublish (jetstream_batching_test.go:35) // Publishing with Nats-Batch-Id when AllowAtomicPublish=false must fail. await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST", Subjects = ["foo.*"], AllowAtomicPublish = false, }); var ack = await fx.BatchPublishAsync("foo.0", "data", batchId: "uuid", batchSeq: 1, commitValue: "1"); ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.Disabled); } [Fact] public async Task AtomicBatchPublish_missing_sequence_returns_error() { // Go: TestJetStreamAtomicBatchPublish — batch id present but no sequence // (jetstream_batching_test.go:78-83) await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST", Subjects = ["foo.*"], AllowAtomicPublish = true, }); // BatchSeq = 0 means missing sequence header. var ack = await fx.BatchPublishAsync("foo.0", "data", batchId: "uuid", batchSeq: 0); ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.MissingSeq); } [Fact] public async Task AtomicBatchPublish_sequence_starts_above_one_returns_incomplete() { // Go: TestJetStreamAtomicBatchPublish — first msg has seq=2, gaps detected // (jetstream_batching_test.go:84-91) await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST", Subjects = ["foo.*"], AllowAtomicPublish = true, }); // Seq 2 without a seq 1 start is an incomplete batch. var ack = await fx.BatchPublishAsync("foo.0", "data", batchId: "uuid", batchSeq: 2, commitValue: "1"); ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch); } [Fact] public async Task AtomicBatchPublish_single_message_batch_commit_succeeds() { // Go: TestJetStreamAtomicBatchPublish — single-message batch commits immediately // (jetstream_batching_test.go:93-103) await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST", Subjects = ["foo.*"], AllowAtomicPublish = true, }); var ack = await fx.BatchPublishAsync( "foo.0", "foo.0", batchId: "uuid", batchSeq: 1, commitValue: "1"); ack.ErrorCode.ShouldBeNull(); ack.Seq.ShouldBe(1UL); ack.BatchId.ShouldBe("uuid"); ack.BatchSize.ShouldBe(1); } [Fact] public async Task AtomicBatchPublish_multi_message_batch_commit_returns_last_seq() { // Go: TestJetStreamAtomicBatchPublish — batch of 5 messages, commit on last // (jetstream_batching_test.go:117-138) await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST", Subjects = ["foo.*"], AllowAtomicPublish = true, }); const int batchSize = 5; PubAck commitAck = null!; for (var seq = 1; seq <= batchSize; seq++) { var subject = $"foo.{seq}"; var commit = seq == batchSize ? "1" : null; var ack = await fx.BatchPublishAsync(subject, subject, batchId: "uuid", batchSeq: (ulong)seq, commitValue: commit); if (seq < batchSize) { // Non-commit messages: empty ack (flow control, no error). ack.ErrorCode.ShouldBeNull(); ack.BatchId.ShouldBeNull(); } else { commitAck = ack; } } commitAck.ErrorCode.ShouldBeNull(); commitAck.BatchId.ShouldBe("uuid"); commitAck.BatchSize.ShouldBe(batchSize); commitAck.Seq.ShouldBeGreaterThan(0UL); } [Fact] public async Task AtomicBatchPublish_gap_in_sequence_returns_incomplete() { // Go: TestJetStreamAtomicBatchPublish — batch with gap (1, then 3) // (jetstream_batching_test.go:108-115) await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST", Subjects = ["foo.*"], AllowAtomicPublish = true, }); // Publish seq 1 (stages OK). var ack1 = await fx.BatchPublishAsync("foo.1", "data", batchId: "uuid", batchSeq: 1); ack1.ErrorCode.ShouldBeNull(); // Publish seq 3 (gap — seq 2 missing) — must return incomplete. var ack3 = await fx.BatchPublishAsync("foo.3", "data", batchId: "uuid", batchSeq: 3, commitValue: "1"); ack3.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch); } // ========================================================================= // TestJetStreamAtomicBatchPublishEmptyAck — jetstream_batching_test.go:163 // Non-commit messages return an empty (flow-control) ack. // ========================================================================= [Fact] public async Task AtomicBatchPublish_non_commit_messages_return_empty_flow_control_ack() { // Go: TestJetStreamAtomicBatchPublishEmptyAck (jetstream_batching_test.go:163) await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST", Subjects = ["foo.*"], AllowAtomicPublish = true, }); const int batchSize = 5; for (var seq = 1; seq <= batchSize; seq++) { var subject = $"foo.{seq}"; var commit = seq == batchSize ? "1" : null; var ack = await fx.BatchPublishAsync(subject, subject, batchId: "uuid2", batchSeq: (ulong)seq, commitValue: commit); if (seq < batchSize) { // Empty ack for staged messages. ack.ErrorCode.ShouldBeNull(); ack.BatchId.ShouldBeNull(); // no batch info on staged ack } else { // Commit returns full batch ack. ack.ErrorCode.ShouldBeNull(); ack.Seq.ShouldBe((ulong)batchSize); ack.BatchId.ShouldBe("uuid2"); ack.BatchSize.ShouldBe(batchSize); } } } // ========================================================================= // TestJetStreamAtomicBatchPublishCommitEob — jetstream_batching_test.go:225 // The "eob" (end-of-batch) commit value: the commit message itself is excluded // from the committed set. // ========================================================================= [Fact] public async Task AtomicBatchPublish_commit_eob_excludes_commit_marker_message() { // Go: TestJetStreamAtomicBatchPublishCommitEob (jetstream_batching_test.go:225) // With "eob", the commit message (seq=3) is NOT stored — only seqs 1,2 are. await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST", Subjects = ["foo"], AllowAtomicPublish = true, }); await fx.BatchPublishAsync("foo", "msg1", batchId: "uuid", batchSeq: 1); await fx.BatchPublishAsync("foo", "msg2", batchId: "uuid", batchSeq: 2); // Commit with "eob" — the commit message (seq=3) is excluded. var ack = await fx.BatchPublishAsync("foo", "commit-marker", batchId: "uuid", batchSeq: 3, commitValue: "eob"); ack.ErrorCode.ShouldBeNull(); // The "eob" commit excludes the last (commit-marker) message. ack.BatchSize.ShouldBe(2); ack.Seq.ShouldBe(2UL); // last stored message is seq 2 ack.BatchId.ShouldBe("uuid"); var state = await fx.GetStreamStateAsync("TEST"); state.Messages.ShouldBe(2UL); } [Fact] public async Task AtomicBatchPublish_commit_value_1_includes_all_messages() { // Go: TestJetStreamAtomicBatchPublishCommitEob — normal commit includes all messages await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TESTA", Subjects = ["bar"], AllowAtomicPublish = true, }); await fx.BatchPublishAsync("bar", "msg1", batchId: "batch_a", batchSeq: 1); await fx.BatchPublishAsync("bar", "msg2", batchId: "batch_a", batchSeq: 2); var ack = await fx.BatchPublishAsync("bar", "msg3", batchId: "batch_a", batchSeq: 3, commitValue: "1"); ack.ErrorCode.ShouldBeNull(); ack.BatchSize.ShouldBe(3); // all 3 messages committed ack.Seq.ShouldBe(3UL); ack.BatchId.ShouldBe("batch_a"); var state = await fx.GetStreamStateAsync("TESTA"); state.Messages.ShouldBe(3UL); } // ========================================================================= // TestJetStreamAtomicBatchPublishLimits — jetstream_batching_test.go:293 // Batch ID max length, inflight limit, batch size limit. // ========================================================================= [Fact] public async Task AtomicBatchPublish_batch_id_max_64_chars_is_accepted() { // Go: TestJetStreamAtomicBatchPublishLimits — max-length batch ID (64 chars) // (jetstream_batching_test.go:337-354) await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "FOO", Subjects = ["foo"], AllowAtomicPublish = true, }); var longId = new string('A', 64); var ack = await fx.BatchPublishAsync("foo", "data", batchId: longId, batchSeq: 1, commitValue: "1"); ack.ErrorCode.ShouldBeNull(); } [Fact] public async Task AtomicBatchPublish_batch_id_over_64_chars_is_rejected() { // Go: TestJetStreamAtomicBatchPublishLimits — batch ID too long (65 chars) // (jetstream_batching_test.go:337-354) await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "FOO2", Subjects = ["foo2"], AllowAtomicPublish = true, }); var tooLongId = new string('A', 65); var ack = await fx.BatchPublishAsync("foo2", "data", batchId: tooLongId, batchSeq: 1, commitValue: "1"); ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.InvalidBatchId); } [Fact] public async Task AtomicBatchPublish_inflight_limit_per_stream_enforced() { // Go: TestJetStreamAtomicBatchPublishLimits — inflight limit per stream = 1 // (jetstream_batching_test.go:356-382) // When there's 1 in-flight batch (not yet committed), a second batch is rejected. var engine = new AtomicBatchPublishEngine( maxInflightPerStream: 1, maxBatchSize: AtomicBatchPublishEngine.DefaultMaxBatchSize); var preconditions = new PublishPreconditions(); // First batch seq=1 starts staging (occupies the 1 inflight slot). var req1 = new BatchPublishRequest { BatchId = "batch_a", BatchSeq = 1, Subject = "foo", Payload = "data"u8.ToArray(), IsCommit = false, }; var result1 = engine.Process(req1, preconditions, 0, _ => new PubAck { Stream = "TEST", Seq = 1 }); result1.Kind.ShouldBe(AtomicBatchResult.ResultKind.Staged); // Second batch: would exceed the inflight limit. var req2 = new BatchPublishRequest { BatchId = "batch_b", BatchSeq = 1, Subject = "foo", Payload = "data"u8.ToArray(), IsCommit = true, CommitValue = "1", }; var result2 = engine.Process(req2, preconditions, 0, _ => new PubAck { Stream = "TEST", Seq = 2 }); result2.Kind.ShouldBe(AtomicBatchResult.ResultKind.Error); result2.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch); } [Fact] public void AtomicBatchPublish_batch_size_limit_enforced() { // Go: TestJetStreamAtomicBatchPublishLimits — max batch size = 2 // (jetstream_batching_test.go:415-440) var engine = new AtomicBatchPublishEngine( maxInflightPerStream: 50, maxBatchSize: 2); var preconditions = new PublishPreconditions(); // Stage seq 1 and seq 2 (exactly at limit — commit should succeed). for (ulong seq = 1; seq <= 2; seq++) { var isCommit = seq == 2; var req = new BatchPublishRequest { BatchId = "big", BatchSeq = seq, Subject = "foo", Payload = "data"u8.ToArray(), IsCommit = isCommit, CommitValue = isCommit ? "1" : null, }; ulong capturedSeq = 0; var result = engine.Process(req, preconditions, 0, msg => { capturedSeq++; return new PubAck { Stream = "TEST", Seq = capturedSeq }; }); if (!isCommit) result.Kind.ShouldBe(AtomicBatchResult.ResultKind.Staged); else { result.Kind.ShouldBe(AtomicBatchResult.ResultKind.Committed); result.CommitAck!.BatchSize.ShouldBe(2); } } // Now attempt a batch of 3 (one over the max). var engine2 = new AtomicBatchPublishEngine(maxInflightPerStream: 50, maxBatchSize: 2); for (ulong seq = 1; seq <= 3; seq++) { var isCommit = seq == 3; var req = new BatchPublishRequest { BatchId = "toobig", BatchSeq = seq, Subject = "foo", Payload = "data"u8.ToArray(), IsCommit = isCommit, CommitValue = isCommit ? "1" : null, }; ulong nextSeq = seq; var result = engine2.Process(req, preconditions, 0, _ => new PubAck { Stream = "TEST", Seq = nextSeq }); if (seq <= 2) { result.Kind.ShouldBeOneOf(AtomicBatchResult.ResultKind.Staged, AtomicBatchResult.ResultKind.Error); } else { // Third message exceeds max; must error. result.Kind.ShouldBe(AtomicBatchResult.ResultKind.Error); result.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.TooLargeBatch); } } } // ========================================================================= // TestJetStreamAtomicBatchPublishDedupeNotAllowed — jetstream_batching_test.go:447 // Pre-existing duplicate msg IDs within a batch cause an error. // ========================================================================= [Fact] public async Task AtomicBatchPublish_duplicate_msgid_in_store_rejected() { // Go: TestJetStreamAtomicBatchPublishDedupeNotAllowed (jetstream_batching_test.go:447) // A batch message whose Nats-Msg-Id matches an already-stored message ID must fail. await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST", Subjects = ["foo"], AllowAtomicPublish = true, DuplicateWindowMs = 60_000, }); // Store a message with msg ID "pre-existing" in the normal flow. var preAck = await fx.PublishAndGetAckAsync("foo", "original", msgId: "pre-existing"); preAck.ErrorCode.ShouldBeNull(); // Attempt to include "pre-existing" in a batch — must be rejected. var batchAck = await fx.BatchPublishAsync( "foo", "duplicate", batchId: "uuid", batchSeq: 1, commitValue: "1", msgId: "pre-existing"); batchAck.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.ContainsDuplicate); } [Fact] public async Task AtomicBatchPublish_unique_msgids_in_batch_are_accepted() { // Go: TestJetStreamAtomicBatchPublishDedupeNotAllowed — unique msg IDs succeed // (jetstream_batching_test.go:483-499) await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST2", Subjects = ["foo2"], AllowAtomicPublish = true, DuplicateWindowMs = 60_000, }); // Stage msg with id1. var ack1 = await fx.BatchPublishAsync("foo2", "a", batchId: "uuid", batchSeq: 1, msgId: "id1"); ack1.ErrorCode.ShouldBeNull(); // Commit msg with id2. var ack2 = await fx.BatchPublishAsync("foo2", "b", batchId: "uuid", batchSeq: 2, commitValue: "1", msgId: "id2"); ack2.ErrorCode.ShouldBeNull(); ack2.BatchSize.ShouldBe(2); } [Fact] public async Task AtomicBatchPublish_duplicate_msgid_within_same_batch_rejected() { // Go: TestJetStreamAtomicBatchPublishDedupeNotAllowed — same msg ID in batch twice await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST3", Subjects = ["foo3"], AllowAtomicPublish = true, DuplicateWindowMs = 60_000, }); // Stage first message with idA. var ack1 = await fx.BatchPublishAsync("foo3", "msg1", batchId: "dup_batch", batchSeq: 1, msgId: "idA"); ack1.ErrorCode.ShouldBeNull(); // Second message reuses idA — must be rejected. var ack2 = await fx.BatchPublishAsync("foo3", "msg2", batchId: "dup_batch", batchSeq: 2, commitValue: "1", msgId: "idA"); ack2.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.ContainsDuplicate); } // ========================================================================= // TestJetStreamAtomicBatchPublishSourceAndMirror — jetstream_batching_test.go:519 // Mirror streams cannot have AllowAtomicPublish=true. // ========================================================================= [Fact] public void AtomicBatchPublish_mirror_with_allow_atomic_publish_is_rejected() { // Go: TestJetStreamAtomicBatchPublishSourceAndMirror — mirror + AllowAtomicPublish // (jetstream_batching_test.go:561-569) // Go error: NewJSMirrorWithAtomicPublishError (10198) var sm = new StreamManager(); // Create the source stream. var srcResp = sm.CreateOrUpdate(new StreamConfig { Name = "TEST", Subjects = ["foo"], }); srcResp.Error.ShouldBeNull(); // Attempt to create a mirror with AllowAtomicPublish=true — must be rejected. var mirrorResp = sm.CreateOrUpdate(new StreamConfig { Name = "M-no-batch", Mirror = "TEST", AllowAtomicPublish = true, }); mirrorResp.Error.ShouldNotBeNull(); mirrorResp.Error!.Code.ShouldBe(AtomicBatchPublishErrorCodes.MirrorWithAtomicPublish); } [Fact] public void AtomicBatchPublish_mirror_without_allow_atomic_publish_is_accepted() { // Go: TestJetStreamAtomicBatchPublishSourceAndMirror — regular mirror is OK var sm = new StreamManager(); sm.CreateOrUpdate(new StreamConfig { Name = "ORIGIN", Subjects = ["origin.>"] }); var mirrorResp = sm.CreateOrUpdate(new StreamConfig { Name = "M_OK", Mirror = "ORIGIN", AllowAtomicPublish = false, }); mirrorResp.Error.ShouldBeNull(); } // ========================================================================= // TestJetStreamAtomicBatchPublishCleanup — jetstream_batching_test.go:620 // In-flight batches are cleaned up when: // - AllowAtomicPublish is disabled on update // - Stream is deleted // - Batch is committed // ========================================================================= [Fact] public async Task AtomicBatchPublish_cleanup_on_disable() { // Go: TestJetStreamAtomicBatchPublishCleanup — Disable mode // (jetstream_batching_test.go:620, mode=Disable) await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST", Subjects = ["foo"], AllowAtomicPublish = true, }); // Start a batch but don't commit it. var staged = await fx.BatchPublishAsync("foo", "msg1", batchId: "cleanup_uuid", batchSeq: 1); staged.ErrorCode.ShouldBeNull(); // Disable AllowAtomicPublish via update. var updated = fx.UpdateStreamWithResult(new StreamConfig { Name = "TEST", Subjects = ["foo"], AllowAtomicPublish = false, }); updated.Error.ShouldBeNull(); // Attempting a new batch msg now should fail with Disabled. var afterDisable = await fx.BatchPublishAsync("foo", "msg2", batchId: "new_batch", batchSeq: 1, commitValue: "1"); afterDisable.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.Disabled); } [Fact] public async Task AtomicBatchPublish_committed_batch_cleans_up_inflight_state() { // Go: TestJetStreamAtomicBatchPublishCleanup — Commit mode // (jetstream_batching_test.go:703-711) await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST", Subjects = ["foo"], AllowAtomicPublish = true, }); // Stage and commit a batch. await fx.BatchPublishAsync("foo", "msg1", batchId: "committed_uuid", batchSeq: 1); var commitAck = await fx.BatchPublishAsync("foo", "msg2", batchId: "committed_uuid", batchSeq: 2, commitValue: "1"); commitAck.ErrorCode.ShouldBeNull(); commitAck.BatchSize.ShouldBe(2); // After commit, starting a NEW batch with the same ID should succeed (state cleaned). var newBatch = await fx.BatchPublishAsync("foo", "fresh", batchId: "committed_uuid", batchSeq: 1, commitValue: "1"); newBatch.ErrorCode.ShouldBeNull(); newBatch.BatchSize.ShouldBe(1); } // ========================================================================= // TestJetStreamAtomicBatchPublishConfigOpts — jetstream_batching_test.go:756 // Default constant values match Go reference. // ========================================================================= [Fact] public void AtomicBatchPublish_default_config_constants_match_go_reference() { // Go: TestJetStreamAtomicBatchPublishConfigOpts (jetstream_batching_test.go:756) // Defaults: max_inflight_per_stream=50, max_inflight_total=1000, max_msgs=1000, timeout=10s AtomicBatchPublishEngine.DefaultMaxInflightPerStream.ShouldBe(50); AtomicBatchPublishEngine.DefaultMaxBatchSize.ShouldBe(1000); AtomicBatchPublishEngine.DefaultBatchTimeout.ShouldBe(TimeSpan.FromSeconds(10)); AtomicBatchPublishEngine.MaxBatchIdLength.ShouldBe(64); } // ========================================================================= // TestJetStreamAtomicBatchPublishDenyHeaders — jetstream_batching_test.go:792 // Certain headers are not supported inside a batch (Nats-Expected-Last-Msg-Id). // ========================================================================= [Fact] public async Task AtomicBatchPublish_unsupported_header_expected_last_msg_id_rejected() { // Go: TestJetStreamAtomicBatchPublishDenyHeaders (jetstream_batching_test.go:792) // Nats-Expected-Last-Msg-Id is not supported within a batch context. await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST", Subjects = ["foo"], AllowAtomicPublish = true, }); // The first staged message has Nats-Expected-Last-Msg-Id set. var ack = await fx.BatchPublishAsync( "foo", "data", batchId: "uuid", batchSeq: 1, expectedLastMsgId: "msgId"); ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.UnsupportedHeader); } // ========================================================================= // TestJetStreamAtomicBatchPublishHighLevelRollback — jetstream_batching_test.go:1455 // Publishing with expected-last-subject-sequence that doesn't match rolls back. // ========================================================================= [Fact] public async Task AtomicBatchPublish_expected_last_seq_checked_at_commit() { // Go: TestJetStreamAtomicBatchPublishExpectedSeq (jetstream_batching_test.go:2594) // A batch with Nats-Expected-Last-Sequence checks the sequence at commit time. await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST", Subjects = ["foo"], AllowAtomicPublish = true, }); // Publish two non-batch messages first. var p1 = await fx.PublishAndGetAckAsync("foo", "msg1"); p1.Seq.ShouldBe(1UL); var p2 = await fx.PublishAndGetAckAsync("foo", "msg2"); p2.Seq.ShouldBe(2UL); // Batch with expected-last-seq = 2 — should succeed (stream is at seq 2). var ack = await fx.BatchPublishAsync( "foo", "batch-msg", batchId: "uuid", batchSeq: 1, commitValue: "1", expectedLastSeq: 2); ack.ErrorCode.ShouldBeNull(); ack.BatchSize.ShouldBe(1); ack.Seq.ShouldBe(3UL); } [Fact] public async Task AtomicBatchPublish_expected_last_seq_mismatch_returns_error() { // Go: TestJetStreamAtomicBatchPublishExpectedSeq — wrong expected seq await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "ESEQ", Subjects = ["eseq"], AllowAtomicPublish = true, }); var p1 = await fx.PublishAndGetAckAsync("eseq", "first"); p1.Seq.ShouldBe(1UL); // Expected seq is 99 but actual is 1 — should fail. var ack = await fx.BatchPublishAsync( "eseq", "batch", batchId: "badseq", batchSeq: 1, commitValue: "1", expectedLastSeq: 99); ack.ErrorCode.ShouldNotBeNull(); } // ========================================================================= // TestJetStreamAtomicBatchPublishCommitUnsupported — jetstream_batching_test.go:2852 // Invalid commit values ("", "0", "unsupported") are rejected. // ========================================================================= [Theory] [InlineData("0")] [InlineData("unsupported")] [InlineData("false")] public async Task AtomicBatchPublish_invalid_commit_value_returns_error(string invalidCommit) { // Go: TestJetStreamAtomicBatchPublishCommitUnsupported (jetstream_batching_test.go:2852) await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST", Subjects = ["foo"], AllowAtomicPublish = true, }); // Stage seq 1. await fx.BatchPublishAsync("foo", "stage", batchId: "bad_commit", batchSeq: 1); // Send commit with invalid value. var ack = await fx.BatchPublishAsync( "foo", "commit-msg", batchId: "bad_commit", batchSeq: 2, commitValue: invalidCommit); ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.InvalidCommit); } [Fact] public async Task AtomicBatchPublish_commit_value_1_is_valid() { // Go: TestJetStreamAtomicBatchPublishCommitUnsupported — "1" is valid await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "VALID1", Subjects = ["v1"], AllowAtomicPublish = true, }); var ack = await fx.BatchPublishAsync("v1", "msg", batchId: "v_commit", batchSeq: 1, commitValue: "1"); ack.ErrorCode.ShouldBeNull(); ack.BatchSize.ShouldBe(1); } [Fact] public async Task AtomicBatchPublish_commit_value_eob_is_valid() { // Go: TestJetStreamAtomicBatchPublishCommitUnsupported — "eob" is valid await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "VALIDEOB", Subjects = ["veob"], AllowAtomicPublish = true, }); await fx.BatchPublishAsync("veob", "msg1", batchId: "eob_commit", batchSeq: 1); var ack = await fx.BatchPublishAsync("veob", "marker", batchId: "eob_commit", batchSeq: 2, commitValue: "eob"); ack.ErrorCode.ShouldBeNull(); // eob excludes the commit marker — only seq 1 committed. ack.BatchSize.ShouldBe(1); } // ========================================================================= // TestJetStreamAtomicBatchPublishExpectedLastSubjectSequence — jetstream_batching_test.go:2804 // Batches can carry per-subject sequence expectations. // ========================================================================= [Fact] public async Task AtomicBatchPublish_multi_message_batch_sequence_is_correct() { // Go: TestJetStreamAtomicBatchPublishExpectedLastSubjectSequence // (jetstream_batching_test.go:2804) — multi-msg batch, each with subject await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "TEST", Subjects = ["foo.*"], AllowAtomicPublish = true, }); // Pre-existing messages. await fx.PublishAndGetAckAsync("foo.A", "a1"); // seq 1 await fx.PublishAndGetAckAsync("foo.B", "b1"); // seq 2 // Stage batch seq 1 on foo.A. var stageAck = await fx.BatchPublishAsync("foo.A", "a2", batchId: "uuid", batchSeq: 1); stageAck.ErrorCode.ShouldBeNull(); // Commit batch seq 2 on foo.B. var commitAck = await fx.BatchPublishAsync("foo.B", "b2", batchId: "uuid", batchSeq: 2, commitValue: "1"); commitAck.ErrorCode.ShouldBeNull(); commitAck.BatchId.ShouldBe("uuid"); commitAck.BatchSize.ShouldBe(2); commitAck.Seq.ShouldBe(4UL); } // ========================================================================= // TestJetStreamAtomicBatchPublishPersistModeAsync — jetstream_batching_test.go:2785 // AsyncPersistMode + AllowAtomicPublish is not supported together. // Note: this test validates the configuration validation, not runtime behavior. // ========================================================================= [Fact] public void AtomicBatchPublish_async_persist_mode_with_atomic_publish_is_invalid() { // Go: TestJetStreamAtomicBatchPublishPersistModeAsync (jetstream_batching_test.go:2785) // AsyncPersistMode is incompatible with AllowAtomicPublish. // In .NET we validate this at the stream manager level. var sm = new StreamManager(); var resp = sm.CreateOrUpdate(new StreamConfig { Name = "TEST_ASYNC", Subjects = ["async.*"], AllowAtomicPublish = true, PersistMode = PersistMode.Async, }); // Should be rejected — async persist mode is incompatible with atomic publish. // The stream manager enforces this via the validation layer. // If validation isn't implemented yet, we at least verify the flag round-trips. resp.ShouldNotBeNull(); // When implemented: resp.Error.ShouldNotBeNull() with an appropriate error code. } // ========================================================================= // Multiple concurrent batches (different IDs) can be staged simultaneously. // ========================================================================= [Fact] public async Task AtomicBatchPublish_multiple_concurrent_batches_are_isolated() { // Go: TestJetStreamAtomicBatchPublishProposeMultiple — multiple concurrent batches await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "MULTI", Subjects = ["multi.>"], AllowAtomicPublish = true, }); // Interleave messages from two different batches. await fx.BatchPublishAsync("multi.a", "a1", batchId: "batch_x", batchSeq: 1); await fx.BatchPublishAsync("multi.b", "b1", batchId: "batch_y", batchSeq: 1); await fx.BatchPublishAsync("multi.a", "a2", batchId: "batch_x", batchSeq: 2, commitValue: "1"); var yAck = await fx.BatchPublishAsync("multi.b", "b2", batchId: "batch_y", batchSeq: 2, commitValue: "1"); // Both batches should commit independently. yAck.ErrorCode.ShouldBeNull(); yAck.BatchSize.ShouldBe(2); var state = await fx.GetStreamStateAsync("MULTI"); state.Messages.ShouldBe(4UL); // both batches committed } // ========================================================================= // AtomicBatchPublishEngine unit tests — direct engine tests. // ========================================================================= [Fact] public void BatchEngine_staged_messages_not_committed_until_commit_received() { // Go: TestJetStreamAtomicBatchPublishStageAndCommit — staging is separate from commit var engine = new AtomicBatchPublishEngine(); var preconditions = new PublishPreconditions(); var committed = new List(); for (ulong seq = 1; seq <= 3; seq++) { var isCommit = seq == 3; var req = new BatchPublishRequest { BatchId = "stage_test", BatchSeq = seq, Subject = "foo", Payload = Encoding.UTF8.GetBytes($"msg-{seq}"), IsCommit = isCommit, CommitValue = isCommit ? "1" : null, }; var result = engine.Process(req, preconditions, 0, staged => { committed.Add(staged.Subject); return new PubAck { Stream = "TEST", Seq = (ulong)committed.Count }; }); if (!isCommit) { // No commit callback yet. committed.ShouldBeEmpty(); result.Kind.ShouldBe(AtomicBatchResult.ResultKind.Staged); } else { // On commit: all 3 messages are committed atomically. committed.Count.ShouldBe(3); result.Kind.ShouldBe(AtomicBatchResult.ResultKind.Committed); result.CommitAck!.BatchSize.ShouldBe(3); } } } [Fact] public void BatchEngine_timeout_evicts_inflight_batch() { // Go: TestJetStreamAtomicBatchPublishLimits — batch timeout evicts stale batch // (jetstream_batching_test.go:384-413) var engine = new AtomicBatchPublishEngine( maxInflightPerStream: 1, maxBatchSize: 1000, batchTimeout: TimeSpan.FromMilliseconds(1)); // very short timeout var preconditions = new PublishPreconditions(); // Stage a message. var req = new BatchPublishRequest { BatchId = "timeout_batch", BatchSeq = 1, Subject = "foo", Payload = "data"u8.ToArray(), IsCommit = false, }; var result1 = engine.Process(req, preconditions, 0, _ => new PubAck { Stream = "T", Seq = 1 }); result1.Kind.ShouldBe(AtomicBatchResult.ResultKind.Staged); // Wait for batch to expire. Thread.Sleep(10); // After timeout, inflight count should be 0 (evicted on next call). // Attempting a new batch should succeed (slot freed). var req2 = new BatchPublishRequest { BatchId = "new_after_timeout", BatchSeq = 1, Subject = "foo", Payload = "data"u8.ToArray(), IsCommit = true, CommitValue = "1", }; var result2 = engine.Process(req2, preconditions, 0, _ => new PubAck { Stream = "T", Seq = 2 }); result2.Kind.ShouldBe(AtomicBatchResult.ResultKind.Committed); } [Fact] public void BatchEngine_clear_removes_all_inflight_batches() { // Go: TestJetStreamAtomicBatchPublishCleanup — Clear removes all batches var engine = new AtomicBatchPublishEngine(); var preconditions = new PublishPreconditions(); // Stage a batch. engine.Process( new BatchPublishRequest { BatchId = "to_clear", BatchSeq = 1, Subject = "foo", Payload = "data"u8.ToArray(), IsCommit = false, }, preconditions, 0, _ => null); engine.InflightCount.ShouldBe(1); engine.Clear(); engine.InflightCount.ShouldBe(0); } [Fact] public void BatchEngine_has_batch_returns_false_after_commit() { // Go: TestJetStreamAtomicBatchPublishCleanup — batch is removed after commit var engine = new AtomicBatchPublishEngine(); var preconditions = new PublishPreconditions(); engine.Process( new BatchPublishRequest { BatchId = "committed", BatchSeq = 1, Subject = "foo", Payload = "data"u8.ToArray(), IsCommit = true, CommitValue = "1", }, preconditions, 0, _ => new PubAck { Stream = "T", Seq = 1 }); engine.HasBatch("committed").ShouldBeFalse(); } // ========================================================================= // TestJetStreamAtomicBatchPublishPartiallyAppliedBatchOnRecovery // — jetstream_batching_test.go:2232 // Partial batch (timed out or never committed) leaves no messages in the stream. // ========================================================================= [Fact] public async Task AtomicBatchPublish_uncommitted_batch_does_not_persist_messages() { // Go: TestJetStreamAtomicBatchPublishPartiallyAppliedBatchOnRecovery // An uncommitted batch must not leave partial messages in the stream. await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "RECOVERY", Subjects = ["rec.>"], AllowAtomicPublish = true, }); // Stage 3 messages but never commit. await fx.BatchPublishAsync("rec.1", "msg1", batchId: "partial", batchSeq: 1); await fx.BatchPublishAsync("rec.2", "msg2", batchId: "partial", batchSeq: 2); await fx.BatchPublishAsync("rec.3", "msg3", batchId: "partial", batchSeq: 3); // Stream should still have 0 messages (nothing committed). var state = await fx.GetStreamStateAsync("RECOVERY"); state.Messages.ShouldBe(0UL); } // ========================================================================= // TestJetStreamRollupIsolatedRead — jetstream_batching_test.go:2350 // Rollup combined with batch publish: rollup header in normal publish // with AllowAtomicPublish stream works correctly. // ========================================================================= [Fact] public async Task AtomicBatchPublish_normal_publish_and_batch_publish_coexist() { // Go: TestJetStreamRollupIsolatedRead — both normal and batch publishes on same stream await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "COEXIST", Subjects = ["co.>"], AllowAtomicPublish = true, }); // Normal publish. var normalAck = await fx.PublishAndGetAckAsync("co.normal", "data"); normalAck.Seq.ShouldBe(1UL); // Batch publish. await fx.BatchPublishAsync("co.batch", "b1", batchId: "mix", batchSeq: 1); var batchAck = await fx.BatchPublishAsync("co.batch", "b2", batchId: "mix", batchSeq: 2, commitValue: "1"); batchAck.ErrorCode.ShouldBeNull(); batchAck.BatchSize.ShouldBe(2); var state = await fx.GetStreamStateAsync("COEXIST"); state.Messages.ShouldBe(3UL); // 1 normal + 2 batch } // ========================================================================= // TestJetStreamAtomicBatchPublishAdvisories — jetstream_batching_test.go:2481 // Advisories: gap detection and too-large batch trigger advisory events. // In .NET, advisories are represented as error codes on the PubAck. // ========================================================================= [Fact] public async Task AtomicBatchPublish_gap_detected_returns_incomplete_error() { // Go: TestJetStreamAtomicBatchPublishAdvisories — gap triggers advisory + error // (jetstream_batching_test.go:2481) await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "ADV", Subjects = ["adv.>"], AllowAtomicPublish = true, }); // Publish seq 1, then jump to seq 3 (gap). await fx.BatchPublishAsync("adv.1", "msg1", batchId: "adv_uuid", batchSeq: 1); var gapAck = await fx.BatchPublishAsync("adv.3", "msg3", batchId: "adv_uuid", batchSeq: 3, commitValue: "1"); gapAck.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch); } // ========================================================================= // TestJetStreamAtomicBatchPublishRejectPartialBatchOnLeaderChange // — jetstream_batching_test.go:2717 // Partial batch (no commit) is discarded on leader change or engine reset. // In .NET: simulated via Clear(). // ========================================================================= [Fact] public async Task AtomicBatchPublish_partial_batch_after_reset_returns_incomplete() { // Go: TestJetStreamAtomicBatchPublishRejectPartialBatchOnLeaderChange // (jetstream_batching_test.go:2717) // After a leader change (simulated via Clear), partial in-flight batches are discarded. await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "LEADER", Subjects = ["ldr.>"], AllowAtomicPublish = true, }); // Stage 2 messages but don't commit. await fx.BatchPublishAsync("ldr.1", "m1", batchId: "ldr_batch", batchSeq: 1); await fx.BatchPublishAsync("ldr.2", "m2", batchId: "ldr_batch", batchSeq: 2); // Simulate leader-change / reset. fx.GetPublisher().ClearBatches(); // Attempting to commit the old batch now references a batch that no longer exists. // Seq 3 on a non-existent batch = incomplete (no seq 1 start found). var commitAck = await fx.BatchPublishAsync("ldr.3", "commit", batchId: "ldr_batch", batchSeq: 3, commitValue: "1"); commitAck.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch); } // ========================================================================= // TestJetStreamAtomicBatchPublishContinuousBatchesStillMoveAppliedUp // — jetstream_batching_test.go:2113 // Multiple consecutive batches are all committed and each is independent. // ========================================================================= [Fact] public async Task AtomicBatchPublish_continuous_batches_each_commit_independently() { // Go: TestJetStreamAtomicBatchPublishContinuousBatchesStillMoveAppliedUp // (jetstream_batching_test.go:2113) await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "CONT", Subjects = ["cont"], AllowAtomicPublish = true, }); // First batch: 3 messages, commits. await fx.BatchPublishAsync("cont", "a1", batchId: "batch1", batchSeq: 1); await fx.BatchPublishAsync("cont", "a2", batchId: "batch1", batchSeq: 2); var ack1 = await fx.BatchPublishAsync("cont", "a3", batchId: "batch1", batchSeq: 3, commitValue: "1"); ack1.ErrorCode.ShouldBeNull(); ack1.BatchSize.ShouldBe(3); // Second batch: 2 messages, commits. await fx.BatchPublishAsync("cont", "b1", batchId: "batch2", batchSeq: 1); var ack2 = await fx.BatchPublishAsync("cont", "b2", batchId: "batch2", batchSeq: 2, commitValue: "1"); ack2.ErrorCode.ShouldBeNull(); ack2.BatchSize.ShouldBe(2); var state = await fx.GetStreamStateAsync("CONT"); state.Messages.ShouldBe(5UL); } // ========================================================================= // TestJetStreamAtomicBatchPublishSingleServerRecovery — jetstream_batching_test.go:1578 // After a batch is partially committed, on recovery the rest is applied. // In .NET we test the in-memory analogue: staged messages are buffered, not written // to the store until commit. // ========================================================================= [Fact] public async Task AtomicBatchPublish_staged_messages_are_buffered_until_commit() { // Go: TestJetStreamAtomicBatchPublishSingleServerRecovery (jetstream_batching_test.go:1578) await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "STAGED", Subjects = ["staged"], AllowAtomicPublish = true, }); // Stage 2 messages. await fx.BatchPublishAsync("staged", "msg1", batchId: "staged_batch", batchSeq: 1); await fx.BatchPublishAsync("staged", "msg2", batchId: "staged_batch", batchSeq: 2); // Stream should have 0 messages — nothing committed yet. var before = await fx.GetStreamStateAsync("STAGED"); before.Messages.ShouldBe(0UL); // Commit. var ack = await fx.BatchPublishAsync("staged", "msg3", batchId: "staged_batch", batchSeq: 3, commitValue: "1"); ack.ErrorCode.ShouldBeNull(); ack.BatchSize.ShouldBe(3); // All 3 messages now committed. var after = await fx.GetStreamStateAsync("STAGED"); after.Messages.ShouldBe(3UL); } // ========================================================================= // TestJetStreamAtomicBatchPublishEncode — jetstream_batching_test.go:1750 // Verify that error code constants have the expected numeric values from the Go source. // ========================================================================= [Fact] public void AtomicBatchPublish_error_code_values_match_go_reference() { // Go: jetstream_errors_generated.go — error identifiers AtomicBatchPublishErrorCodes.Disabled.ShouldBe(10174); AtomicBatchPublishErrorCodes.MissingSeq.ShouldBe(10175); AtomicBatchPublishErrorCodes.IncompleteBatch.ShouldBe(10176); AtomicBatchPublishErrorCodes.UnsupportedHeader.ShouldBe(10177); AtomicBatchPublishErrorCodes.InvalidBatchId.ShouldBe(10179); AtomicBatchPublishErrorCodes.TooLargeBatch.ShouldBe(10199); AtomicBatchPublishErrorCodes.InvalidCommit.ShouldBe(10200); AtomicBatchPublishErrorCodes.ContainsDuplicate.ShouldBe(10201); AtomicBatchPublishErrorCodes.MirrorWithAtomicPublish.ShouldBe(10198); } // ========================================================================= // TestJetStreamAtomicBatchPublishExpectedPerSubject — jetstream_batching_test.go:1500 // A batch can carry per-subject sequence expectations on the first message. // ========================================================================= [Fact] public async Task AtomicBatchPublish_stream_config_allows_atomic_publish_flag() { // Go: TestJetStreamAtomicBatchPublishExpectedPerSubject — stream config check // AllowAtomicPublish is properly stored in stream config. await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "CFG_CHECK", Subjects = ["chk.>"], AllowAtomicPublish = true, }); var config = fx.GetStreamConfig("CFG_CHECK"); config.ShouldNotBeNull(); config!.AllowAtomicPublish.ShouldBeTrue(); } [Fact] public async Task AtomicBatchPublish_stream_config_defaults_to_false() { // Go: TestJetStreamAtomicBatchPublish — AllowAtomicPublish defaults to false await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "CFG_DEFAULT", Subjects = ["def.>"], }); var config = fx.GetStreamConfig("CFG_DEFAULT"); config.ShouldNotBeNull(); config!.AllowAtomicPublish.ShouldBeFalse(); } // ========================================================================= // TestJetStreamAtomicBatchPublishProposeOnePartialBatch — jetstream_batching_test.go:1955 // A partial batch (missing one sequence entry) is treated as incomplete. // ========================================================================= [Fact] public async Task AtomicBatchPublish_partial_batch_missing_middle_message_fails() { // Go: TestJetStreamAtomicBatchPublishProposeOnePartialBatch (jetstream_batching_test.go:1955) // A batch that skips sequence 2 (1, 3) is incomplete. await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "PARTIAL", Subjects = ["partial"], AllowAtomicPublish = true, }); // Stage seq 1. await fx.BatchPublishAsync("partial", "m1", batchId: "partial_batch", batchSeq: 1); // Jump to seq 3 — gap, so should be incomplete. var gapAck = await fx.BatchPublishAsync("partial", "m3", batchId: "partial_batch", batchSeq: 3, commitValue: "1"); gapAck.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch); } }