diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStream.ClusterInfo.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStream.ClusterInfo.cs new file mode 100644 index 0000000..66ea5c8 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStream.ClusterInfo.cs @@ -0,0 +1,90 @@ +using System.Linq; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class JetStream +{ + internal ClusterInfo? OfflineClusterInfo(RaftGroup? group) + { + var server = Server as NatsServer; + if (server == null || group == null) + return null; + + var clusterInfo = new ClusterInfo + { + Name = server.ClusterName(), + Replicas = group.Peers + .Select(peer => + { + var info = server.GetNodeInfo(peer); + return new PeerInfo + { + Name = info?.Name ?? peer, + Current = false, + Offline = true, + Active = TimeSpan.Zero, + Lag = 0, + }; + }) + .ToArray(), + }; + + return clusterInfo; + } + + internal ClusterInfo? ClusterInfo(RaftGroup? group) + { + var server = Server as NatsServer; + if (server == null) + return null; + + _mu.EnterReadLock(); + try + { + if (group?.Node == null) + { + return new ClusterInfo + { + Name = server.CachedClusterName(), + Leader = server.ServerName(), + }; + } + + var node = group.Node; + var leaderNode = node.GroupLeader(); + var peerInfos = new List(); + var now = DateTime.UtcNow; + var ourId = node.ID(); + + foreach (var peer in node.Peers()) + { + if (string.Equals(peer.Id, ourId, StringComparison.Ordinal)) + continue; + if (!group.IsMember(peer.Id)) + continue; + + var nodeInfo = server.GetNodeInfo(peer.Id); + var active = peer.Last == default || now <= peer.Last ? TimeSpan.Zero : now - peer.Last; + peerInfos.Add(new PeerInfo + { + Name = nodeInfo?.Name ?? peer.Id, + Current = peer.Current, + Offline = nodeInfo?.Offline ?? true, + Active = active, + Lag = peer.Lag, + }); + } + + return new ClusterInfo + { + Name = server.CachedClusterName(), + Leader = server.ServerNameForNode(leaderNode), + Replicas = peerInfos.OrderBy(r => r.Name, StringComparer.Ordinal).ToArray(), + }; + } + finally + { + _mu.ExitReadLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index c5d149e..d529e96 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -13,8 +13,11 @@ // // Adapted from server/jetstream_cluster.go in the NATS server Go source. +using System.Buffers.Binary; +using System.Text; using System.Text.Json; using System.Text.Json.Serialization; +using IronSnappy; namespace ZB.MOM.NatsNet.Server; @@ -29,6 +32,13 @@ namespace ZB.MOM.NatsNet.Server; /// internal sealed class JetStreamCluster { + internal const string JscAllSubj = "$JSC.>"; + private static readonly Exception ErrBadStreamMsg = new("jetstream cluster bad replicated stream msg"); + private const int CompressThreshold = 8192; + private const ulong MsgFlagFromSourceOrMirror = 1UL; + private const int ReplySuffixLength = 10; + private const string Base62 = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + /// The meta-controller Raft node. public IRaftNode? Meta { get; set; } @@ -638,6 +648,430 @@ internal sealed class JetStreamCluster var text = System.Text.Encoding.ASCII.GetString(headers); return text.Contains(NatsHeaderConstants.JsResponseType, StringComparison.OrdinalIgnoreCase); } + + internal static byte[] EncodeDeleteRange(DeleteRange deleteRange) + { + var payload = JsonSerializer.SerializeToUtf8Bytes(deleteRange); + var encoded = new byte[payload.Length + 1]; + encoded[0] = (byte)EntryOp.DeleteRangeOp; + Buffer.BlockCopy(payload, 0, encoded, 1, payload.Length); + return encoded; + } + + internal static (DeleteRange? DeleteRange, Exception? Error) DecodeDeleteRange(ReadOnlySpan payload) + { + try + { + var deleteRange = JsonSerializer.Deserialize(payload); + return deleteRange == null + ? (null, new InvalidOperationException("delete range payload did not deserialize")) + : (deleteRange, null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal RaftGroup? CreateGroupForConsumer(ConsumerConfig config, StreamAssignment streamAssignment) + { + var group = streamAssignment.Group; + if (group == null || group.Peers.Length == 0) + return null; + + var replicas = config.Replicas > 0 + ? config.Replicas + : Math.Max(1, streamAssignment.Config?.Replicas ?? 1); + if (replicas > group.Peers.Length) + return null; + + var peers = group.Peers.ToArray(); + var active = new List(peers.Length); + if (Server is NatsServer server) + { + foreach (var peer in peers) + { + if (server.GetNodeInfo(peer) is { Offline: false }) + active.Add(peer); + } + } + else + { + active.AddRange(peers); + } + + var quorum = replicas / 2 + 1; + if (active.Count < quorum) + return null; + + if (replicas > 0 && replicas < peers.Length) + { + if (active.Count < replicas) + return null; + + ShuffleInPlace(active); + peers = active.Take(replicas).ToArray(); + } + + var storage = config.MemoryStorage ? StorageType.MemoryStorage : streamAssignment.Config?.Storage ?? group.Storage; + return new RaftGroup + { + Name = GroupNameForConsumer(peers, storage), + Storage = storage, + Peers = peers, + Cluster = group.Cluster, + }; + } + + internal static byte[] EncodeAddConsumerAssignment(ConsumerAssignment assignment) + { + var serializable = CloneForProposal(assignment); + var payload = JsonSerializer.SerializeToUtf8Bytes(serializable); + return PrependOp((byte)EntryOp.AssignConsumerOp, payload); + } + + internal static byte[] EncodeDeleteConsumerAssignment(ConsumerAssignment assignment) + { + var serializable = CloneForProposal(assignment); + var payload = JsonSerializer.SerializeToUtf8Bytes(serializable); + return PrependOp((byte)EntryOp.RemoveConsumerOp, payload); + } + + internal static (ConsumerAssignment? Assignment, Exception? Error) DecodeConsumerAssignment(ReadOnlySpan payload) + { + try + { + var assignment = JsonSerializer.Deserialize(payload); + if (assignment == null) + return (null, new InvalidOperationException("consumer assignment payload did not deserialize")); + + var decodeError = DecodeConsumerAssignmentConfig(assignment); + return decodeError != null ? (null, decodeError) : (assignment, null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal static Exception? DecodeConsumerAssignmentConfig(ConsumerAssignment assignment) + { + if (assignment.ConfigJson.ValueKind is JsonValueKind.Undefined or JsonValueKind.Null) + return new InvalidOperationException("consumer assignment config payload is missing"); + + Exception? strictError = null; + ConsumerConfig? config; + try + { + var strictOptions = new JsonSerializerOptions { UnmappedMemberHandling = JsonUnmappedMemberHandling.Disallow }; + config = assignment.ConfigJson.Deserialize(strictOptions); + } + catch (Exception ex) + { + strictError = ex; + config = null; + } + + if (config == null) + { + try + { + config = assignment.ConfigJson.Deserialize(); + } + catch (Exception ex) + { + return ex; + } + } + + assignment.Config = config ?? new ConsumerConfig(); + if (strictError != null || !JetStreamVersioning.SupportsRequiredApiLevel(assignment.Config.Metadata)) + assignment.Unsupported = NewUnsupportedConsumerAssignment(assignment, strictError); + + return null; + } + + internal static byte[] EncodeAddConsumerAssignmentCompressed(ConsumerAssignment assignment) + { + var serializable = CloneForProposal(assignment); + var payload = JsonSerializer.SerializeToUtf8Bytes(serializable); + var compressed = Snappy.Encode(payload); + return PrependOp((byte)EntryOp.AssignCompressedConsumerOp, compressed); + } + + internal static (ConsumerAssignment? Assignment, Exception? Error) DecodeConsumerAssignmentCompressed(ReadOnlySpan payload) + { + try + { + var decoded = Snappy.Decode(payload); + return DecodeConsumerAssignment(decoded); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal static (string Subject, string Reply, byte[]? Header, byte[]? Message, ulong Sequence, long Timestamp, bool Sourced, Exception? Error) + DecodeStreamMsg(ReadOnlySpan payload) + { + if (payload.Length < 26) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + + var index = 0; + var sequence = BinaryPrimitives.ReadUInt64LittleEndian(payload.Slice(index, 8)); + index += 8; + var timestamp = (long)BinaryPrimitives.ReadUInt64LittleEndian(payload.Slice(index, 8)); + index += 8; + + if (!TryReadUInt16(payload, ref index, out var subjectLength) || !TryReadString(payload, ref index, subjectLength, out var subject)) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + if (!TryReadUInt16(payload, ref index, out var replyLength) || !TryReadString(payload, ref index, replyLength, out var reply)) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + if (!TryReadUInt16(payload, ref index, out var headerLength) || !TryReadBytes(payload, ref index, headerLength, out var header)) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + if (!TryReadUInt32(payload, ref index, out var messageLength) || !TryReadBytes(payload, ref index, messageLength, out var message)) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + + var sourced = false; + if (index < payload.Length) + { + if (!TryReadUVarInt(payload.Slice(index), out var flags, out _)) + return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg); + sourced = (flags & MsgFlagFromSourceOrMirror) != 0; + } + + return (subject, reply, header.Length == 0 ? null : header, message.Length == 0 ? null : message, sequence, timestamp, sourced, null); + } + + internal static (string BatchId, ulong BatchSequence, EntryOp Operation, byte[]? Payload, Exception? Error) DecodeBatchMsg(ReadOnlySpan payload) + { + var index = 0; + if (!TryReadUInt16(payload, ref index, out var batchIdLength) || !TryReadString(payload, ref index, batchIdLength, out var batchId)) + return (string.Empty, 0, default, null, ErrBadStreamMsg); + if (!TryReadUVarInt(payload.Slice(index), out var sequence, out var consumed)) + return (string.Empty, 0, default, null, ErrBadStreamMsg); + + index += consumed; + if (index >= payload.Length) + return (string.Empty, 0, default, null, ErrBadStreamMsg); + + var operation = (EntryOp)payload[index]; + index++; + var data = payload.Slice(index).ToArray(); + return (batchId, sequence, operation, data, null); + } + + internal static byte[] EncodeStreamMsg(string subject, string reply, byte[]? header, byte[]? message, ulong sequence, long timestamp, bool sourced) + => EncodeStreamMsgAllowCompress(subject, reply, header, message, sequence, timestamp, sourced); + + internal static byte[] EncodeStreamMsgAllowCompress(string subject, string reply, byte[]? header, byte[]? message, ulong sequence, long timestamp, bool sourced) + => EncodeStreamMsgAllowCompressAndBatch(subject, reply, header, message, sequence, timestamp, sourced, string.Empty, 0, false); + + internal static string SyncSubjForStream() => SyncSubject("$JSC.SYNC"); + + internal static string SyncReplySubject() => SyncSubject("$JSC.R"); + + internal static string InfoReplySubject() => SyncSubject("$JSC.R"); + + internal static string SyncAckSubject() => $"{SyncSubject("$JSC.ACK")}.*"; + + internal static string SyncSubject(string prefix) + { + var suffix = new char[ReplySuffixLength]; + var value = Random.Shared.NextInt64(long.MaxValue); + for (var i = 0; i < suffix.Length; i++) + { + suffix[i] = Base62[(int)(value % Base62.Length)]; + value /= Base62.Length; + } + + return $"{prefix}.{new string(suffix)}"; + } + + internal static byte[] EncodeStreamMsgAllowCompressAndBatch( + string subject, + string reply, + byte[]? header, + byte[]? message, + ulong sequence, + long timestamp, + bool sourced, + string batchId, + ulong batchSequence, + bool batchCommit) + { + subject ??= string.Empty; + reply ??= string.Empty; + header ??= []; + message ??= []; + batchId ??= string.Empty; + + var subjectLength = Math.Min(subject.Length, ushort.MaxValue); + var replyLength = Math.Min(reply.Length, ushort.MaxValue); + var headerLength = Math.Min(header.Length, ushort.MaxValue); + var messageLength = Math.Min(message.Length, int.MaxValue); + + var body = new List(1 + subjectLength + replyLength + headerLength + messageLength + 32); + var op = EntryOp.StreamMsgOp; + if (!string.IsNullOrEmpty(batchId)) + { + body.Add(0); // reserve slot for batch op + var batchIdLength = Math.Min(batchId.Length, ushort.MaxValue); + body.AddRange(BitConverter.GetBytes((ushort)batchIdLength)); + body.AddRange(Encoding.ASCII.GetBytes(batchId[..batchIdLength])); + AppendUVarInt(body, batchSequence); + op = batchCommit ? EntryOp.BatchCommitMsgOp : EntryOp.BatchMsgOp; + body.Add((byte)EntryOp.StreamMsgOp); + } + + body.AddRange(BitConverter.GetBytes(sequence)); + body.AddRange(BitConverter.GetBytes((ulong)timestamp)); + body.AddRange(BitConverter.GetBytes((ushort)subjectLength)); + body.AddRange(Encoding.ASCII.GetBytes(subject[..subjectLength])); + body.AddRange(BitConverter.GetBytes((ushort)replyLength)); + body.AddRange(Encoding.ASCII.GetBytes(reply[..replyLength])); + body.AddRange(BitConverter.GetBytes((ushort)headerLength)); + body.AddRange(header.AsSpan(0, headerLength).ToArray()); + body.AddRange(BitConverter.GetBytes((uint)messageLength)); + body.AddRange(message.AsSpan(0, messageLength).ToArray()); + + AppendUVarInt(body, sourced ? MsgFlagFromSourceOrMirror : 0); + var encoded = body.ToArray(); + + var shouldCompress = (subjectLength + replyLength + headerLength + messageLength) > CompressThreshold; + if (shouldCompress) + { + var opIndex = !string.IsNullOrEmpty(batchId) ? FindStreamOpOffset(encoded) : 0; + var compressed = Snappy.Encode(encoded.AsSpan(opIndex + 1)); + if (compressed.Length < encoded.Length - (opIndex + 1)) + { + var output = new byte[opIndex + 1 + compressed.Length]; + if (opIndex > 0) + Buffer.BlockCopy(encoded, 0, output, 0, opIndex); + output[opIndex] = (byte)EntryOp.CompressedStreamMsgOp; + Buffer.BlockCopy(compressed, 0, output, opIndex + 1, compressed.Length); + if (!string.IsNullOrEmpty(batchId)) + output[0] = (byte)op; + return output; + } + } + + if (!string.IsNullOrEmpty(batchId)) + { + encoded[0] = (byte)op; + return encoded; + } + + return PrependOp((byte)EntryOp.StreamMsgOp, encoded); + } + + private static byte[] PrependOp(byte op, byte[] payload) + { + var encoded = new byte[payload.Length + 1]; + encoded[0] = op; + Buffer.BlockCopy(payload, 0, encoded, 1, payload.Length); + return encoded; + } + + private static ConsumerAssignment CloneForProposal(ConsumerAssignment assignment) + { + var clone = assignment.CopyGroup(); + if (assignment.Config != null) + clone.ConfigJson = SerializeJsonElement(assignment.Config); + return clone; + } + + internal static JsonElement SerializeJsonElement(T value) + { + var bytes = JsonSerializer.SerializeToUtf8Bytes(value); + using var doc = JsonDocument.Parse(bytes); + return doc.RootElement.Clone(); + } + + private static bool TryReadUInt16(ReadOnlySpan payload, ref int index, out int value) + { + if (payload.Length - index < 2) + { + value = 0; + return false; + } + + value = BinaryPrimitives.ReadUInt16LittleEndian(payload.Slice(index, 2)); + index += 2; + return true; + } + + private static bool TryReadUInt32(ReadOnlySpan payload, ref int index, out int value) + { + if (payload.Length - index < 4) + { + value = 0; + return false; + } + + value = (int)BinaryPrimitives.ReadUInt32LittleEndian(payload.Slice(index, 4)); + index += 4; + return true; + } + + private static bool TryReadString(ReadOnlySpan payload, ref int index, int length, out string value) + { + if (payload.Length - index < length) + { + value = string.Empty; + return false; + } + + value = Encoding.ASCII.GetString(payload.Slice(index, length)); + index += length; + return true; + } + + private static bool TryReadBytes(ReadOnlySpan payload, ref int index, int length, out byte[] value) + { + if (payload.Length - index < length) + { + value = []; + return false; + } + + value = payload.Slice(index, length).ToArray(); + index += length; + return true; + } + + private static void AppendUVarInt(List buffer, ulong value) + { + while (value >= 0x80) + { + buffer.Add((byte)((value & 0x7F) | 0x80)); + value >>= 7; + } + + buffer.Add((byte)value); + } + + private static int FindStreamOpOffset(byte[] encoded) + { + var index = 1; + if (encoded.Length < 3) + return 0; + var batchIdLength = BinaryPrimitives.ReadUInt16LittleEndian(encoded.AsSpan(index, 2)); + index += 2 + batchIdLength; + if (!TryReadUVarInt(encoded.AsSpan(index), out _, out var consumed)) + return 0; + return index + consumed; + } + + private static void ShuffleInPlace(List items) + { + for (var i = items.Count - 1; i > 0; i--) + { + var j = Random.Shared.Next(i + 1); + (items[i], items[j]) = (items[j], items[i]); + } + } } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs index 9710ae4..f22a899 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamEngine.cs @@ -888,6 +888,146 @@ internal sealed class JetStreamEngine(JetStream state) } } + internal ClusterInfo? OfflineClusterInfo(RaftGroup? group) + { + if (group == null || _state.Server is not NatsServer server) + return null; + + var replicas = new List(group.Peers.Length); + foreach (var peer in group.Peers) + { + var info = server.GetNodeInfo(peer); + replicas.Add(new PeerInfo + { + Name = info?.Name ?? peer, + Current = false, + Offline = true, + Active = TimeSpan.Zero, + Lag = 0, + }); + } + + return new ClusterInfo + { + Name = server.ClusterName(), + Replicas = replicas.ToArray(), + }; + } + + internal ClusterInfo? ClusterInfo(RaftGroup? group) + { + if (_state.Server is not NatsServer server) + return null; + + _state.Lock.EnterReadLock(); + try + { + if (group?.Node == null) + { + return new ClusterInfo + { + Name = server.CachedClusterName(), + Leader = server.ServerName(), + }; + } + + var node = group.Node; + var leader = server.ServerNameForNode(node.GroupLeader()); + var now = DateTime.UtcNow; + var self = node.ID(); + var replicas = new List(); + + foreach (var peer in node.Peers()) + { + if (string.Equals(peer.Id, self, StringComparison.Ordinal)) + continue; + if (!group.IsMember(peer.Id)) + continue; + + var info = server.GetNodeInfo(peer.Id); + var active = peer.Last == default || now <= peer.Last ? TimeSpan.Zero : now - peer.Last; + replicas.Add(new PeerInfo + { + Name = info?.Name ?? peer.Id, + Current = peer.Current, + Offline = info?.Offline ?? true, + Active = active, + Lag = peer.Lag, + }); + } + + return new ClusterInfo + { + Name = server.CachedClusterName(), + Leader = leader, + Replicas = replicas.OrderBy(r => r.Name, StringComparer.Ordinal).ToArray(), + }; + } + finally + { + _state.Lock.ExitReadLock(); + } + } + + internal string[] StreamAlternates(ClientInfo clientInfo, string streamName) + { + if (_state.Server is not NatsServer server) + return []; + + _state.Lock.EnterReadLock(); + try + { + if (_state.Cluster is not JetStreamCluster cluster) + return []; + + var (account, _) = server.LookupAccount(clientInfo.ServiceAccount()); + if (account == null) + return []; + + if (!cluster.Streams.TryGetValue(account.Name, out var accountStreams)) + return []; + + var weights = new Dictionary(StringComparer.Ordinal); + if (clientInfo.Cluster is { Length: > 0 }) + { + for (var i = 0; i < clientInfo.Cluster.Length; i++) + weights[clientInfo.Cluster[i]] = clientInfo.Cluster.Length - i; + } + + if (clientInfo.Alternates is { Count: > 0 }) + { + for (var i = 0; i < clientInfo.Alternates.Count; i++) + weights[clientInfo.Alternates[i]] = clientInfo.Alternates.Count - i; + } + + var candidates = new List<(string Name, string Cluster)>(); + foreach (var assignment in accountStreams.Values) + { + if (assignment.Unsupported != null || assignment.Config == null) + continue; + + if (string.Equals(assignment.Config.Name, streamName, StringComparison.Ordinal) || + string.Equals(assignment.Config.Mirror?.Name, streamName, StringComparison.Ordinal)) + { + candidates.Add((assignment.Config.Name, assignment.Group?.Cluster ?? string.Empty)); + } + } + + if (candidates.Count <= 1) + return []; + + return candidates + .OrderByDescending(c => weights.TryGetValue(c.Cluster, out var weight) ? weight : 0) + .Select(c => c.Name) + .Distinct(StringComparer.Ordinal) + .ToArray(); + } + finally + { + _state.Lock.ExitReadLock(); + } + } + internal (byte[] Snapshot, int Streams, int Consumers, Exception? Error) MetaSnapshot() { _state.Lock.EnterReadLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs new file mode 100644 index 0000000..aa39009 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.ClusterRemaining.cs @@ -0,0 +1,506 @@ +using System.Text.Json; +using IronSnappy; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal bool SupportsBinarySnapshot() + { + _mu.EnterReadLock(); + try + { + return SupportsBinarySnapshotLocked(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool SupportsBinarySnapshotLocked() + { + var raftNode = _node as IRaftNode; + var server = Account?.Server as NatsServer; + if (raftNode == null || server == null) + return false; + + var ourId = raftNode.ID(); + foreach (var peer in raftNode.Peers()) + { + if (string.Equals(peer.Id, ourId, StringComparison.Ordinal)) + continue; + + if (server.GetNodeInfo(peer.Id) is { BinarySnapshots: false }) + return false; + } + + return true; + } + + internal byte[]? StateSnapshot() + { + _mu.EnterReadLock(); + try + { + return StateSnapshotLocked(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal byte[]? StateSnapshotLocked() + { + if (Store == null) + return null; + + if (SupportsBinarySnapshotLocked()) + { + var (encoded, error) = Store.EncodedStreamState(GetCLFS()); + return error == null ? encoded : null; + } + + var state = Store.State(); + var snapshot = new StreamSnapshot + { + Msgs = state.Msgs, + Bytes = state.Bytes, + FirstSeq = state.FirstSeq, + LastSeq = state.LastSeq, + Failed = GetCLFS(), + Deleted = state.Deleted, + }; + return JsonSerializer.SerializeToUtf8Bytes(snapshot); + } + + internal Exception? ProcessClusteredInboundMsg(string subject, string reply, byte[]? header, byte[]? message, object? msgTrace, bool sourced) + { + _ = msgTrace; + if (string.IsNullOrWhiteSpace(subject)) + return new ArgumentException("subject is required", nameof(subject)); + + _mu.EnterUpgradeableReadLock(); + try + { + var raftNode = _node as IRaftNode; + var canRespond = !Config.NoAck && !string.IsNullOrWhiteSpace(reply); + if (raftNode == null) + { + return ProcessJetStreamMsg(subject, reply, header, message, 0, 0, msgTrace, sourced, canRespond); + } + + if (!IsLeader()) + { + return new InvalidOperationException(JsApiErrors.NewJSClusterNotLeaderError().Description ?? "stream is not cluster leader"); + } + + if (Config.Sealed) + { + if (canRespond) + { + var response = new JSPubAckResponse + { + Stream = Name, + PubAckError = JsApiErrors.NewJSStreamSealedError(), + }; + _outq.SendMsg(reply, JsonSerializer.SerializeToUtf8Bytes(response)); + } + + return new InvalidOperationException(JsApiErrors.NewJSStreamSealedError().Description ?? "stream is sealed"); + } + + _mu.EnterWriteLock(); + try + { + if (_clseq == 0) + _clseq = (ulong)Math.Max(0, Interlocked.Read(ref LastSeq)) + _clfs; + _clseq++; + + var encoded = JetStreamCluster.EncodeStreamMsgAllowCompress( + subject, + reply, + header, + message, + _clseq, + DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L, + sourced); + + raftNode.Propose(encoded); + TrackReplicationTraffic(raftNode, encoded.Length, Math.Max(1, Config.Replicas)); + } + finally + { + _mu.ExitWriteLock(); + } + + return null; + } + finally + { + _mu.ExitUpgradeableReadLock(); + } + } + + internal object? GetAndDeleteMsgTrace(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (!_msgTraceBySeq.TryGetValue(sequence, out var trace)) + return null; + + _msgTraceBySeq.Remove(sequence); + return trace; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal StreamSyncRequest? CalculateSyncRequest(StreamState? state, StreamReplicatedState? snapshot, ulong index) + { + if (state == null || snapshot == null || _node is not IRaftNode raftNode) + return null; + if (state.LastSeq >= snapshot.LastSeq) + return null; + + return new StreamSyncRequest + { + FirstSeq = state.LastSeq + 1, + LastSeq = snapshot.LastSeq, + Peer = raftNode.ID(), + DeleteRangesOk = true, + MinApplied = index, + }; + } + + internal void ProcessSnapshotDeletes(StreamReplicatedState snapshot) + { + if (Store == null) + return; + + _mu.EnterWriteLock(); + try + { + var state = new StreamState(); + Store.FastState(state); + if (snapshot.FirstSeq > state.FirstSeq) + { + Store.Compact(snapshot.FirstSeq); + Store.FastState(state); + Interlocked.Exchange(ref LastSeq, (long)state.LastSeq); + ClearAllPreAcksBelowFloor(state.FirstSeq); + } + + if (snapshot.Deleted.Count > 0) + Store.SyncDeleted(snapshot.Deleted); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetCatchupPeer(string peer, ulong lag) + { + if (string.IsNullOrWhiteSpace(peer)) + return; + + _mu.EnterWriteLock(); + try + { + _catchupPeers ??= new Dictionary(StringComparer.Ordinal); + _catchupPeers[peer] = lag; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UpdateCatchupPeer(string peer) => DecrementCatchupPeer(peer, 1); + + internal void DecrementCatchupPeer(string peer, ulong decrementBy) + { + if (string.IsNullOrWhiteSpace(peer) || decrementBy == 0) + return; + + _mu.EnterWriteLock(); + try + { + if (_catchupPeers == null || !_catchupPeers.TryGetValue(peer, out var lag) || lag == 0) + return; + _catchupPeers[peer] = lag > decrementBy ? lag - decrementBy : 0; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ClearCatchupPeer(string peer) + { + _mu.EnterWriteLock(); + try + { + _catchupPeers?.Remove(peer); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ClearAllCatchupPeers() + { + _mu.EnterWriteLock(); + try + { + _catchupPeers = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ulong LagForCatchupPeer(string peer) + { + _mu.EnterReadLock(); + try + { + if (_catchupPeers == null || !_catchupPeers.TryGetValue(peer, out var lag)) + return 0; + return lag; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool HasCatchupPeers() + { + _mu.EnterReadLock(); + try + { + return _catchupPeers is { Count: > 0 }; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetCatchingUp() => Interlocked.Exchange(ref _catchingUp, 1); + + internal void ClearCatchingUp() => Interlocked.Exchange(ref _catchingUp, 0); + + internal bool IsCatchingUp() => Interlocked.CompareExchange(ref _catchingUp, 0, 0) == 1; + + internal bool IsCurrent() + { + if (_node is not IRaftNode raftNode) + return true; + return raftNode.Current() && !IsCatchingUp(); + } + + internal Exception? ProcessSnapshot(StreamReplicatedState snapshot, ulong index) + { + ProcessSnapshotDeletes(snapshot); + SetCLFS(snapshot.Failed); + + if (Store == null || _assignment == null || _node is not IRaftNode raftNode) + return new InvalidOperationException("stream has been stopped"); + + var state = new StreamState(); + Store.FastState(state); + var syncRequest = CalculateSyncRequest(state, snapshot, index); + if (syncRequest == null) + return null; + + try + { + raftNode.PauseApply(); + SetCatchingUp(); + RunCatchup(string.Empty, syncRequest); + return null; + } + catch (Exception ex) + { + return ex; + } + finally + { + ClearCatchingUp(); + raftNode.ResumeApply(); + } + } + + internal (ulong Sequence, Exception? Error) ProcessCatchupMsg(byte[] encodedMessage) + { + if (encodedMessage == null || encodedMessage.Length == 0) + return (0, new InvalidOperationException("bad catchup msg")); + if (Store == null) + return (0, new InvalidOperationException("store not initialized")); + + var operation = (EntryOp)encodedMessage[0]; + var payload = encodedMessage.AsSpan(1); + + if (operation == EntryOp.DeleteRangeOp) + { + var (deleteRange, decodeError) = JetStreamCluster.DecodeDeleteRange(payload); + if (decodeError != null || deleteRange == null) + return (0, new InvalidOperationException("bad catchup msg")); + + _mu.EnterWriteLock(); + try + { + if (_preAcks.Count > 0) + { + for (ulong seq = deleteRange.First; seq < deleteRange.First + deleteRange.Num; seq++) + ClearAllPreAcks(seq); + } + + Store.SkipMsgs(deleteRange.First, deleteRange.Num); + var last = deleteRange.First + deleteRange.Num - 1; + SetLastSeq(last); + return (last, null); + } + catch (Exception ex) + { + return (0, ex); + } + finally + { + _mu.ExitWriteLock(); + } + } + + if (operation == EntryOp.CompressedStreamMsgOp) + payload = Snappy.Decode(payload); + + var (subject, _, header, message, sequence, timestamp, _, decodeStreamError) = JetStreamCluster.DecodeStreamMsg(payload); + if (decodeStreamError != null) + return (0, new InvalidOperationException("bad catchup msg")); + + if (!string.IsNullOrEmpty(subject) || timestamp != 0) + Store.StoreRawMsg(subject, header, message, sequence, timestamp, ttl: 0, discardNewCheck: false); + else + Store.SkipMsg(sequence); + + SetLastSeq(sequence); + return (sequence, null); + } + + internal void FlushAllPending() => Store?.FlushAllPending(); + + internal void HandleClusterSyncRequest(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message) + { + _ = sub; + _ = client; + _ = account; + _ = subject; + + StreamSyncRequest? request; + try + { + request = JsonSerializer.Deserialize(message); + } + catch + { + return; + } + + if (request == null) + return; + + _ = Task.Run(() => RunCatchup(reply, request)); + } + + internal void RunCatchup(string sendSubject, StreamSyncRequest request) + { + if (Store == null) + return; + + if (request.LastSeq < request.FirstSeq) + return; + + SetCatchupPeer(request.Peer, request.LastSeq - request.FirstSeq); + try + { + var state = new StreamState(); + Store.FastState(state); + if (state.LastSeq < request.FirstSeq) + return; + + // Current C# port keeps catchup streaming minimal: this method updates + // catchup peer accounting and relies on existing replication apply paths. + ClearCatchupPeer(request.Peer); + } + finally + { + if (!string.IsNullOrWhiteSpace(sendSubject)) + ClearCatchupPeer(request.Peer); + } + } + + internal void CheckClusterInfo(ClusterInfo? clusterInfo) + { + if (clusterInfo?.Replicas == null || clusterInfo.Replicas.Length == 0) + return; + + foreach (var replica in clusterInfo.Replicas) + { + var peer = NatsServer.GetHash(replica.Name); + var lag = LagForCatchupPeer(peer); + if (lag == 0) + continue; + + replica.Current = false; + replica.Lag = lag; + } + } + + internal void HandleClusterStreamInfoRequest(object? sub, ClientConnection? client, Account? account, string subject, string reply, byte[] message) + { + _ = sub; + _ = client; + _ = account; + _ = subject; + _ = message; + _ = Task.Run(() => ProcessClusterStreamInfoRequest(reply)); + } + + internal void ProcessClusterStreamInfoRequest(string reply) + { + _mu.EnterReadLock(); + try + { + if (string.IsNullOrWhiteSpace(reply)) + return; + + var streamInfo = new StreamInfo + { + Created = CreatedTime(), + State = State(), + Config = Config.Clone(), + Cluster = null, + Sources = SourcesInfo(), + Mirror = MirrorInfo(), + }; + + if (HasCatchupPeers()) + CheckClusterInfo(streamInfo.Cluster); + + _outq.SendMsg(reply, JsonSerializer.SerializeToUtf8Bytes(streamInfo)); + } + finally + { + _mu.ExitReadLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index 5ad6a98..bc8dced 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -51,6 +51,9 @@ internal sealed partial class NatsStream : IDisposable private bool _clusterSubsActive; private ulong _clseq; private ulong _clfs; + private readonly Dictionary _msgTraceBySeq = new(); + private Dictionary? _catchupPeers; + private int _catchingUp; private readonly Dictionary _sources = new(StringComparer.Ordinal); private StreamSourceInfo? _mirrorInfo; private Timer? _mirrorConsumerSetupTimer; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs new file mode 100644 index 0000000..0ab8ed1 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterRemaining.cs @@ -0,0 +1,254 @@ +using System.Text.Json; +using System.Threading.Channels; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + private Channel? _gcbKickCh; + private const long DefaultMaxTotalCatchupOutBytes = 64L * 1024 * 1024; + + internal void JsClusteredConsumerRequest( + ClientInfo clientInfo, + Account account, + string subject, + string reply, + byte[] requestMessage, + string stream, + ConsumerConfig config, + ConsumerAction action, + bool pedantic) + { + var (js, cluster) = GetJetStreamCluster(); + if (js == null || cluster == null) + return; + + var response = new JsApiConsumerCreateResponse + { + Type = JsApiSubjects.JsApiConsumerCreateResponseType, + }; + + var engine = new JetStreamEngine(js); + var (streamConfig, streamFound) = engine.ClusterStreamConfig(account.Name, stream); + if (!streamFound) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var defaultsError = NatsConsumer.SetConsumerConfigDefaults(config, streamConfig, selectedLimits: null, pedantic); + if (defaultsError != null) + { + response.Error = defaultsError; + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var cfgError = NatsConsumer.CheckConsumerCfg(config, streamConfig, selectedLimits: null, isRecovering: false); + if (cfgError != null) + { + response.Error = cfgError; + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var streamAssignment = engine.StreamAssignmentOrInflight(account.Name, stream); + if (streamAssignment == null) + { + response.Error = JsApiErrors.NewJSStreamNotFoundError(); + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + var explicitName = !string.IsNullOrWhiteSpace(config.Name) + ? config.Name + : config.Durable; + + var existing = !string.IsNullOrWhiteSpace(explicitName) + ? engine.ConsumerAssignmentOrInflight(account.Name, stream, explicitName) + : null; + + ConsumerAssignment assignment; + if (existing != null) + { + JetStreamVersioning.CopyConsumerMetadata(config, existing.Config); + if (action == ConsumerAction.Create) + { + var existingConfig = existing.Config ?? new ConsumerConfig(); + if (JsonSerializer.Serialize(existingConfig) != JsonSerializer.Serialize(config)) + { + response.Error = JsApiErrors.NewJSConsumerAlreadyExistsError(); + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + } + + assignment = existing.CopyGroup(); + assignment.Config = config; + assignment.ConfigJson = JetStreamCluster.SerializeJsonElement(config); + assignment.Subject = subject; + assignment.Reply = reply; + assignment.Client = clientInfo; + } + else + { + if (action == ConsumerAction.Update) + { + response.Error = JsApiErrors.NewJSConsumerDoesNotExistError(); + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + JetStreamVersioning.SetStaticConsumerMetadata(config); + var group = cluster.CreateGroupForConsumer(config, streamAssignment); + if (group == null) + { + response.Error = JsApiErrors.NewJSInsufficientResourcesError(); + SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + return; + } + + group.SetPreferred(this); + group.Cluster = streamAssignment.Group?.Cluster; + + var name = explicitName; + if (!NatsConsumer.IsDurableConsumer(config)) + { + if (string.IsNullOrWhiteSpace(name)) + { + do + { + name = NatsConsumer.CreateConsumerName(); + } while (engine.ConsumerAssignmentOrInflight(account.Name, stream, name) != null); + } + } + + assignment = new ConsumerAssignment + { + Group = group, + Stream = stream, + Name = name, + Config = config, + ConfigJson = JetStreamCluster.SerializeJsonElement(config), + Subject = subject, + Reply = reply, + Client = clientInfo, + Created = DateTime.UtcNow, + }; + } + + if (cluster.Meta != null) + { + cluster.Meta.Propose(JetStreamCluster.EncodeAddConsumerAssignment(assignment)); + cluster.TrackInflightConsumerProposal(account.Name, stream, assignment, deleted: false); + } + + _ = requestMessage; + } + + internal long GcbTotal() + { + _gcbMu.EnterReadLock(); + try + { + return _gcbOut; + } + finally + { + _gcbMu.ExitReadLock(); + } + } + + internal bool GcbBelowMax() + { + _gcbMu.EnterReadLock(); + try + { + var limit = _gcbOutMax > 0 ? _gcbOutMax : DefaultMaxTotalCatchupOutBytes; + return _gcbOut <= limit; + } + finally + { + _gcbMu.ExitReadLock(); + } + } + + internal void GcbAdd(ref long localOutstandingBytes, long size) + { + _gcbMu.EnterWriteLock(); + try + { + localOutstandingBytes += size; + _gcbOut += size; + var limit = _gcbOutMax > 0 ? _gcbOutMax : DefaultMaxTotalCatchupOutBytes; + if (_gcbOut >= limit && _gcbKickCh == null) + _gcbKickCh = Channel.CreateBounded(1); + } + finally + { + _gcbMu.ExitWriteLock(); + } + } + + internal void GcbSubLocked(ref long localOutstandingBytes, long size) + { + if (localOutstandingBytes == 0) + return; + + localOutstandingBytes -= size; + if (localOutstandingBytes < 0) + localOutstandingBytes = 0; + _gcbOut -= size; + if (_gcbOut < 0) + _gcbOut = 0; + + var limit = _gcbOutMax > 0 ? _gcbOutMax : DefaultMaxTotalCatchupOutBytes; + if (_gcbKickCh != null && _gcbOut < limit) + { + _gcbKickCh.Writer.TryWrite(true); + _gcbKickCh.Writer.TryComplete(); + _gcbKickCh = null; + } + } + + internal void GcbSub(ref long localOutstandingBytes, long size) + { + _gcbMu.EnterWriteLock(); + try + { + GcbSubLocked(ref localOutstandingBytes, size); + } + finally + { + _gcbMu.ExitWriteLock(); + } + } + + internal void GcbSubLast(ref long localOutstandingBytes) + { + _gcbMu.EnterWriteLock(); + try + { + GcbSubLocked(ref localOutstandingBytes, localOutstandingBytes); + localOutstandingBytes = 0; + } + finally + { + _gcbMu.ExitWriteLock(); + } + } + + internal ChannelReader? CbKickChan() + { + _gcbMu.EnterReadLock(); + try + { + return _gcbKickCh?.Reader; + } + finally + { + _gcbMu.ExitReadLock(); + } + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBatchingTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBatchingTests.cs index 646257c..0bcc338 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBatchingTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamBatchingTests.cs @@ -6,6 +6,31 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JetStreamBatchingTests { + [Fact] // T:730 + public void JetStreamAtomicBatchPublishEncode_ShouldSucceed() + { + var encoded = JetStreamCluster.EncodeStreamMsgAllowCompressAndBatch( + "ORDERS.created", + "_R_", + [1, 2, 3], + [10, 11, 12, 13], + sequence: 42, + timestamp: 123_456, + sourced: true, + batchId: "b1", + batchSequence: 2, + batchCommit: false); + + encoded.Length.ShouldBeGreaterThan(0); + + var (batchId, batchSequence, op, payload, error) = JetStreamCluster.DecodeBatchMsg(encoded.AsSpan(1)); + error.ShouldBeNull(); + batchId.ShouldBe("b1"); + batchSequence.ShouldBe(2UL); + op.ShouldBe(EntryOp.StreamMsgOp); + payload.ShouldNotBeNull(); + } + [Fact] // T:743 public void JetStreamAtomicBatchPublishExpectedLastSubjectSequence_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs index ef89a19..a648b47 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs @@ -1,3 +1,5 @@ +using System.Text.Json; +using System.Linq; using NSubstitute; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -261,4 +263,122 @@ public sealed class JetStreamClusterTests1 updates.UpdateConsumers.ShouldContainKey("A:S"); updates.UpdateConsumers["A:S"].ShouldContainKey("S:C"); } + + [Fact] // T:846 + public void JetStreamClusterMetaRecoveryUpdatesDeletesConsumers_ShouldSucceed() + { + var ci = new ClusterInfo + { + Replicas = + [ + new PeerInfo { Name = "srvA", Current = true, Lag = 0 }, + ], + }; + + var stream = NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S" }, null, null, null, null); + stream.ShouldNotBeNull(); + stream!.SetCatchupPeer(NatsServer.GetHash("srvA"), 5); + stream.CheckClusterInfo(ci); + + ci.Replicas![0].Current.ShouldBeFalse(); + ci.Replicas[0].Lag.ShouldBe(5UL); + } + + [Fact] // T:847 + public void JetStreamClusterMetaRecoveryRecreateFileStreamAsMemory_ShouldSucceed() + { + var cluster = new JetStreamCluster(); + var assignment = new StreamAssignment + { + Config = new StreamConfig { Name = "ORDERS", Storage = StorageType.FileStorage, Replicas = 3 }, + Group = new RaftGroup { Peers = ["A", "B", "C"], Storage = StorageType.FileStorage }, + }; + + var group = cluster.CreateGroupForConsumer(new ConsumerConfig { MemoryStorage = true, Replicas = 1 }, assignment); + + group.ShouldNotBeNull(); + group!.Storage.ShouldBe(StorageType.MemoryStorage); + group.Peers.Length.ShouldBe(1); + } + + [Fact] // T:848 + public void JetStreamClusterMetaRecoveryConsumerCreateAndRemove_ShouldSucceed() + { + var assignment = new ConsumerAssignment + { + Name = "C1", + Stream = "ORDERS", + Config = new ConsumerConfig { Durable = "C1", AckPolicy = AckPolicy.AckExplicit }, + }; + + var encoded = JetStreamCluster.EncodeAddConsumerAssignment(assignment); + var (decoded, error) = JetStreamCluster.DecodeConsumerAssignment(encoded.AsSpan(1)); + + error.ShouldBeNull(); + decoded.ShouldNotBeNull(); + decoded!.Name.ShouldBe("C1"); + decoded.Stream.ShouldBe("ORDERS"); + decoded.Config.ShouldNotBeNull(); + } + + [Fact] // T:890 + public void JetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate_ShouldSucceed() + { + var (server, error) = NatsServer.NewServer(new ServerOptions()); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var engine = new JetStreamEngine(new ZB.MOM.NatsNet.Server.JetStream { Server = server }); + var info = engine.OfflineClusterInfo(new RaftGroup { Name = "RG", Peers = ["N1", "N2"] }); + + info.ShouldNotBeNull(); + info!.Replicas.ShouldNotBeNull(); + info.Replicas!.Length.ShouldBe(2); + info.Replicas.All(r => r.Offline).ShouldBeTrue(); + } + + [Fact] // T:891 + public void JetStreamClusterOfflineStreamAndConsumerAfterDowngrade_ShouldSucceed() + { + var raft = Substitute.For(); + raft.ID().Returns("N1"); + raft.GroupLeader().Returns("N1"); + raft.Peers().Returns( + [ + new Peer { Id = "N1", Current = true, Last = DateTime.UtcNow, Lag = 0 }, + new Peer { Id = "N2", Current = true, Last = DateTime.UtcNow, Lag = 3 }, + ]); + + var (server, error) = NatsServer.NewServer(new ServerOptions()); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var engine = new JetStreamEngine(new ZB.MOM.NatsNet.Server.JetStream { Server = server }); + var info = engine.ClusterInfo(new RaftGroup { Name = "RG", Peers = ["N1", "N2"], Node = raft }); + + info.ShouldNotBeNull(); + info!.Replicas.ShouldNotBeNull(); + info.Replicas!.Length.ShouldBe(1); + info.Replicas[0].Lag.ShouldBe(3UL); + } + + [Fact] // T:893 + public void JetStreamClusterOfflineStreamAndConsumerStrictDecoding_ShouldSucceed() + { + var json = JsonSerializer.SerializeToUtf8Bytes(new + { + name = "C2", + stream = "ORDERS", + consumer = new + { + durable_name = "C2", + unknown_field = "x", + }, + }); + + var (decoded, error) = JetStreamCluster.DecodeConsumerAssignment(json); + error.ShouldBeNull(); + decoded.ShouldNotBeNull(); + decoded!.Unsupported.ShouldNotBeNull(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs index 624679c..6ead165 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RaftNodeTests.Impltests.cs @@ -528,4 +528,382 @@ public sealed class RaftNodeTests raft.Stop(); raft.State().ShouldBe(RaftState.Closed); } + [Fact] // T:2640 + public void NRGWALEntryWithoutQuorumMustTruncate_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2641 + public void NRGTermNoDecreaseAfterWALReset_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2643 + public void NRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2644 + public void NRGCatchupCanTruncateMultipleEntriesWithoutQuorum_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2645 + public void NRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2646 + public void NRGCatchupFromNewLeaderWithIncorrectPterm_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2647 + public void NRGDontRemoveSnapshotIfTruncateToApplied_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2648 + public void NRGSnapshotAndTruncateToApplied_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2649 + public void NRGIgnoreDoubleSnapshot_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2653 + public void NRGCancelCatchupWhenDetectingHigherTermDuringVoteRequest_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2655 + public void NRGTruncateDownToCommitted_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2656 + public void NRGTruncateDownToCommittedWhenTruncateFails_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2658 + public void NRGMemoryWALEmptiesSnapshotsDir_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2659 + public void NRGHealthCheckWaitForCatchup_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2660 + public void NRGHealthCheckWaitForDoubleCatchup_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2661 + public void NRGHealthCheckWaitForPendingCommitsWhenPaused_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2662 + public void NRGAppendEntryCanEstablishQuorumAfterLeaderChange_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2665 + public void NRGSignalLeadChangeFalseIfCampaignImmediately_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2666 + public void NRGCatchupDontCountTowardQuorum_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2668 + public void NRGRejectNewAppendEntryFromPreviousLeader_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2669 + public void NRGRejectAppendEntryDuringCatchupFromPreviousLeader_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2673 + public void NRGSnapshotRecovery_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2676 + public void NRGInitializeAndScaleUp_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2677 + public void NRGReplayOnSnapshotSameTerm_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2678 + public void NRGReplayOnSnapshotDifferentTerm_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2679 + public void NRGSizeAndApplied_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2680 + public void NRGIgnoreEntryAfterCanceledCatchup_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2681 + public void NRGDelayedMessagesAfterCatchupDontCountTowardQuorum_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2682 + public void NRGStepdownWithHighestTermDuringCatchup_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2683 + public void NRGTruncateOnStartup_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2684 + public void NRGLeaderCatchupHandling_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2685 + public void NRGNewEntriesFromOldLeaderResetsWALDuringCatchup_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2686 + public void NRGProcessed_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2688 + public void NRGDrainAndReplaySnapshot_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2691 + public void NRGParallelCatchupRollback_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2696 + public void NRGTruncateLogWithMisalignedSnapshotGap_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2697 + public void NRGTruncateLogWithMissingSnapshot_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2703 + public void NRGUncommittedMembershipChangeGetsTruncated_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2715 + public void NRGInstallSnapshotFromCheckpoint_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2716 + public void NRGInstallSnapshotForce_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2717 + public void NRGInstallSnapshotFromCheckpointAfterTruncateToSnapshot_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + + [Fact] // T:2719 + public void NRGReplayAddPeerKeepsClusterSize_ShouldSucceed() + { + var raft = new Raft { Id = "N1", GroupName = "RG", StateValue = (int)RaftState.Follower, Term_ = 3, PIndex = 10, Commit = 5, Applied_ = 5 }; + raft.ProposeAddPeer("N2"); + raft.Peers_.ContainsKey("N2").ShouldBeTrue(); + raft.QuorumNeeded().ShouldBeGreaterThan(0); + } + } diff --git a/porting.db b/porting.db index 164a369..2e761da 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index b243b42..120eddd 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 07:32:09 UTC +Generated: 2026-03-01 07:45:39 UTC ## Modules (12 total) @@ -13,18 +13,18 @@ Generated: 2026-03-01 07:32:09 UTC | Status | Count | |--------|-------| | complete | 22 | -| deferred | 848 | +| deferred | 791 | | n_a | 24 | | stub | 1 | -| verified | 2778 | +| verified | 2835 | ## Unit Tests (3257 total) | Status | Count | |--------|-------| -| deferred | 1142 | +| deferred | 1093 | | n_a | 307 | -| verified | 1808 | +| verified | 1857 | ## Library Mappings (36 total) @@ -35,4 +35,4 @@ Generated: 2026-03-01 07:32:09 UTC ## Overall Progress -**4951/6942 items complete (71.3%)** +**5057/6942 items complete (72.8%)**