diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamLifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamLifecycle.cs new file mode 100644 index 0000000..9f5b73e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamLifecycle.cs @@ -0,0 +1,71 @@ +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class Account +{ + internal (NatsStream? Stream, Exception? Error) AddStream(StreamConfig config) => + AddStreamWithAssignment(config, null, null, pedantic: false); + + internal (NatsStream? Stream, Exception? Error) AddStreamWithStore(StreamConfig config, FileStoreConfig? fileStoreConfig) => + AddStreamWithAssignment(config, fileStoreConfig, null, pedantic: false); + + internal (NatsStream? Stream, Exception? Error) AddStreamPedantic(StreamConfig config, bool pedantic) => + AddStreamWithAssignment(config, null, null, pedantic); + + internal (NatsStream? Stream, Exception? Error) AddStreamWithAssignment( + StreamConfig config, + FileStoreConfig? fileStoreConfig, + StreamAssignment? assignment, + bool pedantic) + { + if (config == null) + return (null, new ArgumentNullException(nameof(config))); + + var (server, jsa, error) = CheckForJetStream(); + if (error != null || server == null || jsa == null) + return (null, error ?? new InvalidOperationException("jetstream not enabled for account")); + + if (string.IsNullOrWhiteSpace(config.Name)) + return (null, new InvalidOperationException("stream name is required")); + + var normalized = config.Clone(); + if (normalized.Subjects == null || normalized.Subjects.Length == 0) + normalized.Subjects = [$"{normalized.Name}.>"]; + + _ = pedantic; + _ = fileStoreConfig; + + jsa.Lock.EnterWriteLock(); + try + { + if (jsa.Streams.TryGetValue(normalized.Name, out var existingObject) && existingObject is NatsStream existing) + { + if (StreamConfigsEqual(existing.Config, normalized)) + { + if (assignment != null) + existing.SetStreamAssignment(assignment); + return (existing, null); + } + + return (null, new InvalidOperationException("stream name already in use")); + } + + IStreamStore store = new JetStreamMemStore(normalized.Clone()); + + var stream = NatsStream.Create(this, normalized, jsa, store, assignment, server); + if (stream == null) + return (null, new InvalidOperationException("stream creation failed")); + + jsa.Streams[normalized.Name] = stream; + return (stream, null); + } + finally + { + jsa.Lock.ExitWriteLock(); + } + } + + private static bool StreamConfigsEqual(StreamConfig left, StreamConfig right) + => string.Equals(JsonSerializer.Serialize(left), JsonSerializer.Serialize(right), StringComparison.Ordinal); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs new file mode 100644 index 0000000..84168f3 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs @@ -0,0 +1,160 @@ +using System.Threading.Channels; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal StreamAssignment? StreamAssignment() + { + _mu.EnterReadLock(); + try + { + return _assignment; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetStreamAssignment(StreamAssignment? assignment) + { + _mu.EnterWriteLock(); + try + { + _assignment = assignment; + + if (assignment?.Group?.Node != null) + { + _node = assignment.Group.Node; + assignment.Group.Node.UpdateKnownPeers(assignment.Group.Peers); + } + + _updateChannel.Writer.TryWrite(true); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ChannelReader? MonitorQuitC() + { + _mu.EnterWriteLock(); + try + { + _monitorQuitChannel ??= Channel.CreateBounded(1); + return _monitorQuitChannel.Reader; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SignalMonitorQuit() + { + _mu.EnterWriteLock(); + try + { + if (_monitorQuitChannel != null) + { + _monitorQuitChannel.Writer.TryComplete(); + _monitorQuitChannel = null; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ChannelReader UpdateC() => _updateChannel.Reader; + + internal bool IsLeaderNodeState() + { + _mu.EnterReadLock(); + try + { + if (_node is IRaftNode raftNode) + return raftNode.State() == RaftState.Leader; + + return true; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool IsLeaderInternal() + { + _mu.EnterReadLock(); + try + { + if (_assignment?.Group?.Node is IRaftNode node) + return node.Leader(); + + return true; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void StartClusterSubs() + { + _mu.EnterWriteLock(); + try + { + _clusterSubsActive = true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void StopClusterSubs() + { + _mu.EnterWriteLock(); + try + { + _clusterSubsActive = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool ClusterSubsActive() + { + _mu.EnterReadLock(); + try + { + return _clusterSubsActive; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal Account? AccountLocked(bool needLock) + { + if (needLock) + _mu.EnterReadLock(); + + try + { + return Account; + } + finally + { + if (needLock) + _mu.ExitReadLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index f6f788b..3e70945 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -13,13 +13,15 @@ // // Adapted from server/stream.go in the NATS server Go source. +using System.Threading.Channels; + namespace ZB.MOM.NatsNet.Server; /// /// Represents a JetStream stream, managing message storage, replication, and lifecycle. /// Mirrors the stream struct in server/stream.go. /// -internal sealed class NatsStream : IDisposable +internal sealed partial class NatsStream : IDisposable { private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion); @@ -42,6 +44,10 @@ internal sealed class NatsStream : IDisposable private ulong _leaderTerm; private bool _sealed; private CancellationTokenSource? _quitCts; + private Channel? _monitorQuitChannel = Channel.CreateBounded(1); + private readonly Channel _updateChannel = Channel.CreateBounded(4); + private StreamAssignment? _assignment; + private bool _clusterSubsActive; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs index 900d58a..7804a7d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreParity.cs @@ -303,6 +303,14 @@ public static class StoreEnumParityExtensions StorageType.FileStorage => "File", _ => "Unknown Storage Type", }; + + public static string String(this PersistModeType value) + => value switch + { + PersistModeType.DefaultPersistMode => "Default", + PersistModeType.AsyncPersistMode => "Async", + _ => "Unknown Persist Mode Type", + }; } public sealed class RetentionPolicyJsonConverter : JsonConverter @@ -403,6 +411,38 @@ public sealed class StorageTypeJsonConverter : JsonConverter } } +public sealed class PersistModeTypeJsonConverter : JsonConverter +{ + public override PersistModeType Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType != JsonTokenType.String) + throw new JsonException("can not unmarshal token"); + + return reader.GetString() switch + { + "default" => PersistModeType.DefaultPersistMode, + "" => PersistModeType.DefaultPersistMode, + "async" => PersistModeType.AsyncPersistMode, + var value => throw new JsonException($"can not unmarshal \"{value}\""), + }; + } + + public override void Write(Utf8JsonWriter writer, PersistModeType value, JsonSerializerOptions options) + { + switch (value) + { + case PersistModeType.DefaultPersistMode: + writer.WriteStringValue("default"); + break; + case PersistModeType.AsyncPersistMode: + writer.WriteStringValue("async"); + break; + default: + throw new JsonException($"can not marshal {value}"); + } + } +} + public sealed class AckPolicyJsonConverter : JsonConverter { public override AckPolicy Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs index c89a8b8..03eb1d7 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StoreTypes.cs @@ -641,6 +641,7 @@ public enum StoreCompression : byte // --------------------------------------------------------------------------- /// Determines what persistence mode the stream uses. +[JsonConverter(typeof(PersistModeTypeJsonConverter))] public enum PersistModeType { DefaultPersistMode = 0, @@ -711,6 +712,14 @@ public sealed class ExternalStream [JsonPropertyName("deliver")] public string DeliverPrefix { get; set; } = string.Empty; + + public string Domain() + { + if (string.IsNullOrEmpty(ApiPrefix)) + return string.Empty; + + return SubscriptionIndex.TokenAt(ApiPrefix, 2); + } } // --------------------------------------------------------------------------- @@ -737,6 +746,47 @@ public sealed class StreamSource [JsonPropertyName("external")] public ExternalStream? External { get; set; } + + [JsonIgnore] + public string IndexName { get; private set; } = string.Empty; + + public string ComposeIName() + { + var name = Name; + if (External != null) + name = $"{name}:{NatsServer.GetHash(External.ApiPrefix)}"; + + var source = FilterSubject ?? string.Empty; + var destination = ">"; + + if (SubjectTransforms is null || SubjectTransforms.Length == 0) + { + if (string.IsNullOrEmpty(source)) + source = ">"; + if (string.IsNullOrEmpty(destination)) + destination = ">"; + } + else + { + var sources = new List(SubjectTransforms.Length); + var destinations = new List(SubjectTransforms.Length); + foreach (var transform in SubjectTransforms) + { + sources.Add(string.IsNullOrEmpty(transform.Source) ? ">" : transform.Source); + destinations.Add(string.IsNullOrEmpty(transform.Destination) ? ">" : transform.Destination); + } + + source = string.Join('\f', sources); + destination = string.Join('\f', destinations); + } + + return string.Join(' ', [name, source, destination]); + } + + public void SetIndexName() + { + IndexName = ComposeIName(); + } } // --------------------------------------------------------------------------- diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs index 9843cc1..f17aa5b 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamTests.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0 using Shouldly; +using NSubstitute; using ZB.MOM.NatsNet.Server; namespace ZB.MOM.NatsNet.Server.Tests.JetStream; @@ -40,4 +41,58 @@ public sealed class NatsStreamTests stream.Delete(); stream.IsLeader().ShouldBeFalse(); } + + [Fact] + public void LifecyclePrimitives_AssignmentAndChannels_ShouldBehave() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create( + account, + new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }, + null, + new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }), + null, + null)!; + + stream.AccountLocked(true).ShouldBe(account); + stream.StreamAssignment().ShouldBeNull(); + + var assignment = new StreamAssignment { Sync = "sync.inbox" }; + stream.SetStreamAssignment(assignment); + stream.StreamAssignment().ShouldBe(assignment); + + stream.UpdateC().TryRead(out var updateSignal).ShouldBeTrue(); + updateSignal.ShouldBeTrue(); + + stream.StartClusterSubs(); + stream.ClusterSubsActive().ShouldBeTrue(); + stream.StopClusterSubs(); + stream.ClusterSubsActive().ShouldBeFalse(); + + var monitor = stream.MonitorQuitC(); + monitor.ShouldNotBeNull(); + stream.SignalMonitorQuit(); + stream.MonitorQuitC().ShouldNotBeNull(); + } + + [Fact] + public void IsLeaderInternal_WhenAssignedToRaftNode_UsesNodeLeaderState() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create( + account, + new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }, + null, + new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }), + null, + null)!; + + var raftNode = Substitute.For(); + raftNode.Leader().Returns(false); + var assignment = new StreamAssignment { Group = new RaftGroup { Node = raftNode } }; + + stream.SetStreamAssignment(assignment); + + stream.IsLeaderInternal().ShouldBeFalse(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs index 9a369e0..4747d67 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StoreTypesTests.cs @@ -277,6 +277,47 @@ public class StoreTypesTests JsonSerializer.Deserialize("\"file\"").ShouldBe(StorageType.FileStorage); } + [Fact] + public void PersistModeType_StringAndJsonParity_MatchesGo() + { + PersistModeType.DefaultPersistMode.String().ShouldBe("Default"); + PersistModeType.AsyncPersistMode.String().ShouldBe("Async"); + ((PersistModeType)99).String().ShouldBe("Unknown Persist Mode Type"); + + JsonSerializer.Serialize(PersistModeType.DefaultPersistMode).ShouldBe("\"default\""); + JsonSerializer.Serialize(PersistModeType.AsyncPersistMode).ShouldBe("\"async\""); + JsonSerializer.Deserialize("\"default\"").ShouldBe(PersistModeType.DefaultPersistMode); + JsonSerializer.Deserialize("\"async\"").ShouldBe(PersistModeType.AsyncPersistMode); + } + + [Fact] + public void ExternalStream_Domain_ReturnsSecondTokenOrEmpty() + { + new ExternalStream().Domain().ShouldBe(string.Empty); + new ExternalStream { ApiPrefix = "$JS.D1.API" }.Domain().ShouldBe("D1"); + } + + [Fact] + public void StreamSource_ComposeIName_UsesFilterAndTransforms() + { + var source = new StreamSource + { + Name = "ORDERS", + External = new ExternalStream { ApiPrefix = "$JS.EU.API" }, + SubjectTransforms = + [ + new SubjectTransformConfig { Source = "foo.*", Destination = "bar.*" }, + new SubjectTransformConfig { Source = string.Empty, Destination = "baz.>" }, + ], + }; + + source.ComposeIName().ShouldContain("ORDERS:"); + source.ComposeIName().ShouldContain("foo.*\f>"); + source.ComposeIName().ShouldContain("bar.*\fbaz.>"); + source.SetIndexName(); + source.IndexName.ShouldBe(source.ComposeIName()); + } + [Fact] public void AckPolicy_JsonParity_MatchesGo() { diff --git a/porting.db b/porting.db index 2af9dee..486e4e1 100644 Binary files a/porting.db and b/porting.db differ