diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs index 53ca9e7..465338d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamBatching.cs @@ -15,9 +15,140 @@ namespace ZB.MOM.NatsNet.Server; -// --------------------------------------------------------------------------- -// Batching types -// --------------------------------------------------------------------------- +internal static class JetStreamBatching +{ + internal static readonly TimeSpan StreamDefaultMaxBatchTimeout = TimeSpan.FromSeconds(10); + internal const string BatchTimeout = "timeout"; + + private static int _globalInflightBatches; + + internal static int GlobalInflightBatches => Volatile.Read(ref _globalInflightBatches); + + internal static (string BatchName, string StoreDir) GetBatchStoreDir(string storeRootDir, string streamName, string batchId) + { + if (string.IsNullOrWhiteSpace(storeRootDir)) + throw new ArgumentException("store root directory is required", nameof(storeRootDir)); + if (string.IsNullOrWhiteSpace(streamName)) + throw new ArgumentException("stream name is required", nameof(streamName)); + if (string.IsNullOrWhiteSpace(batchId)) + throw new ArgumentException("batch ID is required", nameof(batchId)); + + var batchName = NatsServer.GetHash(batchId); + var batchPath = Path.Combine(storeRootDir, "_streams", streamName, "batches", batchName); + return (batchName, batchPath); + } + + internal static IStreamStore NewBatchStore(string storeRootDir, StreamConfig streamConfig, string batchId) + { + ArgumentNullException.ThrowIfNull(streamConfig); + + if (streamConfig.Replicas == 1 && streamConfig.Storage == StorageType.FileStorage) + { + var (batchName, storeDir) = GetBatchStoreDir(storeRootDir, streamConfig.Name, batchId); + Directory.CreateDirectory(storeDir); + + var cfg = new FileStoreConfig + { + AsyncFlush = true, + BlockSize = FileStoreDefaults.DefaultLargeBlockSize, + StoreDir = storeDir, + }; + + var fsInfo = new FileStreamInfo + { + Created = DateTime.UtcNow, + Config = new StreamConfig + { + Name = batchName, + Storage = StorageType.FileStorage, + }, + }; + + return new JetStreamFileStore(cfg, fsInfo); + } + + return new JetStreamMemStore(new StreamConfig + { + Name = string.Empty, + Storage = StorageType.MemoryStorage, + }); + } + + internal static void IncrementGlobalInflightBatches() => Interlocked.Increment(ref _globalInflightBatches); + + internal static void DecrementGlobalInflightBatches() => Interlocked.Decrement(ref _globalInflightBatches); + + internal static void ResetGlobalInflightBatchesForTest() => Interlocked.Exchange(ref _globalInflightBatches, 0); + + internal static void IncrementGlobalInflightBatchesForTest() => IncrementGlobalInflightBatches(); + + internal static BatchGroup NewBatchGroup( + Batching batches, + string storeRootDir, + StreamConfig streamConfig, + string batchId, + Action? abandonedAdvisory = null, + TimeSpan? maxBatchTimeout = null) + { + ArgumentNullException.ThrowIfNull(batches); + return batches.NewBatchGroup(storeRootDir, streamConfig, batchId, abandonedAdvisory, maxBatchTimeout); + } + + internal static bool ReadyForCommit(BatchGroup batchGroup) + { + ArgumentNullException.ThrowIfNull(batchGroup); + return batchGroup.ReadyForCommit(); + } + + internal static void Cleanup(BatchGroup batchGroup, string batchId, Batching batches) + { + ArgumentNullException.ThrowIfNull(batchGroup); + batchGroup.Cleanup(batchId, batches); + } + + internal static void CleanupLocked(BatchGroup batchGroup, string batchId, Batching batches) + { + ArgumentNullException.ThrowIfNull(batchGroup); + batchGroup.CleanupLocked(batchId, batches); + } + + internal static void StopLocked(BatchGroup batchGroup) + { + ArgumentNullException.ThrowIfNull(batchGroup); + batchGroup.StopLocked(); + } +} + +internal interface IBatchTimer +{ + bool Stop(); +} + +internal sealed class SystemBatchTimer : IBatchTimer, IDisposable +{ + private readonly Timer _timer; + private int _stopped; + + public SystemBatchTimer(TimeSpan timeout, Action callback) + { + ArgumentNullException.ThrowIfNull(callback); + + _timer = new Timer(_ => + { + if (Interlocked.Exchange(ref _stopped, 1) == 0) + callback(); + }, null, timeout, Timeout.InfiniteTimeSpan); + } + + public bool Stop() + { + var firstStop = Interlocked.Exchange(ref _stopped, 1) == 0; + _timer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); + return firstStop; + } + + public void Dispose() => _timer.Dispose(); +} /// /// Tracks in-progress atomic publish batch groups for a stream. @@ -28,8 +159,25 @@ internal sealed class Batching private readonly Lock _mu = new(); private readonly Dictionary _group = new(StringComparer.Ordinal); - public Lock Mu => _mu; + public Lock Mu => _mu; public Dictionary Group => _group; + + internal BatchGroup NewBatchGroup(string storeRootDir, StreamConfig streamConfig, string batchId, Action? abandonedAdvisory = null, TimeSpan? maxBatchTimeout = null) + { + var store = JetStreamBatching.NewBatchStore(storeRootDir, streamConfig, batchId); + var group = new BatchGroup { Store = store }; + + var timeout = maxBatchTimeout.GetValueOrDefault(JetStreamBatching.StreamDefaultMaxBatchTimeout); + group.BatchTimer = new SystemBatchTimer(timeout, () => + { + group.Cleanup(batchId, this); + abandonedAdvisory?.Invoke(batchId, JetStreamBatching.BatchTimeout); + }); + + _group[batchId] = group; + JetStreamBatching.IncrementGlobalInflightBatches(); + return group; + } } /// @@ -39,13 +187,13 @@ internal sealed class Batching internal sealed class BatchGroup { /// Last proposed stream sequence for this batch. - public ulong LastSeq { get; set; } + public ulong LastSeq { get; set; } /// Temporary backing store for the batch's messages. - public object? Store { get; set; } // IStreamStore — session 20 + public IStreamStore? Store { get; set; } /// Timer that abandons the batch after the configured timeout. - public Timer? BatchTimer { get; set; } + public IBatchTimer? BatchTimer { get; set; } /// /// Stops the cleanup timer and flushes pending writes so the batch is @@ -54,8 +202,49 @@ internal sealed class BatchGroup /// public bool ReadyForCommit() { - // Stub — full implementation requires IStreamStore.FlushAllPending (session 20). - return BatchTimer?.Change(Timeout.Infinite, Timeout.Infinite) != null; + if (BatchTimer?.Stop() != true) + return false; + + Store?.FlushAllPending(); + return true; + } + + /// + /// Deletes underlying resources associated with the batch and unregisters it from the stream's batches. + /// Mirrors batchGroup.cleanup. + /// + public void Cleanup(string batchId, Batching batches) + { + ArgumentNullException.ThrowIfNull(batches); + lock (batches.Mu) + { + CleanupLocked(batchId, batches); + } + } + + /// + /// Deletes underlying resources associated with the batch and unregisters it from the stream's batches. + /// Mirrors batchGroup.cleanupLocked. + /// + public void CleanupLocked(string batchId, Batching batches) + { + ArgumentNullException.ThrowIfNull(batches); + + JetStreamBatching.DecrementGlobalInflightBatches(); + _ = BatchTimer?.Stop(); + Store?.Delete(true); + _ = batches.Group.Remove(batchId); + } + + /// + /// Stops underlying resources associated with the batch. + /// Mirrors batchGroup.stopLocked. + /// + public void StopLocked() + { + JetStreamBatching.DecrementGlobalInflightBatches(); + _ = BatchTimer?.Stop(); + Store?.Stop(); } } @@ -66,13 +255,13 @@ internal sealed class BatchGroup internal sealed class BatchStagedDiff { /// Message IDs seen in this batch, for duplicate detection. - public Dictionary? MsgIds { get; set; } + public Dictionary? MsgIds { get; set; } /// Running counter totals, keyed by subject. - public Dictionary? Counter { get; set; } // map[string]*msgCounterRunningTotal + public Dictionary? Counter { get; set; } /// Inflight subject byte/op totals for DiscardNew checks. - public Dictionary? Inflight { get; set; } // map[string]*inflightSubjectRunningTotal + public Dictionary? Inflight { get; set; } /// Expected-last-seq-per-subject checks staged in this batch. public Dictionary? ExpectedPerSubject { get; set; } @@ -85,7 +274,7 @@ internal sealed class BatchStagedDiff internal sealed class BatchExpectedPerSubject { /// Stream sequence of the last message on this subject at proposal time. - public ulong SSeq { get; set; } + public ulong SSeq { get; set; } /// Clustered proposal sequence at which this check was computed. public ulong ClSeq { get; set; } @@ -100,32 +289,31 @@ internal sealed class BatchApply private readonly Lock _mu = new(); /// ID of the current batch. - public string Id { get; set; } = string.Empty; + public string Id { get; set; } = string.Empty; /// Number of entries expected in the batch (for consistency checks). - public ulong Count { get; set; } + public ulong Count { get; set; } /// Raft committed entries that make up this batch. - public List? Entries { get; set; } // []*CommittedEntry — session 20+ + public List? Entries { get; set; } /// Index within an entry indicating the first message of the batch. - public int EntryStart { get; set; } + public int EntryStart { get; set; } /// Applied value before the entry containing the first batch message. - public ulong MaxApplied { get; set; } + public ulong MaxApplied { get; set; } public Lock Mu => _mu; /// /// Clears in-memory apply-batch state. /// Mirrors batchApply.clearBatchStateLocked. - /// Lock should be held. /// public void ClearBatchStateLocked() { - Id = string.Empty; - Count = 0; - Entries = null; + Id = string.Empty; + Count = 0; + Entries = null; EntryStart = 0; MaxApplied = 0; } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamBatchingCoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamBatchingCoreTests.cs new file mode 100644 index 0000000..53055a2 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamBatchingCoreTests.cs @@ -0,0 +1,146 @@ +using NSubstitute; +using Shouldly; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class JetStreamBatchingCoreTests +{ + [Fact] + public void GetBatchStoreDir_ValidInputs_ReturnsHashedBatchPath() + { + var storeDir = Path.Combine(Path.GetTempPath(), $"jsa-{Guid.NewGuid():N}"); + var (batchName, batchPath) = JetStreamBatching.GetBatchStoreDir(storeDir, "ORDERS", "batch-A"); + + batchName.ShouldBe(NatsServer.GetHash("batch-A")); + batchPath.ShouldBe(Path.Combine(storeDir, "_streams", "ORDERS", "batches", batchName)); + } + + [Fact] + public void NewBatchStore_FileSingleReplica_CreatesFileStore() + { + var tempRoot = Path.Combine(Path.GetTempPath(), $"batch-store-{Guid.NewGuid():N}"); + var stream = new StreamConfig + { + Name = "ORDERS", + Replicas = 1, + Storage = StorageType.FileStorage, + }; + + var store = JetStreamBatching.NewBatchStore(tempRoot, stream, "batch-A"); + + try + { + store.ShouldBeOfType(); + } + finally + { + store.Stop(); + } + } + + [Fact] + public void NewBatchStore_MemoryOrReplicated_CreatesMemStore() + { + var tempRoot = Path.Combine(Path.GetTempPath(), $"batch-store-{Guid.NewGuid():N}"); + var stream = new StreamConfig + { + Name = "ORDERS", + Replicas = 3, + Storage = StorageType.FileStorage, + }; + + var store = JetStreamBatching.NewBatchStore(tempRoot, stream, "batch-A"); + + try + { + store.ShouldBeOfType(); + } + finally + { + store.Stop(); + } + } + + [Fact] + public void ReadyForCommit_TimerStopped_FlushesAndReturnsTrue() + { + var store = Substitute.For(); + var timer = Substitute.For(); + timer.Stop().Returns(true); + + var group = new BatchGroup + { + Store = store, + BatchTimer = timer, + }; + + group.ReadyForCommit().ShouldBeTrue(); + store.Received(1).FlushAllPending(); + } + + [Fact] + public void ReadyForCommit_TimerAlreadyExpired_DoesNotFlushAndReturnsFalse() + { + var store = Substitute.For(); + var timer = Substitute.For(); + timer.Stop().Returns(false); + + var group = new BatchGroup + { + Store = store, + BatchTimer = timer, + }; + + group.ReadyForCommit().ShouldBeFalse(); + store.DidNotReceive().FlushAllPending(); + } + + [Fact] + public void CleanupLocked_BatchPresent_StopsTimerDeletesStoreRemovesGroupAndDecrementsGlobalInflight() + { + JetStreamBatching.ResetGlobalInflightBatchesForTest(); + JetStreamBatching.IncrementGlobalInflightBatchesForTest(); + + var store = Substitute.For(); + var timer = Substitute.For(); + timer.Stop().Returns(true); + + var group = new BatchGroup + { + Store = store, + BatchTimer = timer, + }; + + var batching = new Batching(); + batching.Group["batch-A"] = group; + + group.CleanupLocked("batch-A", batching); + + timer.Received(1).Stop(); + store.Received(1).Delete(true); + batching.Group.ContainsKey("batch-A").ShouldBeFalse(); + JetStreamBatching.GlobalInflightBatches.ShouldBe(0); + } + + [Fact] + public void StopLocked_BatchPresent_StopsTimerStopsStoreAndDecrementsGlobalInflight() + { + JetStreamBatching.ResetGlobalInflightBatchesForTest(); + JetStreamBatching.IncrementGlobalInflightBatchesForTest(); + + var store = Substitute.For(); + var timer = Substitute.For(); + + var group = new BatchGroup + { + Store = store, + BatchTimer = timer, + }; + + group.StopLocked(); + + timer.Received(1).Stop(); + store.Received(1).Stop(); + JetStreamBatching.GlobalInflightBatches.ShouldBe(0); + } +} diff --git a/porting.db b/porting.db index 6eec31e..c655db9 100644 Binary files a/porting.db and b/porting.db differ