// Copyright 2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Ported from: golang/nats-server/server/jetstream_batching_test.go // These tests exercise the JetStream atomic batch publish protocol via NATS headers. // Tests that require direct access to Go server internals (mset.batches, clMu, etc.) // are marked with [Fact(Skip = ...)] because those internal structures are not accessible // over the NATS protocol from an external client. using System.Text.Json.Nodes; using NATS.Client.Core; using Shouldly; namespace ZB.MOM.NatsNet.Server.IntegrationTests.JetStream; /// /// Integration tests ported from jetstream_batching_test.go (26 tests). /// Tests the JetStream atomic batch publish protocol using NATS headers. /// /// /// The Go source tests fall into two categories: /// (a) Tests exercising the server via NATS protocol (Nats-Batch-* headers): ported directly. /// (b) Tests accessing Go server internals (mset.batches, clMu, checkMsgHeadersPreClusteredProposal): /// skipped because those internal structures are not reachable from a .NET NATS client. /// [Trait("Category", "Integration")] public class JetStreamBatchingIntegrationTests : IAsyncLifetime { private NatsConnection? _nats; private Exception? _initFailure; public async Task InitializeAsync() { try { _nats = new NatsConnection(new NatsOpts { Url = "nats://localhost:4222" }); await _nats.ConnectAsync(); } catch (Exception ex) { _initFailure = ex; } } public async Task DisposeAsync() { if (_nats is not null) await _nats.DisposeAsync(); } private bool ServerUnavailable() => _initFailure != null; // ----------------------------------------------------------------------- // Helpers // ----------------------------------------------------------------------- private async Task CreateStreamAsync(string name, string[] subjects, bool allowAtomicPublish = false, string storage = "file", string retention = "limits") { var cfg = new JsonObject { ["name"] = name, ["subjects"] = new JsonArray(subjects.Select(s => JsonValue.Create(s)).ToArray()), ["storage"] = storage, ["retention"] = retention, ["allow_atomic_publish"] = allowAtomicPublish, }; var payload = System.Text.Encoding.UTF8.GetBytes(cfg.ToJsonString()); // NatsMsg is a struct — just await; a response being returned confirms the call succeeded. await _nats!.RequestAsync($"$JS.API.STREAM.CREATE.{name}", payload); } private async Task UpdateStreamAsync(string name, string[] subjects, bool allowAtomicPublish = false, string storage = "file", string retention = "limits") { var cfg = new JsonObject { ["name"] = name, ["subjects"] = new JsonArray(subjects.Select(s => JsonValue.Create(s)).ToArray()), ["storage"] = storage, ["retention"] = retention, ["allow_atomic_publish"] = allowAtomicPublish, }; var payload = System.Text.Encoding.UTF8.GetBytes(cfg.ToJsonString()); // NatsMsg is a struct — just await; a response being returned confirms the call succeeded. await _nats!.RequestAsync($"$JS.API.STREAM.UPDATE.{name}", payload); } // ----------------------------------------------------------------------- // TestJetStreamAtomicBatchPublish // Tests basic atomic batch publish flow: disabled, enabled, missing seq error. // ----------------------------------------------------------------------- [Fact] public async Task AtomicBatchPublish_ShouldSucceed() { if (ServerUnavailable()) return; var streamName = $"BATCHTEST_{Guid.NewGuid():N}"; await CreateStreamAsync(streamName, [$"bat.{streamName}.*"], allowAtomicPublish: false); // Publish with atomic publish disabled — expect error in pub ack. var hdrs = new NatsHeaders { ["Nats-Batch-Id"] = "uuid" }; var inbox = _nats!.NewInbox(); var sub = await _nats.SubscribeCoreAsync(inbox); await _nats.PublishAsync($"bat.{streamName}.0", Array.Empty(), headers: hdrs, replyTo: inbox); JsonObject? ack = null; using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3))) { await foreach (var reply in sub.Msgs.ReadAllAsync(cts.Token)) { if (reply.Data is { Length: > 0 }) ack = JsonNode.Parse(reply.Data)?.AsObject(); break; } } ack.ShouldNotBeNull("Expected a pub ack response"); ack["error"].ShouldNotBeNull("Expected error field when atomic publish is disabled"); // Enable atomic publish. await UpdateStreamAsync(streamName, [$"bat.{streamName}.*"], allowAtomicPublish: true); // Publish without batch sequence — expect missing seq error. var inbox2 = _nats.NewInbox(); var sub2 = await _nats.SubscribeCoreAsync(inbox2); var hdrs2 = new NatsHeaders { ["Nats-Batch-Id"] = "uuid" }; await _nats.PublishAsync($"bat.{streamName}.0", Array.Empty(), headers: hdrs2, replyTo: inbox2); JsonObject? ack2 = null; using (var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(3))) { await foreach (var reply in sub2.Msgs.ReadAllAsync(cts2.Token)) { if (reply.Data is { Length: > 0 }) ack2 = JsonNode.Parse(reply.Data)?.AsObject(); break; } } ack2.ShouldNotBeNull(); ack2["error"].ShouldNotBeNull("Expected error for missing sequence header"); } // ----------------------------------------------------------------------- // TestJetStreamAtomicBatchPublishEmptyAck // Non-commit messages return empty ack (flow control). Commit returns full pub ack. // ----------------------------------------------------------------------- [Fact] public async Task AtomicBatchPublishEmptyAck_ShouldReturnEmptyForNonCommit() { if (ServerUnavailable()) return; var streamName = $"BATCHEA_{Guid.NewGuid():N}"; await CreateStreamAsync(streamName, [$"ea.{streamName}.*"], allowAtomicPublish: true); var batchId = "uuid-ea"; const int batchSize = 5; for (int seq = 1; seq <= batchSize; seq++) { var subject = $"ea.{streamName}.{seq}"; var data = System.Text.Encoding.UTF8.GetBytes(subject); bool isCommit = seq == batchSize; var hdrs = new NatsHeaders { ["Nats-Batch-Id"] = batchId, ["Nats-Batch-Sequence"] = seq.ToString(), }; if (isCommit) hdrs["Nats-Batch-Commit"] = "1"; var inbox = _nats!.NewInbox(); var sub = await _nats.SubscribeCoreAsync(inbox); await _nats.PublishAsync(subject, data, headers: hdrs, replyTo: inbox); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await foreach (var reply in sub.Msgs.ReadAllAsync(cts.Token)) { if (!isCommit) { (reply.Data is null || reply.Data.Length == 0).ShouldBeTrue( "Expected empty ack for non-commit message"); } else { reply.Data.ShouldNotBeNull(); reply.Data.Length.ShouldBeGreaterThan(0, "Expected full pub ack for commit message"); var ack = JsonNode.Parse(reply.Data)?.AsObject(); ack.ShouldNotBeNull(); ack["error"].ShouldBeNull("Commit should not return error"); ((int?)ack["batch_size"]).ShouldBe(batchSize); } break; } } } // ----------------------------------------------------------------------- // TestJetStreamAtomicBatchPublishCommitEob // EOB commit excludes the EOB message itself; batchSize should equal seq count - 1. // ----------------------------------------------------------------------- [Fact] public async Task AtomicBatchPublishCommitEob_ShouldExcludeEobMessage() { if (ServerUnavailable()) return; var streamName = $"BATCHEOB_{Guid.NewGuid():N}"; await CreateStreamAsync(streamName, [$"eob.{streamName}"], allowAtomicPublish: true); var batchId = "uuid-eob"; var subject = $"eob.{streamName}"; // Seq 1 and 2: publish without commit, consume empty ack each time. for (int seq = 1; seq <= 2; seq++) { var hdrs = new NatsHeaders { ["Nats-Batch-Id"] = batchId, ["Nats-Batch-Sequence"] = seq.ToString(), }; var inbox = _nats!.NewInbox(); var sub = await _nats.SubscribeCoreAsync(inbox); await _nats.PublishAsync(subject, Array.Empty(), headers: hdrs, replyTo: inbox); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await foreach (var _ in sub.Msgs.ReadAllAsync(cts.Token)) break; } // Seq 3: publish with "eob" commit — this message itself is NOT stored. var hdrs3 = new NatsHeaders { ["Nats-Batch-Id"] = batchId, ["Nats-Batch-Sequence"] = "3", ["Nats-Batch-Commit"] = "eob", }; var inbox3 = _nats!.NewInbox(); var sub3 = await _nats.SubscribeCoreAsync(inbox3); await _nats.PublishAsync(subject, Array.Empty(), headers: hdrs3, replyTo: inbox3); JsonObject? ack = null; using (var cts3 = new CancellationTokenSource(TimeSpan.FromSeconds(5))) { await foreach (var reply in sub3.Msgs.ReadAllAsync(cts3.Token)) { if (reply.Data is { Length: > 0 }) ack = JsonNode.Parse(reply.Data)?.AsObject(); break; } } ack.ShouldNotBeNull("Expected pub ack from EOB commit"); ack["error"].ShouldBeNull("EOB commit should not return error"); ((int?)ack["batch_size"]).ShouldBe(2); ack["batch_id"]?.GetValue().ShouldBe(batchId); } // ----------------------------------------------------------------------- // TestJetStreamAtomicBatchPublishLimits // Batch ID length limit: max 64 chars. IDs longer than 64 are rejected. // ----------------------------------------------------------------------- [Fact] public async Task AtomicBatchPublishLimits_BatchIdTooLong_ShouldError() { if (ServerUnavailable()) return; var streamName = $"BATCHLIM_{Guid.NewGuid():N}"; await CreateStreamAsync(streamName, [$"lim.{streamName}"], allowAtomicPublish: true); // 64-char batch ID should succeed. var validId = new string('A', 64); var hdrsOk = new NatsHeaders { ["Nats-Batch-Id"] = validId, ["Nats-Batch-Sequence"] = "1", ["Nats-Batch-Commit"] = "1", }; var inboxOk = _nats!.NewInbox(); var subOk = await _nats.SubscribeCoreAsync(inboxOk); await _nats.PublishAsync($"lim.{streamName}", Array.Empty(), headers: hdrsOk, replyTo: inboxOk); JsonObject? ackOk = null; using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3))) { await foreach (var reply in subOk.Msgs.ReadAllAsync(cts.Token)) { if (reply.Data is { Length: > 0 }) ackOk = JsonNode.Parse(reply.Data)?.AsObject(); break; } } ackOk.ShouldNotBeNull("Expected pub ack for 64-char batch ID"); // 65-char batch ID should be rejected. var longId = new string('A', 65); var hdrsLong = new NatsHeaders { ["Nats-Batch-Id"] = longId, ["Nats-Batch-Sequence"] = "1", ["Nats-Batch-Commit"] = "1", }; var inboxLong = _nats.NewInbox(); var subLong = await _nats.SubscribeCoreAsync(inboxLong); await _nats.PublishAsync($"lim.{streamName}", Array.Empty(), headers: hdrsLong, replyTo: inboxLong); JsonObject? ackLong = null; using (var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(3))) { await foreach (var reply in subLong.Msgs.ReadAllAsync(cts2.Token)) { if (reply.Data is { Length: > 0 }) ackLong = JsonNode.Parse(reply.Data)?.AsObject(); break; } } ackLong.ShouldNotBeNull(); ackLong["error"].ShouldNotBeNull("65-char batch ID should be rejected by the server"); } // ----------------------------------------------------------------------- // TestJetStreamAtomicBatchPublishDedupeNotAllowed // Pre-existing dedup IDs must not be allowed in a batch. // ----------------------------------------------------------------------- [Fact] public async Task AtomicBatchPublishDedupeNotAllowed_PreExistingIdShouldError() { if (ServerUnavailable()) return; var streamName = $"BATCHDD_{Guid.NewGuid():N}"; await CreateStreamAsync(streamName, [$"dd.{streamName}"], allowAtomicPublish: true); // Publish a pre-existing message with dedup ID. var hdrsPre = new NatsHeaders { ["Nats-Msg-Id"] = "pre-existing" }; var inboxPre = _nats!.NewInbox(); var subPre = await _nats.SubscribeCoreAsync(inboxPre); await _nats.PublishAsync($"dd.{streamName}", Array.Empty(), headers: hdrsPre, replyTo: inboxPre); using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3))) { await foreach (var _ in subPre.Msgs.ReadAllAsync(cts.Token)) break; } // Publish a batch that includes the same dedup ID — should fail. var hdrsDup = new NatsHeaders { ["Nats-Msg-Id"] = "pre-existing", ["Nats-Batch-Id"] = "uuid", ["Nats-Batch-Sequence"] = "1", ["Nats-Batch-Commit"] = "1", }; var inboxDup = _nats.NewInbox(); var subDup = await _nats.SubscribeCoreAsync(inboxDup); await _nats.PublishAsync($"dd.{streamName}", Array.Empty(), headers: hdrsDup, replyTo: inboxDup); JsonObject? ackDup = null; using (var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(3))) { await foreach (var reply in subDup.Msgs.ReadAllAsync(cts2.Token)) { if (reply.Data is { Length: > 0 }) ackDup = JsonNode.Parse(reply.Data)?.AsObject(); break; } } ackDup.ShouldNotBeNull(); ackDup["error"].ShouldNotBeNull("Duplicate message ID in batch should return error"); } // ----------------------------------------------------------------------- // TestJetStreamAtomicBatchPublishSourceAndMirror // Requires cluster setup and direct stream inspection. Skipped. // ----------------------------------------------------------------------- [Fact(Skip = "Requires a running 3-node JetStream cluster with AllowAtomicPublish + mirror support")] public Task AtomicBatchPublishSourceAndMirror_BatchHeadersRemovedInMirror() => Task.CompletedTask; // ----------------------------------------------------------------------- // TestJetStreamAtomicBatchPublishCleanup (4 sub-tests) // All require direct access to Go server internals. Skipped. // ----------------------------------------------------------------------- [Fact(Skip = "Requires Go server internals (mset.batches, mset.batchApply) — not accessible via NATS protocol")] public Task AtomicBatchPublishCleanup_Disable_ShouldCleanupBatchState() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.batches, JetStreamStepdownStream) — not accessible via NATS protocol")] public Task AtomicBatchPublishCleanup_StepDown_ShouldCleanupBatchState() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.delete, mset.batchApply) — not accessible via NATS protocol")] public Task AtomicBatchPublishCleanup_Delete_ShouldCleanupBatchState() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.batches, batchStagedDiff) — not accessible via NATS protocol")] public Task AtomicBatchPublishCleanup_Commit_ShouldCleanupBatchState() => Task.CompletedTask; // ----------------------------------------------------------------------- // TestJetStreamAtomicBatchPublishConfigOpts // Requires server config file creation. Skipped. // ----------------------------------------------------------------------- [Fact(Skip = "Requires direct server configuration (RunServerWithConfig, opts.JetStreamLimits) — not accessible via NATS protocol")] public Task AtomicBatchPublishConfigOpts_DefaultsAndOverrides_ShouldApply() => Task.CompletedTask; // ----------------------------------------------------------------------- // TestJetStreamAtomicBatchPublishDenyHeaders // Unsupported headers in a batch (e.g. Nats-Expected-Last-Msg-Id) should error. // ----------------------------------------------------------------------- [Fact] public async Task AtomicBatchPublishDenyHeaders_UnsupportedHeader_ShouldError() { if (ServerUnavailable()) return; var streamName = $"BATCHDH_{Guid.NewGuid():N}"; await CreateStreamAsync(streamName, [$"dh.{streamName}"], allowAtomicPublish: true); // Seq 1: publish with Nats-Expected-Last-Msg-Id (unsupported in batches). var hdrs1 = new NatsHeaders { ["Nats-Batch-Id"] = "uuid", ["Nats-Batch-Sequence"] = "1", ["Nats-Expected-Last-Msg-Id"] = "msgId", }; var inbox1 = _nats!.NewInbox(); var sub1 = await _nats.SubscribeCoreAsync(inbox1); await _nats.PublishAsync($"dh.{streamName}", Array.Empty(), headers: hdrs1, replyTo: inbox1); using (var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(3))) { await foreach (var _ in sub1.Msgs.ReadAllAsync(cts1.Token)) break; } // Seq 2: commit with "eob" — server should surface unsupported header error. var hdrs2 = new NatsHeaders { ["Nats-Batch-Id"] = "uuid", ["Nats-Batch-Sequence"] = "2", ["Nats-Batch-Commit"] = "eob", }; var inbox2 = _nats.NewInbox(); var sub2 = await _nats.SubscribeCoreAsync(inbox2); await _nats.PublishAsync($"dh.{streamName}", Array.Empty(), headers: hdrs2, replyTo: inbox2); JsonObject? ack = null; using (var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(3))) { await foreach (var reply in sub2.Msgs.ReadAllAsync(cts2.Token)) { if (reply.Data is { Length: > 0 }) ack = JsonNode.Parse(reply.Data)?.AsObject(); break; } } ack.ShouldNotBeNull("Expected pub ack from EOB commit with unsupported header"); ack["error"].ShouldNotBeNull("Expected error for unsupported batch header (Nats-Expected-Last-Msg-Id)"); } // ----------------------------------------------------------------------- // TestJetStreamAtomicBatchPublishStageAndCommit (26 sub-tests) // All require direct Go server internals (mset.clMu, checkMsgHeadersPreClusteredProposal, // batchStagedDiff). Skipped. // ----------------------------------------------------------------------- [Fact(Skip = "Requires Go server internals (mset.clMu, checkMsgHeadersPreClusteredProposal, batchStagedDiff) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_DedupeDistinct_ShouldSucceed() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.storeMsgId, checkMsgHeadersPreClusteredProposal) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_Dedupe_ShouldDetectDuplicate() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.clMu, batchStagedDiff) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_DedupeStaged_ShouldDetectInBatchDuplicate() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.clusteredCounterTotal, checkMsgHeadersPreClusteredProposal) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_CounterSingle_ShouldAccumulate() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.clusteredCounterTotal) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_CounterMultiple_ShouldAccumulate() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.clusteredCounterTotal pre-init) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_CounterPreInit_ShouldAddToExisting() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (checkMsgHeadersPreClusteredProposal with schedule headers) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_MsgSchedulesDisabled_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (errMsgTTLDisabled path in batch staging) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_MsgSchedulesTtlDisabled_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (NewJSMessageSchedulesTTLInvalidError in staging) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_MsgSchedulesTtlInvalid_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (NewJSMessageSchedulesPatternInvalidError in staging) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_MsgSchedulesInvalidSchedule_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (NewJSMessageSchedulesTargetInvalidError in staging) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_MsgSchedulesTargetMismatch_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (schedule target literal check in staging) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_MsgSchedulesTargetMustBeLiteral_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (schedule target uniqueness check in staging) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_MsgSchedulesTargetMustBeUnique_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (rollup check in schedule staging) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_MsgSchedulesRollupDisabled_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (full schedule staging pipeline) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_MsgSchedules_ShouldCommitSuccessfully() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.inflight, DiscardNew policy in staging) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_DiscardNew_ShouldTrackInflight() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.inflight with ErrMaxMsgs) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_DiscardNewMaxMsgs_ShouldEnforceLimit() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.inflight with ErrMaxBytes) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_DiscardNewMaxBytes_ShouldEnforceLimit() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.inflight with DiscardNewPerSubject) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_DiscardNewMaxMsgsPerSubj_ShouldEnforceLimit() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.inflight duplicate per-subject tracking) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_DiscardNewMaxMsgsPerSubjDuplicate_ShouldEnforceLimit() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.inflight pre-init with DiscardNewPerSubject) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_DiscardNewMaxMsgsPerSubjInflight_ShouldEnforceLimit() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.store pre-existing + DiscardNewPerSubject) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_DiscardNewMaxMsgsPerSubjPreExisting_ShouldEnforceLimit() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (JSExpectedLastSeq in batch staging pre-clustered proposal) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_ExpectLastSeq_ShouldSucceed() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (last seq check not allowed after first message in batch) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_ExpectLastSeqNotFirst_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (last seq mismatch on first batch message) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_ExpectLastSeqInvalidFirst_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (last seq mismatch for subsequent batch messages) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_ExpectLastSeqInvalid_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.expectedPerSubjectSequence, expectedPerSubjectInProcess) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_ExpectPerSubjSimple_ShouldTrackSequences() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (in-batch per-subject sequence tracking) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_ExpectPerSubjRedundantInBatch_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (JSExpectedLastSubjSeqSubj per-batch tracking) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_ExpectPerSubjDupeInChange_ShouldSucceed() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (expectedPerSubjectInProcess once set for subject) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_ExpectPerSubjNotFirst_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.expectedPerSubjectInProcess pre-init) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_ExpectPerSubjInProcess_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (mset.inflight pre-init + per-subject sequence check) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_ExpectPerSubjInflight_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (rollup deny purge check in batch staging) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_RollupDenyPurge_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (rollup value validation in batch staging) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_RollupInvalid_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (rollup all allowed as first item in batch) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_RollupAllFirst_ShouldSucceed() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (rollup all not allowed after first item) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_RollupAllNotFirst_ShouldError() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (rollup sub with unique subjects) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_RollupSubUnique_ShouldSucceed() => Task.CompletedTask; [Fact(Skip = "Requires Go server internals (rollup sub overlap check per batch) — not accessible via NATS protocol")] public Task AtomicBatchPublishStageAndCommit_RollupSubOverlap_ShouldError() => Task.CompletedTask; // ----------------------------------------------------------------------- // TestJetStreamAtomicBatchPublishHighLevelRollback // Requires direct access to Go server internals. Skipped. // ----------------------------------------------------------------------- [Fact(Skip = "Requires Go server internals (mset.ddarr, mset.ddmap, mset.inflight, expectedPerSubjectSequence) — not accessible via NATS protocol")] public Task AtomicBatchPublishHighLevelRollback_OnError_ShouldClearStagingState() => Task.CompletedTask; }