308 lines
9.2 KiB
C#
308 lines
9.2 KiB
C#
using NSubstitute;
|
|
using Shouldly;
|
|
|
|
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
|
|
|
|
public sealed class JetStreamBatchingCoreTests
|
|
{
|
|
[Fact]
|
|
public void GetBatchStoreDir_ValidInputs_ReturnsHashedBatchPath()
|
|
{
|
|
var storeDir = Path.Combine(Path.GetTempPath(), $"jsa-{Guid.NewGuid():N}");
|
|
var (batchName, batchPath) = JetStreamBatching.GetBatchStoreDir(storeDir, "ORDERS", "batch-A");
|
|
|
|
batchName.ShouldBe(NatsServer.GetHash("batch-A"));
|
|
batchPath.ShouldBe(Path.Combine(storeDir, "_streams", "ORDERS", "batches", batchName));
|
|
}
|
|
|
|
[Fact]
|
|
public void NewBatchStore_FileSingleReplica_CreatesFileStore()
|
|
{
|
|
var tempRoot = Path.Combine(Path.GetTempPath(), $"batch-store-{Guid.NewGuid():N}");
|
|
var stream = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Replicas = 1,
|
|
Storage = StorageType.FileStorage,
|
|
};
|
|
|
|
var store = JetStreamBatching.NewBatchStore(tempRoot, stream, "batch-A");
|
|
|
|
try
|
|
{
|
|
store.ShouldBeOfType<JetStreamFileStore>();
|
|
}
|
|
finally
|
|
{
|
|
store.Stop();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public void NewBatchStore_MemoryOrReplicated_CreatesMemStore()
|
|
{
|
|
var tempRoot = Path.Combine(Path.GetTempPath(), $"batch-store-{Guid.NewGuid():N}");
|
|
var stream = new StreamConfig
|
|
{
|
|
Name = "ORDERS",
|
|
Replicas = 3,
|
|
Storage = StorageType.FileStorage,
|
|
};
|
|
|
|
var store = JetStreamBatching.NewBatchStore(tempRoot, stream, "batch-A");
|
|
|
|
try
|
|
{
|
|
store.ShouldBeOfType<JetStreamMemStore>();
|
|
}
|
|
finally
|
|
{
|
|
store.Stop();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public void ReadyForCommit_TimerStopped_FlushesAndReturnsTrue()
|
|
{
|
|
var store = Substitute.For<IStreamStore>();
|
|
var timer = Substitute.For<IBatchTimer>();
|
|
timer.Stop().Returns(true);
|
|
|
|
var group = new BatchGroup
|
|
{
|
|
Store = store,
|
|
BatchTimer = timer,
|
|
};
|
|
|
|
group.ReadyForCommit().ShouldBeTrue();
|
|
store.Received(1).FlushAllPending();
|
|
}
|
|
|
|
[Fact]
|
|
public void ReadyForCommit_TimerAlreadyExpired_DoesNotFlushAndReturnsFalse()
|
|
{
|
|
var store = Substitute.For<IStreamStore>();
|
|
var timer = Substitute.For<IBatchTimer>();
|
|
timer.Stop().Returns(false);
|
|
|
|
var group = new BatchGroup
|
|
{
|
|
Store = store,
|
|
BatchTimer = timer,
|
|
};
|
|
|
|
group.ReadyForCommit().ShouldBeFalse();
|
|
store.DidNotReceive().FlushAllPending();
|
|
}
|
|
|
|
[Fact]
|
|
public void CleanupLocked_BatchPresent_StopsTimerDeletesStoreRemovesGroupAndDecrementsGlobalInflight()
|
|
{
|
|
JetStreamBatching.ResetGlobalInflightBatchesForTest();
|
|
JetStreamBatching.IncrementGlobalInflightBatchesForTest();
|
|
|
|
var store = Substitute.For<IStreamStore>();
|
|
var timer = Substitute.For<IBatchTimer>();
|
|
timer.Stop().Returns(true);
|
|
|
|
var group = new BatchGroup
|
|
{
|
|
Store = store,
|
|
BatchTimer = timer,
|
|
};
|
|
|
|
var batching = new Batching();
|
|
batching.Group["batch-A"] = group;
|
|
|
|
group.CleanupLocked("batch-A", batching);
|
|
|
|
timer.Received(1).Stop();
|
|
store.Received(1).Delete(true);
|
|
batching.Group.ContainsKey("batch-A").ShouldBeFalse();
|
|
JetStreamBatching.GlobalInflightBatches.ShouldBe(0);
|
|
}
|
|
|
|
[Fact]
|
|
public void StopLocked_BatchPresent_StopsTimerStopsStoreAndDecrementsGlobalInflight()
|
|
{
|
|
JetStreamBatching.ResetGlobalInflightBatchesForTest();
|
|
JetStreamBatching.IncrementGlobalInflightBatchesForTest();
|
|
|
|
var store = Substitute.For<IStreamStore>();
|
|
var timer = Substitute.For<IBatchTimer>();
|
|
|
|
var group = new BatchGroup
|
|
{
|
|
Store = store,
|
|
BatchTimer = timer,
|
|
};
|
|
|
|
group.StopLocked();
|
|
|
|
timer.Received(1).Stop();
|
|
store.Received(1).Stop();
|
|
JetStreamBatching.GlobalInflightBatches.ShouldBe(0);
|
|
}
|
|
|
|
[Fact]
|
|
public void Commit_DiffContainsState_UpdatesRuntimeState()
|
|
{
|
|
var diff = new BatchStagedDiff
|
|
{
|
|
MsgIds = new Dictionary<string, object?> { ["id-1"] = null },
|
|
Counter = new Dictionary<string, MsgCounterRunningTotal>
|
|
{
|
|
["foo"] = new() { Ops = 2, Total = new System.Numerics.BigInteger(5) },
|
|
},
|
|
Inflight = new Dictionary<string, InflightSubjectRunningTotal>
|
|
{
|
|
["foo"] = new() { Bytes = 10, Ops = 1 },
|
|
},
|
|
ExpectedPerSubject = new Dictionary<string, BatchExpectedPerSubject>
|
|
{
|
|
["foo"] = new() { SSeq = 3, ClSeq = 9 },
|
|
},
|
|
};
|
|
|
|
var state = new BatchRuntimeState();
|
|
|
|
JetStreamBatching.Commit(diff, state);
|
|
|
|
state.MsgIds.ContainsKey("id-1").ShouldBeTrue();
|
|
state.ClusteredCounterTotal["foo"].Total.ShouldBe(new System.Numerics.BigInteger(5));
|
|
state.Inflight["foo"].Ops.ShouldBe((ulong)1);
|
|
state.ExpectedPerSubjectSequence[9].ShouldBe("foo");
|
|
state.ExpectedPerSubjectInProcess.Contains("foo").ShouldBeTrue();
|
|
}
|
|
|
|
[Fact]
|
|
public void BatchApplyWrappers_ClearAndRejectPaths_UpdateState()
|
|
{
|
|
var entry = new TestCommittedEntry();
|
|
var apply = new BatchApply
|
|
{
|
|
Id = "batch-A",
|
|
Count = 3,
|
|
EntryStart = 4,
|
|
MaxApplied = 8,
|
|
Entries = [entry],
|
|
};
|
|
|
|
var context = new BatchApplyContext { Clfs = 10 };
|
|
JetStreamBatching.RejectBatchStateLocked(apply, context);
|
|
|
|
context.Clfs.ShouldBe((ulong)13);
|
|
entry.Returned.ShouldBeTrue();
|
|
apply.Id.ShouldBe(string.Empty);
|
|
apply.Count.ShouldBe((ulong)0);
|
|
|
|
apply.Id = "batch-B";
|
|
apply.Count = 2;
|
|
JetStreamBatching.ClearBatchStateLocked(apply);
|
|
apply.Id.ShouldBe(string.Empty);
|
|
apply.Count.ShouldBe((ulong)0);
|
|
}
|
|
|
|
[Fact]
|
|
public void CheckMsgHeadersPreClusteredProposal_ExpectedStreamMismatch_ReturnsNotMatchError()
|
|
{
|
|
var store = new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage });
|
|
try
|
|
{
|
|
var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsExpectedStream, "OTHER");
|
|
var diff = new BatchStagedDiff();
|
|
var context = new BatchHeaderCheckContext
|
|
{
|
|
Store = store,
|
|
ClSeq = 1,
|
|
Clfs = 0,
|
|
MaxPayload = 1024,
|
|
StreamSubjects = ["ORDERS.>"],
|
|
};
|
|
|
|
var (_, _, _, apiError, error) = JetStreamBatching.CheckMsgHeadersPreClusteredProposal(
|
|
diff,
|
|
context,
|
|
"ORDERS.created",
|
|
hdr,
|
|
[],
|
|
sourced: false,
|
|
name: "ORDERS",
|
|
allowRollup: true,
|
|
denyPurge: false,
|
|
allowTtl: true,
|
|
allowMsgCounter: false,
|
|
allowMsgSchedules: false,
|
|
discard: DiscardPolicy.DiscardOld,
|
|
discardNewPer: false,
|
|
maxMsgSize: -1,
|
|
maxMsgs: -1,
|
|
maxMsgsPer: -1,
|
|
maxBytes: -1);
|
|
|
|
apiError.ShouldNotBeNull();
|
|
apiError!.ErrCode.ShouldBe(JsApiErrors.StreamNotMatch.ErrCode);
|
|
error.ShouldNotBeNull();
|
|
}
|
|
finally
|
|
{
|
|
store.Stop();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public void CheckMsgHeadersPreClusteredProposal_DuplicateMsgIdInBatch_ReturnsDuplicateError()
|
|
{
|
|
var store = new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage });
|
|
try
|
|
{
|
|
var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsMsgId, "msg-1");
|
|
var diff = new BatchStagedDiff
|
|
{
|
|
MsgIds = new Dictionary<string, object?> { ["msg-1"] = null },
|
|
};
|
|
var context = new BatchHeaderCheckContext
|
|
{
|
|
Store = store,
|
|
ClSeq = 1,
|
|
MaxPayload = 1024,
|
|
};
|
|
|
|
var (_, _, _, apiError, error) = JetStreamBatching.CheckMsgHeadersPreClusteredProposal(
|
|
diff,
|
|
context,
|
|
"ORDERS.created",
|
|
hdr,
|
|
[],
|
|
sourced: false,
|
|
name: "ORDERS",
|
|
allowRollup: true,
|
|
denyPurge: false,
|
|
allowTtl: true,
|
|
allowMsgCounter: false,
|
|
allowMsgSchedules: false,
|
|
discard: DiscardPolicy.DiscardOld,
|
|
discardNewPer: false,
|
|
maxMsgSize: -1,
|
|
maxMsgs: -1,
|
|
maxMsgsPer: -1,
|
|
maxBytes: -1);
|
|
|
|
apiError.ShouldNotBeNull();
|
|
apiError!.ErrCode.ShouldBe(JsApiErrors.AtomicPublishContainsDuplicateMessage.ErrCode);
|
|
error.ShouldNotBeNull();
|
|
}
|
|
finally
|
|
{
|
|
store.Stop();
|
|
}
|
|
}
|
|
}
|
|
|
|
internal sealed class TestCommittedEntry : ICommittedEntry
|
|
{
|
|
public bool Returned { get; private set; }
|
|
|
|
public void ReturnToPool() => Returned = true;
|
|
}
|