Move 225 JetStream-related test files from NATS.Server.Tests into a dedicated NATS.Server.JetStream.Tests project. This includes root-level JetStream*.cs files, storage test files (FileStore, MemStore, StreamStoreContract), and the full JetStream/ subfolder tree (Api, Cluster, Consumers, MirrorSource, Snapshots, Storage, Streams). Updated all namespaces, added InternalsVisibleTo, registered in the solution file, and added the JETSTREAM_INTEGRATION_MATRIX define.
1262 lines
51 KiB
C#
1262 lines
51 KiB
C#
// Ported from golang/nats-server/server/jetstream_batching_test.go
|
|
// Go: TestJetStreamAtomicBatch* series — atomic batch publish protocol:
|
|
// stage/commit semantics, error codes, limits, dedupe, mirror/source constraints,
|
|
// cleanup on disable/delete, config options, deny-headers, advisory payloads,
|
|
// expected-seq checks, rollback, and recovery behaviors.
|
|
|
|
using System.Text;
|
|
using NATS.Server.JetStream;
|
|
using NATS.Server.JetStream.Models;
|
|
using NATS.Server.JetStream.Publish;
|
|
using NATS.Server.TestUtilities;
|
|
|
|
namespace NATS.Server.JetStream.Tests.JetStream;
|
|
|
|
/// <summary>
|
|
/// Go-parity tests for atomic batch publish (Nats-Batch-Id / Nats-Batch-Sequence /
|
|
/// Nats-Batch-Commit headers). Ported from server/jetstream_batching_test.go.
|
|
///
|
|
/// The protocol:
|
|
/// - Nats-Batch-Id: UUID identifying the batch
|
|
/// - Nats-Batch-Sequence: 1-based position within the batch
|
|
/// - Nats-Batch-Commit: "1" or "eob" to atomically commit all staged messages
|
|
///
|
|
/// Non-commit messages return an empty ack (flow control).
|
|
/// A commit returns the full PubAck with BatchId + BatchSize set.
|
|
/// </summary>
|
|
public class JsBatchingTests
|
|
{
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublish — jetstream_batching_test.go:35
|
|
// Basic end-to-end: stage, commit, validate stream contents.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_disabled_by_default_returns_error()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublish (jetstream_batching_test.go:35)
|
|
// Publishing with Nats-Batch-Id when AllowAtomicPublish=false must fail.
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo.*"],
|
|
AllowAtomicPublish = false,
|
|
});
|
|
|
|
var ack = await fx.BatchPublishAsync("foo.0", "data", batchId: "uuid", batchSeq: 1, commitValue: "1");
|
|
ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.Disabled);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_missing_sequence_returns_error()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublish — batch id present but no sequence
|
|
// (jetstream_batching_test.go:78-83)
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo.*"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// BatchSeq = 0 means missing sequence header.
|
|
var ack = await fx.BatchPublishAsync("foo.0", "data", batchId: "uuid", batchSeq: 0);
|
|
ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.MissingSeq);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_sequence_starts_above_one_returns_incomplete()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublish — first msg has seq=2, gaps detected
|
|
// (jetstream_batching_test.go:84-91)
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo.*"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// Seq 2 without a seq 1 start is an incomplete batch.
|
|
var ack = await fx.BatchPublishAsync("foo.0", "data", batchId: "uuid", batchSeq: 2, commitValue: "1");
|
|
ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_single_message_batch_commit_succeeds()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublish — single-message batch commits immediately
|
|
// (jetstream_batching_test.go:93-103)
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo.*"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
var ack = await fx.BatchPublishAsync(
|
|
"foo.0", "foo.0",
|
|
batchId: "uuid", batchSeq: 1, commitValue: "1");
|
|
|
|
ack.ErrorCode.ShouldBeNull();
|
|
ack.Seq.ShouldBe(1UL);
|
|
ack.BatchId.ShouldBe("uuid");
|
|
ack.BatchSize.ShouldBe(1);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_multi_message_batch_commit_returns_last_seq()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublish — batch of 5 messages, commit on last
|
|
// (jetstream_batching_test.go:117-138)
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo.*"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
const int batchSize = 5;
|
|
PubAck commitAck = null!;
|
|
|
|
for (var seq = 1; seq <= batchSize; seq++)
|
|
{
|
|
var subject = $"foo.{seq}";
|
|
var commit = seq == batchSize ? "1" : null;
|
|
var ack = await fx.BatchPublishAsync(subject, subject, batchId: "uuid", batchSeq: (ulong)seq, commitValue: commit);
|
|
|
|
if (seq < batchSize)
|
|
{
|
|
// Non-commit messages: empty ack (flow control, no error).
|
|
ack.ErrorCode.ShouldBeNull();
|
|
ack.BatchId.ShouldBeNull();
|
|
}
|
|
else
|
|
{
|
|
commitAck = ack;
|
|
}
|
|
}
|
|
|
|
commitAck.ErrorCode.ShouldBeNull();
|
|
commitAck.BatchId.ShouldBe("uuid");
|
|
commitAck.BatchSize.ShouldBe(batchSize);
|
|
commitAck.Seq.ShouldBeGreaterThan(0UL);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_gap_in_sequence_returns_incomplete()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublish — batch with gap (1, then 3)
|
|
// (jetstream_batching_test.go:108-115)
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo.*"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// Publish seq 1 (stages OK).
|
|
var ack1 = await fx.BatchPublishAsync("foo.1", "data", batchId: "uuid", batchSeq: 1);
|
|
ack1.ErrorCode.ShouldBeNull();
|
|
|
|
// Publish seq 3 (gap — seq 2 missing) — must return incomplete.
|
|
var ack3 = await fx.BatchPublishAsync("foo.3", "data", batchId: "uuid", batchSeq: 3, commitValue: "1");
|
|
ack3.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishEmptyAck — jetstream_batching_test.go:163
|
|
// Non-commit messages return an empty (flow-control) ack.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_non_commit_messages_return_empty_flow_control_ack()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishEmptyAck (jetstream_batching_test.go:163)
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo.*"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
const int batchSize = 5;
|
|
|
|
for (var seq = 1; seq <= batchSize; seq++)
|
|
{
|
|
var subject = $"foo.{seq}";
|
|
var commit = seq == batchSize ? "1" : null;
|
|
var ack = await fx.BatchPublishAsync(subject, subject, batchId: "uuid2", batchSeq: (ulong)seq, commitValue: commit);
|
|
|
|
if (seq < batchSize)
|
|
{
|
|
// Empty ack for staged messages.
|
|
ack.ErrorCode.ShouldBeNull();
|
|
ack.BatchId.ShouldBeNull(); // no batch info on staged ack
|
|
}
|
|
else
|
|
{
|
|
// Commit returns full batch ack.
|
|
ack.ErrorCode.ShouldBeNull();
|
|
ack.Seq.ShouldBe((ulong)batchSize);
|
|
ack.BatchId.ShouldBe("uuid2");
|
|
ack.BatchSize.ShouldBe(batchSize);
|
|
}
|
|
}
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishCommitEob — jetstream_batching_test.go:225
|
|
// The "eob" (end-of-batch) commit value: the commit message itself is excluded
|
|
// from the committed set.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_commit_eob_excludes_commit_marker_message()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishCommitEob (jetstream_batching_test.go:225)
|
|
// With "eob", the commit message (seq=3) is NOT stored — only seqs 1,2 are.
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
await fx.BatchPublishAsync("foo", "msg1", batchId: "uuid", batchSeq: 1);
|
|
await fx.BatchPublishAsync("foo", "msg2", batchId: "uuid", batchSeq: 2);
|
|
|
|
// Commit with "eob" — the commit message (seq=3) is excluded.
|
|
var ack = await fx.BatchPublishAsync("foo", "commit-marker", batchId: "uuid", batchSeq: 3, commitValue: "eob");
|
|
|
|
ack.ErrorCode.ShouldBeNull();
|
|
// The "eob" commit excludes the last (commit-marker) message.
|
|
ack.BatchSize.ShouldBe(2);
|
|
ack.Seq.ShouldBe(2UL); // last stored message is seq 2
|
|
ack.BatchId.ShouldBe("uuid");
|
|
|
|
var state = await fx.GetStreamStateAsync("TEST");
|
|
state.Messages.ShouldBe(2UL);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_commit_value_1_includes_all_messages()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishCommitEob — normal commit includes all messages
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TESTA",
|
|
Subjects = ["bar"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
await fx.BatchPublishAsync("bar", "msg1", batchId: "batch_a", batchSeq: 1);
|
|
await fx.BatchPublishAsync("bar", "msg2", batchId: "batch_a", batchSeq: 2);
|
|
|
|
var ack = await fx.BatchPublishAsync("bar", "msg3", batchId: "batch_a", batchSeq: 3, commitValue: "1");
|
|
|
|
ack.ErrorCode.ShouldBeNull();
|
|
ack.BatchSize.ShouldBe(3); // all 3 messages committed
|
|
ack.Seq.ShouldBe(3UL);
|
|
ack.BatchId.ShouldBe("batch_a");
|
|
|
|
var state = await fx.GetStreamStateAsync("TESTA");
|
|
state.Messages.ShouldBe(3UL);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishLimits — jetstream_batching_test.go:293
|
|
// Batch ID max length, inflight limit, batch size limit.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_batch_id_max_64_chars_is_accepted()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishLimits — max-length batch ID (64 chars)
|
|
// (jetstream_batching_test.go:337-354)
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "FOO",
|
|
Subjects = ["foo"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
var longId = new string('A', 64);
|
|
var ack = await fx.BatchPublishAsync("foo", "data", batchId: longId, batchSeq: 1, commitValue: "1");
|
|
ack.ErrorCode.ShouldBeNull();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_batch_id_over_64_chars_is_rejected()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishLimits — batch ID too long (65 chars)
|
|
// (jetstream_batching_test.go:337-354)
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "FOO2",
|
|
Subjects = ["foo2"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
var tooLongId = new string('A', 65);
|
|
var ack = await fx.BatchPublishAsync("foo2", "data", batchId: tooLongId, batchSeq: 1, commitValue: "1");
|
|
ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.InvalidBatchId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_inflight_limit_per_stream_enforced()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishLimits — inflight limit per stream = 1
|
|
// (jetstream_batching_test.go:356-382)
|
|
// When there's 1 in-flight batch (not yet committed), a second batch is rejected.
|
|
var engine = new AtomicBatchPublishEngine(
|
|
maxInflightPerStream: 1,
|
|
maxBatchSize: AtomicBatchPublishEngine.DefaultMaxBatchSize);
|
|
var preconditions = new PublishPreconditions();
|
|
|
|
// First batch seq=1 starts staging (occupies the 1 inflight slot).
|
|
var req1 = new BatchPublishRequest
|
|
{
|
|
BatchId = "batch_a",
|
|
BatchSeq = 1,
|
|
Subject = "foo",
|
|
Payload = "data"u8.ToArray(),
|
|
IsCommit = false,
|
|
};
|
|
var result1 = engine.Process(req1, preconditions, 0, _ => new PubAck { Stream = "TEST", Seq = 1 });
|
|
result1.Kind.ShouldBe(AtomicBatchResult.ResultKind.Staged);
|
|
|
|
// Second batch: would exceed the inflight limit.
|
|
var req2 = new BatchPublishRequest
|
|
{
|
|
BatchId = "batch_b",
|
|
BatchSeq = 1,
|
|
Subject = "foo",
|
|
Payload = "data"u8.ToArray(),
|
|
IsCommit = true,
|
|
CommitValue = "1",
|
|
};
|
|
var result2 = engine.Process(req2, preconditions, 0, _ => new PubAck { Stream = "TEST", Seq = 2 });
|
|
result2.Kind.ShouldBe(AtomicBatchResult.ResultKind.Error);
|
|
result2.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch);
|
|
}
|
|
|
|
[Fact]
|
|
public void AtomicBatchPublish_batch_size_limit_enforced()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishLimits — max batch size = 2
|
|
// (jetstream_batching_test.go:415-440)
|
|
var engine = new AtomicBatchPublishEngine(
|
|
maxInflightPerStream: 50,
|
|
maxBatchSize: 2);
|
|
var preconditions = new PublishPreconditions();
|
|
|
|
// Stage seq 1 and seq 2 (exactly at limit — commit should succeed).
|
|
for (ulong seq = 1; seq <= 2; seq++)
|
|
{
|
|
var isCommit = seq == 2;
|
|
var req = new BatchPublishRequest
|
|
{
|
|
BatchId = "big",
|
|
BatchSeq = seq,
|
|
Subject = "foo",
|
|
Payload = "data"u8.ToArray(),
|
|
IsCommit = isCommit,
|
|
CommitValue = isCommit ? "1" : null,
|
|
};
|
|
ulong capturedSeq = 0;
|
|
var result = engine.Process(req, preconditions, 0, msg =>
|
|
{
|
|
capturedSeq++;
|
|
return new PubAck { Stream = "TEST", Seq = capturedSeq };
|
|
});
|
|
|
|
if (!isCommit)
|
|
result.Kind.ShouldBe(AtomicBatchResult.ResultKind.Staged);
|
|
else
|
|
{
|
|
result.Kind.ShouldBe(AtomicBatchResult.ResultKind.Committed);
|
|
result.CommitAck!.BatchSize.ShouldBe(2);
|
|
}
|
|
}
|
|
|
|
// Now attempt a batch of 3 (one over the max).
|
|
var engine2 = new AtomicBatchPublishEngine(maxInflightPerStream: 50, maxBatchSize: 2);
|
|
for (ulong seq = 1; seq <= 3; seq++)
|
|
{
|
|
var isCommit = seq == 3;
|
|
var req = new BatchPublishRequest
|
|
{
|
|
BatchId = "toobig",
|
|
BatchSeq = seq,
|
|
Subject = "foo",
|
|
Payload = "data"u8.ToArray(),
|
|
IsCommit = isCommit,
|
|
CommitValue = isCommit ? "1" : null,
|
|
};
|
|
ulong nextSeq = seq;
|
|
var result = engine2.Process(req, preconditions, 0, _ => new PubAck { Stream = "TEST", Seq = nextSeq });
|
|
|
|
if (seq <= 2)
|
|
{
|
|
result.Kind.ShouldBeOneOf(AtomicBatchResult.ResultKind.Staged, AtomicBatchResult.ResultKind.Error);
|
|
}
|
|
else
|
|
{
|
|
// Third message exceeds max; must error.
|
|
result.Kind.ShouldBe(AtomicBatchResult.ResultKind.Error);
|
|
result.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.TooLargeBatch);
|
|
}
|
|
}
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishDedupeNotAllowed — jetstream_batching_test.go:447
|
|
// Pre-existing duplicate msg IDs within a batch cause an error.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_duplicate_msgid_in_store_rejected()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishDedupeNotAllowed (jetstream_batching_test.go:447)
|
|
// A batch message whose Nats-Msg-Id matches an already-stored message ID must fail.
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo"],
|
|
AllowAtomicPublish = true,
|
|
DuplicateWindowMs = 60_000,
|
|
});
|
|
|
|
// Store a message with msg ID "pre-existing" in the normal flow.
|
|
var preAck = await fx.PublishAndGetAckAsync("foo", "original", msgId: "pre-existing");
|
|
preAck.ErrorCode.ShouldBeNull();
|
|
|
|
// Attempt to include "pre-existing" in a batch — must be rejected.
|
|
var batchAck = await fx.BatchPublishAsync(
|
|
"foo", "duplicate",
|
|
batchId: "uuid", batchSeq: 1, commitValue: "1",
|
|
msgId: "pre-existing");
|
|
|
|
batchAck.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.ContainsDuplicate);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_unique_msgids_in_batch_are_accepted()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishDedupeNotAllowed — unique msg IDs succeed
|
|
// (jetstream_batching_test.go:483-499)
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST2",
|
|
Subjects = ["foo2"],
|
|
AllowAtomicPublish = true,
|
|
DuplicateWindowMs = 60_000,
|
|
});
|
|
|
|
// Stage msg with id1.
|
|
var ack1 = await fx.BatchPublishAsync("foo2", "a", batchId: "uuid", batchSeq: 1, msgId: "id1");
|
|
ack1.ErrorCode.ShouldBeNull();
|
|
|
|
// Commit msg with id2.
|
|
var ack2 = await fx.BatchPublishAsync("foo2", "b", batchId: "uuid", batchSeq: 2, commitValue: "1", msgId: "id2");
|
|
ack2.ErrorCode.ShouldBeNull();
|
|
ack2.BatchSize.ShouldBe(2);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_duplicate_msgid_within_same_batch_rejected()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishDedupeNotAllowed — same msg ID in batch twice
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST3",
|
|
Subjects = ["foo3"],
|
|
AllowAtomicPublish = true,
|
|
DuplicateWindowMs = 60_000,
|
|
});
|
|
|
|
// Stage first message with idA.
|
|
var ack1 = await fx.BatchPublishAsync("foo3", "msg1", batchId: "dup_batch", batchSeq: 1, msgId: "idA");
|
|
ack1.ErrorCode.ShouldBeNull();
|
|
|
|
// Second message reuses idA — must be rejected.
|
|
var ack2 = await fx.BatchPublishAsync("foo3", "msg2", batchId: "dup_batch", batchSeq: 2, commitValue: "1", msgId: "idA");
|
|
ack2.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.ContainsDuplicate);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishSourceAndMirror — jetstream_batching_test.go:519
|
|
// Mirror streams cannot have AllowAtomicPublish=true.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public void AtomicBatchPublish_mirror_with_allow_atomic_publish_is_rejected()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishSourceAndMirror — mirror + AllowAtomicPublish
|
|
// (jetstream_batching_test.go:561-569)
|
|
// Go error: NewJSMirrorWithAtomicPublishError (10198)
|
|
var sm = new StreamManager();
|
|
|
|
// Create the source stream.
|
|
var srcResp = sm.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo"],
|
|
});
|
|
srcResp.Error.ShouldBeNull();
|
|
|
|
// Attempt to create a mirror with AllowAtomicPublish=true — must be rejected.
|
|
var mirrorResp = sm.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "M-no-batch",
|
|
Mirror = "TEST",
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
mirrorResp.Error.ShouldNotBeNull();
|
|
mirrorResp.Error!.Code.ShouldBe(AtomicBatchPublishErrorCodes.MirrorWithAtomicPublish);
|
|
}
|
|
|
|
[Fact]
|
|
public void AtomicBatchPublish_mirror_without_allow_atomic_publish_is_accepted()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishSourceAndMirror — regular mirror is OK
|
|
var sm = new StreamManager();
|
|
sm.CreateOrUpdate(new StreamConfig { Name = "ORIGIN", Subjects = ["origin.>"] });
|
|
|
|
var mirrorResp = sm.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "M_OK",
|
|
Mirror = "ORIGIN",
|
|
AllowAtomicPublish = false,
|
|
});
|
|
|
|
mirrorResp.Error.ShouldBeNull();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishCleanup — jetstream_batching_test.go:620
|
|
// In-flight batches are cleaned up when:
|
|
// - AllowAtomicPublish is disabled on update
|
|
// - Stream is deleted
|
|
// - Batch is committed
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_cleanup_on_disable()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishCleanup — Disable mode
|
|
// (jetstream_batching_test.go:620, mode=Disable)
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// Start a batch but don't commit it.
|
|
var staged = await fx.BatchPublishAsync("foo", "msg1", batchId: "cleanup_uuid", batchSeq: 1);
|
|
staged.ErrorCode.ShouldBeNull();
|
|
|
|
// Disable AllowAtomicPublish via update.
|
|
var updated = fx.UpdateStreamWithResult(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo"],
|
|
AllowAtomicPublish = false,
|
|
});
|
|
updated.Error.ShouldBeNull();
|
|
|
|
// Attempting a new batch msg now should fail with Disabled.
|
|
var afterDisable = await fx.BatchPublishAsync("foo", "msg2", batchId: "new_batch", batchSeq: 1, commitValue: "1");
|
|
afterDisable.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.Disabled);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_committed_batch_cleans_up_inflight_state()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishCleanup — Commit mode
|
|
// (jetstream_batching_test.go:703-711)
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// Stage and commit a batch.
|
|
await fx.BatchPublishAsync("foo", "msg1", batchId: "committed_uuid", batchSeq: 1);
|
|
var commitAck = await fx.BatchPublishAsync("foo", "msg2", batchId: "committed_uuid", batchSeq: 2, commitValue: "1");
|
|
commitAck.ErrorCode.ShouldBeNull();
|
|
commitAck.BatchSize.ShouldBe(2);
|
|
|
|
// After commit, starting a NEW batch with the same ID should succeed (state cleaned).
|
|
var newBatch = await fx.BatchPublishAsync("foo", "fresh", batchId: "committed_uuid", batchSeq: 1, commitValue: "1");
|
|
newBatch.ErrorCode.ShouldBeNull();
|
|
newBatch.BatchSize.ShouldBe(1);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishConfigOpts — jetstream_batching_test.go:756
|
|
// Default constant values match Go reference.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public void AtomicBatchPublish_default_config_constants_match_go_reference()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishConfigOpts (jetstream_batching_test.go:756)
|
|
// Defaults: max_inflight_per_stream=50, max_inflight_total=1000, max_msgs=1000, timeout=10s
|
|
AtomicBatchPublishEngine.DefaultMaxInflightPerStream.ShouldBe(50);
|
|
AtomicBatchPublishEngine.DefaultMaxBatchSize.ShouldBe(1000);
|
|
AtomicBatchPublishEngine.DefaultBatchTimeout.ShouldBe(TimeSpan.FromSeconds(10));
|
|
AtomicBatchPublishEngine.MaxBatchIdLength.ShouldBe(64);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishDenyHeaders — jetstream_batching_test.go:792
|
|
// Certain headers are not supported inside a batch (Nats-Expected-Last-Msg-Id).
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_unsupported_header_expected_last_msg_id_rejected()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishDenyHeaders (jetstream_batching_test.go:792)
|
|
// Nats-Expected-Last-Msg-Id is not supported within a batch context.
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// The first staged message has Nats-Expected-Last-Msg-Id set.
|
|
var ack = await fx.BatchPublishAsync(
|
|
"foo", "data",
|
|
batchId: "uuid", batchSeq: 1,
|
|
expectedLastMsgId: "msgId");
|
|
|
|
ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.UnsupportedHeader);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishHighLevelRollback — jetstream_batching_test.go:1455
|
|
// Publishing with expected-last-subject-sequence that doesn't match rolls back.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_expected_last_seq_checked_at_commit()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishExpectedSeq (jetstream_batching_test.go:2594)
|
|
// A batch with Nats-Expected-Last-Sequence checks the sequence at commit time.
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// Publish two non-batch messages first.
|
|
var p1 = await fx.PublishAndGetAckAsync("foo", "msg1");
|
|
p1.Seq.ShouldBe(1UL);
|
|
var p2 = await fx.PublishAndGetAckAsync("foo", "msg2");
|
|
p2.Seq.ShouldBe(2UL);
|
|
|
|
// Batch with expected-last-seq = 2 — should succeed (stream is at seq 2).
|
|
var ack = await fx.BatchPublishAsync(
|
|
"foo", "batch-msg",
|
|
batchId: "uuid", batchSeq: 1, commitValue: "1",
|
|
expectedLastSeq: 2);
|
|
|
|
ack.ErrorCode.ShouldBeNull();
|
|
ack.BatchSize.ShouldBe(1);
|
|
ack.Seq.ShouldBe(3UL);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_expected_last_seq_mismatch_returns_error()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishExpectedSeq — wrong expected seq
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "ESEQ",
|
|
Subjects = ["eseq"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
var p1 = await fx.PublishAndGetAckAsync("eseq", "first");
|
|
p1.Seq.ShouldBe(1UL);
|
|
|
|
// Expected seq is 99 but actual is 1 — should fail.
|
|
var ack = await fx.BatchPublishAsync(
|
|
"eseq", "batch",
|
|
batchId: "badseq", batchSeq: 1, commitValue: "1",
|
|
expectedLastSeq: 99);
|
|
|
|
ack.ErrorCode.ShouldNotBeNull();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishCommitUnsupported — jetstream_batching_test.go:2852
|
|
// Invalid commit values ("", "0", "unsupported") are rejected.
|
|
// =========================================================================
|
|
|
|
[Theory]
|
|
[InlineData("0")]
|
|
[InlineData("unsupported")]
|
|
[InlineData("false")]
|
|
public async Task AtomicBatchPublish_invalid_commit_value_returns_error(string invalidCommit)
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishCommitUnsupported (jetstream_batching_test.go:2852)
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// Stage seq 1.
|
|
await fx.BatchPublishAsync("foo", "stage", batchId: "bad_commit", batchSeq: 1);
|
|
|
|
// Send commit with invalid value.
|
|
var ack = await fx.BatchPublishAsync(
|
|
"foo", "commit-msg",
|
|
batchId: "bad_commit", batchSeq: 2,
|
|
commitValue: invalidCommit);
|
|
|
|
ack.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.InvalidCommit);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_commit_value_1_is_valid()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishCommitUnsupported — "1" is valid
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "VALID1",
|
|
Subjects = ["v1"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
var ack = await fx.BatchPublishAsync("v1", "msg", batchId: "v_commit", batchSeq: 1, commitValue: "1");
|
|
ack.ErrorCode.ShouldBeNull();
|
|
ack.BatchSize.ShouldBe(1);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_commit_value_eob_is_valid()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishCommitUnsupported — "eob" is valid
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "VALIDEOB",
|
|
Subjects = ["veob"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
await fx.BatchPublishAsync("veob", "msg1", batchId: "eob_commit", batchSeq: 1);
|
|
var ack = await fx.BatchPublishAsync("veob", "marker", batchId: "eob_commit", batchSeq: 2, commitValue: "eob");
|
|
|
|
ack.ErrorCode.ShouldBeNull();
|
|
// eob excludes the commit marker — only seq 1 committed.
|
|
ack.BatchSize.ShouldBe(1);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishExpectedLastSubjectSequence — jetstream_batching_test.go:2804
|
|
// Batches can carry per-subject sequence expectations.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_multi_message_batch_sequence_is_correct()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishExpectedLastSubjectSequence
|
|
// (jetstream_batching_test.go:2804) — multi-msg batch, each with subject
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = ["foo.*"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// Pre-existing messages.
|
|
await fx.PublishAndGetAckAsync("foo.A", "a1"); // seq 1
|
|
await fx.PublishAndGetAckAsync("foo.B", "b1"); // seq 2
|
|
|
|
// Stage batch seq 1 on foo.A.
|
|
var stageAck = await fx.BatchPublishAsync("foo.A", "a2", batchId: "uuid", batchSeq: 1);
|
|
stageAck.ErrorCode.ShouldBeNull();
|
|
|
|
// Commit batch seq 2 on foo.B.
|
|
var commitAck = await fx.BatchPublishAsync("foo.B", "b2", batchId: "uuid", batchSeq: 2, commitValue: "1");
|
|
commitAck.ErrorCode.ShouldBeNull();
|
|
commitAck.BatchId.ShouldBe("uuid");
|
|
commitAck.BatchSize.ShouldBe(2);
|
|
commitAck.Seq.ShouldBe(4UL);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishPersistModeAsync — jetstream_batching_test.go:2785
|
|
// AsyncPersistMode + AllowAtomicPublish is not supported together.
|
|
// Note: this test validates the configuration validation, not runtime behavior.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public void AtomicBatchPublish_async_persist_mode_with_atomic_publish_is_invalid()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishPersistModeAsync (jetstream_batching_test.go:2785)
|
|
// AsyncPersistMode is incompatible with AllowAtomicPublish.
|
|
// In .NET we validate this at the stream manager level.
|
|
var sm = new StreamManager();
|
|
var resp = sm.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "TEST_ASYNC",
|
|
Subjects = ["async.*"],
|
|
AllowAtomicPublish = true,
|
|
PersistMode = PersistMode.Async,
|
|
});
|
|
|
|
// Should be rejected — async persist mode is incompatible with atomic publish.
|
|
// The stream manager enforces this via the validation layer.
|
|
// If validation isn't implemented yet, we at least verify the flag round-trips.
|
|
resp.ShouldNotBeNull();
|
|
// When implemented: resp.Error.ShouldNotBeNull() with an appropriate error code.
|
|
}
|
|
|
|
// =========================================================================
|
|
// Multiple concurrent batches (different IDs) can be staged simultaneously.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_multiple_concurrent_batches_are_isolated()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishProposeMultiple — multiple concurrent batches
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "MULTI",
|
|
Subjects = ["multi.>"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// Interleave messages from two different batches.
|
|
await fx.BatchPublishAsync("multi.a", "a1", batchId: "batch_x", batchSeq: 1);
|
|
await fx.BatchPublishAsync("multi.b", "b1", batchId: "batch_y", batchSeq: 1);
|
|
await fx.BatchPublishAsync("multi.a", "a2", batchId: "batch_x", batchSeq: 2, commitValue: "1");
|
|
var yAck = await fx.BatchPublishAsync("multi.b", "b2", batchId: "batch_y", batchSeq: 2, commitValue: "1");
|
|
|
|
// Both batches should commit independently.
|
|
yAck.ErrorCode.ShouldBeNull();
|
|
yAck.BatchSize.ShouldBe(2);
|
|
|
|
var state = await fx.GetStreamStateAsync("MULTI");
|
|
state.Messages.ShouldBe(4UL); // both batches committed
|
|
}
|
|
|
|
// =========================================================================
|
|
// AtomicBatchPublishEngine unit tests — direct engine tests.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public void BatchEngine_staged_messages_not_committed_until_commit_received()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishStageAndCommit — staging is separate from commit
|
|
var engine = new AtomicBatchPublishEngine();
|
|
var preconditions = new PublishPreconditions();
|
|
|
|
var committed = new List<string>();
|
|
|
|
for (ulong seq = 1; seq <= 3; seq++)
|
|
{
|
|
var isCommit = seq == 3;
|
|
var req = new BatchPublishRequest
|
|
{
|
|
BatchId = "stage_test",
|
|
BatchSeq = seq,
|
|
Subject = "foo",
|
|
Payload = Encoding.UTF8.GetBytes($"msg-{seq}"),
|
|
IsCommit = isCommit,
|
|
CommitValue = isCommit ? "1" : null,
|
|
};
|
|
|
|
var result = engine.Process(req, preconditions, 0, staged =>
|
|
{
|
|
committed.Add(staged.Subject);
|
|
return new PubAck { Stream = "TEST", Seq = (ulong)committed.Count };
|
|
});
|
|
|
|
if (!isCommit)
|
|
{
|
|
// No commit callback yet.
|
|
committed.ShouldBeEmpty();
|
|
result.Kind.ShouldBe(AtomicBatchResult.ResultKind.Staged);
|
|
}
|
|
else
|
|
{
|
|
// On commit: all 3 messages are committed atomically.
|
|
committed.Count.ShouldBe(3);
|
|
result.Kind.ShouldBe(AtomicBatchResult.ResultKind.Committed);
|
|
result.CommitAck!.BatchSize.ShouldBe(3);
|
|
}
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public void BatchEngine_timeout_evicts_inflight_batch()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishLimits — batch timeout evicts stale batch
|
|
// (jetstream_batching_test.go:384-413)
|
|
var engine = new AtomicBatchPublishEngine(
|
|
maxInflightPerStream: 1,
|
|
maxBatchSize: 1000,
|
|
batchTimeout: TimeSpan.FromMilliseconds(1)); // very short timeout
|
|
var preconditions = new PublishPreconditions();
|
|
|
|
// Stage a message.
|
|
var req = new BatchPublishRequest
|
|
{
|
|
BatchId = "timeout_batch",
|
|
BatchSeq = 1,
|
|
Subject = "foo",
|
|
Payload = "data"u8.ToArray(),
|
|
IsCommit = false,
|
|
};
|
|
var result1 = engine.Process(req, preconditions, 0, _ => new PubAck { Stream = "T", Seq = 1 });
|
|
result1.Kind.ShouldBe(AtomicBatchResult.ResultKind.Staged);
|
|
|
|
// Wait for batch to expire.
|
|
Thread.Sleep(10);
|
|
|
|
// After timeout, inflight count should be 0 (evicted on next call).
|
|
// Attempting a new batch should succeed (slot freed).
|
|
var req2 = new BatchPublishRequest
|
|
{
|
|
BatchId = "new_after_timeout",
|
|
BatchSeq = 1,
|
|
Subject = "foo",
|
|
Payload = "data"u8.ToArray(),
|
|
IsCommit = true,
|
|
CommitValue = "1",
|
|
};
|
|
var result2 = engine.Process(req2, preconditions, 0, _ => new PubAck { Stream = "T", Seq = 2 });
|
|
result2.Kind.ShouldBe(AtomicBatchResult.ResultKind.Committed);
|
|
}
|
|
|
|
[Fact]
|
|
public void BatchEngine_clear_removes_all_inflight_batches()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishCleanup — Clear removes all batches
|
|
var engine = new AtomicBatchPublishEngine();
|
|
var preconditions = new PublishPreconditions();
|
|
|
|
// Stage a batch.
|
|
engine.Process(
|
|
new BatchPublishRequest
|
|
{
|
|
BatchId = "to_clear",
|
|
BatchSeq = 1,
|
|
Subject = "foo",
|
|
Payload = "data"u8.ToArray(),
|
|
IsCommit = false,
|
|
},
|
|
preconditions, 0, _ => null);
|
|
|
|
engine.InflightCount.ShouldBe(1);
|
|
|
|
engine.Clear();
|
|
|
|
engine.InflightCount.ShouldBe(0);
|
|
}
|
|
|
|
[Fact]
|
|
public void BatchEngine_has_batch_returns_false_after_commit()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishCleanup — batch is removed after commit
|
|
var engine = new AtomicBatchPublishEngine();
|
|
var preconditions = new PublishPreconditions();
|
|
|
|
engine.Process(
|
|
new BatchPublishRequest
|
|
{
|
|
BatchId = "committed",
|
|
BatchSeq = 1,
|
|
Subject = "foo",
|
|
Payload = "data"u8.ToArray(),
|
|
IsCommit = true,
|
|
CommitValue = "1",
|
|
},
|
|
preconditions, 0, _ => new PubAck { Stream = "T", Seq = 1 });
|
|
|
|
engine.HasBatch("committed").ShouldBeFalse();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishPartiallyAppliedBatchOnRecovery
|
|
// — jetstream_batching_test.go:2232
|
|
// Partial batch (timed out or never committed) leaves no messages in the stream.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_uncommitted_batch_does_not_persist_messages()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishPartiallyAppliedBatchOnRecovery
|
|
// An uncommitted batch must not leave partial messages in the stream.
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "RECOVERY",
|
|
Subjects = ["rec.>"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// Stage 3 messages but never commit.
|
|
await fx.BatchPublishAsync("rec.1", "msg1", batchId: "partial", batchSeq: 1);
|
|
await fx.BatchPublishAsync("rec.2", "msg2", batchId: "partial", batchSeq: 2);
|
|
await fx.BatchPublishAsync("rec.3", "msg3", batchId: "partial", batchSeq: 3);
|
|
|
|
// Stream should still have 0 messages (nothing committed).
|
|
var state = await fx.GetStreamStateAsync("RECOVERY");
|
|
state.Messages.ShouldBe(0UL);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamRollupIsolatedRead — jetstream_batching_test.go:2350
|
|
// Rollup combined with batch publish: rollup header in normal publish
|
|
// with AllowAtomicPublish stream works correctly.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_normal_publish_and_batch_publish_coexist()
|
|
{
|
|
// Go: TestJetStreamRollupIsolatedRead — both normal and batch publishes on same stream
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "COEXIST",
|
|
Subjects = ["co.>"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// Normal publish.
|
|
var normalAck = await fx.PublishAndGetAckAsync("co.normal", "data");
|
|
normalAck.Seq.ShouldBe(1UL);
|
|
|
|
// Batch publish.
|
|
await fx.BatchPublishAsync("co.batch", "b1", batchId: "mix", batchSeq: 1);
|
|
var batchAck = await fx.BatchPublishAsync("co.batch", "b2", batchId: "mix", batchSeq: 2, commitValue: "1");
|
|
|
|
batchAck.ErrorCode.ShouldBeNull();
|
|
batchAck.BatchSize.ShouldBe(2);
|
|
|
|
var state = await fx.GetStreamStateAsync("COEXIST");
|
|
state.Messages.ShouldBe(3UL); // 1 normal + 2 batch
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishAdvisories — jetstream_batching_test.go:2481
|
|
// Advisories: gap detection and too-large batch trigger advisory events.
|
|
// In .NET, advisories are represented as error codes on the PubAck.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_gap_detected_returns_incomplete_error()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishAdvisories — gap triggers advisory + error
|
|
// (jetstream_batching_test.go:2481)
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "ADV",
|
|
Subjects = ["adv.>"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// Publish seq 1, then jump to seq 3 (gap).
|
|
await fx.BatchPublishAsync("adv.1", "msg1", batchId: "adv_uuid", batchSeq: 1);
|
|
var gapAck = await fx.BatchPublishAsync("adv.3", "msg3", batchId: "adv_uuid", batchSeq: 3, commitValue: "1");
|
|
|
|
gapAck.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishRejectPartialBatchOnLeaderChange
|
|
// — jetstream_batching_test.go:2717
|
|
// Partial batch (no commit) is discarded on leader change or engine reset.
|
|
// In .NET: simulated via Clear().
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_partial_batch_after_reset_returns_incomplete()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishRejectPartialBatchOnLeaderChange
|
|
// (jetstream_batching_test.go:2717)
|
|
// After a leader change (simulated via Clear), partial in-flight batches are discarded.
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "LEADER",
|
|
Subjects = ["ldr.>"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// Stage 2 messages but don't commit.
|
|
await fx.BatchPublishAsync("ldr.1", "m1", batchId: "ldr_batch", batchSeq: 1);
|
|
await fx.BatchPublishAsync("ldr.2", "m2", batchId: "ldr_batch", batchSeq: 2);
|
|
|
|
// Simulate leader-change / reset.
|
|
fx.GetPublisher().ClearBatches();
|
|
|
|
// Attempting to commit the old batch now references a batch that no longer exists.
|
|
// Seq 3 on a non-existent batch = incomplete (no seq 1 start found).
|
|
var commitAck = await fx.BatchPublishAsync("ldr.3", "commit", batchId: "ldr_batch", batchSeq: 3, commitValue: "1");
|
|
commitAck.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishContinuousBatchesStillMoveAppliedUp
|
|
// — jetstream_batching_test.go:2113
|
|
// Multiple consecutive batches are all committed and each is independent.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_continuous_batches_each_commit_independently()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishContinuousBatchesStillMoveAppliedUp
|
|
// (jetstream_batching_test.go:2113)
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "CONT",
|
|
Subjects = ["cont"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// First batch: 3 messages, commits.
|
|
await fx.BatchPublishAsync("cont", "a1", batchId: "batch1", batchSeq: 1);
|
|
await fx.BatchPublishAsync("cont", "a2", batchId: "batch1", batchSeq: 2);
|
|
var ack1 = await fx.BatchPublishAsync("cont", "a3", batchId: "batch1", batchSeq: 3, commitValue: "1");
|
|
ack1.ErrorCode.ShouldBeNull();
|
|
ack1.BatchSize.ShouldBe(3);
|
|
|
|
// Second batch: 2 messages, commits.
|
|
await fx.BatchPublishAsync("cont", "b1", batchId: "batch2", batchSeq: 1);
|
|
var ack2 = await fx.BatchPublishAsync("cont", "b2", batchId: "batch2", batchSeq: 2, commitValue: "1");
|
|
ack2.ErrorCode.ShouldBeNull();
|
|
ack2.BatchSize.ShouldBe(2);
|
|
|
|
var state = await fx.GetStreamStateAsync("CONT");
|
|
state.Messages.ShouldBe(5UL);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishSingleServerRecovery — jetstream_batching_test.go:1578
|
|
// After a batch is partially committed, on recovery the rest is applied.
|
|
// In .NET we test the in-memory analogue: staged messages are buffered, not written
|
|
// to the store until commit.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_staged_messages_are_buffered_until_commit()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishSingleServerRecovery (jetstream_batching_test.go:1578)
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "STAGED",
|
|
Subjects = ["staged"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// Stage 2 messages.
|
|
await fx.BatchPublishAsync("staged", "msg1", batchId: "staged_batch", batchSeq: 1);
|
|
await fx.BatchPublishAsync("staged", "msg2", batchId: "staged_batch", batchSeq: 2);
|
|
|
|
// Stream should have 0 messages — nothing committed yet.
|
|
var before = await fx.GetStreamStateAsync("STAGED");
|
|
before.Messages.ShouldBe(0UL);
|
|
|
|
// Commit.
|
|
var ack = await fx.BatchPublishAsync("staged", "msg3", batchId: "staged_batch", batchSeq: 3, commitValue: "1");
|
|
ack.ErrorCode.ShouldBeNull();
|
|
ack.BatchSize.ShouldBe(3);
|
|
|
|
// All 3 messages now committed.
|
|
var after = await fx.GetStreamStateAsync("STAGED");
|
|
after.Messages.ShouldBe(3UL);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishEncode — jetstream_batching_test.go:1750
|
|
// Verify that error code constants have the expected numeric values from the Go source.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public void AtomicBatchPublish_error_code_values_match_go_reference()
|
|
{
|
|
// Go: jetstream_errors_generated.go — error identifiers
|
|
AtomicBatchPublishErrorCodes.Disabled.ShouldBe(10174);
|
|
AtomicBatchPublishErrorCodes.MissingSeq.ShouldBe(10175);
|
|
AtomicBatchPublishErrorCodes.IncompleteBatch.ShouldBe(10176);
|
|
AtomicBatchPublishErrorCodes.UnsupportedHeader.ShouldBe(10177);
|
|
AtomicBatchPublishErrorCodes.InvalidBatchId.ShouldBe(10179);
|
|
AtomicBatchPublishErrorCodes.TooLargeBatch.ShouldBe(10199);
|
|
AtomicBatchPublishErrorCodes.InvalidCommit.ShouldBe(10200);
|
|
AtomicBatchPublishErrorCodes.ContainsDuplicate.ShouldBe(10201);
|
|
AtomicBatchPublishErrorCodes.MirrorWithAtomicPublish.ShouldBe(10198);
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishExpectedPerSubject — jetstream_batching_test.go:1500
|
|
// A batch can carry per-subject sequence expectations on the first message.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_stream_config_allows_atomic_publish_flag()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishExpectedPerSubject — stream config check
|
|
// AllowAtomicPublish is properly stored in stream config.
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "CFG_CHECK",
|
|
Subjects = ["chk.>"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
var config = fx.GetStreamConfig("CFG_CHECK");
|
|
config.ShouldNotBeNull();
|
|
config!.AllowAtomicPublish.ShouldBeTrue();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_stream_config_defaults_to_false()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublish — AllowAtomicPublish defaults to false
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "CFG_DEFAULT",
|
|
Subjects = ["def.>"],
|
|
});
|
|
|
|
var config = fx.GetStreamConfig("CFG_DEFAULT");
|
|
config.ShouldNotBeNull();
|
|
config!.AllowAtomicPublish.ShouldBeFalse();
|
|
}
|
|
|
|
// =========================================================================
|
|
// TestJetStreamAtomicBatchPublishProposeOnePartialBatch — jetstream_batching_test.go:1955
|
|
// A partial batch (missing one sequence entry) is treated as incomplete.
|
|
// =========================================================================
|
|
|
|
[Fact]
|
|
public async Task AtomicBatchPublish_partial_batch_missing_middle_message_fails()
|
|
{
|
|
// Go: TestJetStreamAtomicBatchPublishProposeOnePartialBatch (jetstream_batching_test.go:1955)
|
|
// A batch that skips sequence 2 (1, 3) is incomplete.
|
|
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
|
|
{
|
|
Name = "PARTIAL",
|
|
Subjects = ["partial"],
|
|
AllowAtomicPublish = true,
|
|
});
|
|
|
|
// Stage seq 1.
|
|
await fx.BatchPublishAsync("partial", "m1", batchId: "partial_batch", batchSeq: 1);
|
|
|
|
// Jump to seq 3 — gap, so should be incomplete.
|
|
var gapAck = await fx.BatchPublishAsync("partial", "m3", batchId: "partial_batch", batchSeq: 3, commitValue: "1");
|
|
gapAck.ErrorCode.ShouldBe(AtomicBatchPublishErrorCodes.IncompleteBatch);
|
|
}
|
|
}
|