From d6efba6f8ab1caa0a0a80a0b0fce6f2125b3ecd1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 22:47:53 -0500 Subject: [PATCH] batch33 task2 implement group A cluster stream features --- .../JetStream/JetStreamClusterTypes.cs | 85 +++ .../JetStream/JetStreamEngine.cs | 485 ++++++++++++++++++ ...reamClusterStreamsGroupATests.Impltests.cs | 175 +++++++ porting.db | Bin 6754304 -> 6758400 bytes 4 files changed, 745 insertions(+) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupATests.Impltests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index d563da5..312dd86 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -452,6 +452,9 @@ internal sealed class RaftGroup /// Internal Raft node — not serialized. [JsonIgnore] public IRaftNode? Node { get; set; } + + internal bool IsMember(string id) => + Peers.Contains(id, StringComparer.Ordinal); } // ============================================================================ @@ -482,6 +485,49 @@ internal sealed class StreamAssignment [JsonIgnore] public bool Resetting { get; set; } [JsonIgnore] public Exception? Error { get; set; } [JsonIgnore] public UnsupportedStreamAssignment? Unsupported { get; set; } + + internal StreamAssignment CopyGroup() + { + var clone = new StreamAssignment + { + Client = Client, + Created = Created, + ConfigJson = ConfigJson, + Config = Config, + Group = Group == null + ? null + : new RaftGroup + { + Name = Group.Name, + Peers = [.. Group.Peers], + Storage = Group.Storage, + Cluster = Group.Cluster, + Preferred = Group.Preferred, + ScaleUp = Group.ScaleUp, + Node = Group.Node, + }, + Sync = Sync, + Subject = Subject, + Reply = Reply, + Restore = Restore, + Consumers = Consumers, + Responded = Responded, + Recovering = Recovering, + Reassigning = Reassigning, + Resetting = Resetting, + Error = Error, + Unsupported = Unsupported, + }; + return clone; + } + + internal bool MissingPeers() => + Group != null && + Config != null && + Group.Peers.Length < Math.Max(1, Config.Replicas); + + internal string RecoveryKey() => + $"{Client?.ServiceAccount() ?? string.Empty}:{Config?.Name ?? Subject ?? string.Empty}"; } // ============================================================================ @@ -551,6 +597,45 @@ internal sealed class ConsumerAssignment [JsonIgnore] public bool Recovering { get; set; } [JsonIgnore] public Exception? Error { get; set; } [JsonIgnore] public UnsupportedConsumerAssignment? Unsupported { get; set; } + + internal ConsumerAssignment CopyGroup() + { + var clone = new ConsumerAssignment + { + Client = Client, + Created = Created, + Name = Name, + Stream = Stream, + ConfigJson = ConfigJson, + Config = Config, + Group = Group == null + ? null + : new RaftGroup + { + Name = Group.Name, + Peers = [.. Group.Peers], + Storage = Group.Storage, + Cluster = Group.Cluster, + Preferred = Group.Preferred, + ScaleUp = Group.ScaleUp, + Node = Group.Node, + }, + Subject = Subject, + Reply = Reply, + State = State, + Responded = Responded, + Recovering = Recovering, + Error = Error, + Unsupported = Unsupported, + }; + return clone; + } + + internal string StreamRecoveryKey() => + $"{Client?.ServiceAccount() ?? string.Empty}:{Stream}"; + + internal string RecoveryKey() => + $"{Client?.ServiceAccount() ?? string.Empty}:{Stream}:{Name}"; } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 6237f85..414a42d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -727,6 +727,491 @@ internal sealed class JetStreamEngine(JetStream state) !string.IsNullOrWhiteSpace(value) && value.Length >= 10 && value.StartsWith("A", StringComparison.Ordinal); + + // ------------------------------------------------------------------------- + // JetStream cluster stream methods (Batch 33 Group A) + // ------------------------------------------------------------------------- + + internal void MonitorCluster() + { + var meta = GetMetaGroup(); + if (meta == null) + return; + + SetMetaRecovering(); + try + { + CheckClusterSize(); + var (_, _, _, error) = MetaSnapshot(); + if (error != null) + Server()?.Warnf("JetStream meta snapshot failed in MonitorCluster: {0}", error.Message); + } + finally + { + ClearMetaRecovering(); + ClusterStoppedC()?.Writer.TryWrite(true); + } + } + + internal void CheckClusterSize() + { + var server = Server(); + var meta = GetMetaGroup(); + if (server == null || meta == null) + return; + + var activePeers = server.ActivePeers(); + if (activePeers.Count == 0) + return; + + var jsPeers = 0; + foreach (var peer in activePeers) + { + var info = server.GetNodeInfo(peer); + if (info?.Js == true) + jsPeers++; + } + + if (jsPeers > 0 && jsPeers < meta.ClusterSize()) + meta.AdjustClusterSize(jsPeers); + } + + internal (StreamConfig Config, bool Ok) ClusterStreamConfig(string accountName, string streamName) + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return (new StreamConfig(), false); + if (!cluster.Streams.TryGetValue(accountName, out var accountStreams)) + return (new StreamConfig(), false); + if (!accountStreams.TryGetValue(streamName, out var assignment)) + return (new StreamConfig(), false); + if (assignment.Config == null) + return (new StreamConfig(), false); + + return (assignment.Config.Clone(), true); + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) MetaSnapshot() + { + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return ([], 0, 0, null); + + return EncodeMetaSnapshot(cluster.Streams); + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal Exception? ApplyMetaSnapshot(byte[] buffer, RecoveryUpdates? updates, bool isRecovering) + { + var (decoded, error) = DecodeMetaSnapshot(buffer); + if (error != null || decoded == null) + return error; + + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return null; + + foreach (var accountStreams in decoded.Values) + { + foreach (var assignment in accountStreams.Values) + { + SetStreamAssignmentRecovering(assignment); + if (assignment.Consumers == null) + continue; + + foreach (var consumer in assignment.Consumers.Values) + { + SetConsumerAssignmentRecovering(consumer); + } + } + } + + cluster.Streams = decoded; + if (!isRecovering && updates != null) + { + updates.AddStreams.Clear(); + updates.UpdateStreams.Clear(); + updates.RemoveStreams.Clear(); + updates.UpdateConsumers.Clear(); + updates.RemoveConsumers.Clear(); + } + return null; + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal (Dictionary>? Streams, Exception? Error) DecodeMetaSnapshot(byte[] buffer) + { + if (buffer.Length == 0) + { + return ( + new Dictionary>(StringComparer.Ordinal), + null + ); + } + + try + { + var writeable = System.Text.Json.JsonSerializer.Deserialize>(buffer) ?? []; + var streams = new Dictionary>(StringComparer.Ordinal); + foreach (var item in writeable) + { + var accountName = item.Client?.ServiceAccount() ?? string.Empty; + if (!streams.TryGetValue(accountName, out var accountStreams)) + { + accountStreams = new Dictionary(StringComparer.Ordinal); + streams[accountName] = accountStreams; + } + + var assignment = new StreamAssignment + { + Client = item.Client, + Created = item.Created, + ConfigJson = item.ConfigJson, + Group = item.Group, + Sync = item.Sync, + Config = item.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined + ? null + : System.Text.Json.JsonSerializer.Deserialize(item.ConfigJson.GetRawText()), + Consumers = new Dictionary(StringComparer.Ordinal), + }; + + if (item.Consumers.Count > 0) + { + foreach (var consumer in item.Consumers) + { + var streamName = !string.IsNullOrWhiteSpace(consumer.Stream) + ? consumer.Stream + : assignment.Config?.Name ?? string.Empty; + var assignmentConsumer = new ConsumerAssignment + { + Client = consumer.Client, + Created = consumer.Created, + Name = consumer.Name, + Stream = streamName, + ConfigJson = consumer.ConfigJson, + Group = consumer.Group, + State = consumer.State, + Config = consumer.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined + ? null + : System.Text.Json.JsonSerializer.Deserialize(consumer.ConfigJson.GetRawText()), + }; + assignment.Consumers[assignmentConsumer.Name] = assignmentConsumer; + } + } + + var streamKey = assignment.Config?.Name ?? string.Empty; + accountStreams[streamKey] = assignment; + } + + return (streams, null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) EncodeMetaSnapshot( + Dictionary> streams) + { + try + { + var streamCount = 0; + var consumerCount = 0; + var writeable = new List(); + foreach (var accountStreams in streams.Values) + { + foreach (var assignment in accountStreams.Values) + { + streamCount++; + var configJson = assignment.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined && + assignment.Config != null + ? System.Text.Json.JsonSerializer.SerializeToElement(assignment.Config) + : assignment.ConfigJson; + var ws = new WriteableStreamAssignment + { + Client = assignment.Client, + Created = assignment.Created, + ConfigJson = configJson, + Group = assignment.Group, + Sync = assignment.Sync, + }; + + if (assignment.Consumers != null) + { + foreach (var consumer in assignment.Consumers.Values) + { + consumerCount++; + var consumerConfigJson = + consumer.ConfigJson.ValueKind == System.Text.Json.JsonValueKind.Undefined && + consumer.Config != null + ? System.Text.Json.JsonSerializer.SerializeToElement(consumer.Config) + : consumer.ConfigJson; + ws.Consumers.Add(new WriteableConsumerAssignment + { + Client = consumer.Client, + Created = consumer.Created, + Name = consumer.Name, + Stream = consumer.Stream, + ConfigJson = consumerConfigJson, + Group = consumer.Group, + State = consumer.State, + }); + } + } + + writeable.Add(ws); + } + } + + if (writeable.Count == 0) + return ([], 0, 0, null); + + var snapshot = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(writeable); + return (snapshot, streamCount, consumerCount, null); + } + catch (Exception ex) + { + return ([], 0, 0, ex); + } + } + + internal Exception? CollectStreamAndConsumerChanges( + IRaftNodeCheckpoint checkpoint, + Dictionary> streams) + { + try + { + foreach (var (appendEntry, error) in checkpoint.AppendEntriesSeq()) + { + if (error != null) + return error; + + foreach (var entry in appendEntry.Entries) + { + if (entry.Type != EntryType.EntryNormal || entry.Data.Length == 0) + continue; + + var op = (EntryOp)entry.Data[0]; + var payload = entry.Data.Length > 1 ? entry.Data.AsSpan(1).ToArray() : []; + switch (op) + { + case EntryOp.AssignStreamOp: + case EntryOp.UpdateStreamOp: + { + var sa = System.Text.Json.JsonSerializer.Deserialize(payload); + if (sa?.Client == null || sa.Config == null) + break; + var account = sa.Client.ServiceAccount(); + if (!streams.TryGetValue(account, out var accountStreams)) + { + accountStreams = new Dictionary(StringComparer.Ordinal); + streams[account] = accountStreams; + } + + accountStreams[sa.Config.Name] = sa; + break; + } + case EntryOp.RemoveStreamOp: + { + var sa = System.Text.Json.JsonSerializer.Deserialize(payload); + if (sa?.Client == null || sa.Config == null) + break; + if (streams.TryGetValue(sa.Client.ServiceAccount(), out var accountStreams)) + { + accountStreams.Remove(sa.Config.Name); + } + break; + } + case EntryOp.AssignConsumerOp: + case EntryOp.AssignCompressedConsumerOp: + { + var ca = System.Text.Json.JsonSerializer.Deserialize(payload); + if (ca?.Client == null || string.IsNullOrWhiteSpace(ca.Stream)) + break; + var account = ca.Client.ServiceAccount(); + if (!streams.TryGetValue(account, out var accountStreams)) + break; + if (!accountStreams.TryGetValue(ca.Stream, out var streamAssignment)) + break; + streamAssignment.Consumers ??= new Dictionary(StringComparer.Ordinal); + streamAssignment.Consumers[ca.Name] = ca; + break; + } + case EntryOp.RemoveConsumerOp: + { + var ca = System.Text.Json.JsonSerializer.Deserialize(payload); + if (ca?.Client == null || string.IsNullOrWhiteSpace(ca.Stream)) + break; + var account = ca.Client.ServiceAccount(); + if (!streams.TryGetValue(account, out var accountStreams)) + break; + if (!accountStreams.TryGetValue(ca.Stream, out var streamAssignment)) + break; + streamAssignment.Consumers?.Remove(ca.Name); + break; + } + } + } + } + + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal void SetStreamAssignmentRecovering(StreamAssignment assignment) + { + assignment.Responded = true; + assignment.Recovering = true; + assignment.Restore = null; + if (assignment.Group != null) + { + assignment.Group.Preferred = string.Empty; + assignment.Group.ScaleUp = false; + } + } + + internal void SetConsumerAssignmentRecovering(ConsumerAssignment assignment) + { + assignment.Responded = true; + assignment.Recovering = true; + if (assignment.Group != null) + { + assignment.Group.Preferred = string.Empty; + assignment.Group.ScaleUp = false; + } + } + + internal void ProcessAddPeer(string peer) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + if (!cluster.IsLeader()) + return; + + foreach (var accountStreams in cluster.Streams.Values) + { + foreach (var assignment in accountStreams.Values) + { + if (!assignment.MissingPeers()) + continue; + + var copy = assignment.CopyGroup(); + copy.Group!.Peers = [.. copy.Group.Peers, peer]; + assignment.Group = copy.Group; + + if (assignment.Consumers == null) + continue; + + foreach (var consumer in assignment.Consumers.Values) + { + if (consumer.Config?.Durable is { Length: > 0 } || (consumer.Group?.Peers.Length ?? 0) > 1) + consumer.Group!.Peers = assignment.Group.Peers; + } + } + } + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal void ProcessRemovePeer(string peer) + { + _state.Lock.EnterWriteLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return; + + foreach (var accountStreams in cluster.Streams.Values) + { + foreach (var assignment in accountStreams.Values) + { + if (assignment.Group == null || !assignment.Group.IsMember(peer)) + continue; + + RemovePeerFromStreamLocked(assignment, peer); + } + } + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal bool RemovePeerFromStream(StreamAssignment assignment, string peer) + { + _state.Lock.EnterWriteLock(); + try + { + return RemovePeerFromStreamLocked(assignment, peer); + } + finally + { + _state.Lock.ExitWriteLock(); + } + } + + internal bool RemovePeerFromStreamLocked(StreamAssignment assignment, string peer) + { + if (assignment.Group == null || !assignment.Group.IsMember(peer)) + return false; + + assignment.Group.Peers = [.. assignment.Group.Peers.Where(p => !string.Equals(p, peer, StringComparison.Ordinal))]; + assignment.Group.Preferred = string.Empty; + + if (assignment.Consumers == null) + return true; + + foreach (var consumer in assignment.Consumers.Values) + { + if (consumer.Group == null) + continue; + consumer.Group.Peers = [.. consumer.Group.Peers.Where(p => !string.Equals(p, peer, StringComparison.Ordinal))]; + consumer.Group.Preferred = string.Empty; + } + + return true; + } + + internal bool HasPeerEntries(IEnumerable entries) + { + foreach (var entry in entries) + { + if (entry.Type == EntryType.EntryAddPeer || entry.Type == EntryType.EntryRemovePeer) + return true; + } + return false; + } } internal sealed class StreamAssignmentView diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupATests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupATests.Impltests.cs new file mode 100644 index 0000000..2b7c040 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterStreamsGroupATests.Impltests.cs @@ -0,0 +1,175 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterStreamsGroupATests +{ + [Fact] // T:1578 + public void MonitorCluster_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("MonitorCluster", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1579 + public void CheckClusterSize_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("CheckClusterSize", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1580 + public void ClusterStreamConfig_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ClusterStreamConfig", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1581 + public void MetaSnapshot_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("MetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1582 + public void ApplyMetaSnapshot_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ApplyMetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1583 + public void DecodeMetaSnapshot_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("DecodeMetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1584 + public void EncodeMetaSnapshot_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("EncodeMetaSnapshot", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1585 + public void CollectStreamAndConsumerChanges_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("CollectStreamAndConsumerChanges", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1586 + public void SetStreamAssignmentRecovering_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("SetStreamAssignmentRecovering", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1587 + public void SetConsumerAssignmentRecovering_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("SetConsumerAssignmentRecovering", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1588 + public void CopyGroup_StreamAssignment_ShouldCloneGroupPeers() + { + var sa = new StreamAssignment + { + Group = new RaftGroup { Name = "RG", Peers = ["A", "B"] }, + Config = new StreamConfig { Name = "S", Replicas = 3 }, + }; + + var method = typeof(StreamAssignment).GetMethod("CopyGroup", BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + var copy = method!.Invoke(sa, []) as StreamAssignment; + copy.ShouldNotBeNull(); + copy!.Group.ShouldNotBeNull(); + ReferenceEquals(copy.Group, sa.Group).ShouldBeFalse(); + copy.Group.Peers.SequenceEqual(sa.Group!.Peers).ShouldBeTrue(); + } + + [Fact] // T:1589 + public void CopyGroup_ConsumerAssignment_ShouldCloneGroupPeers() + { + var ca = new ConsumerAssignment + { + Name = "C", + Stream = "S", + Group = new RaftGroup { Name = "RG", Peers = ["A", "B"] }, + }; + + var method = typeof(ConsumerAssignment).GetMethod("CopyGroup", BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + var copy = method!.Invoke(ca, []) as ConsumerAssignment; + copy.ShouldNotBeNull(); + copy!.Group.ShouldNotBeNull(); + ReferenceEquals(copy.Group, ca.Group).ShouldBeFalse(); + copy.Group.Peers.SequenceEqual(ca.Group!.Peers).ShouldBeTrue(); + } + + [Fact] // T:1590 + public void MissingPeers_StreamAssignment_ShouldReflectReplicaGap() + { + var sa = new StreamAssignment + { + Config = new StreamConfig { Name = "S", Replicas = 3 }, + Group = new RaftGroup { Peers = ["A", "B"] }, + }; + var method = typeof(StreamAssignment).GetMethod("MissingPeers", BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + ((bool)method!.Invoke(sa, [])!).ShouldBeTrue(); + } + + [Fact] // T:1591 + public void ProcessAddPeer_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessAddPeer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1592 + public void ProcessRemovePeer_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("ProcessRemovePeer", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1593 + public void RemovePeerFromStream_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("RemovePeerFromStream", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1594 + public void RemovePeerFromStreamLocked_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("RemovePeerFromStreamLocked", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1595 + public void HasPeerEntries_Method_ShouldExist() + { + typeof(JetStreamEngine).GetMethod("HasPeerEntries", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + } + + [Fact] // T:1596 + public void RecoveryKey_StreamAssignment_ShouldUseAccountAndName() + { + var sa = new StreamAssignment + { + Client = new ClientInfo { Account = "A", ServiceName = "SA" }, + Config = new StreamConfig { Name = "ORDERS" }, + }; + var method = typeof(StreamAssignment).GetMethod("RecoveryKey", BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + method!.Invoke(sa, [])!.ToString().ShouldBe("SA:ORDERS"); + } + + [Fact] // T:1597 + public void StreamRecoveryKey_ConsumerAssignment_ShouldUseAccountAndStream() + { + var ca = new ConsumerAssignment + { + Client = new ClientInfo { Account = "A", ServiceName = "SA" }, + Stream = "ORDERS", + Name = "C1", + }; + var method = typeof(ConsumerAssignment).GetMethod("StreamRecoveryKey", BindingFlags.Instance | BindingFlags.NonPublic); + method.ShouldNotBeNull(); + method!.Invoke(ca, [])!.ToString().ShouldBe("SA:ORDERS"); + } +} diff --git a/porting.db b/porting.db index 20bfae6d0a49a237595334b97e0af7a8512c2e06..ea4ceaefba83ab20cb81819f920fd1d97b928846 100644 GIT binary patch delta 3424 zcmb7`YfMvT7{_}~Z*NPX1pyH$6cj}$w6t7GDN{D4Zr+&pJB7AjMNt74<6J~%#uwsr zi9A_Cj5wDN+%n8ux@>O4Wid;(WNKVCFHu3o+jM~7+!wb$2Xor)e2{+lCIA0<-t#}t z`#$HSZ8_E*b(4k7h zoFmux9}e$f@dT@qcPey>Y@PP7FmekD3PT({_Ka2=w{ zhdU@bD_o1{EO3pY%Y*ZZE*Ea4=*)VTK|Wu!IVd^lP|5sAGaEg=!a+6RJ&6B~beX<$&5HsA8xFK@~xH1yu;OR!{{{ zm4dQE%@vdl$^}(eqsoW02^XzUS%R`ar3xw!Dqc{zP~)we1D8JKtcGRR%&16>zN|YO<<`#97}P`R zSmkd@jY2Ee^E0>&vOW5IN;h#UX+a-*hmNO3r&4dZR!xRJrlrw7mfij@W6FYJr@e&C z{j8oYHe{yK`PG?9`u+iySRXMp{oK!tW9wpsb^Cm6rLi)uYz>(>&16cr#r%}fnlgiq zaT*nM-(oXEGOyoaAJB(aQeRY$-eK|d*=?2!}%S#NcGtRc8=oeltKr3dhn1rLaLTO*$>pe zTPy5WL#9W}9#VBbV!zYbO=`@ki^Y)VaoD|GkJ(5_^{2mBPRPA`Zn9t7d*!NtYLM;X z#@8K+OViWIp^!m4sDFrUrllQIj5PQi=3%4Vt^hPKrDy@lR!L30Eu8SNCHzpGMEZdfDxpEG++YhAOlPT(?KSf0cL_MFbiaZ z9AE~yAP-o8732dOu!90n2#P>47~4Y$aDr0c0%hP8FdNJP<=|ED8mIuTgSlWHm=E3n zZ-ND2Ay@{v5-K{Sm!;oP&4R(5er0j%Pz1|fqH}Y@$8vV6?{79A3 ziR9R2#=@|R1!2xk=7iW;E39D`EnzM%%;knTbC}BsbJ?xE9$%o>Q(MvIoGM!`({GG9 z6O$ahQ+HmM6ZO8fPwUbU8)l2tUFr-~hbmEdNU2pEkUy92;)nSy+yLj6U6Cz6T%MS? zkMHnlWJcb&B%$73>#J_w+30Ih(J7_RMn~#ADJ5I|O-=rWI@4BP^QLXyCY#k(lxwx; zTJud-yR*RVv=$=5D~%|k@_%abg$&1pGCa}@x^TdgM=i(TxdzsjNs-+qB zNHg5h3|lZ`VrNn%%^5ZMUVZv(p{73 zcgKvxG(_KvuF+9?&y;ThRudPan~cx3Ui!$g;3Ho?$|7WJmS#{V7ul;hmgAS^lm>fw e`6$OSHc2y_k!I9MGfE=OsFh|of*BbvALW0xCx1Ht delta 1566 zcmY+>X-ph-7{~FQ+3T4-hm}$U7M5};pd4~+Ib5ttwUtV3l~$2%7nVp<6H1~n@hE6g zd_kI4Qyw+M(AuV?ns`;GF(fRB^`ru=sag``EJ0c=S6fXq{yIyC?2FHf@Bf+OKmTbB zb?IBXyYz?RjVEPOUet$sjNYV|#;(5vNl$V{!i zMI~tU4Qh4kXO*8n5_41NW9ys_zp`1e4T*IE4;RH2DDQ7|R$1ETMnuqq0gPY*vobeA zZbO2j$L_b~SZ+$cQ^-7KI&S>c@SA>ATo$v1&Ft~Gc5vQbG{~=v(aW4)WOH-KXCY1) zr-IHty@Tt9aS58rxx@4KA=Qyw_&du^Eis2K}l)}~T zN)E}}QS8E-+FlL6Dyf&u{O>fK=JWHoU%hzhGI`X8xMrx8T^l7guWq?-hSJn0@xly+ zB0to^|IE_5@HA?cdilZy>*~GP;X&pfbe4}D4u71)iE~sK9_*f@e*Vl7qwd1VTdq)1 zc<|O0y2vekE;oCcB$V?SA-I(|FfymEcXBBiP8&o9#6)t}K>nqX$z((NeeEb9TwxTQ>DTA(U(kCN;W4wwyX$z3&^cd2oW(to4T G7yCaIYGAGa