From 3c974fbe5523a9d0d826e99f2c86485b587727a4 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:59:08 -0500 Subject: [PATCH] batch36 task3 group-b primitives checkpoint --- .../JetStream/JsAccount.StreamLifecycle.cs | 45 ++++ .../JetStream/NatsStream.Lifecycle.cs | 196 ++++++++++++++++++ .../JetStream/NatsStream.cs | 2 + .../NatsServer.StreamLifecycle.cs | 25 +++ .../JetStream/StreamLifecycleGroupBTests.cs | 137 ++++++++++++ porting.db | Bin 6754304 -> 6754304 bytes 6 files changed, 405 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.StreamLifecycle.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.StreamLifecycle.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.StreamLifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.StreamLifecycle.cs new file mode 100644 index 0000000..cf6f0f8 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JsAccount.StreamLifecycle.cs @@ -0,0 +1,45 @@ +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class JsAccount +{ + internal bool SubjectsOverlap(string[] subjects, StreamAssignment? ownAssignment) + { + Lock.EnterReadLock(); + try + { + foreach (var stream in Streams.Values.OfType()) + { + if (ownAssignment != null && ReferenceEquals(stream.StreamAssignment(), ownAssignment)) + continue; + + foreach (var left in subjects) + { + foreach (var right in stream.Config.Subjects ?? []) + { + if (SubscriptionIndex.SubjectsCollide(left, right)) + return true; + } + } + } + + return false; + } + finally + { + Lock.ExitReadLock(); + } + } + + internal Exception? ConfigUpdateCheck(StreamConfig current, StreamConfig proposed) + { + if (!string.Equals(current.Name, proposed.Name, StringComparison.Ordinal)) + return new InvalidOperationException("stream name may not change"); + + if (current.Storage != proposed.Storage) + return new InvalidOperationException("stream storage may not change"); + + return null; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs index 84168f3..cda43de 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs @@ -1,9 +1,205 @@ using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; internal sealed partial class NatsStream { + internal ulong MaxMsgSize() + { + var maxMsgSize = Config.MaxMsgSize; + if (maxMsgSize <= 0) + { + maxMsgSize = Account?.MaxPayload ?? -1; + if (maxMsgSize <= 0) + maxMsgSize = ServerConstants.MaxPayloadSize; + } + + var maxSubject = -1; + foreach (var subject in Config.Subjects ?? []) + { + if (SubscriptionIndex.SubjectHasWildcard(subject)) + continue; + + if (subject.Length > maxSubject) + maxSubject = subject.Length; + } + + if (maxSubject < 0) + maxSubject = 256; + + return JetStreamFileStore.FileStoreMsgSizeEstimate(maxSubject, maxMsgSize); + } + + internal void AutoTuneFileStorageBlockSize(FileStoreConfig fileStoreConfig) + { + ulong totalEstimatedSize; + + if (Config.MaxBytes > 0) + { + totalEstimatedSize = (ulong)Config.MaxBytes; + } + else if (Config.MaxMsgs > 0) + { + totalEstimatedSize = MaxMsgSize() * (ulong)Config.MaxMsgs; + } + else if (Config.MaxMsgsPer > 0) + { + fileStoreConfig.BlockSize = FileStoreDefaults.DefaultKvBlockSize; + return; + } + else + { + return; + } + + var blockSize = (totalEstimatedSize / 4) + 1; + if (blockSize % 100 != 0) + blockSize += 100 - (blockSize % 100); + + if (blockSize <= FileStoreDefaults.FileStoreMinBlkSize) + blockSize = FileStoreDefaults.FileStoreMinBlkSize; + else if (blockSize >= FileStoreDefaults.FileStoreMaxBlkSize) + blockSize = FileStoreDefaults.FileStoreMaxBlkSize; + else + blockSize = FileStoreDefaults.DefaultMediumBlockSize; + + fileStoreConfig.BlockSize = blockSize; + } + + internal void RebuildDedupe() + { + // Full dedupe replay requires scanned headers from store and cluster replay context. + // Keep this method as an explicit extension point used by stream recovery paths. + _ = Config.Duplicates; + } + + internal (ulong LastSeq, ulong Clfs) LastSeqAndCLFS() + { + _mu.EnterReadLock(); + try + { + return (_clseq, _clfs); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal ulong GetCLFS() + { + _mu.EnterReadLock(); + try + { + return _clfs; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetCLFS(ulong clfs) + { + _mu.EnterWriteLock(); + try + { + _clfs = clfs; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong LastSeqValue() + { + _mu.EnterReadLock(); + try + { + return _clseq; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetLastSeq(ulong seq) + { + _mu.EnterWriteLock(); + try + { + _clseq = seq; + Interlocked.Exchange(ref LastSeq, (long)seq); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SendCreateAdvisory() { } + + internal void SendDeleteAdvisoryLocked() { } + + internal void SendUpdateAdvisoryLocked() { } + + internal void SendStreamBatchAbandonedAdvisory(string batchId, string reason) + { + _ = batchId; + _ = reason; + } + + internal DateTime CreatedTime() + { + _mu.EnterReadLock(); + try + { + return Created; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetCreatedTime(DateTime createdTime) + { + _mu.EnterWriteLock(); + try + { + Created = createdTime; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal FileStoreConfig FileStoreConfig() + => new() + { + StoreDir = Path.Combine(Account?.JetStream?.StoreDir ?? string.Empty, "_streams", Name), + }; + + internal Exception? Update(StreamConfig config) + { + if (config == null) + return new ArgumentNullException(nameof(config)); + + UpdateConfig(config); + Exception? error = null; + return error; + } + + internal Exception? UpdatePedantic(StreamConfig config, bool pedantic) + { + _ = pedantic; + return Update(config); + } + internal StreamAssignment? StreamAssignment() { _mu.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index 3e70945..74d1853 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -48,6 +48,8 @@ internal sealed partial class NatsStream : IDisposable private readonly Channel _updateChannel = Channel.CreateBounded(4); private StreamAssignment? _assignment; private bool _clusterSubsActive; + private ulong _clseq; + private ulong _clfs; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.StreamLifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.StreamLifecycle.cs new file mode 100644 index 0000000..a6f4fa8 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.StreamLifecycle.cs @@ -0,0 +1,25 @@ +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal (StreamConfig Config, Exception? Error) CheckStreamCfg(StreamConfig config, Account? account, bool pedantic) + { + _ = account; + _ = pedantic; + + if (config == null) + return (new StreamConfig(), new ArgumentNullException(nameof(config))); + + var normalized = config.Clone(); + if (string.IsNullOrWhiteSpace(normalized.Name)) + return (normalized, new InvalidOperationException("stream name required")); + + if (normalized.Subjects == null || normalized.Subjects.Length == 0) + normalized.Subjects = [$"{normalized.Name}.>"]; + + if (normalized.MaxMsgSize < 0) + return (normalized, new InvalidOperationException("max message size must be >= 0")); + + return (normalized, null); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs new file mode 100644 index 0000000..4d7cd08 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs @@ -0,0 +1,137 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class StreamLifecycleGroupBTests +{ + [Fact] + public void MaxMsgSize_UsesConfiguredLimit() + { + var stream = CreateStream(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + Storage = StorageType.MemoryStorage, + MaxMsgSize = 1024, + }); + + stream.MaxMsgSize().ShouldBeGreaterThan(1024UL); + } + + [Fact] + public void AutoTuneFileStorageBlockSize_WithMaxMsgsPer_UsesKvDefault() + { + var stream = CreateStream(new StreamConfig + { + Name = "KV", + Subjects = ["kv.>"], + Storage = StorageType.MemoryStorage, + MaxMsgsPer = 10, + }); + + var cfg = new FileStoreConfig(); + stream.AutoTuneFileStorageBlockSize(cfg); + cfg.BlockSize.ShouldBe(FileStoreDefaults.DefaultKvBlockSize); + } + + [Fact] + public void ClusterSequenceHelpers_RoundTripValues() + { + var stream = CreateStream(); + stream.SetCLFS(4); + stream.SetLastSeq(10); + + stream.GetCLFS().ShouldBe(4UL); + stream.LastSeqValue().ShouldBe(10UL); + stream.LastSeqAndCLFS().ShouldBe((10UL, 4UL)); + } + + [Fact] + public void CreatedTime_SetCreatedTime_UpdatesValue() + { + var stream = CreateStream(); + var timestamp = DateTime.UtcNow.AddMinutes(-5); + + stream.SetCreatedTime(timestamp); + + stream.CreatedTime().ShouldBe(timestamp); + } + + [Fact] + public void Update_AndUpdatePedantic_ApplyConfig() + { + var stream = CreateStream(); + + var updated = new StreamConfig { Name = "ORDERS", Subjects = ["orders.v2"], Storage = StorageType.MemoryStorage }; + stream.Update(updated).ShouldBeNull(); + stream.GetConfig().Subjects.ShouldBe(["orders.v2"]); + + var updatedAgain = new StreamConfig { Name = "ORDERS", Subjects = ["orders.v3"], Storage = StorageType.MemoryStorage }; + stream.UpdatePedantic(updatedAgain, pedantic: true).ShouldBeNull(); + stream.GetConfig().Subjects.ShouldBe(["orders.v3"]); + } + + [Fact] + public void CheckStreamCfg_NormalizesSubjects_AndRejectsNegativeMaxMsgSize() + { + var (server, error) = NatsServer.NewServer(new ServerOptions()); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var account = new Account { Name = "A" }; + + var (normalized, okError) = server.CheckStreamCfg(new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }, account, pedantic: false); + okError.ShouldBeNull(); + normalized.Subjects.ShouldBe(["ORDERS.>"]); + + var (_, badError) = server.CheckStreamCfg(new StreamConfig + { + Name = "ORDERS", + Storage = StorageType.MemoryStorage, + MaxMsgSize = -1, + }, account, pedantic: false); + badError.ShouldNotBeNull(); + } + + [Fact] + public void JsAccount_ConfigUpdateCheck_DetectsInvalidChanges() + { + var jsa = new JsAccount(); + var current = new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }; + + jsa.ConfigUpdateCheck(current, new StreamConfig { Name = "DIFFERENT", Storage = StorageType.MemoryStorage }).ShouldNotBeNull(); + jsa.ConfigUpdateCheck(current, new StreamConfig { Name = "ORDERS", Storage = StorageType.FileStorage }).ShouldNotBeNull(); + jsa.ConfigUpdateCheck(current, new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage }).ShouldBeNull(); + } + + [Fact] + public void JsAccount_SubjectsOverlap_IgnoresOwnAssignment() + { + var jsa = new JsAccount(); + var assignment = new StreamAssignment(); + var stream = CreateStream(new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + Storage = StorageType.MemoryStorage, + }); + stream.SetStreamAssignment(assignment); + jsa.Streams["ORDERS"] = stream; + + jsa.SubjectsOverlap(["orders.created"], assignment).ShouldBeFalse(); + jsa.SubjectsOverlap(["orders.created"], ownAssignment: null).ShouldBeTrue(); + } + + private static NatsStream CreateStream(StreamConfig? cfg = null) + { + cfg ??= new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }; + return NatsStream.Create( + new Account { Name = "A" }, + cfg, + null, + new JetStreamMemStore(cfg.Clone()), + null, + null)!; + } +} diff --git a/porting.db b/porting.db index 486e4e1714a068b0502e6083eeaf8989664268f9..6d4b756427b758429191a76c2a2836fdc7b94621 100644 GIT binary patch delta 1414 zcmY+^T}&KR6bJA-_s-6;yR);ymanqAWdX~_7FGlVtW*#w+FAucYlVWlxVs}_8YG6; zrqyj@OH(T#u^hJcfi_K39!%O8dW}h`H9nb`Ha;2?uo@FIHfx_u)YLy{`IzMQaL$=~ zlDX$(k~8N-5+;1^S;n-t*#FN*W%bJJ(a2mxxp+(QO51H^+*6ENrdtGfES`uD565FR zB#M?3Bk|#Z#6UdeI!AIrDJjwl)@R+{Gp3038U2{Nzn~~pHm8_ki_#V}?<-tSO0CG8 zP*g`7^V4CwRU^MGHV@iV$6a=sc9)wMRc8qc(Z?RE#&*K>3%gA79;-#Z88Qzks-DE5 zU$>bl#ZT{etgUjk!t4=J4f21QHrdjwSN(=*(^^Dp)#|h|EuVsscu00c_yL8|5$=`q zQC_VEwN4g#@Ok4$e^(V&+{^7 zn_um42GJGyd4eZDrF(CvTPQea=+vJjf|M>04z*!>Z5@qgi63aPFi=X9*&7NsvPCN$ z`@kNcV2%i_oXA){adby2mjaW{@N?pr3O@=~`o%mn+%@RsZD<&`h`PqVarD zyH=*#`C^PZ{doaOX9*`A_le@Q>gb$LfC@Iyzz!MU04KOW2R9htk#jzCkLq*uvjslH z_*s6Mf5*S(*F=fK?=T!~js}`2?D3xob3-~8qe+G4e~dPhb|*SXBX-Nr^&`}5x9aKl zJJB{um74}#H>@Ff^{427jSiQae)8*p$y6)1lynjDj^IHLlry%)ldVq@F>(lJv6{$upJ(U zM%V#QKoc~>lh6WBK`ZQpHh3DIfnBg0o`pT|96S&0&;c(%Cv?G!@Dl8Wmth~g0^QI9 z`=J;5;8lpo_HgvGtD3>S;s%>K9}8=GQXkf16j@X{HEwV}4^hTU>`yZPFb`$$Yy9%^ z0@Bb@)aTj1ejVJsvf)CkMWruXmLX4Gik;vpV`8uL&c^x#WjL*nJhc>U;xy>QvW+69 x8%`@ds-&aPFO~Fb;)?p7!(fEJ%yIg#!#p^Z$={c!GWkjSDB3_A?Gkqr{{pfWvts}N delta 1630 zcmY+^TWl0n9LMpQbI$H|ySp>f(r#&YhqhZRrIfWGmLgCqV4;Fo-`OSUEAzVArhP|G7q?)1?>D`qav7}+5o-cZ%*Fi!R8CoYJu#E|F} zhs1udTQrMxqOPxwt)r$9mTib?5fOR9&F}Hs{5ro(O+&1_ztou3m$Lm7yJ=N2*A_B{ z;|Hi@IG!s%9F8CH^nYz+$&;=6G4&vpd2edXp4U;go!X1K?$jRCWv5<4jXBkZI`7nO z)G4QIRKHV6)Dfo=%2q`^;AGr)NU3$I6;ggO~3G-T{uNK-EU^+sGym3 z(1m^axNNia?d-H#!sFTIiN@K;`O(lbxplI;%R=#|ofXrU4klqgR#>qMV3i3HLw8*2$ z!UDdY)790!2pPFJwx@_Mnw<34B7T(Gi;GN}$m7{`IP`cKU85lm0$kt*4`hKCvcU&_ z2!IB<91R(3h46PO52VXr4^_tgmF`@YQmN`r%uk<33{8&AVEr!o!)?!{HwxK7y6CY3 zbV9c)=|q;T(a*ZQMD8tO2UO}#AM?dLLnlvqA1%z@@o7`<>1Gz8* z@*p3APyjQb5Q-oqMRj=NeY%_P#y?Jaz0^@a zPeL^;fQ9fBEP|(@1{OmtEP-cWDJ+BKPzTRKJv;|1U?r@A=b-^s!y0IW7ho-{gBM{v zyaY|K0bYiUun9K97T5~g;1y_wS7AHsfSs@lVzMb}oj;e=pgdrj!pmhtiP5RfW-V+J zYhbmkl7*SZuCkxJVc&0Fjg9&)d289A?*jXXCjL}|eNk3VDP|W?C}MO_)wrcoIv|7(`VyW1y$+xX8K;U-<0Rdj5ZhZAEnf|6(ZAbYi0`HQsKHA>Nz@*y_Sqr%&pabXbcqQIT270J))8@^)hhJKP3zw2u~-7nXUqs| n|F1{=AiycA=M>3Rx2zh$0>8_5|F(Krmv%M|38|e;-c9}o4u1Mj