From cf9f40ab0cab3fd64f59f285180b6a42203a00bd Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 02:21:30 -0500 Subject: [PATCH] batch35: implement and verify feature group A --- .../JetStream/JetStreamClusterTypes.cs | 409 ++++++++++++++++++ .../JetStream/NatsStream.ClusterRemaining.cs | 144 ++++++ .../NatsServer.JetStreamClusterRemaining.cs | 145 +++++++ porting.db | Bin 6791168 -> 6795264 bytes 4 files changed, 698 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index c5d149e..55f0303 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -13,8 +13,11 @@ // // Adapted from server/jetstream_cluster.go in the NATS server Go source. +using System.Buffers.Binary; +using System.Text; using System.Text.Json; using System.Text.Json.Serialization; +using IronSnappy; namespace ZB.MOM.NatsNet.Server; @@ -29,6 +32,10 @@ namespace ZB.MOM.NatsNet.Server; /// internal sealed class JetStreamCluster { + private static readonly Exception ErrBadStreamMsg = new("jetstream cluster bad replicated stream msg"); + private const int CompressThreshold = 8192; + private const ulong MsgFlagFromSourceOrMirror = 1UL; + /// The meta-controller Raft node. public IRaftNode? Meta { get; set; } @@ -638,6 +645,408 @@ internal sealed class JetStreamCluster var text = System.Text.Encoding.ASCII.GetString(headers); return text.Contains(NatsHeaderConstants.JsResponseType, StringComparison.OrdinalIgnoreCase); } + + internal static byte[] EncodeDeleteRange(DeleteRange deleteRange) + { + var payload = JsonSerializer.SerializeToUtf8Bytes(deleteRange); + var encoded = new byte[payload.Length + 1]; + encoded[0] = (byte)EntryOp.DeleteRangeOp; + Buffer.BlockCopy(payload, 0, encoded, 1, payload.Length); + return encoded; + } + + internal static (DeleteRange? DeleteRange, Exception? Error) DecodeDeleteRange(ReadOnlySpan payload) + { + try + { + var deleteRange = JsonSerializer.Deserialize(payload); + return deleteRange == null + ? (null, new InvalidOperationException("delete range payload did not deserialize")) + : (deleteRange, null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal RaftGroup? CreateGroupForConsumer(ConsumerConfig config, StreamAssignment streamAssignment) + { + var group = streamAssignment.Group; + if (group == null || group.Peers.Length == 0) + return null; + + var replicas = config.Replicas > 0 + ? config.Replicas + : Math.Max(1, streamAssignment.Config?.Replicas ?? 1); + if (replicas > group.Peers.Length) + return null; + + var peers = group.Peers.ToArray(); + var active = new List(peers.Length); + if (Server is NatsServer server) + { + foreach (var peer in peers) + { + if (server.GetNodeInfo(peer) is { Offline: false }) + active.Add(peer); + } + } + else + { + active.AddRange(peers); + } + + var quorum = replicas / 2 + 1; + if (active.Count < quorum) + return null; + + if (replicas > 0 && replicas < peers.Length) + { + if (active.Count < replicas) + return null; + + ShuffleInPlace(active); + peers = active.Take(replicas).ToArray(); + } + + var storage = config.MemoryStorage ? StorageType.MemoryStorage : streamAssignment.Config?.Storage ?? group.Storage; + return new RaftGroup + { + Name = GroupNameForConsumer(peers, storage), + Storage = storage, + Peers = peers, + Cluster = group.Cluster, + }; + } + + internal static byte[] EncodeAddConsumerAssignment(ConsumerAssignment assignment) + { + var serializable = CloneForProposal(assignment); + var payload = JsonSerializer.SerializeToUtf8Bytes(serializable); + return PrependOp((byte)EntryOp.AssignConsumerOp, payload); + } + + internal static byte[] EncodeDeleteConsumerAssignment(ConsumerAssignment assignment) + { + var serializable = CloneForProposal(assignment); + var payload = JsonSerializer.SerializeToUtf8Bytes(serializable); + return PrependOp((byte)EntryOp.RemoveConsumerOp, payload); + } + + internal static (ConsumerAssignment? Assignment, Exception? Error) DecodeConsumerAssignment(ReadOnlySpan payload) + { + try + { + var assignment = JsonSerializer.Deserialize(payload); + if (assignment == null) + return (null, new InvalidOperationException("consumer assignment payload did not deserialize")); + + var decodeError = DecodeConsumerAssignmentConfig(assignment); + return decodeError != null ? (null, decodeError) : (assignment, null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal static Exception? DecodeConsumerAssignmentConfig(ConsumerAssignment assignment) + { + if (assignment.ConfigJson.ValueKind is JsonValueKind.Undefined or JsonValueKind.Null) + return new InvalidOperationException("consumer assignment config payload is missing"); + + Exception? strictError = null; + ConsumerConfig? config; + try + { + var strictOptions = new JsonSerializerOptions { UnmappedMemberHandling = JsonUnmappedMemberHandling.Disallow }; + config = assignment.ConfigJson.Deserialize(strictOptions); + } + catch (Exception ex) + { + strictError = ex; + config = null; + } + + if (config == null) + { + try + { + config = assignment.ConfigJson.Deserialize(); + } + catch (Exception ex) + { + return ex; + } + } + + assignment.Config = config ?? new ConsumerConfig(); + if (strictError != null || !JetStreamVersioning.SupportsRequiredApiLevel(assignment.Config.Metadata)) + assignment.Unsupported = NewUnsupportedConsumerAssignment(assignment, strictError); + + return null; + } + + internal static byte[] EncodeAddConsumerAssignmentCompressed(ConsumerAssignment assignment) + { + var serializable = CloneForProposal(assignment); + var payload = JsonSerializer.SerializeToUtf8Bytes(serializable); + var compressed = Snappy.Encode(payload); + return PrependOp((byte)EntryOp.AssignCompressedConsumerOp, compressed); + } + + internal static (ConsumerAssignment? Assignment, Exception? Error) DecodeConsumerAssignmentCompressed(ReadOnlySpan payload) + { + try + { + var decoded = Snappy.Decode(payload); + return DecodeConsumerAssignment(decoded); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal static (string Subject, string Reply, byte[]? Header, byte[]? Message, ulong Sequence, long Timestamp, bool Sourced, Exception? Error) + DecodeStreamMsg(ReadOnlySpan payload) + { + if (payload.Length < 26) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + + var index = 0; + var sequence = BinaryPrimitives.ReadUInt64LittleEndian(payload.Slice(index, 8)); + index += 8; + var timestamp = (long)BinaryPrimitives.ReadUInt64LittleEndian(payload.Slice(index, 8)); + index += 8; + + if (!TryReadUInt16(payload, ref index, out var subjectLength) || !TryReadString(payload, ref index, subjectLength, out var subject)) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + if (!TryReadUInt16(payload, ref index, out var replyLength) || !TryReadString(payload, ref index, replyLength, out var reply)) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + if (!TryReadUInt16(payload, ref index, out var headerLength) || !TryReadBytes(payload, ref index, headerLength, out var header)) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + if (!TryReadUInt32(payload, ref index, out var messageLength) || !TryReadBytes(payload, ref index, messageLength, out var message)) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + + var sourced = false; + if (index < payload.Length) + { + if (!TryReadUVarInt(payload.Slice(index), out var flags, out _)) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + sourced = (flags & MsgFlagFromSourceOrMirror) != 0; + } + + return (subject, reply, header.Length == 0 ? null : header, message.Length == 0 ? null : message, sequence, timestamp, sourced, null); + } + + internal static (string BatchId, ulong BatchSequence, EntryOp Operation, byte[]? Payload, Exception? Error) DecodeBatchMsg(ReadOnlySpan payload) + { + var index = 0; + if (!TryReadUInt16(payload, ref index, out var batchIdLength) || !TryReadString(payload, ref index, batchIdLength, out var batchId)) + return (string.Empty, 0, default, null, ErrBadStreamMsg); + if (!TryReadUVarInt(payload.Slice(index), out var sequence, out var consumed)) + return (string.Empty, 0, default, null, ErrBadStreamMsg); + + index += consumed; + if (index >= payload.Length) + return (string.Empty, 0, default, null, ErrBadStreamMsg); + + var operation = (EntryOp)payload[index]; + index++; + var data = payload.Slice(index).ToArray(); + return (batchId, sequence, operation, data, null); + } + + internal static byte[] EncodeStreamMsg(string subject, string reply, byte[]? header, byte[]? message, ulong sequence, long timestamp, bool sourced) + => EncodeStreamMsgAllowCompress(subject, reply, header, message, sequence, timestamp, sourced); + + internal static byte[] EncodeStreamMsgAllowCompress(string subject, string reply, byte[]? header, byte[]? message, ulong sequence, long timestamp, bool sourced) + => EncodeStreamMsgAllowCompressAndBatch(subject, reply, header, message, sequence, timestamp, sourced, string.Empty, 0, false); + + internal static byte[] EncodeStreamMsgAllowCompressAndBatch( + string subject, + string reply, + byte[]? header, + byte[]? message, + ulong sequence, + long timestamp, + bool sourced, + string batchId, + ulong batchSequence, + bool batchCommit) + { + subject ??= string.Empty; + reply ??= string.Empty; + header ??= []; + message ??= []; + batchId ??= string.Empty; + + var subjectLength = Math.Min(subject.Length, ushort.MaxValue); + var replyLength = Math.Min(reply.Length, ushort.MaxValue); + var headerLength = Math.Min(header.Length, ushort.MaxValue); + var messageLength = Math.Min(message.Length, int.MaxValue); + + var body = new List(1 + subjectLength + replyLength + headerLength + messageLength + 32); + var op = EntryOp.StreamMsgOp; + if (!string.IsNullOrEmpty(batchId)) + { + var batchIdLength = Math.Min(batchId.Length, ushort.MaxValue); + body.AddRange(BitConverter.GetBytes((ushort)batchIdLength)); + body.AddRange(Encoding.ASCII.GetBytes(batchId[..batchIdLength])); + AppendUVarInt(body, batchSequence); + op = batchCommit ? EntryOp.BatchCommitMsgOp : EntryOp.BatchMsgOp; + body.Add((byte)EntryOp.StreamMsgOp); + } + + body.AddRange(BitConverter.GetBytes(sequence)); + body.AddRange(BitConverter.GetBytes((ulong)timestamp)); + body.AddRange(BitConverter.GetBytes((ushort)subjectLength)); + body.AddRange(Encoding.ASCII.GetBytes(subject[..subjectLength])); + body.AddRange(BitConverter.GetBytes((ushort)replyLength)); + body.AddRange(Encoding.ASCII.GetBytes(reply[..replyLength])); + body.AddRange(BitConverter.GetBytes((ushort)headerLength)); + body.AddRange(header.AsSpan(0, headerLength).ToArray()); + body.AddRange(BitConverter.GetBytes((uint)messageLength)); + body.AddRange(message.AsSpan(0, messageLength).ToArray()); + + AppendUVarInt(body, sourced ? MsgFlagFromSourceOrMirror : 0); + var encoded = body.ToArray(); + + var shouldCompress = (subjectLength + replyLength + headerLength + messageLength) > CompressThreshold; + if (shouldCompress) + { + var opIndex = !string.IsNullOrEmpty(batchId) ? FindStreamOpOffset(encoded) : 0; + var compressed = Snappy.Encode(encoded.AsSpan(opIndex + 1)); + if (compressed.Length < encoded.Length - (opIndex + 1)) + { + var output = new byte[opIndex + 1 + compressed.Length]; + if (opIndex > 0) + Buffer.BlockCopy(encoded, 0, output, 0, opIndex); + output[opIndex] = (byte)EntryOp.CompressedStreamMsgOp; + Buffer.BlockCopy(compressed, 0, output, opIndex + 1, compressed.Length); + if (!string.IsNullOrEmpty(batchId)) + output[0] = (byte)op; + return output; + } + } + + if (!string.IsNullOrEmpty(batchId)) + { + encoded[0] = (byte)op; + return encoded; + } + + return PrependOp((byte)EntryOp.StreamMsgOp, encoded); + } + + private static byte[] PrependOp(byte op, byte[] payload) + { + var encoded = new byte[payload.Length + 1]; + encoded[0] = op; + Buffer.BlockCopy(payload, 0, encoded, 1, payload.Length); + return encoded; + } + + private static ConsumerAssignment CloneForProposal(ConsumerAssignment assignment) + { + var clone = assignment.CopyGroup(); + if (assignment.Config != null) + clone.ConfigJson = SerializeJsonElement(assignment.Config); + return clone; + } + + internal static JsonElement SerializeJsonElement(T value) + { + var bytes = JsonSerializer.SerializeToUtf8Bytes(value); + using var doc = JsonDocument.Parse(bytes); + return doc.RootElement.Clone(); + } + + private static bool TryReadUInt16(ReadOnlySpan payload, ref int index, out int value) + { + if (payload.Length - index < 2) + { + value = 0; + return false; + } + + value = BinaryPrimitives.ReadUInt16LittleEndian(payload.Slice(index, 2)); + index += 2; + return true; + } + + private static bool TryReadUInt32(ReadOnlySpan payload, ref int index, out int value) + { + if (payload.Length - index < 4) + { + value = 0; + return false; + } + + value = (int)BinaryPrimitives.ReadUInt32LittleEndian(payload.Slice(index, 4)); + index += 4; + return true; + } + + private static bool TryReadString(ReadOnlySpan payload, ref int index, int length, out string value) + { + if (payload.Length - index < length) + { + value = string.Empty; + return false; + } + + value = Encoding.ASCII.GetString(payload.Slice(index, length)); + index += length; + return true; + } + + private static bool TryReadBytes(ReadOnlySpan payload, ref int index, int length, out byte[] value) + { + if (payload.Length - index < length) + { + value = []; + return false; + } + + value = payload.Slice(index, length).ToArray(); + index += length; + return true; + } + + private static void AppendUVarInt(List buffer, ulong value) + { + while (value >= 0x80) + { + buffer.Add((byte)((value & 0x7F) | 0x80)); + value >>= 7; + } + + buffer.Add((byte)value); + } + + private static int FindStreamOpOffset(byte[] encoded) + { + var index = 1; + if (encoded.Length < 3) + return 0; + var batchIdLength = BinaryPrimitives.ReadUInt16LittleEndian(encoded.AsSpan(index, 2)); + index += 2 + batchIdLength; + if (!TryReadUVarInt(encoded.AsSpan(index), out _, out var consumed)) + return 0; + return index + consumed; + } + + private static void ShuffleInPlace(List items) + { + for (var i = items.Count - 1; i > 0; i--) + { + var j = Random.Shared.Next(i + 1); + (items[i], items[j]) = (items[j], items[i]); + } + } } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs new file mode 100644 index 0000000..2138b0c --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs @@ -0,0 +1,144 @@ +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal bool SupportsBinarySnapshot() + { + _mu.EnterReadLock(); + try + { + return SupportsBinarySnapshotLocked(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool SupportsBinarySnapshotLocked() + { + var raftNode = _node as IRaftNode; + var server = Account?.Server as NatsServer; + if (raftNode == null || server == null) + return false; + + var ourId = raftNode.ID(); + foreach (var peer in raftNode.Peers()) + { + if (string.Equals(peer.Id, ourId, StringComparison.Ordinal)) + continue; + + if (server.GetNodeInfo(peer.Id) is { BinarySnapshots: false }) + return false; + } + + return true; + } + + internal byte[]? StateSnapshot() + { + _mu.EnterReadLock(); + try + { + return StateSnapshotLocked(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal byte[]? StateSnapshotLocked() + { + if (Store == null) + return null; + + if (SupportsBinarySnapshotLocked()) + { + var (encoded, error) = Store.EncodedStreamState(GetCLFS()); + return error == null ? encoded : null; + } + + var state = Store.State(); + var snapshot = new StreamSnapshot + { + Msgs = state.Msgs, + Bytes = state.Bytes, + FirstSeq = state.FirstSeq, + LastSeq = state.LastSeq, + Failed = GetCLFS(), + Deleted = state.Deleted, + }; + return JsonSerializer.SerializeToUtf8Bytes(snapshot); + } + + internal Exception? ProcessClusteredInboundMsg(string subject, string reply, byte[]? header, byte[]? message, object? msgTrace, bool sourced) + { + _ = msgTrace; + if (string.IsNullOrWhiteSpace(subject)) + return new ArgumentException("subject is required", nameof(subject)); + + _mu.EnterUpgradeableReadLock(); + try + { + var raftNode = _node as IRaftNode; + var canRespond = !Config.NoAck && !string.IsNullOrWhiteSpace(reply); + if (raftNode == null) + { + return ProcessJetStreamMsg(subject, reply, header, message, 0, 0, msgTrace, sourced, canRespond); + } + + if (!IsLeader()) + { + return new InvalidOperationException(JsApiErrors.NewJSClusterNotLeaderError().Description ?? "stream is not cluster leader"); + } + + if (Config.Sealed) + { + if (canRespond) + { + var response = new JSPubAckResponse + { + Stream = Name, + PubAckError = JsApiErrors.NewJSStreamSealedError(), + }; + _outq.SendMsg(reply, JsonSerializer.SerializeToUtf8Bytes(response)); + } + + return new InvalidOperationException(JsApiErrors.NewJSStreamSealedError().Description ?? "stream is sealed"); + } + + _mu.EnterWriteLock(); + try + { + if (_clseq == 0) + _clseq = (ulong)Math.Max(0, Interlocked.Read(ref LastSeq)) + _clfs; + _clseq++; + + var encoded = JetStreamCluster.EncodeStreamMsgAllowCompress( + subject, + reply, + header, + message, + _clseq, + DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L, + sourced); + + raftNode.Propose(encoded); + TrackReplicationTraffic(raftNode, encoded.Length, Math.Max(1, Config.Replicas)); + } + finally + { + _mu.ExitWriteLock(); + } + + return null; + } + finally + { + _mu.ExitUpgradeableReadLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs new file mode 100644 index 0000000..965fa1c --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs @@ -0,0 +1,145 @@ +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal void JsClusteredConsumerRequest( + ClientInfo clientInfo, + Account account, + string subject, + string reply, + byte[] requestMessage, + string stream, + ConsumerConfig config, + ConsumerAction action, + bool pedantic) + { + var (js, cluster) = GetJetStreamCluster(); + if (js == null || cluster == null) + return; + + var response = new JsApiConsumerCreateResponse + { + Type = JsApiSubjects.JsApiConsumerCreateResponseType, + }; + + var engine = new JetStreamEngine(js); + var (streamConfig, streamFound) = engine.ClusterStreamConfig(account.Name, stream); + if (!streamFound) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var defaultsError = NatsConsumer.SetConsumerConfigDefaults(config, streamConfig, selectedLimits: null, pedantic); + if (defaultsError != null) + { + response.Error = defaultsError; + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var cfgError = NatsConsumer.CheckConsumerCfg(config, streamConfig, selectedLimits: null, isRecovering: false); + if (cfgError != null) + { + response.Error = cfgError; + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var streamAssignment = engine.StreamAssignmentOrInflight(account.Name, stream); + if (streamAssignment == null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var explicitName = !string.IsNullOrWhiteSpace(config.Name) + ? config.Name + : config.Durable; + + var existing = !string.IsNullOrWhiteSpace(explicitName) + ? engine.ConsumerAssignmentOrInflight(account.Name, stream, explicitName) + : null; + + ConsumerAssignment assignment; + if (existing != null) + { + JetStreamVersioning.CopyConsumerMetadata(config, existing.Config); + if (action == ConsumerAction.Create) + { + var existingConfig = existing.Config ?? new ConsumerConfig(); + if (JsonSerializer.Serialize(existingConfig) != JsonSerializer.Serialize(config)) + { + response.Error = JsApiErrors.NewJSConsumerAlreadyExistsError(); + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + } + + assignment = existing.CopyGroup(); + assignment.Config = config; + assignment.ConfigJson = JetStreamCluster.SerializeJsonElement(config); + assignment.Subject = subject; + assignment.Reply = reply; + assignment.Client = clientInfo; + } + else + { + if (action == ConsumerAction.Update) + { + response.Error = JsApiErrors.NewJSConsumerDoesNotExistError(); + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + JetStreamVersioning.SetStaticConsumerMetadata(config); + var group = cluster.CreateGroupForConsumer(config, streamAssignment); + if (group == null) + { + response.Error = JsApiErrors.NewJSInsufficientResourcesError(); + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + group.SetPreferred(this); + group.Cluster = streamAssignment.Group?.Cluster; + + var name = explicitName; + if (!NatsConsumer.IsDurableConsumer(config)) + { + if (string.IsNullOrWhiteSpace(name)) + { + do + { + name = NatsConsumer.CreateConsumerName(); + } while (engine.ConsumerAssignmentOrInflight(account.Name, stream, name) != null); + } + } + + assignment = new ConsumerAssignment + { + Group = group, + Stream = stream, + Name = name, + Config = config, + ConfigJson = JetStreamCluster.SerializeJsonElement(config), + Subject = subject, + Reply = reply, + Client = clientInfo, + Created = DateTime.UtcNow, + }; + } + + if (cluster.Meta != null) + { + cluster.Meta.Propose(JetStreamCluster.EncodeAddConsumerAssignment(assignment)); + cluster.TrackInflightConsumerProposal(account.Name, stream, assignment, deleted: false); + } + + _ = requestMessage; + } +} diff --git a/porting.db b/porting.db index 01c3a0ee90689251fcff0a390c5479c62f193fa4..06cc962a0c973ca0d2d2e7c15491ae3d4f590290 100644 GIT binary patch delta 3157 zcma)+TTB#J9L8sMVfL`g?#?18!tQd>aw+AufKaHSR8Xr`yi{9lmD@sTBTz(R(H6nW zTI8w;|HQ_X<$hBjt!#~r(ZtZiG!^ZmNlQ%MOd13q8snuOLmact1N-oU@0)XeXU?3R z%!yl0^@(Yxx-Haqg5!i^?cv${#CSn=qRP1Bq$||`& zDiV4T^Q4PCNFPY48_6c6E~FSKxsa?<>O=~Z(q$yAl$;Y)DxU)Oi|!gO3CG<2Q>qGH zrMpJ;X~B_h=Y7>Ot$=}Ar`mDjp$g3Z_W`~@aNl^SInqnB-C7If$6It%(ZGVJqMn7( z%pWR2lb&fy&a~O+Vm%w7^AVP4I@rJxJ;w$c*mdG>vUqAe$yQ5ax&<$}eou288PBs| znrvW8Nq356lC4LG>adBL`PpV_ImMPsim;&2Pp8;U>b|MA&`d28=O-EI?r}rd>+!PF zY&)rP&G8-WOrXc7S%ilkdPbl;@EdSeaL*#4o_cQEUn%%9H?k?F!9W2emzV2Xk z^r&`W%KW_r`s*AHjEDY>nfVrzf%qY3jH_L$>% z)yjY$&;lLs2YUCbR)cCv!)my4-;JC*slZsklUQhsakmv3{}JkR0-s-EFSS=!+e?IT zv>dsOp{2@g6fH__SJ8syb_Go{e7gLmZCk2WE< zKC}V3^`c#rTMt^3+`7@ems=NFmE2rtMRM!Z31)7WTrXoVPi{_~&>@I9?x}KPY9IgH z?JJ57o^BP`V`!_W+AURsVr9HYQcQ?6y$B6v zrgXlSazae8d>8ExF|Bt@hKaxg0>Baw2!g;;unYu)ql#GY8E=*d3+XnOU;nj*bd@;4=W);dD79o9&iwi`6`a?Yxw zusQ2nU!Ki#*2aY|&+|EJl;;X-W~_G|kDppW7zhUuAQG$qE5SP;3Pgh#5DVS~ao{}= z4-!BkNCL^g22wyONCT@tI>-R;gG`VGJ^-u18juavf^{GVd;${OCtx?&13m?xfxTcK_#Auz_JaeU02G2EPz*|d-TnA!>GB`d z!CV(<+R9=X{~v#b4@A44#ecQ5(y8|2EGo;kpxl&FdtuGtYWpFkZB=|yN_0|VGg+(^a8C#5I!;m3EKdIm4|H!{e_fprS3(*c}SNYxX+rg&Y z5mVcuHEz)$w(~f=CCV*~d-(tvmCgXAJ?6b_Qre^5+oICGD%t6a9v*gXRNCaSlq68W z?PC4nFAi2PC|4Nqa`$-c^h$f!d)r@W4|#9vl=h(awpM8mcyIeD?SAiVrnLJcd-2~K nXRcA&WO5aI`Q?q}#tbutl7+b^ zbdEhbF#AP`lOZqz2?+*vjWdYI22N$;KNAwI#zbW*gY6F^@$v2LoU2Vf$@h28J?EZt zPft^*U)yx4UmHx&U*I^QzBj2*bKzWRp@+Br-8$e240trkfQP5TMvmjI?Wx=AH;f}) zRMT~&^J@AE>9m@@L^`IXFOd4wG=|ivrq7XD)N~DLpPH^B)u?F{sa#DXNITSY1<8lB z&Tsh4sEy^rjrvF*gP*a)JK)YL*oobqyi$Zl;D;;fS zXUPAl-bubpr-^D>m^;#Ox`lm6UfJQKsd^?+PbiC2sP)WfUo)lZ}T@VJVGgOD5_f#^~ElW}x)0@Cw35 zQPRb3%&n7r-OLl2xT~9OCI4#E?8;($SZbtWO%FOgFuBwZ7Cs?$VX5KSEPY9OAs_@9 z7$F8sUP$3ss^K~Q&H8)sOm&i^!=Oe4&PM z1td+$Zc3h!$LNnKc_MNjC7V`Hf4%86(C>HTjL4rL`>uS3YRhq3LlwBKQ`54EY6mSD zls_$Z%)6+!r{%@!K}*=e$2Lxr*{mscxOhzh-J6kLr|;v$M3TN{W}$d54yq