diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.StreamLifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.StreamLifecycle.cs new file mode 100644 index 0000000..cf6f0f8 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.StreamLifecycle.cs @@ -0,0 +1,45 @@ +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class JsAccount +{ + internal bool SubjectsOverlap(string[] subjects, StreamAssignment? ownAssignment) + { + Lock.EnterReadLock(); + try + { + foreach (var stream in Streams.Values.OfType()) + { + if (ownAssignment != null && ReferenceEquals(stream.StreamAssignment(), ownAssignment)) + continue; + + foreach (var left in subjects) + { + foreach (var right in stream.Config.Subjects ?? []) + { + if (SubscriptionIndex.SubjectsCollide(left, right)) + return true; + } + } + } + + return false; + } + finally + { + Lock.ExitReadLock(); + } + } + + internal Exception? ConfigUpdateCheck(StreamConfig current, StreamConfig proposed) + { + if (!string.Equals(current.Name, proposed.Name, StringComparison.Ordinal)) + return new InvalidOperationException("stream name may not change"); + + if (current.Storage != proposed.Storage) + return new InvalidOperationException("stream storage may not change"); + + return null; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs index 84168f3..cda43de 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs @@ -1,9 +1,205 @@ using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; internal sealed partial class NatsStream { + internal ulong MaxMsgSize() + { + var maxMsgSize = Config.MaxMsgSize; + if (maxMsgSize <= 0) + { + maxMsgSize = Account?.MaxPayload ?? -1; + if (maxMsgSize <= 0) + maxMsgSize = ServerConstants.MaxPayloadSize; + } + + var maxSubject = -1; + foreach (var subject in Config.Subjects ?? []) + { + if (SubscriptionIndex.SubjectHasWildcard(subject)) + continue; + + if (subject.Length > maxSubject) + maxSubject = subject.Length; + } + + if (maxSubject < 0) + maxSubject = 256; + + return JetStreamFileStore.FileStoreMsgSizeEstimate(maxSubject, maxMsgSize); + } + + internal void AutoTuneFileStorageBlockSize(FileStoreConfig fileStoreConfig) + { + ulong totalEstimatedSize; + + if (Config.MaxBytes > 0) + { + totalEstimatedSize = (ulong)Config.MaxBytes; + } + else if (Config.MaxMsgs > 0) + { + totalEstimatedSize = MaxMsgSize() * (ulong)Config.MaxMsgs; + } + else if (Config.MaxMsgsPer > 0) + { + fileStoreConfig.BlockSize = FileStoreDefaults.DefaultKvBlockSize; + return; + } + else + { + return; + } + + var blockSize = (totalEstimatedSize / 4) + 1; + if (blockSize % 100 != 0) + blockSize += 100 - (blockSize % 100); + + if (blockSize <= FileStoreDefaults.FileStoreMinBlkSize) + blockSize = FileStoreDefaults.FileStoreMinBlkSize; + else if (blockSize >= FileStoreDefaults.FileStoreMaxBlkSize) + blockSize = FileStoreDefaults.FileStoreMaxBlkSize; + else + blockSize = FileStoreDefaults.DefaultMediumBlockSize; + + fileStoreConfig.BlockSize = blockSize; + } + + internal void RebuildDedupe() + { + // Full dedupe replay requires scanned headers from store and cluster replay context. + // Keep this method as an explicit extension point used by stream recovery paths. + _ = Config.Duplicates; + } + + internal (ulong LastSeq, ulong Clfs) LastSeqAndCLFS() + { + _mu.EnterReadLock(); + try + { + return (_clseq, _clfs); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal ulong GetCLFS() + { + _mu.EnterReadLock(); + try + { + return _clfs; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetCLFS(ulong clfs) + { + _mu.EnterWriteLock(); + try + { + _clfs = clfs; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong LastSeqValue() + { + _mu.EnterReadLock(); + try + { + return _clseq; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetLastSeq(ulong seq) + { + _mu.EnterWriteLock(); + try + { + _clseq = seq; + Interlocked.Exchange(ref LastSeq, (long)seq); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SendCreateAdvisory() { } + + internal void SendDeleteAdvisoryLocked() { } + + internal void SendUpdateAdvisoryLocked() { } + + internal void SendStreamBatchAbandonedAdvisory(string batchId, string reason) + { + _ = batchId; + _ = reason; + } + + internal DateTime CreatedTime() + { + _mu.EnterReadLock(); + try + { + return Created; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetCreatedTime(DateTime createdTime) + { + _mu.EnterWriteLock(); + try + { + Created = createdTime; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal FileStoreConfig FileStoreConfig() + => new() + { + StoreDir = Path.Combine(Account?.JetStream?.StoreDir ?? string.Empty, "_streams", Name), + }; + + internal Exception? Update(StreamConfig config) + { + if (config == null) + return new ArgumentNullException(nameof(config)); + + UpdateConfig(config); + Exception? error = null; + return error; + } + + internal Exception? UpdatePedantic(StreamConfig config, bool pedantic) + { + _ = pedantic; + return Update(config); + } + internal StreamAssignment? StreamAssignment() { _mu.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index 3e70945..74d1853 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -48,6 +48,8 @@ internal sealed partial class NatsStream : IDisposable private readonly Channel _updateChannel = Channel.CreateBounded(4); private StreamAssignment? _assignment; private bool _clusterSubsActive; + private ulong _clseq; + private ulong _clfs; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.StreamLifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.StreamLifecycle.cs new file mode 100644 index 0000000..a6f4fa8 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.StreamLifecycle.cs @@ -0,0 +1,25 @@ +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal (StreamConfig Config, Exception? Error) CheckStreamCfg(StreamConfig config, Account? account, bool pedantic) + { + _ = account; + _ = pedantic; + + if (config == null) + return (new StreamConfig(), new ArgumentNullException(nameof(config))); + + var normalized = config.Clone(); + if (string.IsNullOrWhiteSpace(normalized.Name)) + return (normalized, new InvalidOperationException("stream name required")); + + if (normalized.Subjects == null || normalized.Subjects.Length == 0) + normalized.Subjects = [$"{normalized.Name}.>"]; + + if (normalized.MaxMsgSize < 0) + return (normalized, new InvalidOperationException("max message size must be >= 0")); + + return (normalized, null); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs new file mode 100644 index 0000000..4d7cd08 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs @@ -0,0 +1,137 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class StreamLifecycleGroupBTests +{ + [Fact] + public void MaxMsgSize_UsesConfiguredLimit() + { + var stream = CreateStream(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + Storage = StorageType.MemoryStorage, + MaxMsgSize = 1024, + }); + + stream.MaxMsgSize().ShouldBeGreaterThan(1024UL); + } + + [Fact] + public void AutoTuneFileStorageBlockSize_WithMaxMsgsPer_UsesKvDefault() + { + var stream = CreateStream(new StreamConfig + { + Name = "KV", + Subjects = ["kv.>"], + Storage = StorageType.MemoryStorage, + MaxMsgsPer = 10, + }); + + var cfg = new FileStoreConfig(); + stream.AutoTuneFileStorageBlockSize(cfg); + cfg.BlockSize.ShouldBe(FileStoreDefaults.DefaultKvBlockSize); + } + + [Fact] + public void ClusterSequenceHelpers_RoundTripValues() + { + var stream = CreateStream(); + stream.SetCLFS(4); + stream.SetLastSeq(10); + + stream.GetCLFS().ShouldBe(4UL); + stream.LastSeqValue().ShouldBe(10UL); + stream.LastSeqAndCLFS().ShouldBe((10UL, 4UL)); + } + + [Fact] + public void CreatedTime_SetCreatedTime_UpdatesValue() + { + var stream = CreateStream(); + var timestamp = DateTime.UtcNow.AddMinutes(-5); + + stream.SetCreatedTime(timestamp); + + stream.CreatedTime().ShouldBe(timestamp); + } + + [Fact] + public void Update_AndUpdatePedantic_ApplyConfig() + { + var stream = CreateStream(); + + var updated = new StreamConfig { Name = "ORDERS", Subjects = ["orders.v2"], Storage = StorageType.MemoryStorage }; + stream.Update(updated).ShouldBeNull(); + stream.GetConfig().Subjects.ShouldBe(["orders.v2"]); + + var updatedAgain = new StreamConfig { Name = "ORDERS", Subjects = ["orders.v3"], Storage = StorageType.MemoryStorage }; + stream.UpdatePedantic(updatedAgain, pedantic: true).ShouldBeNull(); + stream.GetConfig().Subjects.ShouldBe(["orders.v3"]); + } + + [Fact] + public void CheckStreamCfg_NormalizesSubjects_AndRejectsNegativeMaxMsgSize() + { + var (server, error) = NatsServer.NewServer(new ServerOptions()); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var account = new Account { Name = "A" }; + + var (normalized, okError) = server.CheckStreamCfg(new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }, account, pedantic: false); + okError.ShouldBeNull(); + normalized.Subjects.ShouldBe(["ORDERS.>"]); + + var (_, badError) = server.CheckStreamCfg(new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.MemoryStorage, + MaxMsgSize = -1, + }, account, pedantic: false); + badError.ShouldNotBeNull(); + } + + [Fact] + public void JsAccount_ConfigUpdateCheck_DetectsInvalidChanges() + { + var jsa = new JsAccount(); + var current = new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }; + + jsa.ConfigUpdateCheck(current, new StreamConfig { Name = "DIFFERENT", Storage = StorageType.MemoryStorage }).ShouldNotBeNull(); + jsa.ConfigUpdateCheck(current, new StreamConfig { Name = "ORDERS", Storage = StorageType.FileStorage }).ShouldNotBeNull(); + jsa.ConfigUpdateCheck(current, new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }).ShouldBeNull(); + } + + [Fact] + public void JsAccount_SubjectsOverlap_IgnoresOwnAssignment() + { + var jsa = new JsAccount(); + var assignment = new StreamAssignment(); + var stream = CreateStream(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + Storage = StorageType.MemoryStorage, + }); + stream.SetStreamAssignment(assignment); + jsa.Streams["ORDERS"] = stream; + + jsa.SubjectsOverlap(["orders.created"], assignment).ShouldBeFalse(); + jsa.SubjectsOverlap(["orders.created"], ownAssignment: null).ShouldBeTrue(); + } + + private static NatsStream CreateStream(StreamConfig? cfg = null) + { + cfg ??= new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }; + return NatsStream.Create( + new Account { Name = "A" }, + cfg, + null, + new JetStreamMemStore(cfg.Clone()), + null, + null)!; + } +} diff --git a/porting.db b/porting.db index 486e4e1..6d4b756 100644 Binary files a/porting.db and b/porting.db differ