From 82c2f4ed1cf4f3727e4492b7482a08d27cf62f93 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:52:06 -0500 Subject: [PATCH] batch36 task2 implement group-a lifecycle primitives --- .../Accounts/Account.StreamLifecycle.cs | 71 ++++++++ .../JetStream/NatsStream.Lifecycle.cs | 160 ++++++++++++++++++ .../JetStream/NatsStream.cs | 8 +- .../JetStream/StoreParity.cs | 40 +++++ .../JetStream/StoreTypes.cs | 50 ++++++ .../JetStream/NatsStreamTests.cs | 55 ++++++ .../JetStream/StoreTypesTests.cs | 41 +++++ porting.db | Bin 6754304 -> 6754304 bytes 8 files changed, 424 insertions(+), 1 deletion(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamLifecycle.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamLifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamLifecycle.cs new file mode 100644 index 0000000..9f5b73e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamLifecycle.cs @@ -0,0 +1,71 @@ +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class Account +{ + internal (NatsStream? Stream, Exception? Error) AddStream(StreamConfig config) => + AddStreamWithAssignment(config, null, null, pedantic: false); + + internal (NatsStream? Stream, Exception? Error) AddStreamWithStore(StreamConfig config, FileStoreConfig? fileStoreConfig) => + AddStreamWithAssignment(config, fileStoreConfig, null, pedantic: false); + + internal (NatsStream? Stream, Exception? Error) AddStreamPedantic(StreamConfig config, bool pedantic) => + AddStreamWithAssignment(config, null, null, pedantic); + + internal (NatsStream? Stream, Exception? Error) AddStreamWithAssignment( + StreamConfig config, + FileStoreConfig? fileStoreConfig, + StreamAssignment? assignment, + bool pedantic) + { + if (config == null) + return (null, new ArgumentNullException(nameof(config))); + + var (server, jsa, error) = CheckForJetStream(); + if (error != null || server == null || jsa == null) + return (null, error ?? new InvalidOperationException("jetstream not enabled for account")); + + if (string.IsNullOrWhiteSpace(config.Name)) + return (null, new InvalidOperationException("stream name is required")); + + var normalized = config.Clone(); + if (normalized.Subjects == null || normalized.Subjects.Length == 0) + normalized.Subjects = [$"{normalized.Name}.>"]; + + _ = pedantic; + _ = fileStoreConfig; + + jsa.Lock.EnterWriteLock(); + try + { + if (jsa.Streams.TryGetValue(normalized.Name, out var existingObject) && existingObject is NatsStream existing) + { + if (StreamConfigsEqual(existing.Config, normalized)) + { + if (assignment != null) + existing.SetStreamAssignment(assignment); + return (existing, null); + } + + return (null, new InvalidOperationException("stream name already in use")); + } + + IStreamStore store = new JetStreamMemStore(normalized.Clone()); + + var stream = NatsStream.Create(this, normalized, jsa, store, assignment, server); + if (stream == null) + return (null, new InvalidOperationException("stream creation failed")); + + jsa.Streams[normalized.Name] = stream; + return (stream, null); + } + finally + { + jsa.Lock.ExitWriteLock(); + } + } + + private static bool StreamConfigsEqual(StreamConfig left, StreamConfig right) + => string.Equals(JsonSerializer.Serialize(left), JsonSerializer.Serialize(right), StringComparison.Ordinal); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs new file mode 100644 index 0000000..84168f3 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs @@ -0,0 +1,160 @@ +using System.Threading.Channels; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal StreamAssignment? StreamAssignment() + { + _mu.EnterReadLock(); + try + { + return _assignment; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetStreamAssignment(StreamAssignment? assignment) + { + _mu.EnterWriteLock(); + try + { + _assignment = assignment; + + if (assignment?.Group?.Node != null) + { + _node = assignment.Group.Node; + assignment.Group.Node.UpdateKnownPeers(assignment.Group.Peers); + } + + _updateChannel.Writer.TryWrite(true); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ChannelReader? MonitorQuitC() + { + _mu.EnterWriteLock(); + try + { + _monitorQuitChannel ??= Channel.CreateBounded(1); + return _monitorQuitChannel.Reader; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SignalMonitorQuit() + { + _mu.EnterWriteLock(); + try + { + if (_monitorQuitChannel != null) + { + _monitorQuitChannel.Writer.TryComplete(); + _monitorQuitChannel = null; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ChannelReader UpdateC() => _updateChannel.Reader; + + internal bool IsLeaderNodeState() + { + _mu.EnterReadLock(); + try + { + if (_node is IRaftNode raftNode) + return raftNode.State() == RaftState.Leader; + + return true; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool IsLeaderInternal() + { + _mu.EnterReadLock(); + try + { + if (_assignment?.Group?.Node is IRaftNode node) + return node.Leader(); + + return true; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void StartClusterSubs() + { + _mu.EnterWriteLock(); + try + { + _clusterSubsActive = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void StopClusterSubs() + { + _mu.EnterWriteLock(); + try + { + _clusterSubsActive = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool ClusterSubsActive() + { + _mu.EnterReadLock(); + try + { + return _clusterSubsActive; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal Account? AccountLocked(bool needLock) + { + if (needLock) + _mu.EnterReadLock(); + + try + { + return Account; + } + finally + { + if (needLock) + _mu.ExitReadLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index f6f788b..3e70945 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -13,13 +13,15 @@ // // Adapted from server/stream.go in the NATS server Go source. +using System.Threading.Channels; + namespace ZB.MOM.NatsNet.Server; /// /// Represents a JetStream stream, managing message storage, replication, and lifecycle. /// Mirrors the stream struct in server/stream.go. /// -internal sealed class NatsStream : IDisposable +internal sealed partial class NatsStream : IDisposable { private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion); @@ -42,6 +44,10 @@ internal sealed class NatsStream : IDisposable private ulong _leaderTerm; private bool _sealed; private CancellationTokenSource? _quitCts; + private Channel? _monitorQuitChannel = Channel.CreateBounded(1); + private readonly Channel _updateChannel = Channel.CreateBounded(4); + private StreamAssignment? _assignment; + private bool _clusterSubsActive; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs index 900d58a..7804a7d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs @@ -303,6 +303,14 @@ public static class StoreEnumParityExtensions StorageType.FileStorage => "File", _ => "Unknown Storage Type", }; + + public static string String(this PersistModeType value) + => value switch + { + PersistModeType.DefaultPersistMode => "Default", + PersistModeType.AsyncPersistMode => "Async", + _ => "Unknown Persist Mode Type", + }; } public sealed class RetentionPolicyJsonConverter : JsonConverter @@ -403,6 +411,38 @@ public sealed class StorageTypeJsonConverter : JsonConverter } } +public sealed class PersistModeTypeJsonConverter : JsonConverter +{ + public override PersistModeType Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "default" => PersistModeType.DefaultPersistMode, + "" => PersistModeType.DefaultPersistMode, + "async" => PersistModeType.AsyncPersistMode, + var value => throw new JsonException($"can not unmarshal \"{value}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, PersistModeType value, JsonSerializerOptions options) + { + switch (value) + { + case PersistModeType.DefaultPersistMode: + writer.WriteStringValue("default"); + break; + case PersistModeType.AsyncPersistMode: + writer.WriteStringValue("async"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} + public sealed class AckPolicyJsonConverter : JsonConverter { public override AckPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs index c89a8b8..03eb1d7 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs @@ -641,6 +641,7 @@ public enum StoreCompression : byte // --------------------------------------------------------------------------- /// Determines what persistence mode the stream uses. +[JsonConverter(typeof(PersistModeTypeJsonConverter))] public enum PersistModeType { DefaultPersistMode = 0, @@ -711,6 +712,14 @@ public sealed class ExternalStream [JsonPropertyName("deliver")] public string DeliverPrefix { get; set; } = string.Empty; + + public string Domain() + { + if (string.IsNullOrEmpty(ApiPrefix)) + return string.Empty; + + return SubscriptionIndex.TokenAt(ApiPrefix, 2); + } } // --------------------------------------------------------------------------- @@ -737,6 +746,47 @@ public sealed class StreamSource [JsonPropertyName("external")] public ExternalStream? External { get; set; } + + [JsonIgnore] + public string IndexName { get; private set; } = string.Empty; + + public string ComposeIName() + { + var name = Name; + if (External != null) + name = $"{name}:{NatsServer.GetHash(External.ApiPrefix)}"; + + var source = FilterSubject ?? string.Empty; + var destination = ">"; + + if (SubjectTransforms is null || SubjectTransforms.Length == 0) + { + if (string.IsNullOrEmpty(source)) + source = ">"; + if (string.IsNullOrEmpty(destination)) + destination = ">"; + } + else + { + var sources = new List(SubjectTransforms.Length); + var destinations = new List(SubjectTransforms.Length); + foreach (var transform in SubjectTransforms) + { + sources.Add(string.IsNullOrEmpty(transform.Source) ? ">" : transform.Source); + destinations.Add(string.IsNullOrEmpty(transform.Destination) ? ">" : transform.Destination); + } + + source = string.Join('\f', sources); + destination = string.Join('\f', destinations); + } + + return string.Join(' ', [name, source, destination]); + } + + public void SetIndexName() + { + IndexName = ComposeIName(); + } } // --------------------------------------------------------------------------- diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs index 9843cc1..f17aa5b 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0 using Shouldly; +using NSubstitute; using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; @@ -40,4 +41,58 @@ public sealed class NatsStreamTests stream.Delete(); stream.IsLeader().ShouldBeFalse(); } + + [Fact] + public void LifecyclePrimitives_AssignmentAndChannels_ShouldBehave() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create( + account, + new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }, + null, + new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }), + null, + null)!; + + stream.AccountLocked(true).ShouldBe(account); + stream.StreamAssignment().ShouldBeNull(); + + var assignment = new StreamAssignment { Sync = "sync.inbox" }; + stream.SetStreamAssignment(assignment); + stream.StreamAssignment().ShouldBe(assignment); + + stream.UpdateC().TryRead(out var updateSignal).ShouldBeTrue(); + updateSignal.ShouldBeTrue(); + + stream.StartClusterSubs(); + stream.ClusterSubsActive().ShouldBeTrue(); + stream.StopClusterSubs(); + stream.ClusterSubsActive().ShouldBeFalse(); + + var monitor = stream.MonitorQuitC(); + monitor.ShouldNotBeNull(); + stream.SignalMonitorQuit(); + stream.MonitorQuitC().ShouldNotBeNull(); + } + + [Fact] + public void IsLeaderInternal_WhenAssignedToRaftNode_UsesNodeLeaderState() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create( + account, + new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }, + null, + new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }), + null, + null)!; + + var raftNode = Substitute.For(); + raftNode.Leader().Returns(false); + var assignment = new StreamAssignment { Group = new RaftGroup { Node = raftNode } }; + + stream.SetStreamAssignment(assignment); + + stream.IsLeaderInternal().ShouldBeFalse(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs index 9a369e0..4747d67 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs @@ -277,6 +277,47 @@ public class StoreTypesTests JsonSerializer.Deserialize("\"file\"").ShouldBe(StorageType.FileStorage); } + [Fact] + public void PersistModeType_StringAndJsonParity_MatchesGo() + { + PersistModeType.DefaultPersistMode.String().ShouldBe("Default"); + PersistModeType.AsyncPersistMode.String().ShouldBe("Async"); + ((PersistModeType)99).String().ShouldBe("Unknown Persist Mode Type"); + + JsonSerializer.Serialize(PersistModeType.DefaultPersistMode).ShouldBe("\"default\""); + JsonSerializer.Serialize(PersistModeType.AsyncPersistMode).ShouldBe("\"async\""); + JsonSerializer.Deserialize("\"default\"").ShouldBe(PersistModeType.DefaultPersistMode); + JsonSerializer.Deserialize("\"async\"").ShouldBe(PersistModeType.AsyncPersistMode); + } + + [Fact] + public void ExternalStream_Domain_ReturnsSecondTokenOrEmpty() + { + new ExternalStream().Domain().ShouldBe(string.Empty); + new ExternalStream { ApiPrefix = "$JS.D1.API" }.Domain().ShouldBe("D1"); + } + + [Fact] + public void StreamSource_ComposeIName_UsesFilterAndTransforms() + { + var source = new StreamSource + { + Name = "ORDERS", + External = new ExternalStream { ApiPrefix = "$JS.EU.API" }, + SubjectTransforms = + [ + new SubjectTransformConfig { Source = "foo.*", Destination = "bar.*" }, + new SubjectTransformConfig { Source = string.Empty, Destination = "baz.>" }, + ], + }; + + source.ComposeIName().ShouldContain("ORDERS:"); + source.ComposeIName().ShouldContain("foo.*\f>"); + source.ComposeIName().ShouldContain("bar.*\fbaz.>"); + source.SetIndexName(); + source.IndexName.ShouldBe(source.ComposeIName()); + } + [Fact] public void AckPolicy_JsonParity_MatchesGo() { diff --git a/porting.db b/porting.db index 2af9deeec4f20394a38e40a2a4d8d718b09e46d2..486e4e1714a068b0502e6083eeaf8989664268f9 100644 GIT binary patch delta 2228 zcmcK5Z%h++7zgmX-|zbO+G}B`AU!C>3Zg)fp+%j7)`DRDi-2sRgkH>)#^|=VY*AyFFO1V<@t(@qo_$YpB|oy(Qa<+8y6qAmx+2%Ad3F#Rs)jIe%Fy6HX>XQU$WNq#95 zl6)$LVQKWhUMUpx2PGeW3FJ|BK4Icvbd*gY#vStWQI>RS)Za6H?m8noc99+y%wCcy zU-1%7b&z3HgD;mee4@F@2uk`m_Ed(GSi7dwdN5s7(g{pMN;-z=jFOIGIOMFs2G6eTd1eqz^DTm2?PGmXf+LS(Ma;X?jbYGq%*hFlO#crS$;A zaHr*w010z5+V;!726FBz*iuPF7^pGukZqy%Q_rDTPkD(1Ud18i7L)o?47 z+G2X1rc&8D)$?-^&7Q6RUrg&@?0vlmM~i8JQYe&f7Sm_{S$1-`7o%sms0+G}X+)Sv zpn5Q6ifTCRqH~oM9B|b|f1PTX+}*eBf(ov$rSqaKx~VOu_vxUkS4@ILH_e0YL284} zWE}rJ_l!1YpBB146_Vhpo34V#mt2z<%m|q!u~!0WOK9cPCA!rlXduz1%X%?wTq$jY z!6uplBh_{ltn9^Uhe~7bg?z7+mas5(!z#iq6{nu+DJzTZf&FDP0Amjm#8c&TY0OSt z<+KJ4{gM(rZiXA>G%cq1S2?{1kK^&iU2Qk2!CX!`c(9(P#I*1p+8)yq@zA8{YWR+a zA`YpL8flOg>5v}r$bgJUknea*3vY9VPUe}cdf;mc=;mGM`pYhzLEPMd3Vz==P;u<6p!9O31}9Yjpm?4l!WG@WMn}i zN6Rs7 zw%U&7qYRXZ7N9J&5G_L4CQ9gPT6`<%kOOX>5qGf0~T7lj|E72-cgo@E> zvUX0!!uMHOfpszg<&8f`~4=xy{4+JS1(PP9vA ztG(-j*Y{AkdZqb7zEu#yiX zgJ=i{;z@>Kl;_4+JL6j&a^0|RP#xwbyr}LxH&Guk|3jS2_maa1UZka_YVtNADP zl+Ek{*RW571;aiITMxppFIn!hncp>rxrh%*=edZ#DBZ03ficwzmxZ~;Hp7TvA%9r^ zP`^S4+Q(XtW=Nw^`&E$EDSj?HtnA-hVwc!oqh~}St`ob&e81#vZ3{^)h7Q==RG)J& zxTm>6+9Nf#NSu|9r@XgJgXyrS@hLbh$r6uiW47r`KPg_!P^2mn(A$@ laR?_4QT>V`_Wy5)ov-goEa^(?uY$aL{#o*OPyDMA{{;r}#_#|D delta 1619 zcmY+^YfM{Z7zgm4^Pb-5>1jb=zr8dG$wP! zIXcnJU7Qajm~{OD-)PL>?1K?zmT0Dl$cKFxF`$VbMzTrFqREVxKdsw(n*5sl&vV|( z^X8lricGQ)#3B`(_w4lw<$47-7~wfa;v$#F9$1!@ZHrvcQmD~r3e8OB{efU$csStK zj^j!yJo}Nd!WHv6H?6F2xx8sk`FDlmLwfx^&AY-K4PERD)KDbG*DKw9yo*!5^6`T9 zO!A!KRaLsIN|#jWqAFcbrH@o;OqD)VrSs-9NiB(CRdH06&Z*J|s&rPB-dCj&RSK!n zduC>RcB10xjiRzRz=s94UsFP#+_z>bZw&F*W+?5XPNv8a(M07o=AeZWI*IyhY=EkF zh;~}YV>)_hW6pTN&@y37)o{COY2OVgof>84q5fN1nYznm6V1xF?b2;#R@P)T%RQZ- zlGcASrYk}|i{)=zlsoZ!;^S z&`o9}XDe%=r8)WkeOa8cj>cPA&c-;27FyX)N!9sxBpuypV@2_mSmA_`My|-&l;vVo zH2kZvfI^$m+?%c^chpT7Xyj8Vo911tfxiCB9`DOc&UUsnvD@SA>`+p}KfI0=vI;O$ z{M6Dq*lW~3WR$5-E9hv?73^-h17{kk(M!s52Xk@M++|KKcXmEG^3$CxNW(K3@@H(1=s@(& z8Q7%^l~_0N*|t-G;lbcwz@HM^_&z*MGsptk864p<|#7F$JhF{`u^K<;P zVp#Q^;;vcwS82f)xQXX3AW~>!;s?L@%5^jr^;^gn_1mZof80@}|GqD(3kxwn1g3;o zurb%HpVnB8Nnc7i=C{N}vD7r0@^ebPkqo~ZJo;(Hp3jZyg%SItm#XdsGn89@1{<>e E13VP|4gdfE