diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index dcded03..6e96f24 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -1421,6 +1421,270 @@ internal sealed class JetStreamEngine(JetStream state) if (!isLeader) stream.ResetClusteredState(); } + + internal StreamAssignment? StreamAssignment(string accountName, string streamName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return null; + return accountStreams.TryGetValue(streamName, out var assignment) ? assignment : null; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal StreamAssignment? StreamAssignmentOrInflight(string accountName, string streamName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + if (cluster.Streams.TryGetValue(accountName, out var accountStreams) && + accountStreams.TryGetValue(streamName, out var current)) + return current; + + if (cluster.InflightStreams.TryGetValue(accountName, out var inflight) && + inflight.TryGetValue(streamName, out var info)) + return info.Assignment; + + return null; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal IEnumerable StreamAssignmentsOrInflightSeq(string accountName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return []; + + var seen = new HashSet(StringComparer.Ordinal); + var items = new List(); + if (cluster.Streams.TryGetValue(accountName, out var accountStreams)) + { + foreach (var (streamName, assignment) in accountStreams) + { + seen.Add(streamName); + items.Add(assignment); + } + } + + if (cluster.InflightStreams.TryGetValue(accountName, out var inflightStreams)) + { + foreach (var (streamName, inflight) in inflightStreams) + { + if (seen.Contains(streamName) || inflight.Assignment == null) + continue; + items.Add(inflight.Assignment); + } + } + + return items; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal IEnumerable<(string Account, StreamAssignment Assignment)> StreamAssignmentsOrInflightSeqAllAccounts() + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return []; + + var results = new List<(string Account, StreamAssignment Assignment)>(); + foreach (var account in cluster.Streams.Keys.Union(cluster.InflightStreams.Keys, StringComparer.Ordinal)) + { + var seen = new HashSet(StringComparer.Ordinal); + if (cluster.Streams.TryGetValue(account, out var accountStreams)) + { + foreach (var (streamName, assignment) in accountStreams) + { + seen.Add(streamName); + results.Add((account, assignment)); + } + } + + if (cluster.InflightStreams.TryGetValue(account, out var inflightStreams)) + { + foreach (var (streamName, inflight) in inflightStreams) + { + if (seen.Contains(streamName) || inflight.Assignment == null) + continue; + results.Add((account, inflight.Assignment)); + } + } + } + + return results; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal void ProcessStreamAssignment(StreamAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + + var accountName = assignment.Client.ServiceAccount(); + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + { + accountStreams = new Dictionary(StringComparer.Ordinal); + cluster.Streams[accountName] = accountStreams; + } + + var streamName = assignment.Config?.Name ?? assignment.Subject ?? string.Empty; + assignment.Responded = true; + accountStreams[streamName] = assignment; + cluster.RemoveInflightStreamProposal(accountName, streamName); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessUpdateStreamAssignment(StreamAssignment assignment) + { + ProcessStreamAssignment(assignment); + } + + internal void ProcessClusterUpdateStream(StreamAssignment assignment) + { + ProcessUpdateStreamAssignment(assignment); + } + + internal void ProcessClusterCreateStream(StreamAssignment assignment) + { + ProcessStreamAssignment(assignment); + } + + internal void ProcessStreamRemoval(StreamAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + var accountName = assignment.Client.ServiceAccount(); + var streamName = assignment.Config?.Name ?? assignment.Subject ?? string.Empty; + if (cluster.Streams.TryGetValue(accountName, out var accountStreams)) + { + accountStreams.Remove(streamName); + if (accountStreams.Count == 0) + cluster.Streams.Remove(accountName); + } + cluster.RemoveInflightStreamProposal(accountName, streamName); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessClusterDeleteStream(StreamAssignment assignment) + { + ProcessStreamRemoval(assignment); + } + + internal void ProcessConsumerAssignment(ConsumerAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + + var accountName = assignment.Client.ServiceAccount(); + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return; + if (!accountStreams.TryGetValue(assignment.Stream, out var streamAssignment)) + return; + + streamAssignment.Consumers ??= new Dictionary(StringComparer.Ordinal); + assignment.Responded = true; + streamAssignment.Consumers[assignment.Name] = assignment; + cluster.RemoveInflightConsumerProposal(accountName, assignment.Stream, assignment.Name); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessConsumerRemoval(ConsumerAssignment assignment) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster || assignment.Client == null) + return; + + var accountName = assignment.Client.ServiceAccount(); + if (cluster.Streams.TryGetValue(accountName, out var accountStreams) && + accountStreams.TryGetValue(assignment.Stream, out var streamAssignment)) + { + streamAssignment.Consumers?.Remove(assignment.Name); + } + cluster.RemoveInflightConsumerProposal(accountName, assignment.Stream, assignment.Name); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessClusterCreateConsumer(ConsumerAssignment assignment) + { + ProcessConsumerAssignment(assignment); + } + + internal void ProcessClusterDeleteConsumer(ConsumerAssignment assignment) + { + ProcessConsumerRemoval(assignment); + } + + internal ConsumerAssignment? ConsumerAssignment(string accountName, string streamName, string consumerName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return null; + if (!accountStreams.TryGetValue(streamName, out var streamAssignment)) + return null; + if (streamAssignment.Consumers == null) + return null; + return streamAssignment.Consumers.TryGetValue(consumerName, out var assignment) ? assignment : null; + } + finally + { + _state.Lock.ExitReadLock(); + } + } } internal sealed class StreamAssignmentView diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs index f41cfe1..70176f6 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterStreams.cs @@ -9,4 +9,30 @@ public sealed partial class NatsServer return Math.Max(1, config.Replicas); } + + internal void SendStreamLostQuorumAdvisory(Account? account, string stream, string[]? peers = null) + { + _ = account; + _ = peers; + Noticef("JetStream stream lost quorum advisory for stream {0}", stream); + } + + internal void SendStreamLeaderElectAdvisory(Account? account, string stream, string leader) + { + _ = account; + Noticef("JetStream stream leader elect advisory for stream {0}, leader {1}", stream, leader); + } + + internal bool RemoveStream(Account? account, string streamName) + { + if (account == null) + return false; + + var (stream, _) = account.LookupStream(streamName); + if (stream == null) + return false; + + stream.Delete(); + return true; + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupCTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupCTests.Impltests.cs new file mode 100644 index 0000000..3fcbf5f --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupCTests.Impltests.cs @@ -0,0 +1,116 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterStreamsGroupCTests +{ + [Fact] // T:1618 + public void SendStreamLostQuorumAdvisory_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("SendStreamLostQuorumAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1619 + public void SendStreamLeaderElectAdvisory_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("SendStreamLeaderElectAdvisory", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1620 + public void StreamAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1621 + public void StreamAssignmentOrInflight_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignmentOrInflight", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1622 + public void StreamAssignmentsOrInflightSeq_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignmentsOrInflightSeq", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1623 + public void StreamAssignmentsOrInflightSeqAllAccounts_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("StreamAssignmentsOrInflightSeqAllAccounts", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1624 + public void ProcessStreamAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessStreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1625 + public void ProcessUpdateStreamAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessUpdateStreamAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1626 + public void RemoveStream_Method_ShouldExist() + { + typeof(NatsServer).GetMethod("RemoveStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1627 + public void ProcessClusterUpdateStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterUpdateStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1628 + public void ProcessClusterCreateStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterCreateStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1629 + public void ProcessStreamRemoval_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessStreamRemoval", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1630 + public void ProcessClusterDeleteStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterDeleteStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1631 + public void ProcessConsumerAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessConsumerAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1632 + public void ProcessConsumerRemoval_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessConsumerRemoval", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1633 + public void ProcessClusterCreateConsumer_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterCreateConsumer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1634 + public void ProcessClusterDeleteConsumer_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessClusterDeleteConsumer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1635 + public void ConsumerAssignment_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ConsumerAssignment", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } +} diff --git a/porting.db b/porting.db index 8e2ce0e..0e22117 100644 Binary files a/porting.db and b/porting.db differ