feat(batch29): implement jetstream batching group-a lifecycle/store methods

This commit is contained in:
Joseph Doherty
2026-03-01 01:45:35 -05:00
parent 6cf904cc7d
commit 02d3b610a1
3 changed files with 356 additions and 22 deletions

View File

@@ -15,9 +15,140 @@
namespace ZB.MOM.NatsNet.Server;
// ---------------------------------------------------------------------------
// Batching types
// ---------------------------------------------------------------------------
internal static class JetStreamBatching
{
internal static readonly TimeSpan StreamDefaultMaxBatchTimeout = TimeSpan.FromSeconds(10);
internal const string BatchTimeout = "timeout";
private static int _globalInflightBatches;
internal static int GlobalInflightBatches => Volatile.Read(ref _globalInflightBatches);
internal static (string BatchName, string StoreDir) GetBatchStoreDir(string storeRootDir, string streamName, string batchId)
{
if (string.IsNullOrWhiteSpace(storeRootDir))
throw new ArgumentException("store root directory is required", nameof(storeRootDir));
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("stream name is required", nameof(streamName));
if (string.IsNullOrWhiteSpace(batchId))
throw new ArgumentException("batch ID is required", nameof(batchId));
var batchName = NatsServer.GetHash(batchId);
var batchPath = Path.Combine(storeRootDir, "_streams", streamName, "batches", batchName);
return (batchName, batchPath);
}
internal static IStreamStore NewBatchStore(string storeRootDir, StreamConfig streamConfig, string batchId)
{
ArgumentNullException.ThrowIfNull(streamConfig);
if (streamConfig.Replicas == 1 && streamConfig.Storage == StorageType.FileStorage)
{
var (batchName, storeDir) = GetBatchStoreDir(storeRootDir, streamConfig.Name, batchId);
Directory.CreateDirectory(storeDir);
var cfg = new FileStoreConfig
{
AsyncFlush = true,
BlockSize = FileStoreDefaults.DefaultLargeBlockSize,
StoreDir = storeDir,
};
var fsInfo = new FileStreamInfo
{
Created = DateTime.UtcNow,
Config = new StreamConfig
{
Name = batchName,
Storage = StorageType.FileStorage,
},
};
return new JetStreamFileStore(cfg, fsInfo);
}
return new JetStreamMemStore(new StreamConfig
{
Name = string.Empty,
Storage = StorageType.MemoryStorage,
});
}
internal static void IncrementGlobalInflightBatches() => Interlocked.Increment(ref _globalInflightBatches);
internal static void DecrementGlobalInflightBatches() => Interlocked.Decrement(ref _globalInflightBatches);
internal static void ResetGlobalInflightBatchesForTest() => Interlocked.Exchange(ref _globalInflightBatches, 0);
internal static void IncrementGlobalInflightBatchesForTest() => IncrementGlobalInflightBatches();
internal static BatchGroup NewBatchGroup(
Batching batches,
string storeRootDir,
StreamConfig streamConfig,
string batchId,
Action<string, string>? abandonedAdvisory = null,
TimeSpan? maxBatchTimeout = null)
{
ArgumentNullException.ThrowIfNull(batches);
return batches.NewBatchGroup(storeRootDir, streamConfig, batchId, abandonedAdvisory, maxBatchTimeout);
}
internal static bool ReadyForCommit(BatchGroup batchGroup)
{
ArgumentNullException.ThrowIfNull(batchGroup);
return batchGroup.ReadyForCommit();
}
internal static void Cleanup(BatchGroup batchGroup, string batchId, Batching batches)
{
ArgumentNullException.ThrowIfNull(batchGroup);
batchGroup.Cleanup(batchId, batches);
}
internal static void CleanupLocked(BatchGroup batchGroup, string batchId, Batching batches)
{
ArgumentNullException.ThrowIfNull(batchGroup);
batchGroup.CleanupLocked(batchId, batches);
}
internal static void StopLocked(BatchGroup batchGroup)
{
ArgumentNullException.ThrowIfNull(batchGroup);
batchGroup.StopLocked();
}
}
internal interface IBatchTimer
{
bool Stop();
}
internal sealed class SystemBatchTimer : IBatchTimer, IDisposable
{
private readonly Timer _timer;
private int _stopped;
public SystemBatchTimer(TimeSpan timeout, Action callback)
{
ArgumentNullException.ThrowIfNull(callback);
_timer = new Timer(_ =>
{
if (Interlocked.Exchange(ref _stopped, 1) == 0)
callback();
}, null, timeout, Timeout.InfiniteTimeSpan);
}
public bool Stop()
{
var firstStop = Interlocked.Exchange(ref _stopped, 1) == 0;
_timer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
return firstStop;
}
public void Dispose() => _timer.Dispose();
}
/// <summary>
/// Tracks in-progress atomic publish batch groups for a stream.
@@ -28,8 +159,25 @@ internal sealed class Batching
private readonly Lock _mu = new();
private readonly Dictionary<string, BatchGroup> _group = new(StringComparer.Ordinal);
public Lock Mu => _mu;
public Lock Mu => _mu;
public Dictionary<string, BatchGroup> Group => _group;
internal BatchGroup NewBatchGroup(string storeRootDir, StreamConfig streamConfig, string batchId, Action<string, string>? abandonedAdvisory = null, TimeSpan? maxBatchTimeout = null)
{
var store = JetStreamBatching.NewBatchStore(storeRootDir, streamConfig, batchId);
var group = new BatchGroup { Store = store };
var timeout = maxBatchTimeout.GetValueOrDefault(JetStreamBatching.StreamDefaultMaxBatchTimeout);
group.BatchTimer = new SystemBatchTimer(timeout, () =>
{
group.Cleanup(batchId, this);
abandonedAdvisory?.Invoke(batchId, JetStreamBatching.BatchTimeout);
});
_group[batchId] = group;
JetStreamBatching.IncrementGlobalInflightBatches();
return group;
}
}
/// <summary>
@@ -39,13 +187,13 @@ internal sealed class Batching
internal sealed class BatchGroup
{
/// <summary>Last proposed stream sequence for this batch.</summary>
public ulong LastSeq { get; set; }
public ulong LastSeq { get; set; }
/// <summary>Temporary backing store for the batch's messages.</summary>
public object? Store { get; set; } // IStreamStore — session 20
public IStreamStore? Store { get; set; }
/// <summary>Timer that abandons the batch after the configured timeout.</summary>
public Timer? BatchTimer { get; set; }
public IBatchTimer? BatchTimer { get; set; }
/// <summary>
/// Stops the cleanup timer and flushes pending writes so the batch is
@@ -54,8 +202,49 @@ internal sealed class BatchGroup
/// </summary>
public bool ReadyForCommit()
{
// Stub — full implementation requires IStreamStore.FlushAllPending (session 20).
return BatchTimer?.Change(Timeout.Infinite, Timeout.Infinite) != null;
if (BatchTimer?.Stop() != true)
return false;
Store?.FlushAllPending();
return true;
}
/// <summary>
/// Deletes underlying resources associated with the batch and unregisters it from the stream's batches.
/// Mirrors <c>batchGroup.cleanup</c>.
/// </summary>
public void Cleanup(string batchId, Batching batches)
{
ArgumentNullException.ThrowIfNull(batches);
lock (batches.Mu)
{
CleanupLocked(batchId, batches);
}
}
/// <summary>
/// Deletes underlying resources associated with the batch and unregisters it from the stream's batches.
/// Mirrors <c>batchGroup.cleanupLocked</c>.
/// </summary>
public void CleanupLocked(string batchId, Batching batches)
{
ArgumentNullException.ThrowIfNull(batches);
JetStreamBatching.DecrementGlobalInflightBatches();
_ = BatchTimer?.Stop();
Store?.Delete(true);
_ = batches.Group.Remove(batchId);
}
/// <summary>
/// Stops underlying resources associated with the batch.
/// Mirrors <c>batchGroup.stopLocked</c>.
/// </summary>
public void StopLocked()
{
JetStreamBatching.DecrementGlobalInflightBatches();
_ = BatchTimer?.Stop();
Store?.Stop();
}
}
@@ -66,13 +255,13 @@ internal sealed class BatchGroup
internal sealed class BatchStagedDiff
{
/// <summary>Message IDs seen in this batch, for duplicate detection.</summary>
public Dictionary<string, object?>? MsgIds { get; set; }
public Dictionary<string, object?>? MsgIds { get; set; }
/// <summary>Running counter totals, keyed by subject.</summary>
public Dictionary<string, object?>? Counter { get; set; } // map[string]*msgCounterRunningTotal
public Dictionary<string, object?>? Counter { get; set; }
/// <summary>Inflight subject byte/op totals for DiscardNew checks.</summary>
public Dictionary<string, object?>? Inflight { get; set; } // map[string]*inflightSubjectRunningTotal
public Dictionary<string, object?>? Inflight { get; set; }
/// <summary>Expected-last-seq-per-subject checks staged in this batch.</summary>
public Dictionary<string, BatchExpectedPerSubject>? ExpectedPerSubject { get; set; }
@@ -85,7 +274,7 @@ internal sealed class BatchStagedDiff
internal sealed class BatchExpectedPerSubject
{
/// <summary>Stream sequence of the last message on this subject at proposal time.</summary>
public ulong SSeq { get; set; }
public ulong SSeq { get; set; }
/// <summary>Clustered proposal sequence at which this check was computed.</summary>
public ulong ClSeq { get; set; }
@@ -100,32 +289,31 @@ internal sealed class BatchApply
private readonly Lock _mu = new();
/// <summary>ID of the current batch.</summary>
public string Id { get; set; } = string.Empty;
public string Id { get; set; } = string.Empty;
/// <summary>Number of entries expected in the batch (for consistency checks).</summary>
public ulong Count { get; set; }
public ulong Count { get; set; }
/// <summary>Raft committed entries that make up this batch.</summary>
public List<object?>? Entries { get; set; } // []*CommittedEntry — session 20+
public List<object?>? Entries { get; set; }
/// <summary>Index within an entry indicating the first message of the batch.</summary>
public int EntryStart { get; set; }
public int EntryStart { get; set; }
/// <summary>Applied value before the entry containing the first batch message.</summary>
public ulong MaxApplied { get; set; }
public ulong MaxApplied { get; set; }
public Lock Mu => _mu;
/// <summary>
/// Clears in-memory apply-batch state.
/// Mirrors <c>batchApply.clearBatchStateLocked</c>.
/// Lock should be held.
/// </summary>
public void ClearBatchStateLocked()
{
Id = string.Empty;
Count = 0;
Entries = null;
Id = string.Empty;
Count = 0;
Entries = null;
EntryStart = 0;
MaxApplied = 0;
}

View File

@@ -0,0 +1,146 @@
using NSubstitute;
using Shouldly;
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
public sealed class JetStreamBatchingCoreTests
{
[Fact]
public void GetBatchStoreDir_ValidInputs_ReturnsHashedBatchPath()
{
var storeDir = Path.Combine(Path.GetTempPath(), $"jsa-{Guid.NewGuid():N}");
var (batchName, batchPath) = JetStreamBatching.GetBatchStoreDir(storeDir, "ORDERS", "batch-A");
batchName.ShouldBe(NatsServer.GetHash("batch-A"));
batchPath.ShouldBe(Path.Combine(storeDir, "_streams", "ORDERS", "batches", batchName));
}
[Fact]
public void NewBatchStore_FileSingleReplica_CreatesFileStore()
{
var tempRoot = Path.Combine(Path.GetTempPath(), $"batch-store-{Guid.NewGuid():N}");
var stream = new StreamConfig
{
Name = "ORDERS",
Replicas = 1,
Storage = StorageType.FileStorage,
};
var store = JetStreamBatching.NewBatchStore(tempRoot, stream, "batch-A");
try
{
store.ShouldBeOfType<JetStreamFileStore>();
}
finally
{
store.Stop();
}
}
[Fact]
public void NewBatchStore_MemoryOrReplicated_CreatesMemStore()
{
var tempRoot = Path.Combine(Path.GetTempPath(), $"batch-store-{Guid.NewGuid():N}");
var stream = new StreamConfig
{
Name = "ORDERS",
Replicas = 3,
Storage = StorageType.FileStorage,
};
var store = JetStreamBatching.NewBatchStore(tempRoot, stream, "batch-A");
try
{
store.ShouldBeOfType<JetStreamMemStore>();
}
finally
{
store.Stop();
}
}
[Fact]
public void ReadyForCommit_TimerStopped_FlushesAndReturnsTrue()
{
var store = Substitute.For<IStreamStore>();
var timer = Substitute.For<IBatchTimer>();
timer.Stop().Returns(true);
var group = new BatchGroup
{
Store = store,
BatchTimer = timer,
};
group.ReadyForCommit().ShouldBeTrue();
store.Received(1).FlushAllPending();
}
[Fact]
public void ReadyForCommit_TimerAlreadyExpired_DoesNotFlushAndReturnsFalse()
{
var store = Substitute.For<IStreamStore>();
var timer = Substitute.For<IBatchTimer>();
timer.Stop().Returns(false);
var group = new BatchGroup
{
Store = store,
BatchTimer = timer,
};
group.ReadyForCommit().ShouldBeFalse();
store.DidNotReceive().FlushAllPending();
}
[Fact]
public void CleanupLocked_BatchPresent_StopsTimerDeletesStoreRemovesGroupAndDecrementsGlobalInflight()
{
JetStreamBatching.ResetGlobalInflightBatchesForTest();
JetStreamBatching.IncrementGlobalInflightBatchesForTest();
var store = Substitute.For<IStreamStore>();
var timer = Substitute.For<IBatchTimer>();
timer.Stop().Returns(true);
var group = new BatchGroup
{
Store = store,
BatchTimer = timer,
};
var batching = new Batching();
batching.Group["batch-A"] = group;
group.CleanupLocked("batch-A", batching);
timer.Received(1).Stop();
store.Received(1).Delete(true);
batching.Group.ContainsKey("batch-A").ShouldBeFalse();
JetStreamBatching.GlobalInflightBatches.ShouldBe(0);
}
[Fact]
public void StopLocked_BatchPresent_StopsTimerStopsStoreAndDecrementsGlobalInflight()
{
JetStreamBatching.ResetGlobalInflightBatchesForTest();
JetStreamBatching.IncrementGlobalInflightBatchesForTest();
var store = Substitute.For<IStreamStore>();
var timer = Substitute.For<IBatchTimer>();
var group = new BatchGroup
{
Store = store,
BatchTimer = timer,
};
group.StopLocked();
timer.Received(1).Stop();
store.Received(1).Stop();
JetStreamBatching.GlobalInflightBatches.ShouldBe(0);
}
}

Binary file not shown.