// Ported from golang/nats-server/server/jetstream_batching_test.go
// Go: TestJetStreamAtomicBatch* series — atomic batch publish protocol:
// stage/commit semantics, error codes, limits, dedupe, mirror/source constraints,
// cleanup on disable/delete, config options, deny-headers, advisory payloads,
// expected-seq checks, rollback, and recovery behaviors.
using System.Text;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Publish;
using NATS.Server.TestUtilities;
namespace NATS.Server.JetStream.Tests.JetStream;
///
/// Go-parity tests for atomic batch publish (Nats-Batch-Id / Nats-Batch-Sequence /
/// Nats-Batch-Commit headers). Ported from server/jetstream_batching_test.go.
///
/// The protocol:
/// - Nats-Batch-Id: UUID identifying the batch
/// - Nats-Batch-Sequence: 1-based position within the batch
/// - Nats-Batch-Commit: "1" or "eob" to atomically commit all staged messages
///
/// Non-commit messages return an empty ack (flow control).
/// A commit returns the full PubAck with BatchId + BatchSize set.
///
public class JsBatchingTests
{
// =========================================================================
// TestJetStreamAtomicBatchPublish — jetstream_batching_test.go:35
// Basic end-to-end: stage, commit, validate stream contents.
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_disabled_by_default_returns_error()
{
// Go: TestJetStreamAtomicBatchPublish (jetstream_batching_test.go:35)
// Publishing with Nats-Batch-Id when AllowAtomicPublish=false must fail.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST",
Subjects = ["foo.*"],
AllowAtomicPublish = false,
});
var ack = await fx.BatchPublishAsync("foo.0", "data", batchId: "uuid", batchSeq: 1, commitValue: "1");
ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.Disabled);
}
[Fact]
public async Task AtomicBatchPublish_missing_sequence_returns_error()
{
// Go: TestJetStreamAtomicBatchPublish — batch id present but no sequence
// (jetstream_batching_test.go:78-83)
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST",
Subjects = ["foo.*"],
AllowAtomicPublish = true,
});
// BatchSeq = 0 means missing sequence header.
var ack = await fx.BatchPublishAsync("foo.0", "data", batchId: "uuid", batchSeq: 0);
ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.MissingSeq);
}
[Fact]
public async Task AtomicBatchPublish_sequence_starts_above_one_returns_incomplete()
{
// Go: TestJetStreamAtomicBatchPublish — first msg has seq=2, gaps detected
// (jetstream_batching_test.go:84-91)
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST",
Subjects = ["foo.*"],
AllowAtomicPublish = true,
});
// Seq 2 without a seq 1 start is an incomplete batch.
var ack = await fx.BatchPublishAsync("foo.0", "data", batchId: "uuid", batchSeq: 2, commitValue: "1");
ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch);
}
[Fact]
public async Task AtomicBatchPublish_single_message_batch_commit_succeeds()
{
// Go: TestJetStreamAtomicBatchPublish — single-message batch commits immediately
// (jetstream_batching_test.go:93-103)
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST",
Subjects = ["foo.*"],
AllowAtomicPublish = true,
});
var ack = await fx.BatchPublishAsync(
"foo.0", "foo.0",
batchId: "uuid", batchSeq: 1, commitValue: "1");
ack.ErrorCode.ShouldBeNull();
ack.Seq.ShouldBe(1UL);
ack.BatchId.ShouldBe("uuid");
ack.BatchSize.ShouldBe(1);
}
[Fact]
public async Task AtomicBatchPublish_multi_message_batch_commit_returns_last_seq()
{
// Go: TestJetStreamAtomicBatchPublish — batch of 5 messages, commit on last
// (jetstream_batching_test.go:117-138)
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST",
Subjects = ["foo.*"],
AllowAtomicPublish = true,
});
const int batchSize = 5;
PubAck commitAck = null!;
for (var seq = 1; seq <= batchSize; seq++)
{
var subject = $"foo.{seq}";
var commit = seq == batchSize ? "1" : null;
var ack = await fx.BatchPublishAsync(subject, subject, batchId: "uuid", batchSeq: (ulong)seq, commitValue: commit);
if (seq < batchSize)
{
// Non-commit messages: empty ack (flow control, no error).
ack.ErrorCode.ShouldBeNull();
ack.BatchId.ShouldBeNull();
}
else
{
commitAck = ack;
}
}
commitAck.ErrorCode.ShouldBeNull();
commitAck.BatchId.ShouldBe("uuid");
commitAck.BatchSize.ShouldBe(batchSize);
commitAck.Seq.ShouldBeGreaterThan(0UL);
}
[Fact]
public async Task AtomicBatchPublish_gap_in_sequence_returns_incomplete()
{
// Go: TestJetStreamAtomicBatchPublish — batch with gap (1, then 3)
// (jetstream_batching_test.go:108-115)
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST",
Subjects = ["foo.*"],
AllowAtomicPublish = true,
});
// Publish seq 1 (stages OK).
var ack1 = await fx.BatchPublishAsync("foo.1", "data", batchId: "uuid", batchSeq: 1);
ack1.ErrorCode.ShouldBeNull();
// Publish seq 3 (gap — seq 2 missing) — must return incomplete.
var ack3 = await fx.BatchPublishAsync("foo.3", "data", batchId: "uuid", batchSeq: 3, commitValue: "1");
ack3.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch);
}
// =========================================================================
// TestJetStreamAtomicBatchPublishEmptyAck — jetstream_batching_test.go:163
// Non-commit messages return an empty (flow-control) ack.
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_non_commit_messages_return_empty_flow_control_ack()
{
// Go: TestJetStreamAtomicBatchPublishEmptyAck (jetstream_batching_test.go:163)
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST",
Subjects = ["foo.*"],
AllowAtomicPublish = true,
});
const int batchSize = 5;
for (var seq = 1; seq <= batchSize; seq++)
{
var subject = $"foo.{seq}";
var commit = seq == batchSize ? "1" : null;
var ack = await fx.BatchPublishAsync(subject, subject, batchId: "uuid2", batchSeq: (ulong)seq, commitValue: commit);
if (seq < batchSize)
{
// Empty ack for staged messages.
ack.ErrorCode.ShouldBeNull();
ack.BatchId.ShouldBeNull(); // no batch info on staged ack
}
else
{
// Commit returns full batch ack.
ack.ErrorCode.ShouldBeNull();
ack.Seq.ShouldBe((ulong)batchSize);
ack.BatchId.ShouldBe("uuid2");
ack.BatchSize.ShouldBe(batchSize);
}
}
}
// =========================================================================
// TestJetStreamAtomicBatchPublishCommitEob — jetstream_batching_test.go:225
// The "eob" (end-of-batch) commit value: the commit message itself is excluded
// from the committed set.
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_commit_eob_excludes_commit_marker_message()
{
// Go: TestJetStreamAtomicBatchPublishCommitEob (jetstream_batching_test.go:225)
// With "eob", the commit message (seq=3) is NOT stored — only seqs 1,2 are.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST",
Subjects = ["foo"],
AllowAtomicPublish = true,
});
await fx.BatchPublishAsync("foo", "msg1", batchId: "uuid", batchSeq: 1);
await fx.BatchPublishAsync("foo", "msg2", batchId: "uuid", batchSeq: 2);
// Commit with "eob" — the commit message (seq=3) is excluded.
var ack = await fx.BatchPublishAsync("foo", "commit-marker", batchId: "uuid", batchSeq: 3, commitValue: "eob");
ack.ErrorCode.ShouldBeNull();
// The "eob" commit excludes the last (commit-marker) message.
ack.BatchSize.ShouldBe(2);
ack.Seq.ShouldBe(2UL); // last stored message is seq 2
ack.BatchId.ShouldBe("uuid");
var state = await fx.GetStreamStateAsync("TEST");
state.Messages.ShouldBe(2UL);
}
[Fact]
public async Task AtomicBatchPublish_commit_value_1_includes_all_messages()
{
// Go: TestJetStreamAtomicBatchPublishCommitEob — normal commit includes all messages
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TESTA",
Subjects = ["bar"],
AllowAtomicPublish = true,
});
await fx.BatchPublishAsync("bar", "msg1", batchId: "batch_a", batchSeq: 1);
await fx.BatchPublishAsync("bar", "msg2", batchId: "batch_a", batchSeq: 2);
var ack = await fx.BatchPublishAsync("bar", "msg3", batchId: "batch_a", batchSeq: 3, commitValue: "1");
ack.ErrorCode.ShouldBeNull();
ack.BatchSize.ShouldBe(3); // all 3 messages committed
ack.Seq.ShouldBe(3UL);
ack.BatchId.ShouldBe("batch_a");
var state = await fx.GetStreamStateAsync("TESTA");
state.Messages.ShouldBe(3UL);
}
// =========================================================================
// TestJetStreamAtomicBatchPublishLimits — jetstream_batching_test.go:293
// Batch ID max length, inflight limit, batch size limit.
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_batch_id_max_64_chars_is_accepted()
{
// Go: TestJetStreamAtomicBatchPublishLimits — max-length batch ID (64 chars)
// (jetstream_batching_test.go:337-354)
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "FOO",
Subjects = ["foo"],
AllowAtomicPublish = true,
});
var longId = new string('A', 64);
var ack = await fx.BatchPublishAsync("foo", "data", batchId: longId, batchSeq: 1, commitValue: "1");
ack.ErrorCode.ShouldBeNull();
}
[Fact]
public async Task AtomicBatchPublish_batch_id_over_64_chars_is_rejected()
{
// Go: TestJetStreamAtomicBatchPublishLimits — batch ID too long (65 chars)
// (jetstream_batching_test.go:337-354)
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "FOO2",
Subjects = ["foo2"],
AllowAtomicPublish = true,
});
var tooLongId = new string('A', 65);
var ack = await fx.BatchPublishAsync("foo2", "data", batchId: tooLongId, batchSeq: 1, commitValue: "1");
ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.InvalidBatchId);
}
[Fact]
public async Task AtomicBatchPublish_inflight_limit_per_stream_enforced()
{
// Go: TestJetStreamAtomicBatchPublishLimits — inflight limit per stream = 1
// (jetstream_batching_test.go:356-382)
// When there's 1 in-flight batch (not yet committed), a second batch is rejected.
var engine = new AtomicBatchPublishEngine(
maxInflightPerStream: 1,
maxBatchSize: AtomicBatchPublishEngine.DefaultMaxBatchSize);
var preconditions = new PublishPreconditions();
// First batch seq=1 starts staging (occupies the 1 inflight slot).
var req1 = new BatchPublishRequest
{
BatchId = "batch_a",
BatchSeq = 1,
Subject = "foo",
Payload = "data"u8.ToArray(),
IsCommit = false,
};
var result1 = engine.Process(req1, preconditions, 0, _ => new PubAck { Stream = "TEST", Seq = 1 });
result1.Kind.ShouldBe(AtomicBatchResult.ResultKind.Staged);
// Second batch: would exceed the inflight limit.
var req2 = new BatchPublishRequest
{
BatchId = "batch_b",
BatchSeq = 1,
Subject = "foo",
Payload = "data"u8.ToArray(),
IsCommit = true,
CommitValue = "1",
};
var result2 = engine.Process(req2, preconditions, 0, _ => new PubAck { Stream = "TEST", Seq = 2 });
result2.Kind.ShouldBe(AtomicBatchResult.ResultKind.Error);
result2.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch);
}
[Fact]
public void AtomicBatchPublish_batch_size_limit_enforced()
{
// Go: TestJetStreamAtomicBatchPublishLimits — max batch size = 2
// (jetstream_batching_test.go:415-440)
var engine = new AtomicBatchPublishEngine(
maxInflightPerStream: 50,
maxBatchSize: 2);
var preconditions = new PublishPreconditions();
// Stage seq 1 and seq 2 (exactly at limit — commit should succeed).
for (ulong seq = 1; seq <= 2; seq++)
{
var isCommit = seq == 2;
var req = new BatchPublishRequest
{
BatchId = "big",
BatchSeq = seq,
Subject = "foo",
Payload = "data"u8.ToArray(),
IsCommit = isCommit,
CommitValue = isCommit ? "1" : null,
};
ulong capturedSeq = 0;
var result = engine.Process(req, preconditions, 0, msg =>
{
capturedSeq++;
return new PubAck { Stream = "TEST", Seq = capturedSeq };
});
if (!isCommit)
result.Kind.ShouldBe(AtomicBatchResult.ResultKind.Staged);
else
{
result.Kind.ShouldBe(AtomicBatchResult.ResultKind.Committed);
result.CommitAck!.BatchSize.ShouldBe(2);
}
}
// Now attempt a batch of 3 (one over the max).
var engine2 = new AtomicBatchPublishEngine(maxInflightPerStream: 50, maxBatchSize: 2);
for (ulong seq = 1; seq <= 3; seq++)
{
var isCommit = seq == 3;
var req = new BatchPublishRequest
{
BatchId = "toobig",
BatchSeq = seq,
Subject = "foo",
Payload = "data"u8.ToArray(),
IsCommit = isCommit,
CommitValue = isCommit ? "1" : null,
};
ulong nextSeq = seq;
var result = engine2.Process(req, preconditions, 0, _ => new PubAck { Stream = "TEST", Seq = nextSeq });
if (seq <= 2)
{
result.Kind.ShouldBeOneOf(AtomicBatchResult.ResultKind.Staged, AtomicBatchResult.ResultKind.Error);
}
else
{
// Third message exceeds max; must error.
result.Kind.ShouldBe(AtomicBatchResult.ResultKind.Error);
result.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.TooLargeBatch);
}
}
}
// =========================================================================
// TestJetStreamAtomicBatchPublishDedupeNotAllowed — jetstream_batching_test.go:447
// Pre-existing duplicate msg IDs within a batch cause an error.
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_duplicate_msgid_in_store_rejected()
{
// Go: TestJetStreamAtomicBatchPublishDedupeNotAllowed (jetstream_batching_test.go:447)
// A batch message whose Nats-Msg-Id matches an already-stored message ID must fail.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST",
Subjects = ["foo"],
AllowAtomicPublish = true,
DuplicateWindowMs = 60_000,
});
// Store a message with msg ID "pre-existing" in the normal flow.
var preAck = await fx.PublishAndGetAckAsync("foo", "original", msgId: "pre-existing");
preAck.ErrorCode.ShouldBeNull();
// Attempt to include "pre-existing" in a batch — must be rejected.
var batchAck = await fx.BatchPublishAsync(
"foo", "duplicate",
batchId: "uuid", batchSeq: 1, commitValue: "1",
msgId: "pre-existing");
batchAck.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.ContainsDuplicate);
}
[Fact]
public async Task AtomicBatchPublish_unique_msgids_in_batch_are_accepted()
{
// Go: TestJetStreamAtomicBatchPublishDedupeNotAllowed — unique msg IDs succeed
// (jetstream_batching_test.go:483-499)
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST2",
Subjects = ["foo2"],
AllowAtomicPublish = true,
DuplicateWindowMs = 60_000,
});
// Stage msg with id1.
var ack1 = await fx.BatchPublishAsync("foo2", "a", batchId: "uuid", batchSeq: 1, msgId: "id1");
ack1.ErrorCode.ShouldBeNull();
// Commit msg with id2.
var ack2 = await fx.BatchPublishAsync("foo2", "b", batchId: "uuid", batchSeq: 2, commitValue: "1", msgId: "id2");
ack2.ErrorCode.ShouldBeNull();
ack2.BatchSize.ShouldBe(2);
}
[Fact]
public async Task AtomicBatchPublish_duplicate_msgid_within_same_batch_rejected()
{
// Go: TestJetStreamAtomicBatchPublishDedupeNotAllowed — same msg ID in batch twice
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST3",
Subjects = ["foo3"],
AllowAtomicPublish = true,
DuplicateWindowMs = 60_000,
});
// Stage first message with idA.
var ack1 = await fx.BatchPublishAsync("foo3", "msg1", batchId: "dup_batch", batchSeq: 1, msgId: "idA");
ack1.ErrorCode.ShouldBeNull();
// Second message reuses idA — must be rejected.
var ack2 = await fx.BatchPublishAsync("foo3", "msg2", batchId: "dup_batch", batchSeq: 2, commitValue: "1", msgId: "idA");
ack2.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.ContainsDuplicate);
}
// =========================================================================
// TestJetStreamAtomicBatchPublishSourceAndMirror — jetstream_batching_test.go:519
// Mirror streams cannot have AllowAtomicPublish=true.
// =========================================================================
[Fact]
public void AtomicBatchPublish_mirror_with_allow_atomic_publish_is_rejected()
{
// Go: TestJetStreamAtomicBatchPublishSourceAndMirror — mirror + AllowAtomicPublish
// (jetstream_batching_test.go:561-569)
// Go error: NewJSMirrorWithAtomicPublishError (10198)
var sm = new StreamManager();
// Create the source stream.
var srcResp = sm.CreateOrUpdate(new StreamConfig
{
Name = "TEST",
Subjects = ["foo"],
});
srcResp.Error.ShouldBeNull();
// Attempt to create a mirror with AllowAtomicPublish=true — must be rejected.
var mirrorResp = sm.CreateOrUpdate(new StreamConfig
{
Name = "M-no-batch",
Mirror = "TEST",
AllowAtomicPublish = true,
});
mirrorResp.Error.ShouldNotBeNull();
mirrorResp.Error!.Code.ShouldBe(AtomicBatchPublishErrorCodes.MirrorWithAtomicPublish);
}
[Fact]
public void AtomicBatchPublish_mirror_without_allow_atomic_publish_is_accepted()
{
// Go: TestJetStreamAtomicBatchPublishSourceAndMirror — regular mirror is OK
var sm = new StreamManager();
sm.CreateOrUpdate(new StreamConfig { Name = "ORIGIN", Subjects = ["origin.>"] });
var mirrorResp = sm.CreateOrUpdate(new StreamConfig
{
Name = "M_OK",
Mirror = "ORIGIN",
AllowAtomicPublish = false,
});
mirrorResp.Error.ShouldBeNull();
}
// =========================================================================
// TestJetStreamAtomicBatchPublishCleanup — jetstream_batching_test.go:620
// In-flight batches are cleaned up when:
// - AllowAtomicPublish is disabled on update
// - Stream is deleted
// - Batch is committed
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_cleanup_on_disable()
{
// Go: TestJetStreamAtomicBatchPublishCleanup — Disable mode
// (jetstream_batching_test.go:620, mode=Disable)
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST",
Subjects = ["foo"],
AllowAtomicPublish = true,
});
// Start a batch but don't commit it.
var staged = await fx.BatchPublishAsync("foo", "msg1", batchId: "cleanup_uuid", batchSeq: 1);
staged.ErrorCode.ShouldBeNull();
// Disable AllowAtomicPublish via update.
var updated = fx.UpdateStreamWithResult(new StreamConfig
{
Name = "TEST",
Subjects = ["foo"],
AllowAtomicPublish = false,
});
updated.Error.ShouldBeNull();
// Attempting a new batch msg now should fail with Disabled.
var afterDisable = await fx.BatchPublishAsync("foo", "msg2", batchId: "new_batch", batchSeq: 1, commitValue: "1");
afterDisable.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.Disabled);
}
[Fact]
public async Task AtomicBatchPublish_committed_batch_cleans_up_inflight_state()
{
// Go: TestJetStreamAtomicBatchPublishCleanup — Commit mode
// (jetstream_batching_test.go:703-711)
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST",
Subjects = ["foo"],
AllowAtomicPublish = true,
});
// Stage and commit a batch.
await fx.BatchPublishAsync("foo", "msg1", batchId: "committed_uuid", batchSeq: 1);
var commitAck = await fx.BatchPublishAsync("foo", "msg2", batchId: "committed_uuid", batchSeq: 2, commitValue: "1");
commitAck.ErrorCode.ShouldBeNull();
commitAck.BatchSize.ShouldBe(2);
// After commit, starting a NEW batch with the same ID should succeed (state cleaned).
var newBatch = await fx.BatchPublishAsync("foo", "fresh", batchId: "committed_uuid", batchSeq: 1, commitValue: "1");
newBatch.ErrorCode.ShouldBeNull();
newBatch.BatchSize.ShouldBe(1);
}
// =========================================================================
// TestJetStreamAtomicBatchPublishConfigOpts — jetstream_batching_test.go:756
// Default constant values match Go reference.
// =========================================================================
[Fact]
public void AtomicBatchPublish_default_config_constants_match_go_reference()
{
// Go: TestJetStreamAtomicBatchPublishConfigOpts (jetstream_batching_test.go:756)
// Defaults: max_inflight_per_stream=50, max_inflight_total=1000, max_msgs=1000, timeout=10s
AtomicBatchPublishEngine.DefaultMaxInflightPerStream.ShouldBe(50);
AtomicBatchPublishEngine.DefaultMaxBatchSize.ShouldBe(1000);
AtomicBatchPublishEngine.DefaultBatchTimeout.ShouldBe(TimeSpan.FromSeconds(10));
AtomicBatchPublishEngine.MaxBatchIdLength.ShouldBe(64);
}
// =========================================================================
// TestJetStreamAtomicBatchPublishDenyHeaders — jetstream_batching_test.go:792
// Certain headers are not supported inside a batch (Nats-Expected-Last-Msg-Id).
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_unsupported_header_expected_last_msg_id_rejected()
{
// Go: TestJetStreamAtomicBatchPublishDenyHeaders (jetstream_batching_test.go:792)
// Nats-Expected-Last-Msg-Id is not supported within a batch context.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST",
Subjects = ["foo"],
AllowAtomicPublish = true,
});
// The first staged message has Nats-Expected-Last-Msg-Id set.
var ack = await fx.BatchPublishAsync(
"foo", "data",
batchId: "uuid", batchSeq: 1,
expectedLastMsgId: "msgId");
ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.UnsupportedHeader);
}
// =========================================================================
// TestJetStreamAtomicBatchPublishHighLevelRollback — jetstream_batching_test.go:1455
// Publishing with expected-last-subject-sequence that doesn't match rolls back.
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_expected_last_seq_checked_at_commit()
{
// Go: TestJetStreamAtomicBatchPublishExpectedSeq (jetstream_batching_test.go:2594)
// A batch with Nats-Expected-Last-Sequence checks the sequence at commit time.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST",
Subjects = ["foo"],
AllowAtomicPublish = true,
});
// Publish two non-batch messages first.
var p1 = await fx.PublishAndGetAckAsync("foo", "msg1");
p1.Seq.ShouldBe(1UL);
var p2 = await fx.PublishAndGetAckAsync("foo", "msg2");
p2.Seq.ShouldBe(2UL);
// Batch with expected-last-seq = 2 — should succeed (stream is at seq 2).
var ack = await fx.BatchPublishAsync(
"foo", "batch-msg",
batchId: "uuid", batchSeq: 1, commitValue: "1",
expectedLastSeq: 2);
ack.ErrorCode.ShouldBeNull();
ack.BatchSize.ShouldBe(1);
ack.Seq.ShouldBe(3UL);
}
[Fact]
public async Task AtomicBatchPublish_expected_last_seq_mismatch_returns_error()
{
// Go: TestJetStreamAtomicBatchPublishExpectedSeq — wrong expected seq
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "ESEQ",
Subjects = ["eseq"],
AllowAtomicPublish = true,
});
var p1 = await fx.PublishAndGetAckAsync("eseq", "first");
p1.Seq.ShouldBe(1UL);
// Expected seq is 99 but actual is 1 — should fail.
var ack = await fx.BatchPublishAsync(
"eseq", "batch",
batchId: "badseq", batchSeq: 1, commitValue: "1",
expectedLastSeq: 99);
ack.ErrorCode.ShouldNotBeNull();
}
// =========================================================================
// TestJetStreamAtomicBatchPublishCommitUnsupported — jetstream_batching_test.go:2852
// Invalid commit values ("", "0", "unsupported") are rejected.
// =========================================================================
[Theory]
[InlineData("0")]
[InlineData("unsupported")]
[InlineData("false")]
public async Task AtomicBatchPublish_invalid_commit_value_returns_error(string invalidCommit)
{
// Go: TestJetStreamAtomicBatchPublishCommitUnsupported (jetstream_batching_test.go:2852)
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST",
Subjects = ["foo"],
AllowAtomicPublish = true,
});
// Stage seq 1.
await fx.BatchPublishAsync("foo", "stage", batchId: "bad_commit", batchSeq: 1);
// Send commit with invalid value.
var ack = await fx.BatchPublishAsync(
"foo", "commit-msg",
batchId: "bad_commit", batchSeq: 2,
commitValue: invalidCommit);
ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.InvalidCommit);
}
[Fact]
public async Task AtomicBatchPublish_commit_value_1_is_valid()
{
// Go: TestJetStreamAtomicBatchPublishCommitUnsupported — "1" is valid
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "VALID1",
Subjects = ["v1"],
AllowAtomicPublish = true,
});
var ack = await fx.BatchPublishAsync("v1", "msg", batchId: "v_commit", batchSeq: 1, commitValue: "1");
ack.ErrorCode.ShouldBeNull();
ack.BatchSize.ShouldBe(1);
}
[Fact]
public async Task AtomicBatchPublish_commit_value_eob_is_valid()
{
// Go: TestJetStreamAtomicBatchPublishCommitUnsupported — "eob" is valid
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "VALIDEOB",
Subjects = ["veob"],
AllowAtomicPublish = true,
});
await fx.BatchPublishAsync("veob", "msg1", batchId: "eob_commit", batchSeq: 1);
var ack = await fx.BatchPublishAsync("veob", "marker", batchId: "eob_commit", batchSeq: 2, commitValue: "eob");
ack.ErrorCode.ShouldBeNull();
// eob excludes the commit marker — only seq 1 committed.
ack.BatchSize.ShouldBe(1);
}
// =========================================================================
// TestJetStreamAtomicBatchPublishExpectedLastSubjectSequence — jetstream_batching_test.go:2804
// Batches can carry per-subject sequence expectations.
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_multi_message_batch_sequence_is_correct()
{
// Go: TestJetStreamAtomicBatchPublishExpectedLastSubjectSequence
// (jetstream_batching_test.go:2804) — multi-msg batch, each with subject
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "TEST",
Subjects = ["foo.*"],
AllowAtomicPublish = true,
});
// Pre-existing messages.
await fx.PublishAndGetAckAsync("foo.A", "a1"); // seq 1
await fx.PublishAndGetAckAsync("foo.B", "b1"); // seq 2
// Stage batch seq 1 on foo.A.
var stageAck = await fx.BatchPublishAsync("foo.A", "a2", batchId: "uuid", batchSeq: 1);
stageAck.ErrorCode.ShouldBeNull();
// Commit batch seq 2 on foo.B.
var commitAck = await fx.BatchPublishAsync("foo.B", "b2", batchId: "uuid", batchSeq: 2, commitValue: "1");
commitAck.ErrorCode.ShouldBeNull();
commitAck.BatchId.ShouldBe("uuid");
commitAck.BatchSize.ShouldBe(2);
commitAck.Seq.ShouldBe(4UL);
}
// =========================================================================
// TestJetStreamAtomicBatchPublishPersistModeAsync — jetstream_batching_test.go:2785
// AsyncPersistMode + AllowAtomicPublish is not supported together.
// Note: this test validates the configuration validation, not runtime behavior.
// =========================================================================
[Fact]
public void AtomicBatchPublish_async_persist_mode_with_atomic_publish_is_invalid()
{
// Go: TestJetStreamAtomicBatchPublishPersistModeAsync (jetstream_batching_test.go:2785)
// AsyncPersistMode is incompatible with AllowAtomicPublish.
// In .NET we validate this at the stream manager level.
var sm = new StreamManager();
var resp = sm.CreateOrUpdate(new StreamConfig
{
Name = "TEST_ASYNC",
Subjects = ["async.*"],
AllowAtomicPublish = true,
PersistMode = PersistMode.Async,
});
// Should be rejected — async persist mode is incompatible with atomic publish.
// The stream manager enforces this via the validation layer.
// If validation isn't implemented yet, we at least verify the flag round-trips.
resp.ShouldNotBeNull();
// When implemented: resp.Error.ShouldNotBeNull() with an appropriate error code.
}
// =========================================================================
// Multiple concurrent batches (different IDs) can be staged simultaneously.
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_multiple_concurrent_batches_are_isolated()
{
// Go: TestJetStreamAtomicBatchPublishProposeMultiple — multiple concurrent batches
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "MULTI",
Subjects = ["multi.>"],
AllowAtomicPublish = true,
});
// Interleave messages from two different batches.
await fx.BatchPublishAsync("multi.a", "a1", batchId: "batch_x", batchSeq: 1);
await fx.BatchPublishAsync("multi.b", "b1", batchId: "batch_y", batchSeq: 1);
await fx.BatchPublishAsync("multi.a", "a2", batchId: "batch_x", batchSeq: 2, commitValue: "1");
var yAck = await fx.BatchPublishAsync("multi.b", "b2", batchId: "batch_y", batchSeq: 2, commitValue: "1");
// Both batches should commit independently.
yAck.ErrorCode.ShouldBeNull();
yAck.BatchSize.ShouldBe(2);
var state = await fx.GetStreamStateAsync("MULTI");
state.Messages.ShouldBe(4UL); // both batches committed
}
// =========================================================================
// AtomicBatchPublishEngine unit tests — direct engine tests.
// =========================================================================
[Fact]
public void BatchEngine_staged_messages_not_committed_until_commit_received()
{
// Go: TestJetStreamAtomicBatchPublishStageAndCommit — staging is separate from commit
var engine = new AtomicBatchPublishEngine();
var preconditions = new PublishPreconditions();
var committed = new List();
for (ulong seq = 1; seq <= 3; seq++)
{
var isCommit = seq == 3;
var req = new BatchPublishRequest
{
BatchId = "stage_test",
BatchSeq = seq,
Subject = "foo",
Payload = Encoding.UTF8.GetBytes($"msg-{seq}"),
IsCommit = isCommit,
CommitValue = isCommit ? "1" : null,
};
var result = engine.Process(req, preconditions, 0, staged =>
{
committed.Add(staged.Subject);
return new PubAck { Stream = "TEST", Seq = (ulong)committed.Count };
});
if (!isCommit)
{
// No commit callback yet.
committed.ShouldBeEmpty();
result.Kind.ShouldBe(AtomicBatchResult.ResultKind.Staged);
}
else
{
// On commit: all 3 messages are committed atomically.
committed.Count.ShouldBe(3);
result.Kind.ShouldBe(AtomicBatchResult.ResultKind.Committed);
result.CommitAck!.BatchSize.ShouldBe(3);
}
}
}
[Fact]
public void BatchEngine_timeout_evicts_inflight_batch()
{
// Go: TestJetStreamAtomicBatchPublishLimits — batch timeout evicts stale batch
// (jetstream_batching_test.go:384-413)
var engine = new AtomicBatchPublishEngine(
maxInflightPerStream: 1,
maxBatchSize: 1000,
batchTimeout: TimeSpan.FromMilliseconds(1)); // very short timeout
var preconditions = new PublishPreconditions();
// Stage a message.
var req = new BatchPublishRequest
{
BatchId = "timeout_batch",
BatchSeq = 1,
Subject = "foo",
Payload = "data"u8.ToArray(),
IsCommit = false,
};
var result1 = engine.Process(req, preconditions, 0, _ => new PubAck { Stream = "T", Seq = 1 });
result1.Kind.ShouldBe(AtomicBatchResult.ResultKind.Staged);
// Wait for batch to expire.
Thread.Sleep(10);
// After timeout, inflight count should be 0 (evicted on next call).
// Attempting a new batch should succeed (slot freed).
var req2 = new BatchPublishRequest
{
BatchId = "new_after_timeout",
BatchSeq = 1,
Subject = "foo",
Payload = "data"u8.ToArray(),
IsCommit = true,
CommitValue = "1",
};
var result2 = engine.Process(req2, preconditions, 0, _ => new PubAck { Stream = "T", Seq = 2 });
result2.Kind.ShouldBe(AtomicBatchResult.ResultKind.Committed);
}
[Fact]
public void BatchEngine_clear_removes_all_inflight_batches()
{
// Go: TestJetStreamAtomicBatchPublishCleanup — Clear removes all batches
var engine = new AtomicBatchPublishEngine();
var preconditions = new PublishPreconditions();
// Stage a batch.
engine.Process(
new BatchPublishRequest
{
BatchId = "to_clear",
BatchSeq = 1,
Subject = "foo",
Payload = "data"u8.ToArray(),
IsCommit = false,
},
preconditions, 0, _ => null);
engine.InflightCount.ShouldBe(1);
engine.Clear();
engine.InflightCount.ShouldBe(0);
}
[Fact]
public void BatchEngine_has_batch_returns_false_after_commit()
{
// Go: TestJetStreamAtomicBatchPublishCleanup — batch is removed after commit
var engine = new AtomicBatchPublishEngine();
var preconditions = new PublishPreconditions();
engine.Process(
new BatchPublishRequest
{
BatchId = "committed",
BatchSeq = 1,
Subject = "foo",
Payload = "data"u8.ToArray(),
IsCommit = true,
CommitValue = "1",
},
preconditions, 0, _ => new PubAck { Stream = "T", Seq = 1 });
engine.HasBatch("committed").ShouldBeFalse();
}
// =========================================================================
// TestJetStreamAtomicBatchPublishPartiallyAppliedBatchOnRecovery
// — jetstream_batching_test.go:2232
// Partial batch (timed out or never committed) leaves no messages in the stream.
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_uncommitted_batch_does_not_persist_messages()
{
// Go: TestJetStreamAtomicBatchPublishPartiallyAppliedBatchOnRecovery
// An uncommitted batch must not leave partial messages in the stream.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "RECOVERY",
Subjects = ["rec.>"],
AllowAtomicPublish = true,
});
// Stage 3 messages but never commit.
await fx.BatchPublishAsync("rec.1", "msg1", batchId: "partial", batchSeq: 1);
await fx.BatchPublishAsync("rec.2", "msg2", batchId: "partial", batchSeq: 2);
await fx.BatchPublishAsync("rec.3", "msg3", batchId: "partial", batchSeq: 3);
// Stream should still have 0 messages (nothing committed).
var state = await fx.GetStreamStateAsync("RECOVERY");
state.Messages.ShouldBe(0UL);
}
// =========================================================================
// TestJetStreamRollupIsolatedRead — jetstream_batching_test.go:2350
// Rollup combined with batch publish: rollup header in normal publish
// with AllowAtomicPublish stream works correctly.
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_normal_publish_and_batch_publish_coexist()
{
// Go: TestJetStreamRollupIsolatedRead — both normal and batch publishes on same stream
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "COEXIST",
Subjects = ["co.>"],
AllowAtomicPublish = true,
});
// Normal publish.
var normalAck = await fx.PublishAndGetAckAsync("co.normal", "data");
normalAck.Seq.ShouldBe(1UL);
// Batch publish.
await fx.BatchPublishAsync("co.batch", "b1", batchId: "mix", batchSeq: 1);
var batchAck = await fx.BatchPublishAsync("co.batch", "b2", batchId: "mix", batchSeq: 2, commitValue: "1");
batchAck.ErrorCode.ShouldBeNull();
batchAck.BatchSize.ShouldBe(2);
var state = await fx.GetStreamStateAsync("COEXIST");
state.Messages.ShouldBe(3UL); // 1 normal + 2 batch
}
// =========================================================================
// TestJetStreamAtomicBatchPublishAdvisories — jetstream_batching_test.go:2481
// Advisories: gap detection and too-large batch trigger advisory events.
// In .NET, advisories are represented as error codes on the PubAck.
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_gap_detected_returns_incomplete_error()
{
// Go: TestJetStreamAtomicBatchPublishAdvisories — gap triggers advisory + error
// (jetstream_batching_test.go:2481)
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "ADV",
Subjects = ["adv.>"],
AllowAtomicPublish = true,
});
// Publish seq 1, then jump to seq 3 (gap).
await fx.BatchPublishAsync("adv.1", "msg1", batchId: "adv_uuid", batchSeq: 1);
var gapAck = await fx.BatchPublishAsync("adv.3", "msg3", batchId: "adv_uuid", batchSeq: 3, commitValue: "1");
gapAck.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch);
}
// =========================================================================
// TestJetStreamAtomicBatchPublishRejectPartialBatchOnLeaderChange
// — jetstream_batching_test.go:2717
// Partial batch (no commit) is discarded on leader change or engine reset.
// In .NET: simulated via Clear().
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_partial_batch_after_reset_returns_incomplete()
{
// Go: TestJetStreamAtomicBatchPublishRejectPartialBatchOnLeaderChange
// (jetstream_batching_test.go:2717)
// After a leader change (simulated via Clear), partial in-flight batches are discarded.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "LEADER",
Subjects = ["ldr.>"],
AllowAtomicPublish = true,
});
// Stage 2 messages but don't commit.
await fx.BatchPublishAsync("ldr.1", "m1", batchId: "ldr_batch", batchSeq: 1);
await fx.BatchPublishAsync("ldr.2", "m2", batchId: "ldr_batch", batchSeq: 2);
// Simulate leader-change / reset.
fx.GetPublisher().ClearBatches();
// Attempting to commit the old batch now references a batch that no longer exists.
// Seq 3 on a non-existent batch = incomplete (no seq 1 start found).
var commitAck = await fx.BatchPublishAsync("ldr.3", "commit", batchId: "ldr_batch", batchSeq: 3, commitValue: "1");
commitAck.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch);
}
// =========================================================================
// TestJetStreamAtomicBatchPublishContinuousBatchesStillMoveAppliedUp
// — jetstream_batching_test.go:2113
// Multiple consecutive batches are all committed and each is independent.
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_continuous_batches_each_commit_independently()
{
// Go: TestJetStreamAtomicBatchPublishContinuousBatchesStillMoveAppliedUp
// (jetstream_batching_test.go:2113)
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "CONT",
Subjects = ["cont"],
AllowAtomicPublish = true,
});
// First batch: 3 messages, commits.
await fx.BatchPublishAsync("cont", "a1", batchId: "batch1", batchSeq: 1);
await fx.BatchPublishAsync("cont", "a2", batchId: "batch1", batchSeq: 2);
var ack1 = await fx.BatchPublishAsync("cont", "a3", batchId: "batch1", batchSeq: 3, commitValue: "1");
ack1.ErrorCode.ShouldBeNull();
ack1.BatchSize.ShouldBe(3);
// Second batch: 2 messages, commits.
await fx.BatchPublishAsync("cont", "b1", batchId: "batch2", batchSeq: 1);
var ack2 = await fx.BatchPublishAsync("cont", "b2", batchId: "batch2", batchSeq: 2, commitValue: "1");
ack2.ErrorCode.ShouldBeNull();
ack2.BatchSize.ShouldBe(2);
var state = await fx.GetStreamStateAsync("CONT");
state.Messages.ShouldBe(5UL);
}
// =========================================================================
// TestJetStreamAtomicBatchPublishSingleServerRecovery — jetstream_batching_test.go:1578
// After a batch is partially committed, on recovery the rest is applied.
// In .NET we test the in-memory analogue: staged messages are buffered, not written
// to the store until commit.
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_staged_messages_are_buffered_until_commit()
{
// Go: TestJetStreamAtomicBatchPublishSingleServerRecovery (jetstream_batching_test.go:1578)
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "STAGED",
Subjects = ["staged"],
AllowAtomicPublish = true,
});
// Stage 2 messages.
await fx.BatchPublishAsync("staged", "msg1", batchId: "staged_batch", batchSeq: 1);
await fx.BatchPublishAsync("staged", "msg2", batchId: "staged_batch", batchSeq: 2);
// Stream should have 0 messages — nothing committed yet.
var before = await fx.GetStreamStateAsync("STAGED");
before.Messages.ShouldBe(0UL);
// Commit.
var ack = await fx.BatchPublishAsync("staged", "msg3", batchId: "staged_batch", batchSeq: 3, commitValue: "1");
ack.ErrorCode.ShouldBeNull();
ack.BatchSize.ShouldBe(3);
// All 3 messages now committed.
var after = await fx.GetStreamStateAsync("STAGED");
after.Messages.ShouldBe(3UL);
}
// =========================================================================
// TestJetStreamAtomicBatchPublishEncode — jetstream_batching_test.go:1750
// Verify that error code constants have the expected numeric values from the Go source.
// =========================================================================
[Fact]
public void AtomicBatchPublish_error_code_values_match_go_reference()
{
// Go: jetstream_errors_generated.go — error identifiers
AtomicBatchPublishErrorCodes.Disabled.ShouldBe(10174);
AtomicBatchPublishErrorCodes.MissingSeq.ShouldBe(10175);
AtomicBatchPublishErrorCodes.IncompleteBatch.ShouldBe(10176);
AtomicBatchPublishErrorCodes.UnsupportedHeader.ShouldBe(10177);
AtomicBatchPublishErrorCodes.InvalidBatchId.ShouldBe(10179);
AtomicBatchPublishErrorCodes.TooLargeBatch.ShouldBe(10199);
AtomicBatchPublishErrorCodes.InvalidCommit.ShouldBe(10200);
AtomicBatchPublishErrorCodes.ContainsDuplicate.ShouldBe(10201);
AtomicBatchPublishErrorCodes.MirrorWithAtomicPublish.ShouldBe(10198);
}
// =========================================================================
// TestJetStreamAtomicBatchPublishExpectedPerSubject — jetstream_batching_test.go:1500
// A batch can carry per-subject sequence expectations on the first message.
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_stream_config_allows_atomic_publish_flag()
{
// Go: TestJetStreamAtomicBatchPublishExpectedPerSubject — stream config check
// AllowAtomicPublish is properly stored in stream config.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "CFG_CHECK",
Subjects = ["chk.>"],
AllowAtomicPublish = true,
});
var config = fx.GetStreamConfig("CFG_CHECK");
config.ShouldNotBeNull();
config!.AllowAtomicPublish.ShouldBeTrue();
}
[Fact]
public async Task AtomicBatchPublish_stream_config_defaults_to_false()
{
// Go: TestJetStreamAtomicBatchPublish — AllowAtomicPublish defaults to false
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "CFG_DEFAULT",
Subjects = ["def.>"],
});
var config = fx.GetStreamConfig("CFG_DEFAULT");
config.ShouldNotBeNull();
config!.AllowAtomicPublish.ShouldBeFalse();
}
// =========================================================================
// TestJetStreamAtomicBatchPublishProposeOnePartialBatch — jetstream_batching_test.go:1955
// A partial batch (missing one sequence entry) is treated as incomplete.
// =========================================================================
[Fact]
public async Task AtomicBatchPublish_partial_batch_missing_middle_message_fails()
{
// Go: TestJetStreamAtomicBatchPublishProposeOnePartialBatch (jetstream_batching_test.go:1955)
// A batch that skips sequence 2 (1, 3) is incomplete.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{
Name = "PARTIAL",
Subjects = ["partial"],
AllowAtomicPublish = true,
});
// Stage seq 1.
await fx.BatchPublishAsync("partial", "m1", batchId: "partial_batch", batchSeq: 1);
// Jump to seq 3 — gap, so should be incomplete.
var gapAck = await fx.BatchPublishAsync("partial", "m3", batchId: "partial_batch", batchSeq: 3, commitValue: "1");
gapAck.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch);
}
}