From 02d3b610a1a59ae2af3585320b5d536291f916e8 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 01:45:35 -0500 Subject: [PATCH] feat(batch29): implement jetstream batching group-a lifecycle/store methods --- .../JetStream/JetStreamBatching.cs | 232 ++++++++++++++++-- .../JetStream/JetStreamBatchingCoreTests.cs | 146 +++++++++++ porting.db | Bin 6787072 -> 6791168 bytes 3 files changed, 356 insertions(+), 22 deletions(-) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamBatchingCoreTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs index 53ca9e7..465338d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs @@ -15,9 +15,140 @@ namespace ZB.MOM.NatsNet.Server; -// --------------------------------------------------------------------------- -// Batching types -// --------------------------------------------------------------------------- +internal static class JetStreamBatching +{ + internal static readonly TimeSpan StreamDefaultMaxBatchTimeout = TimeSpan.FromSeconds(10); + internal const string BatchTimeout = "timeout"; + + private static int _globalInflightBatches; + + internal static int GlobalInflightBatches => Volatile.Read(ref _globalInflightBatches); + + internal static (string BatchName, string StoreDir) GetBatchStoreDir(string storeRootDir, string streamName, string batchId) + { + if (string.IsNullOrWhiteSpace(storeRootDir)) + throw new ArgumentException("store root directory is required", nameof(storeRootDir)); + if (string.IsNullOrWhiteSpace(streamName)) + throw new ArgumentException("stream name is required", nameof(streamName)); + if (string.IsNullOrWhiteSpace(batchId)) + throw new ArgumentException("batch ID is required", nameof(batchId)); + + var batchName = NatsServer.GetHash(batchId); + var batchPath = Path.Combine(storeRootDir, "_streams", streamName, "batches", batchName); + return (batchName, batchPath); + } + + internal static IStreamStore NewBatchStore(string storeRootDir, StreamConfig streamConfig, string batchId) + { + ArgumentNullException.ThrowIfNull(streamConfig); + + if (streamConfig.Replicas == 1 && streamConfig.Storage == StorageType.FileStorage) + { + var (batchName, storeDir) = GetBatchStoreDir(storeRootDir, streamConfig.Name, batchId); + Directory.CreateDirectory(storeDir); + + var cfg = new FileStoreConfig + { + AsyncFlush = true, + BlockSize = FileStoreDefaults.DefaultLargeBlockSize, + StoreDir = storeDir, + }; + + var fsInfo = new FileStreamInfo + { + Created = DateTime.UtcNow, + Config = new StreamConfig + { + Name = batchName, + Storage = StorageType.FileStorage, + }, + }; + + return new JetStreamFileStore(cfg, fsInfo); + } + + return new JetStreamMemStore(new StreamConfig + { + Name = string.Empty, + Storage = StorageType.MemoryStorage, + }); + } + + internal static void IncrementGlobalInflightBatches() => Interlocked.Increment(ref _globalInflightBatches); + + internal static void DecrementGlobalInflightBatches() => Interlocked.Decrement(ref _globalInflightBatches); + + internal static void ResetGlobalInflightBatchesForTest() => Interlocked.Exchange(ref _globalInflightBatches, 0); + + internal static void IncrementGlobalInflightBatchesForTest() => IncrementGlobalInflightBatches(); + + internal static BatchGroup NewBatchGroup( + Batching batches, + string storeRootDir, + StreamConfig streamConfig, + string batchId, + Action? abandonedAdvisory = null, + TimeSpan? maxBatchTimeout = null) + { + ArgumentNullException.ThrowIfNull(batches); + return batches.NewBatchGroup(storeRootDir, streamConfig, batchId, abandonedAdvisory, maxBatchTimeout); + } + + internal static bool ReadyForCommit(BatchGroup batchGroup) + { + ArgumentNullException.ThrowIfNull(batchGroup); + return batchGroup.ReadyForCommit(); + } + + internal static void Cleanup(BatchGroup batchGroup, string batchId, Batching batches) + { + ArgumentNullException.ThrowIfNull(batchGroup); + batchGroup.Cleanup(batchId, batches); + } + + internal static void CleanupLocked(BatchGroup batchGroup, string batchId, Batching batches) + { + ArgumentNullException.ThrowIfNull(batchGroup); + batchGroup.CleanupLocked(batchId, batches); + } + + internal static void StopLocked(BatchGroup batchGroup) + { + ArgumentNullException.ThrowIfNull(batchGroup); + batchGroup.StopLocked(); + } +} + +internal interface IBatchTimer +{ + bool Stop(); +} + +internal sealed class SystemBatchTimer : IBatchTimer, IDisposable +{ + private readonly Timer _timer; + private int _stopped; + + public SystemBatchTimer(TimeSpan timeout, Action callback) + { + ArgumentNullException.ThrowIfNull(callback); + + _timer = new Timer(_ => + { + if (Interlocked.Exchange(ref _stopped, 1) == 0) + callback(); + }, null, timeout, Timeout.InfiniteTimeSpan); + } + + public bool Stop() + { + var firstStop = Interlocked.Exchange(ref _stopped, 1) == 0; + _timer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); + return firstStop; + } + + public void Dispose() => _timer.Dispose(); +} /// /// Tracks in-progress atomic publish batch groups for a stream. @@ -28,8 +159,25 @@ internal sealed class Batching private readonly Lock _mu = new(); private readonly Dictionary _group = new(StringComparer.Ordinal); - public Lock Mu => _mu; + public Lock Mu => _mu; public Dictionary Group => _group; + + internal BatchGroup NewBatchGroup(string storeRootDir, StreamConfig streamConfig, string batchId, Action? abandonedAdvisory = null, TimeSpan? maxBatchTimeout = null) + { + var store = JetStreamBatching.NewBatchStore(storeRootDir, streamConfig, batchId); + var group = new BatchGroup { Store = store }; + + var timeout = maxBatchTimeout.GetValueOrDefault(JetStreamBatching.StreamDefaultMaxBatchTimeout); + group.BatchTimer = new SystemBatchTimer(timeout, () => + { + group.Cleanup(batchId, this); + abandonedAdvisory?.Invoke(batchId, JetStreamBatching.BatchTimeout); + }); + + _group[batchId] = group; + JetStreamBatching.IncrementGlobalInflightBatches(); + return group; + } } /// @@ -39,13 +187,13 @@ internal sealed class Batching internal sealed class BatchGroup { /// Last proposed stream sequence for this batch. - public ulong LastSeq { get; set; } + public ulong LastSeq { get; set; } /// Temporary backing store for the batch's messages. - public object? Store { get; set; } // IStreamStore — session 20 + public IStreamStore? Store { get; set; } /// Timer that abandons the batch after the configured timeout. - public Timer? BatchTimer { get; set; } + public IBatchTimer? BatchTimer { get; set; } /// /// Stops the cleanup timer and flushes pending writes so the batch is @@ -54,8 +202,49 @@ internal sealed class BatchGroup /// public bool ReadyForCommit() { - // Stub — full implementation requires IStreamStore.FlushAllPending (session 20). - return BatchTimer?.Change(Timeout.Infinite, Timeout.Infinite) != null; + if (BatchTimer?.Stop() != true) + return false; + + Store?.FlushAllPending(); + return true; + } + + /// + /// Deletes underlying resources associated with the batch and unregisters it from the stream's batches. + /// Mirrors batchGroup.cleanup. + /// + public void Cleanup(string batchId, Batching batches) + { + ArgumentNullException.ThrowIfNull(batches); + lock (batches.Mu) + { + CleanupLocked(batchId, batches); + } + } + + /// + /// Deletes underlying resources associated with the batch and unregisters it from the stream's batches. + /// Mirrors batchGroup.cleanupLocked. + /// + public void CleanupLocked(string batchId, Batching batches) + { + ArgumentNullException.ThrowIfNull(batches); + + JetStreamBatching.DecrementGlobalInflightBatches(); + _ = BatchTimer?.Stop(); + Store?.Delete(true); + _ = batches.Group.Remove(batchId); + } + + /// + /// Stops underlying resources associated with the batch. + /// Mirrors batchGroup.stopLocked. + /// + public void StopLocked() + { + JetStreamBatching.DecrementGlobalInflightBatches(); + _ = BatchTimer?.Stop(); + Store?.Stop(); } } @@ -66,13 +255,13 @@ internal sealed class BatchGroup internal sealed class BatchStagedDiff { /// Message IDs seen in this batch, for duplicate detection. - public Dictionary? MsgIds { get; set; } + public Dictionary? MsgIds { get; set; } /// Running counter totals, keyed by subject. - public Dictionary? Counter { get; set; } // map[string]*msgCounterRunningTotal + public Dictionary? Counter { get; set; } /// Inflight subject byte/op totals for DiscardNew checks. - public Dictionary? Inflight { get; set; } // map[string]*inflightSubjectRunningTotal + public Dictionary? Inflight { get; set; } /// Expected-last-seq-per-subject checks staged in this batch. public Dictionary? ExpectedPerSubject { get; set; } @@ -85,7 +274,7 @@ internal sealed class BatchStagedDiff internal sealed class BatchExpectedPerSubject { /// Stream sequence of the last message on this subject at proposal time. - public ulong SSeq { get; set; } + public ulong SSeq { get; set; } /// Clustered proposal sequence at which this check was computed. public ulong ClSeq { get; set; } @@ -100,32 +289,31 @@ internal sealed class BatchApply private readonly Lock _mu = new(); /// ID of the current batch. - public string Id { get; set; } = string.Empty; + public string Id { get; set; } = string.Empty; /// Number of entries expected in the batch (for consistency checks). - public ulong Count { get; set; } + public ulong Count { get; set; } /// Raft committed entries that make up this batch. - public List? Entries { get; set; } // []*CommittedEntry — session 20+ + public List? Entries { get; set; } /// Index within an entry indicating the first message of the batch. - public int EntryStart { get; set; } + public int EntryStart { get; set; } /// Applied value before the entry containing the first batch message. - public ulong MaxApplied { get; set; } + public ulong MaxApplied { get; set; } public Lock Mu => _mu; /// /// Clears in-memory apply-batch state. /// Mirrors batchApply.clearBatchStateLocked. - /// Lock should be held. /// public void ClearBatchStateLocked() { - Id = string.Empty; - Count = 0; - Entries = null; + Id = string.Empty; + Count = 0; + Entries = null; EntryStart = 0; MaxApplied = 0; } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamBatchingCoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamBatchingCoreTests.cs new file mode 100644 index 0000000..53055a2 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamBatchingCoreTests.cs @@ -0,0 +1,146 @@ +using NSubstitute; +using Shouldly; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class JetStreamBatchingCoreTests +{ + [Fact] + public void GetBatchStoreDir_ValidInputs_ReturnsHashedBatchPath() + { + var storeDir = Path.Combine(Path.GetTempPath(), $"jsa-{Guid.NewGuid():N}"); + var (batchName, batchPath) = JetStreamBatching.GetBatchStoreDir(storeDir, "ORDERS", "batch-A"); + + batchName.ShouldBe(NatsServer.GetHash("batch-A")); + batchPath.ShouldBe(Path.Combine(storeDir, "_streams", "ORDERS", "batches", batchName)); + } + + [Fact] + public void NewBatchStore_FileSingleReplica_CreatesFileStore() + { + var tempRoot = Path.Combine(Path.GetTempPath(), $"batch-store-{Guid.NewGuid():N}"); + var stream = new StreamConfig + { + Name = "ORDERS", + Replicas = 1, + Storage = StorageType.FileStorage, + }; + + var store = JetStreamBatching.NewBatchStore(tempRoot, stream, "batch-A"); + + try + { + store.ShouldBeOfType(); + } + finally + { + store.Stop(); + } + } + + [Fact] + public void NewBatchStore_MemoryOrReplicated_CreatesMemStore() + { + var tempRoot = Path.Combine(Path.GetTempPath(), $"batch-store-{Guid.NewGuid():N}"); + var stream = new StreamConfig + { + Name = "ORDERS", + Replicas = 3, + Storage = StorageType.FileStorage, + }; + + var store = JetStreamBatching.NewBatchStore(tempRoot, stream, "batch-A"); + + try + { + store.ShouldBeOfType(); + } + finally + { + store.Stop(); + } + } + + [Fact] + public void ReadyForCommit_TimerStopped_FlushesAndReturnsTrue() + { + var store = Substitute.For(); + var timer = Substitute.For(); + timer.Stop().Returns(true); + + var group = new BatchGroup + { + Store = store, + BatchTimer = timer, + }; + + group.ReadyForCommit().ShouldBeTrue(); + store.Received(1).FlushAllPending(); + } + + [Fact] + public void ReadyForCommit_TimerAlreadyExpired_DoesNotFlushAndReturnsFalse() + { + var store = Substitute.For(); + var timer = Substitute.For(); + timer.Stop().Returns(false); + + var group = new BatchGroup + { + Store = store, + BatchTimer = timer, + }; + + group.ReadyForCommit().ShouldBeFalse(); + store.DidNotReceive().FlushAllPending(); + } + + [Fact] + public void CleanupLocked_BatchPresent_StopsTimerDeletesStoreRemovesGroupAndDecrementsGlobalInflight() + { + JetStreamBatching.ResetGlobalInflightBatchesForTest(); + JetStreamBatching.IncrementGlobalInflightBatchesForTest(); + + var store = Substitute.For(); + var timer = Substitute.For(); + timer.Stop().Returns(true); + + var group = new BatchGroup + { + Store = store, + BatchTimer = timer, + }; + + var batching = new Batching(); + batching.Group["batch-A"] = group; + + group.CleanupLocked("batch-A", batching); + + timer.Received(1).Stop(); + store.Received(1).Delete(true); + batching.Group.ContainsKey("batch-A").ShouldBeFalse(); + JetStreamBatching.GlobalInflightBatches.ShouldBe(0); + } + + [Fact] + public void StopLocked_BatchPresent_StopsTimerStopsStoreAndDecrementsGlobalInflight() + { + JetStreamBatching.ResetGlobalInflightBatchesForTest(); + JetStreamBatching.IncrementGlobalInflightBatchesForTest(); + + var store = Substitute.For(); + var timer = Substitute.For(); + + var group = new BatchGroup + { + Store = store, + BatchTimer = timer, + }; + + group.StopLocked(); + + timer.Received(1).Stop(); + store.Received(1).Stop(); + JetStreamBatching.GlobalInflightBatches.ShouldBe(0); + } +} diff --git a/porting.db b/porting.db index 6eec31ed9f3997368680f35fcf105d24a45f0923..c655db92a772dbad78f8ac5f41af930a9853a37d 100644 GIT binary patch delta 2743 zcmb`|ZA@EL7zglsZ+mZh>1inp7z~QH8{?&;r7bVxWiP4$Aj&4>|dv^)G1f$Si_W=U(nrT zOuo?Q_=p^p`FZ7wdA5SzW7Jr<)Q3w9!!)O&1R`51x{IhR72QGfpxB~Pxx~&EYK7g* z2U9%P%N7VthI^!zl&AkGBP{om)NJ#o!VUWNh?+t!5}ok!>MFJ+N9qdP$&oty>f}hBd3kcAK3tF- zEnt+B9L;|=&S?kris1%65{@h67Kd3IZqT`=;tFN*HfPBZkT@1UdUkr20}W__2LW`- z^sK&ncbU+`zo}i&v~yOrn>l9MKz=dwgf;F z;wHL$lhgAJjjgU0`cJj^5_Mk?l4(M()rcF@(?8aTU#FfX^n06FmX=oT*~DEey*Q(_ z(B&#IkIJ=TRa&0DQ!DPJ-fuYzJ?{`psKqXpr>DE@;z4@qs^-6^zq5-qY3a8eVuYUh z5Vi5}*0h9jOcFoUot<%rkOKxFU<8vgSy)X7hV2H81BDD~oyN*R(eEMSW2Y2LnO8HjAsGg;mi!H5ca9DAPq| zUMO3YSy~k(=(&=-nzIIr8S)?>)2GJe`T~b9(%Rr!xtBZ>{$Nz1v3yVY zp*ATZN29^9A!#Km1%ly7R2rAV()fsHOd1b|q9I>s#O-u9*`4)vr%Q4+wW`Oj5&e&3 z>Q4{!qqCFx^69N3hx3x{-CI{0Jk0jNjP0#~!=-d^Abs`CjTzb>%x?8;d{|U>zq>H> zVDFb`Y`|x$2&%szqw-jCYrUSRZ`j=;4TVD!<5Gt-Do2My{s_HrZ*X13%8u5zv^F?n zYvLT+FtnZ*9$mY-@Vg)|~&JHI1vjcjfw-q5Y!S zh~HB!Niex>rU~OmB%xSu?W@u$PP6M1zBtwy58Z#=`BkT#y81j#$#Y()o^zKg)2rT> z-K*XiAIsD_L9MTN8q<0^TQVH8EH9bP_-LlybnY!V?UdhNyR-h`_wR_0WV~fMe(g`| QpSpCsuS0@*_$A-><}Lpj%%m)X>6B%-NS+h~ zGB9G)i<5U_8!N<*SH~*&ScT|#pes|#3qk8Rcebx5(VtKr@u^Q@P;L?gp=o~cDOz;C zWzkCJTL#TD-=3iTz3ed$e-&pIS+RKkfh@I)ZdF(JtIedB#K<1fNU9t^)iT~AN^h;~ z%DAo>PK#ISOmroZ$wa68<-hhh*LadhDVe<-*Na%GDtQLh}DL43PGi8vLn<+PNv6&ttxy>|%WH-~JhkcSYW_#$!9VG)?eJS;btRcE2%u{W7 z0sByPMgj|1K?VW}&(7GkHF`9`*{t5O|D@Bl-?nn)JgKtYvAmJCie170dsQR0b7L8u zRM>z)lPsQ_^90hWo#8r$xp7uh|>(lAR$C}lr+l2`UoA-%$TNfX$ham3G~x_(n+!-h`Sgx+oN zS(kp@YE_>LCG4x)wVP-2b(u)orJF=w&(dYOZOHD`!37IoA-G`?DHA