Files
natsnet/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamBatchingCoreTests.cs

308 lines
9.2 KiB
C#

using NSubstitute;
using Shouldly;
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
public sealed class JetStreamBatchingCoreTests
{
[Fact]
public void GetBatchStoreDir_ValidInputs_ReturnsHashedBatchPath()
{
var storeDir = Path.Combine(Path.GetTempPath(), $"jsa-{Guid.NewGuid():N}");
var (batchName, batchPath) = JetStreamBatching.GetBatchStoreDir(storeDir, "ORDERS", "batch-A");
batchName.ShouldBe(NatsServer.GetHash("batch-A"));
batchPath.ShouldBe(Path.Combine(storeDir, "_streams", "ORDERS", "batches", batchName));
}
[Fact]
public void NewBatchStore_FileSingleReplica_CreatesFileStore()
{
var tempRoot = Path.Combine(Path.GetTempPath(), $"batch-store-{Guid.NewGuid():N}");
var stream = new StreamConfig
{
Name = "ORDERS",
Replicas = 1,
Storage = StorageType.FileStorage,
};
var store = JetStreamBatching.NewBatchStore(tempRoot, stream, "batch-A");
try
{
store.ShouldBeOfType<JetStreamFileStore>();
}
finally
{
store.Stop();
}
}
[Fact]
public void NewBatchStore_MemoryOrReplicated_CreatesMemStore()
{
var tempRoot = Path.Combine(Path.GetTempPath(), $"batch-store-{Guid.NewGuid():N}");
var stream = new StreamConfig
{
Name = "ORDERS",
Replicas = 3,
Storage = StorageType.FileStorage,
};
var store = JetStreamBatching.NewBatchStore(tempRoot, stream, "batch-A");
try
{
store.ShouldBeOfType<JetStreamMemStore>();
}
finally
{
store.Stop();
}
}
[Fact]
public void ReadyForCommit_TimerStopped_FlushesAndReturnsTrue()
{
var store = Substitute.For<IStreamStore>();
var timer = Substitute.For<IBatchTimer>();
timer.Stop().Returns(true);
var group = new BatchGroup
{
Store = store,
BatchTimer = timer,
};
group.ReadyForCommit().ShouldBeTrue();
store.Received(1).FlushAllPending();
}
[Fact]
public void ReadyForCommit_TimerAlreadyExpired_DoesNotFlushAndReturnsFalse()
{
var store = Substitute.For<IStreamStore>();
var timer = Substitute.For<IBatchTimer>();
timer.Stop().Returns(false);
var group = new BatchGroup
{
Store = store,
BatchTimer = timer,
};
group.ReadyForCommit().ShouldBeFalse();
store.DidNotReceive().FlushAllPending();
}
[Fact]
public void CleanupLocked_BatchPresent_StopsTimerDeletesStoreRemovesGroupAndDecrementsGlobalInflight()
{
JetStreamBatching.ResetGlobalInflightBatchesForTest();
JetStreamBatching.IncrementGlobalInflightBatchesForTest();
var store = Substitute.For<IStreamStore>();
var timer = Substitute.For<IBatchTimer>();
timer.Stop().Returns(true);
var group = new BatchGroup
{
Store = store,
BatchTimer = timer,
};
var batching = new Batching();
batching.Group["batch-A"] = group;
group.CleanupLocked("batch-A", batching);
timer.Received(1).Stop();
store.Received(1).Delete(true);
batching.Group.ContainsKey("batch-A").ShouldBeFalse();
JetStreamBatching.GlobalInflightBatches.ShouldBe(0);
}
[Fact]
public void StopLocked_BatchPresent_StopsTimerStopsStoreAndDecrementsGlobalInflight()
{
JetStreamBatching.ResetGlobalInflightBatchesForTest();
JetStreamBatching.IncrementGlobalInflightBatchesForTest();
var store = Substitute.For<IStreamStore>();
var timer = Substitute.For<IBatchTimer>();
var group = new BatchGroup
{
Store = store,
BatchTimer = timer,
};
group.StopLocked();
timer.Received(1).Stop();
store.Received(1).Stop();
JetStreamBatching.GlobalInflightBatches.ShouldBe(0);
}
[Fact]
public void Commit_DiffContainsState_UpdatesRuntimeState()
{
var diff = new BatchStagedDiff
{
MsgIds = new Dictionary<string, object?> { ["id-1"] = null },
Counter = new Dictionary<string, MsgCounterRunningTotal>
{
["foo"] = new() { Ops = 2, Total = new System.Numerics.BigInteger(5) },
},
Inflight = new Dictionary<string, InflightSubjectRunningTotal>
{
["foo"] = new() { Bytes = 10, Ops = 1 },
},
ExpectedPerSubject = new Dictionary<string, BatchExpectedPerSubject>
{
["foo"] = new() { SSeq = 3, ClSeq = 9 },
},
};
var state = new BatchRuntimeState();
JetStreamBatching.Commit(diff, state);
state.MsgIds.ContainsKey("id-1").ShouldBeTrue();
state.ClusteredCounterTotal["foo"].Total.ShouldBe(new System.Numerics.BigInteger(5));
state.Inflight["foo"].Ops.ShouldBe((ulong)1);
state.ExpectedPerSubjectSequence[9].ShouldBe("foo");
state.ExpectedPerSubjectInProcess.Contains("foo").ShouldBeTrue();
}
[Fact]
public void BatchApplyWrappers_ClearAndRejectPaths_UpdateState()
{
var entry = new TestCommittedEntry();
var apply = new BatchApply
{
Id = "batch-A",
Count = 3,
EntryStart = 4,
MaxApplied = 8,
Entries = [entry],
};
var context = new BatchApplyContext { Clfs = 10 };
JetStreamBatching.RejectBatchStateLocked(apply, context);
context.Clfs.ShouldBe((ulong)13);
entry.Returned.ShouldBeTrue();
apply.Id.ShouldBe(string.Empty);
apply.Count.ShouldBe((ulong)0);
apply.Id = "batch-B";
apply.Count = 2;
JetStreamBatching.ClearBatchStateLocked(apply);
apply.Id.ShouldBe(string.Empty);
apply.Count.ShouldBe((ulong)0);
}
[Fact]
public void CheckMsgHeadersPreClusteredProposal_ExpectedStreamMismatch_ReturnsNotMatchError()
{
var store = new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage });
try
{
var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsExpectedStream, "OTHER");
var diff = new BatchStagedDiff();
var context = new BatchHeaderCheckContext
{
Store = store,
ClSeq = 1,
Clfs = 0,
MaxPayload = 1024,
StreamSubjects = ["ORDERS.>"],
};
var (_, _, _, apiError, error) = JetStreamBatching.CheckMsgHeadersPreClusteredProposal(
diff,
context,
"ORDERS.created",
hdr,
[],
sourced: false,
name: "ORDERS",
allowRollup: true,
denyPurge: false,
allowTtl: true,
allowMsgCounter: false,
allowMsgSchedules: false,
discard: DiscardPolicy.DiscardOld,
discardNewPer: false,
maxMsgSize: -1,
maxMsgs: -1,
maxMsgsPer: -1,
maxBytes: -1);
apiError.ShouldNotBeNull();
apiError!.ErrCode.ShouldBe(JsApiErrors.StreamNotMatch.ErrCode);
error.ShouldNotBeNull();
}
finally
{
store.Stop();
}
}
[Fact]
public void CheckMsgHeadersPreClusteredProposal_DuplicateMsgIdInBatch_ReturnsDuplicateError()
{
var store = new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage });
try
{
var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsMsgId, "msg-1");
var diff = new BatchStagedDiff
{
MsgIds = new Dictionary<string, object?> { ["msg-1"] = null },
};
var context = new BatchHeaderCheckContext
{
Store = store,
ClSeq = 1,
MaxPayload = 1024,
};
var (_, _, _, apiError, error) = JetStreamBatching.CheckMsgHeadersPreClusteredProposal(
diff,
context,
"ORDERS.created",
hdr,
[],
sourced: false,
name: "ORDERS",
allowRollup: true,
denyPurge: false,
allowTtl: true,
allowMsgCounter: false,
allowMsgSchedules: false,
discard: DiscardPolicy.DiscardOld,
discardNewPer: false,
maxMsgSize: -1,
maxMsgs: -1,
maxMsgsPer: -1,
maxBytes: -1);
apiError.ShouldNotBeNull();
apiError!.ErrCode.ShouldBe(JsApiErrors.AtomicPublishContainsDuplicateMessage.ErrCode);
error.ShouldNotBeNull();
}
finally
{
store.Stop();
}
}
}
internal sealed class TestCommittedEntry : ICommittedEntry
{
public bool Returned { get; private set; }
public void ReturnToPool() => Returned = true;
}