diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index c5d149e..55f0303 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -13,8 +13,11 @@ // // Adapted from server/jetstream_cluster.go in the NATS server Go source. +using System.Buffers.Binary; +using System.Text; using System.Text.Json; using System.Text.Json.Serialization; +using IronSnappy; namespace ZB.MOM.NatsNet.Server; @@ -29,6 +32,10 @@ namespace ZB.MOM.NatsNet.Server; /// internal sealed class JetStreamCluster { + private static readonly Exception ErrBadStreamMsg = new("jetstream cluster bad replicated stream msg"); + private const int CompressThreshold = 8192; + private const ulong MsgFlagFromSourceOrMirror = 1UL; + /// The meta-controller Raft node. public IRaftNode? Meta { get; set; } @@ -638,6 +645,408 @@ internal sealed class JetStreamCluster var text = System.Text.Encoding.ASCII.GetString(headers); return text.Contains(NatsHeaderConstants.JsResponseType, StringComparison.OrdinalIgnoreCase); } + + internal static byte[] EncodeDeleteRange(DeleteRange deleteRange) + { + var payload = JsonSerializer.SerializeToUtf8Bytes(deleteRange); + var encoded = new byte[payload.Length + 1]; + encoded[0] = (byte)EntryOp.DeleteRangeOp; + Buffer.BlockCopy(payload, 0, encoded, 1, payload.Length); + return encoded; + } + + internal static (DeleteRange? DeleteRange, Exception? Error) DecodeDeleteRange(ReadOnlySpan payload) + { + try + { + var deleteRange = JsonSerializer.Deserialize(payload); + return deleteRange == null + ? (null, new InvalidOperationException("delete range payload did not deserialize")) + : (deleteRange, null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal RaftGroup? CreateGroupForConsumer(ConsumerConfig config, StreamAssignment streamAssignment) + { + var group = streamAssignment.Group; + if (group == null || group.Peers.Length == 0) + return null; + + var replicas = config.Replicas > 0 + ? config.Replicas + : Math.Max(1, streamAssignment.Config?.Replicas ?? 1); + if (replicas > group.Peers.Length) + return null; + + var peers = group.Peers.ToArray(); + var active = new List(peers.Length); + if (Server is NatsServer server) + { + foreach (var peer in peers) + { + if (server.GetNodeInfo(peer) is { Offline: false }) + active.Add(peer); + } + } + else + { + active.AddRange(peers); + } + + var quorum = replicas / 2 + 1; + if (active.Count < quorum) + return null; + + if (replicas > 0 && replicas < peers.Length) + { + if (active.Count < replicas) + return null; + + ShuffleInPlace(active); + peers = active.Take(replicas).ToArray(); + } + + var storage = config.MemoryStorage ? StorageType.MemoryStorage : streamAssignment.Config?.Storage ?? group.Storage; + return new RaftGroup + { + Name = GroupNameForConsumer(peers, storage), + Storage = storage, + Peers = peers, + Cluster = group.Cluster, + }; + } + + internal static byte[] EncodeAddConsumerAssignment(ConsumerAssignment assignment) + { + var serializable = CloneForProposal(assignment); + var payload = JsonSerializer.SerializeToUtf8Bytes(serializable); + return PrependOp((byte)EntryOp.AssignConsumerOp, payload); + } + + internal static byte[] EncodeDeleteConsumerAssignment(ConsumerAssignment assignment) + { + var serializable = CloneForProposal(assignment); + var payload = JsonSerializer.SerializeToUtf8Bytes(serializable); + return PrependOp((byte)EntryOp.RemoveConsumerOp, payload); + } + + internal static (ConsumerAssignment? Assignment, Exception? Error) DecodeConsumerAssignment(ReadOnlySpan payload) + { + try + { + var assignment = JsonSerializer.Deserialize(payload); + if (assignment == null) + return (null, new InvalidOperationException("consumer assignment payload did not deserialize")); + + var decodeError = DecodeConsumerAssignmentConfig(assignment); + return decodeError != null ? (null, decodeError) : (assignment, null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal static Exception? DecodeConsumerAssignmentConfig(ConsumerAssignment assignment) + { + if (assignment.ConfigJson.ValueKind is JsonValueKind.Undefined or JsonValueKind.Null) + return new InvalidOperationException("consumer assignment config payload is missing"); + + Exception? strictError = null; + ConsumerConfig? config; + try + { + var strictOptions = new JsonSerializerOptions { UnmappedMemberHandling = JsonUnmappedMemberHandling.Disallow }; + config = assignment.ConfigJson.Deserialize(strictOptions); + } + catch (Exception ex) + { + strictError = ex; + config = null; + } + + if (config == null) + { + try + { + config = assignment.ConfigJson.Deserialize(); + } + catch (Exception ex) + { + return ex; + } + } + + assignment.Config = config ?? new ConsumerConfig(); + if (strictError != null || !JetStreamVersioning.SupportsRequiredApiLevel(assignment.Config.Metadata)) + assignment.Unsupported = NewUnsupportedConsumerAssignment(assignment, strictError); + + return null; + } + + internal static byte[] EncodeAddConsumerAssignmentCompressed(ConsumerAssignment assignment) + { + var serializable = CloneForProposal(assignment); + var payload = JsonSerializer.SerializeToUtf8Bytes(serializable); + var compressed = Snappy.Encode(payload); + return PrependOp((byte)EntryOp.AssignCompressedConsumerOp, compressed); + } + + internal static (ConsumerAssignment? Assignment, Exception? Error) DecodeConsumerAssignmentCompressed(ReadOnlySpan payload) + { + try + { + var decoded = Snappy.Decode(payload); + return DecodeConsumerAssignment(decoded); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal static (string Subject, string Reply, byte[]? Header, byte[]? Message, ulong Sequence, long Timestamp, bool Sourced, Exception? Error) + DecodeStreamMsg(ReadOnlySpan payload) + { + if (payload.Length < 26) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + + var index = 0; + var sequence = BinaryPrimitives.ReadUInt64LittleEndian(payload.Slice(index, 8)); + index += 8; + var timestamp = (long)BinaryPrimitives.ReadUInt64LittleEndian(payload.Slice(index, 8)); + index += 8; + + if (!TryReadUInt16(payload, ref index, out var subjectLength) || !TryReadString(payload, ref index, subjectLength, out var subject)) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + if (!TryReadUInt16(payload, ref index, out var replyLength) || !TryReadString(payload, ref index, replyLength, out var reply)) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + if (!TryReadUInt16(payload, ref index, out var headerLength) || !TryReadBytes(payload, ref index, headerLength, out var header)) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + if (!TryReadUInt32(payload, ref index, out var messageLength) || !TryReadBytes(payload, ref index, messageLength, out var message)) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + + var sourced = false; + if (index < payload.Length) + { + if (!TryReadUVarInt(payload.Slice(index), out var flags, out _)) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + sourced = (flags & MsgFlagFromSourceOrMirror) != 0; + } + + return (subject, reply, header.Length == 0 ? null : header, message.Length == 0 ? null : message, sequence, timestamp, sourced, null); + } + + internal static (string BatchId, ulong BatchSequence, EntryOp Operation, byte[]? Payload, Exception? Error) DecodeBatchMsg(ReadOnlySpan payload) + { + var index = 0; + if (!TryReadUInt16(payload, ref index, out var batchIdLength) || !TryReadString(payload, ref index, batchIdLength, out var batchId)) + return (string.Empty, 0, default, null, ErrBadStreamMsg); + if (!TryReadUVarInt(payload.Slice(index), out var sequence, out var consumed)) + return (string.Empty, 0, default, null, ErrBadStreamMsg); + + index += consumed; + if (index >= payload.Length) + return (string.Empty, 0, default, null, ErrBadStreamMsg); + + var operation = (EntryOp)payload[index]; + index++; + var data = payload.Slice(index).ToArray(); + return (batchId, sequence, operation, data, null); + } + + internal static byte[] EncodeStreamMsg(string subject, string reply, byte[]? header, byte[]? message, ulong sequence, long timestamp, bool sourced) + => EncodeStreamMsgAllowCompress(subject, reply, header, message, sequence, timestamp, sourced); + + internal static byte[] EncodeStreamMsgAllowCompress(string subject, string reply, byte[]? header, byte[]? message, ulong sequence, long timestamp, bool sourced) + => EncodeStreamMsgAllowCompressAndBatch(subject, reply, header, message, sequence, timestamp, sourced, string.Empty, 0, false); + + internal static byte[] EncodeStreamMsgAllowCompressAndBatch( + string subject, + string reply, + byte[]? header, + byte[]? message, + ulong sequence, + long timestamp, + bool sourced, + string batchId, + ulong batchSequence, + bool batchCommit) + { + subject ??= string.Empty; + reply ??= string.Empty; + header ??= []; + message ??= []; + batchId ??= string.Empty; + + var subjectLength = Math.Min(subject.Length, ushort.MaxValue); + var replyLength = Math.Min(reply.Length, ushort.MaxValue); + var headerLength = Math.Min(header.Length, ushort.MaxValue); + var messageLength = Math.Min(message.Length, int.MaxValue); + + var body = new List(1 + subjectLength + replyLength + headerLength + messageLength + 32); + var op = EntryOp.StreamMsgOp; + if (!string.IsNullOrEmpty(batchId)) + { + var batchIdLength = Math.Min(batchId.Length, ushort.MaxValue); + body.AddRange(BitConverter.GetBytes((ushort)batchIdLength)); + body.AddRange(Encoding.ASCII.GetBytes(batchId[..batchIdLength])); + AppendUVarInt(body, batchSequence); + op = batchCommit ? EntryOp.BatchCommitMsgOp : EntryOp.BatchMsgOp; + body.Add((byte)EntryOp.StreamMsgOp); + } + + body.AddRange(BitConverter.GetBytes(sequence)); + body.AddRange(BitConverter.GetBytes((ulong)timestamp)); + body.AddRange(BitConverter.GetBytes((ushort)subjectLength)); + body.AddRange(Encoding.ASCII.GetBytes(subject[..subjectLength])); + body.AddRange(BitConverter.GetBytes((ushort)replyLength)); + body.AddRange(Encoding.ASCII.GetBytes(reply[..replyLength])); + body.AddRange(BitConverter.GetBytes((ushort)headerLength)); + body.AddRange(header.AsSpan(0, headerLength).ToArray()); + body.AddRange(BitConverter.GetBytes((uint)messageLength)); + body.AddRange(message.AsSpan(0, messageLength).ToArray()); + + AppendUVarInt(body, sourced ? MsgFlagFromSourceOrMirror : 0); + var encoded = body.ToArray(); + + var shouldCompress = (subjectLength + replyLength + headerLength + messageLength) > CompressThreshold; + if (shouldCompress) + { + var opIndex = !string.IsNullOrEmpty(batchId) ? FindStreamOpOffset(encoded) : 0; + var compressed = Snappy.Encode(encoded.AsSpan(opIndex + 1)); + if (compressed.Length < encoded.Length - (opIndex + 1)) + { + var output = new byte[opIndex + 1 + compressed.Length]; + if (opIndex > 0) + Buffer.BlockCopy(encoded, 0, output, 0, opIndex); + output[opIndex] = (byte)EntryOp.CompressedStreamMsgOp; + Buffer.BlockCopy(compressed, 0, output, opIndex + 1, compressed.Length); + if (!string.IsNullOrEmpty(batchId)) + output[0] = (byte)op; + return output; + } + } + + if (!string.IsNullOrEmpty(batchId)) + { + encoded[0] = (byte)op; + return encoded; + } + + return PrependOp((byte)EntryOp.StreamMsgOp, encoded); + } + + private static byte[] PrependOp(byte op, byte[] payload) + { + var encoded = new byte[payload.Length + 1]; + encoded[0] = op; + Buffer.BlockCopy(payload, 0, encoded, 1, payload.Length); + return encoded; + } + + private static ConsumerAssignment CloneForProposal(ConsumerAssignment assignment) + { + var clone = assignment.CopyGroup(); + if (assignment.Config != null) + clone.ConfigJson = SerializeJsonElement(assignment.Config); + return clone; + } + + internal static JsonElement SerializeJsonElement(T value) + { + var bytes = JsonSerializer.SerializeToUtf8Bytes(value); + using var doc = JsonDocument.Parse(bytes); + return doc.RootElement.Clone(); + } + + private static bool TryReadUInt16(ReadOnlySpan payload, ref int index, out int value) + { + if (payload.Length - index < 2) + { + value = 0; + return false; + } + + value = BinaryPrimitives.ReadUInt16LittleEndian(payload.Slice(index, 2)); + index += 2; + return true; + } + + private static bool TryReadUInt32(ReadOnlySpan payload, ref int index, out int value) + { + if (payload.Length - index < 4) + { + value = 0; + return false; + } + + value = (int)BinaryPrimitives.ReadUInt32LittleEndian(payload.Slice(index, 4)); + index += 4; + return true; + } + + private static bool TryReadString(ReadOnlySpan payload, ref int index, int length, out string value) + { + if (payload.Length - index < length) + { + value = string.Empty; + return false; + } + + value = Encoding.ASCII.GetString(payload.Slice(index, length)); + index += length; + return true; + } + + private static bool TryReadBytes(ReadOnlySpan payload, ref int index, int length, out byte[] value) + { + if (payload.Length - index < length) + { + value = []; + return false; + } + + value = payload.Slice(index, length).ToArray(); + index += length; + return true; + } + + private static void AppendUVarInt(List buffer, ulong value) + { + while (value >= 0x80) + { + buffer.Add((byte)((value & 0x7F) | 0x80)); + value >>= 7; + } + + buffer.Add((byte)value); + } + + private static int FindStreamOpOffset(byte[] encoded) + { + var index = 1; + if (encoded.Length < 3) + return 0; + var batchIdLength = BinaryPrimitives.ReadUInt16LittleEndian(encoded.AsSpan(index, 2)); + index += 2 + batchIdLength; + if (!TryReadUVarInt(encoded.AsSpan(index), out _, out var consumed)) + return 0; + return index + consumed; + } + + private static void ShuffleInPlace(List items) + { + for (var i = items.Count - 1; i > 0; i--) + { + var j = Random.Shared.Next(i + 1); + (items[i], items[j]) = (items[j], items[i]); + } + } } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs new file mode 100644 index 0000000..2138b0c --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs @@ -0,0 +1,144 @@ +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal bool SupportsBinarySnapshot() + { + _mu.EnterReadLock(); + try + { + return SupportsBinarySnapshotLocked(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool SupportsBinarySnapshotLocked() + { + var raftNode = _node as IRaftNode; + var server = Account?.Server as NatsServer; + if (raftNode == null || server == null) + return false; + + var ourId = raftNode.ID(); + foreach (var peer in raftNode.Peers()) + { + if (string.Equals(peer.Id, ourId, StringComparison.Ordinal)) + continue; + + if (server.GetNodeInfo(peer.Id) is { BinarySnapshots: false }) + return false; + } + + return true; + } + + internal byte[]? StateSnapshot() + { + _mu.EnterReadLock(); + try + { + return StateSnapshotLocked(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal byte[]? StateSnapshotLocked() + { + if (Store == null) + return null; + + if (SupportsBinarySnapshotLocked()) + { + var (encoded, error) = Store.EncodedStreamState(GetCLFS()); + return error == null ? encoded : null; + } + + var state = Store.State(); + var snapshot = new StreamSnapshot + { + Msgs = state.Msgs, + Bytes = state.Bytes, + FirstSeq = state.FirstSeq, + LastSeq = state.LastSeq, + Failed = GetCLFS(), + Deleted = state.Deleted, + }; + return JsonSerializer.SerializeToUtf8Bytes(snapshot); + } + + internal Exception? ProcessClusteredInboundMsg(string subject, string reply, byte[]? header, byte[]? message, object? msgTrace, bool sourced) + { + _ = msgTrace; + if (string.IsNullOrWhiteSpace(subject)) + return new ArgumentException("subject is required", nameof(subject)); + + _mu.EnterUpgradeableReadLock(); + try + { + var raftNode = _node as IRaftNode; + var canRespond = !Config.NoAck && !string.IsNullOrWhiteSpace(reply); + if (raftNode == null) + { + return ProcessJetStreamMsg(subject, reply, header, message, 0, 0, msgTrace, sourced, canRespond); + } + + if (!IsLeader()) + { + return new InvalidOperationException(JsApiErrors.NewJSClusterNotLeaderError().Description ?? "stream is not cluster leader"); + } + + if (Config.Sealed) + { + if (canRespond) + { + var response = new JSPubAckResponse + { + Stream = Name, + PubAckError = JsApiErrors.NewJSStreamSealedError(), + }; + _outq.SendMsg(reply, JsonSerializer.SerializeToUtf8Bytes(response)); + } + + return new InvalidOperationException(JsApiErrors.NewJSStreamSealedError().Description ?? "stream is sealed"); + } + + _mu.EnterWriteLock(); + try + { + if (_clseq == 0) + _clseq = (ulong)Math.Max(0, Interlocked.Read(ref LastSeq)) + _clfs; + _clseq++; + + var encoded = JetStreamCluster.EncodeStreamMsgAllowCompress( + subject, + reply, + header, + message, + _clseq, + DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L, + sourced); + + raftNode.Propose(encoded); + TrackReplicationTraffic(raftNode, encoded.Length, Math.Max(1, Config.Replicas)); + } + finally + { + _mu.ExitWriteLock(); + } + + return null; + } + finally + { + _mu.ExitUpgradeableReadLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs new file mode 100644 index 0000000..965fa1c --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs @@ -0,0 +1,145 @@ +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal void JsClusteredConsumerRequest( + ClientInfo clientInfo, + Account account, + string subject, + string reply, + byte[] requestMessage, + string stream, + ConsumerConfig config, + ConsumerAction action, + bool pedantic) + { + var (js, cluster) = GetJetStreamCluster(); + if (js == null || cluster == null) + return; + + var response = new JsApiConsumerCreateResponse + { + Type = JsApiSubjects.JsApiConsumerCreateResponseType, + }; + + var engine = new JetStreamEngine(js); + var (streamConfig, streamFound) = engine.ClusterStreamConfig(account.Name, stream); + if (!streamFound) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var defaultsError = NatsConsumer.SetConsumerConfigDefaults(config, streamConfig, selectedLimits: null, pedantic); + if (defaultsError != null) + { + response.Error = defaultsError; + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var cfgError = NatsConsumer.CheckConsumerCfg(config, streamConfig, selectedLimits: null, isRecovering: false); + if (cfgError != null) + { + response.Error = cfgError; + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var streamAssignment = engine.StreamAssignmentOrInflight(account.Name, stream); + if (streamAssignment == null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var explicitName = !string.IsNullOrWhiteSpace(config.Name) + ? config.Name + : config.Durable; + + var existing = !string.IsNullOrWhiteSpace(explicitName) + ? engine.ConsumerAssignmentOrInflight(account.Name, stream, explicitName) + : null; + + ConsumerAssignment assignment; + if (existing != null) + { + JetStreamVersioning.CopyConsumerMetadata(config, existing.Config); + if (action == ConsumerAction.Create) + { + var existingConfig = existing.Config ?? new ConsumerConfig(); + if (JsonSerializer.Serialize(existingConfig) != JsonSerializer.Serialize(config)) + { + response.Error = JsApiErrors.NewJSConsumerAlreadyExistsError(); + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + } + + assignment = existing.CopyGroup(); + assignment.Config = config; + assignment.ConfigJson = JetStreamCluster.SerializeJsonElement(config); + assignment.Subject = subject; + assignment.Reply = reply; + assignment.Client = clientInfo; + } + else + { + if (action == ConsumerAction.Update) + { + response.Error = JsApiErrors.NewJSConsumerDoesNotExistError(); + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + JetStreamVersioning.SetStaticConsumerMetadata(config); + var group = cluster.CreateGroupForConsumer(config, streamAssignment); + if (group == null) + { + response.Error = JsApiErrors.NewJSInsufficientResourcesError(); + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + group.SetPreferred(this); + group.Cluster = streamAssignment.Group?.Cluster; + + var name = explicitName; + if (!NatsConsumer.IsDurableConsumer(config)) + { + if (string.IsNullOrWhiteSpace(name)) + { + do + { + name = NatsConsumer.CreateConsumerName(); + } while (engine.ConsumerAssignmentOrInflight(account.Name, stream, name) != null); + } + } + + assignment = new ConsumerAssignment + { + Group = group, + Stream = stream, + Name = name, + Config = config, + ConfigJson = JetStreamCluster.SerializeJsonElement(config), + Subject = subject, + Reply = reply, + Client = clientInfo, + Created = DateTime.UtcNow, + }; + } + + if (cluster.Meta != null) + { + cluster.Meta.Propose(JetStreamCluster.EncodeAddConsumerAssignment(assignment)); + cluster.TrackInflightConsumerProposal(account.Name, stream, assignment, deleted: false); + } + + _ = requestMessage; + } +} diff --git a/porting.db b/porting.db index 01c3a0e..06cc962 100644 Binary files a/porting.db and b/porting.db differ