using NSubstitute; using Shouldly; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; public sealed class JetStreamBatchingCoreTests { [Fact] public void GetBatchStoreDir_ValidInputs_ReturnsHashedBatchPath() { var storeDir = Path.Combine(Path.GetTempPath(), $"jsa-{Guid.NewGuid():N}"); var (batchName, batchPath) = JetStreamBatching.GetBatchStoreDir(storeDir, "ORDERS", "batch-A"); batchName.ShouldBe(NatsServer.GetHash("batch-A")); batchPath.ShouldBe(Path.Combine(storeDir, "_streams", "ORDERS", "batches", batchName)); } [Fact] public void NewBatchStore_FileSingleReplica_CreatesFileStore() { var tempRoot = Path.Combine(Path.GetTempPath(), $"batch-store-{Guid.NewGuid():N}"); var stream = new StreamConfig { Name = "ORDERS", Replicas = 1, Storage = StorageType.FileStorage, }; var store = JetStreamBatching.NewBatchStore(tempRoot, stream, "batch-A"); try { store.ShouldBeOfType(); } finally { store.Stop(); } } [Fact] public void NewBatchStore_MemoryOrReplicated_CreatesMemStore() { var tempRoot = Path.Combine(Path.GetTempPath(), $"batch-store-{Guid.NewGuid():N}"); var stream = new StreamConfig { Name = "ORDERS", Replicas = 3, Storage = StorageType.FileStorage, }; var store = JetStreamBatching.NewBatchStore(tempRoot, stream, "batch-A"); try { store.ShouldBeOfType(); } finally { store.Stop(); } } [Fact] public void ReadyForCommit_TimerStopped_FlushesAndReturnsTrue() { var store = Substitute.For(); var timer = Substitute.For(); timer.Stop().Returns(true); var group = new BatchGroup { Store = store, BatchTimer = timer, }; group.ReadyForCommit().ShouldBeTrue(); store.Received(1).FlushAllPending(); } [Fact] public void ReadyForCommit_TimerAlreadyExpired_DoesNotFlushAndReturnsFalse() { var store = Substitute.For(); var timer = Substitute.For(); timer.Stop().Returns(false); var group = new BatchGroup { Store = store, BatchTimer = timer, }; group.ReadyForCommit().ShouldBeFalse(); store.DidNotReceive().FlushAllPending(); } [Fact] public void CleanupLocked_BatchPresent_StopsTimerDeletesStoreRemovesGroupAndDecrementsGlobalInflight() { JetStreamBatching.ResetGlobalInflightBatchesForTest(); JetStreamBatching.IncrementGlobalInflightBatchesForTest(); var store = Substitute.For(); var timer = Substitute.For(); timer.Stop().Returns(true); var group = new BatchGroup { Store = store, BatchTimer = timer, }; var batching = new Batching(); batching.Group["batch-A"] = group; group.CleanupLocked("batch-A", batching); timer.Received(1).Stop(); store.Received(1).Delete(true); batching.Group.ContainsKey("batch-A").ShouldBeFalse(); JetStreamBatching.GlobalInflightBatches.ShouldBe(0); } [Fact] public void StopLocked_BatchPresent_StopsTimerStopsStoreAndDecrementsGlobalInflight() { JetStreamBatching.ResetGlobalInflightBatchesForTest(); JetStreamBatching.IncrementGlobalInflightBatchesForTest(); var store = Substitute.For(); var timer = Substitute.For(); var group = new BatchGroup { Store = store, BatchTimer = timer, }; group.StopLocked(); timer.Received(1).Stop(); store.Received(1).Stop(); JetStreamBatching.GlobalInflightBatches.ShouldBe(0); } [Fact] public void Commit_DiffContainsState_UpdatesRuntimeState() { var diff = new BatchStagedDiff { MsgIds = new Dictionary { ["id-1"] = null }, Counter = new Dictionary { ["foo"] = new() { Ops = 2, Total = new System.Numerics.BigInteger(5) }, }, Inflight = new Dictionary { ["foo"] = new() { Bytes = 10, Ops = 1 }, }, ExpectedPerSubject = new Dictionary { ["foo"] = new() { SSeq = 3, ClSeq = 9 }, }, }; var state = new BatchRuntimeState(); JetStreamBatching.Commit(diff, state); state.MsgIds.ContainsKey("id-1").ShouldBeTrue(); state.ClusteredCounterTotal["foo"].Total.ShouldBe(new System.Numerics.BigInteger(5)); state.Inflight["foo"].Ops.ShouldBe((ulong)1); state.ExpectedPerSubjectSequence[9].ShouldBe("foo"); state.ExpectedPerSubjectInProcess.Contains("foo").ShouldBeTrue(); } [Fact] public void BatchApplyWrappers_ClearAndRejectPaths_UpdateState() { var entry = new TestCommittedEntry(); var apply = new BatchApply { Id = "batch-A", Count = 3, EntryStart = 4, MaxApplied = 8, Entries = [entry], }; var context = new BatchApplyContext { Clfs = 10 }; JetStreamBatching.RejectBatchStateLocked(apply, context); context.Clfs.ShouldBe((ulong)13); entry.Returned.ShouldBeTrue(); apply.Id.ShouldBe(string.Empty); apply.Count.ShouldBe((ulong)0); apply.Id = "batch-B"; apply.Count = 2; JetStreamBatching.ClearBatchStateLocked(apply); apply.Id.ShouldBe(string.Empty); apply.Count.ShouldBe((ulong)0); } [Fact] public void CheckMsgHeadersPreClusteredProposal_ExpectedStreamMismatch_ReturnsNotMatchError() { var store = new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }); try { var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsExpectedStream, "OTHER"); var diff = new BatchStagedDiff(); var context = new BatchHeaderCheckContext { Store = store, ClSeq = 1, Clfs = 0, MaxPayload = 1024, StreamSubjects = ["ORDERS.>"], }; var (_, _, _, apiError, error) = JetStreamBatching.CheckMsgHeadersPreClusteredProposal( diff, context, "ORDERS.created", hdr, [], sourced: false, name: "ORDERS", allowRollup: true, denyPurge: false, allowTtl: true, allowMsgCounter: false, allowMsgSchedules: false, discard: DiscardPolicy.DiscardOld, discardNewPer: false, maxMsgSize: -1, maxMsgs: -1, maxMsgsPer: -1, maxBytes: -1); apiError.ShouldNotBeNull(); apiError!.ErrCode.ShouldBe(JsApiErrors.StreamNotMatch.ErrCode); error.ShouldNotBeNull(); } finally { store.Stop(); } } [Fact] public void CheckMsgHeadersPreClusteredProposal_DuplicateMsgIdInBatch_ReturnsDuplicateError() { var store = new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }); try { var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsMsgId, "msg-1"); var diff = new BatchStagedDiff { MsgIds = new Dictionary { ["msg-1"] = null }, }; var context = new BatchHeaderCheckContext { Store = store, ClSeq = 1, MaxPayload = 1024, }; var (_, _, _, apiError, error) = JetStreamBatching.CheckMsgHeadersPreClusteredProposal( diff, context, "ORDERS.created", hdr, [], sourced: false, name: "ORDERS", allowRollup: true, denyPurge: false, allowTtl: true, allowMsgCounter: false, allowMsgSchedules: false, discard: DiscardPolicy.DiscardOld, discardNewPer: false, maxMsgSize: -1, maxMsgs: -1, maxMsgsPer: -1, maxBytes: -1); apiError.ShouldNotBeNull(); apiError!.ErrCode.ShouldBe(JsApiErrors.AtomicPublishContainsDuplicateMessage.ErrCode); error.ShouldNotBeNull(); } finally { store.Stop(); } } } internal sealed class TestCommittedEntry : ICommittedEntry { public bool Returned { get; private set; } public void ReturnToPool() => Returned = true; }