batch35: implement and verify feature group A

This commit is contained in:
Joseph Doherty
2026-03-01 02:21:30 -05:00
parent 0d6e040450
commit cf9f40ab0c
4 changed files with 698 additions and 0 deletions

View File

@@ -13,8 +13,11 @@
//
// Adapted from server/jetstream_cluster.go in the NATS server Go source.
using System.Buffers.Binary;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using IronSnappy;
namespace ZB.MOM.NatsNet.Server;
@@ -29,6 +32,10 @@ namespace ZB.MOM.NatsNet.Server;
/// </summary>
internal sealed class JetStreamCluster
{
private static readonly Exception ErrBadStreamMsg = new("jetstream cluster bad replicated stream msg");
private const int CompressThreshold = 8192;
private const ulong MsgFlagFromSourceOrMirror = 1UL;
/// <summary>The meta-controller Raft node.</summary>
public IRaftNode? Meta { get; set; }
@@ -638,6 +645,408 @@ internal sealed class JetStreamCluster
var text = System.Text.Encoding.ASCII.GetString(headers);
return text.Contains(NatsHeaderConstants.JsResponseType, StringComparison.OrdinalIgnoreCase);
}
internal static byte[] EncodeDeleteRange(DeleteRange deleteRange)
{
var payload = JsonSerializer.SerializeToUtf8Bytes(deleteRange);
var encoded = new byte[payload.Length + 1];
encoded[0] = (byte)EntryOp.DeleteRangeOp;
Buffer.BlockCopy(payload, 0, encoded, 1, payload.Length);
return encoded;
}
internal static (DeleteRange? DeleteRange, Exception? Error) DecodeDeleteRange(ReadOnlySpan<byte> payload)
{
try
{
var deleteRange = JsonSerializer.Deserialize<DeleteRange>(payload);
return deleteRange == null
? (null, new InvalidOperationException("delete range payload did not deserialize"))
: (deleteRange, null);
}
catch (Exception ex)
{
return (null, ex);
}
}
internal RaftGroup? CreateGroupForConsumer(ConsumerConfig config, StreamAssignment streamAssignment)
{
var group = streamAssignment.Group;
if (group == null || group.Peers.Length == 0)
return null;
var replicas = config.Replicas > 0
? config.Replicas
: Math.Max(1, streamAssignment.Config?.Replicas ?? 1);
if (replicas > group.Peers.Length)
return null;
var peers = group.Peers.ToArray();
var active = new List<string>(peers.Length);
if (Server is NatsServer server)
{
foreach (var peer in peers)
{
if (server.GetNodeInfo(peer) is { Offline: false })
active.Add(peer);
}
}
else
{
active.AddRange(peers);
}
var quorum = replicas / 2 + 1;
if (active.Count < quorum)
return null;
if (replicas > 0 && replicas < peers.Length)
{
if (active.Count < replicas)
return null;
ShuffleInPlace(active);
peers = active.Take(replicas).ToArray();
}
var storage = config.MemoryStorage ? StorageType.MemoryStorage : streamAssignment.Config?.Storage ?? group.Storage;
return new RaftGroup
{
Name = GroupNameForConsumer(peers, storage),
Storage = storage,
Peers = peers,
Cluster = group.Cluster,
};
}
internal static byte[] EncodeAddConsumerAssignment(ConsumerAssignment assignment)
{
var serializable = CloneForProposal(assignment);
var payload = JsonSerializer.SerializeToUtf8Bytes(serializable);
return PrependOp((byte)EntryOp.AssignConsumerOp, payload);
}
internal static byte[] EncodeDeleteConsumerAssignment(ConsumerAssignment assignment)
{
var serializable = CloneForProposal(assignment);
var payload = JsonSerializer.SerializeToUtf8Bytes(serializable);
return PrependOp((byte)EntryOp.RemoveConsumerOp, payload);
}
internal static (ConsumerAssignment? Assignment, Exception? Error) DecodeConsumerAssignment(ReadOnlySpan<byte> payload)
{
try
{
var assignment = JsonSerializer.Deserialize<ConsumerAssignment>(payload);
if (assignment == null)
return (null, new InvalidOperationException("consumer assignment payload did not deserialize"));
var decodeError = DecodeConsumerAssignmentConfig(assignment);
return decodeError != null ? (null, decodeError) : (assignment, null);
}
catch (Exception ex)
{
return (null, ex);
}
}
internal static Exception? DecodeConsumerAssignmentConfig(ConsumerAssignment assignment)
{
if (assignment.ConfigJson.ValueKind is JsonValueKind.Undefined or JsonValueKind.Null)
return new InvalidOperationException("consumer assignment config payload is missing");
Exception? strictError = null;
ConsumerConfig? config;
try
{
var strictOptions = new JsonSerializerOptions { UnmappedMemberHandling = JsonUnmappedMemberHandling.Disallow };
config = assignment.ConfigJson.Deserialize<ConsumerConfig>(strictOptions);
}
catch (Exception ex)
{
strictError = ex;
config = null;
}
if (config == null)
{
try
{
config = assignment.ConfigJson.Deserialize<ConsumerConfig>();
}
catch (Exception ex)
{
return ex;
}
}
assignment.Config = config ?? new ConsumerConfig();
if (strictError != null || !JetStreamVersioning.SupportsRequiredApiLevel(assignment.Config.Metadata))
assignment.Unsupported = NewUnsupportedConsumerAssignment(assignment, strictError);
return null;
}
internal static byte[] EncodeAddConsumerAssignmentCompressed(ConsumerAssignment assignment)
{
var serializable = CloneForProposal(assignment);
var payload = JsonSerializer.SerializeToUtf8Bytes(serializable);
var compressed = Snappy.Encode(payload);
return PrependOp((byte)EntryOp.AssignCompressedConsumerOp, compressed);
}
internal static (ConsumerAssignment? Assignment, Exception? Error) DecodeConsumerAssignmentCompressed(ReadOnlySpan<byte> payload)
{
try
{
var decoded = Snappy.Decode(payload);
return DecodeConsumerAssignment(decoded);
}
catch (Exception ex)
{
return (null, ex);
}
}
internal static (string Subject, string Reply, byte[]? Header, byte[]? Message, ulong Sequence, long Timestamp, bool Sourced, Exception? Error)
DecodeStreamMsg(ReadOnlySpan<byte> payload)
{
if (payload.Length < 26)
return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg);
var index = 0;
var sequence = BinaryPrimitives.ReadUInt64LittleEndian(payload.Slice(index, 8));
index += 8;
var timestamp = (long)BinaryPrimitives.ReadUInt64LittleEndian(payload.Slice(index, 8));
index += 8;
if (!TryReadUInt16(payload, ref index, out var subjectLength) || !TryReadString(payload, ref index, subjectLength, out var subject))
return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg);
if (!TryReadUInt16(payload, ref index, out var replyLength) || !TryReadString(payload, ref index, replyLength, out var reply))
return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg);
if (!TryReadUInt16(payload, ref index, out var headerLength) || !TryReadBytes(payload, ref index, headerLength, out var header))
return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg);
if (!TryReadUInt32(payload, ref index, out var messageLength) || !TryReadBytes(payload, ref index, messageLength, out var message))
return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg);
var sourced = false;
if (index < payload.Length)
{
if (!TryReadUVarInt(payload.Slice(index), out var flags, out _))
return (string.Empty, string.Empty, null, null, 0, 0, false, ErrBadStreamMsg);
sourced = (flags & MsgFlagFromSourceOrMirror) != 0;
}
return (subject, reply, header.Length == 0 ? null : header, message.Length == 0 ? null : message, sequence, timestamp, sourced, null);
}
internal static (string BatchId, ulong BatchSequence, EntryOp Operation, byte[]? Payload, Exception? Error) DecodeBatchMsg(ReadOnlySpan<byte> payload)
{
var index = 0;
if (!TryReadUInt16(payload, ref index, out var batchIdLength) || !TryReadString(payload, ref index, batchIdLength, out var batchId))
return (string.Empty, 0, default, null, ErrBadStreamMsg);
if (!TryReadUVarInt(payload.Slice(index), out var sequence, out var consumed))
return (string.Empty, 0, default, null, ErrBadStreamMsg);
index += consumed;
if (index >= payload.Length)
return (string.Empty, 0, default, null, ErrBadStreamMsg);
var operation = (EntryOp)payload[index];
index++;
var data = payload.Slice(index).ToArray();
return (batchId, sequence, operation, data, null);
}
internal static byte[] EncodeStreamMsg(string subject, string reply, byte[]? header, byte[]? message, ulong sequence, long timestamp, bool sourced)
=> EncodeStreamMsgAllowCompress(subject, reply, header, message, sequence, timestamp, sourced);
internal static byte[] EncodeStreamMsgAllowCompress(string subject, string reply, byte[]? header, byte[]? message, ulong sequence, long timestamp, bool sourced)
=> EncodeStreamMsgAllowCompressAndBatch(subject, reply, header, message, sequence, timestamp, sourced, string.Empty, 0, false);
internal static byte[] EncodeStreamMsgAllowCompressAndBatch(
string subject,
string reply,
byte[]? header,
byte[]? message,
ulong sequence,
long timestamp,
bool sourced,
string batchId,
ulong batchSequence,
bool batchCommit)
{
subject ??= string.Empty;
reply ??= string.Empty;
header ??= [];
message ??= [];
batchId ??= string.Empty;
var subjectLength = Math.Min(subject.Length, ushort.MaxValue);
var replyLength = Math.Min(reply.Length, ushort.MaxValue);
var headerLength = Math.Min(header.Length, ushort.MaxValue);
var messageLength = Math.Min(message.Length, int.MaxValue);
var body = new List<byte>(1 + subjectLength + replyLength + headerLength + messageLength + 32);
var op = EntryOp.StreamMsgOp;
if (!string.IsNullOrEmpty(batchId))
{
var batchIdLength = Math.Min(batchId.Length, ushort.MaxValue);
body.AddRange(BitConverter.GetBytes((ushort)batchIdLength));
body.AddRange(Encoding.ASCII.GetBytes(batchId[..batchIdLength]));
AppendUVarInt(body, batchSequence);
op = batchCommit ? EntryOp.BatchCommitMsgOp : EntryOp.BatchMsgOp;
body.Add((byte)EntryOp.StreamMsgOp);
}
body.AddRange(BitConverter.GetBytes(sequence));
body.AddRange(BitConverter.GetBytes((ulong)timestamp));
body.AddRange(BitConverter.GetBytes((ushort)subjectLength));
body.AddRange(Encoding.ASCII.GetBytes(subject[..subjectLength]));
body.AddRange(BitConverter.GetBytes((ushort)replyLength));
body.AddRange(Encoding.ASCII.GetBytes(reply[..replyLength]));
body.AddRange(BitConverter.GetBytes((ushort)headerLength));
body.AddRange(header.AsSpan(0, headerLength).ToArray());
body.AddRange(BitConverter.GetBytes((uint)messageLength));
body.AddRange(message.AsSpan(0, messageLength).ToArray());
AppendUVarInt(body, sourced ? MsgFlagFromSourceOrMirror : 0);
var encoded = body.ToArray();
var shouldCompress = (subjectLength + replyLength + headerLength + messageLength) > CompressThreshold;
if (shouldCompress)
{
var opIndex = !string.IsNullOrEmpty(batchId) ? FindStreamOpOffset(encoded) : 0;
var compressed = Snappy.Encode(encoded.AsSpan(opIndex + 1));
if (compressed.Length < encoded.Length - (opIndex + 1))
{
var output = new byte[opIndex + 1 + compressed.Length];
if (opIndex > 0)
Buffer.BlockCopy(encoded, 0, output, 0, opIndex);
output[opIndex] = (byte)EntryOp.CompressedStreamMsgOp;
Buffer.BlockCopy(compressed, 0, output, opIndex + 1, compressed.Length);
if (!string.IsNullOrEmpty(batchId))
output[0] = (byte)op;
return output;
}
}
if (!string.IsNullOrEmpty(batchId))
{
encoded[0] = (byte)op;
return encoded;
}
return PrependOp((byte)EntryOp.StreamMsgOp, encoded);
}
private static byte[] PrependOp(byte op, byte[] payload)
{
var encoded = new byte[payload.Length + 1];
encoded[0] = op;
Buffer.BlockCopy(payload, 0, encoded, 1, payload.Length);
return encoded;
}
private static ConsumerAssignment CloneForProposal(ConsumerAssignment assignment)
{
var clone = assignment.CopyGroup();
if (assignment.Config != null)
clone.ConfigJson = SerializeJsonElement(assignment.Config);
return clone;
}
internal static JsonElement SerializeJsonElement<T>(T value)
{
var bytes = JsonSerializer.SerializeToUtf8Bytes(value);
using var doc = JsonDocument.Parse(bytes);
return doc.RootElement.Clone();
}
private static bool TryReadUInt16(ReadOnlySpan<byte> payload, ref int index, out int value)
{
if (payload.Length - index < 2)
{
value = 0;
return false;
}
value = BinaryPrimitives.ReadUInt16LittleEndian(payload.Slice(index, 2));
index += 2;
return true;
}
private static bool TryReadUInt32(ReadOnlySpan<byte> payload, ref int index, out int value)
{
if (payload.Length - index < 4)
{
value = 0;
return false;
}
value = (int)BinaryPrimitives.ReadUInt32LittleEndian(payload.Slice(index, 4));
index += 4;
return true;
}
private static bool TryReadString(ReadOnlySpan<byte> payload, ref int index, int length, out string value)
{
if (payload.Length - index < length)
{
value = string.Empty;
return false;
}
value = Encoding.ASCII.GetString(payload.Slice(index, length));
index += length;
return true;
}
private static bool TryReadBytes(ReadOnlySpan<byte> payload, ref int index, int length, out byte[] value)
{
if (payload.Length - index < length)
{
value = [];
return false;
}
value = payload.Slice(index, length).ToArray();
index += length;
return true;
}
private static void AppendUVarInt(List<byte> buffer, ulong value)
{
while (value >= 0x80)
{
buffer.Add((byte)((value & 0x7F) | 0x80));
value >>= 7;
}
buffer.Add((byte)value);
}
private static int FindStreamOpOffset(byte[] encoded)
{
var index = 1;
if (encoded.Length < 3)
return 0;
var batchIdLength = BinaryPrimitives.ReadUInt16LittleEndian(encoded.AsSpan(index, 2));
index += 2 + batchIdLength;
if (!TryReadUVarInt(encoded.AsSpan(index), out _, out var consumed))
return 0;
return index + consumed;
}
private static void ShuffleInPlace(List<string> items)
{
for (var i = items.Count - 1; i > 0; i--)
{
var j = Random.Shared.Next(i + 1);
(items[i], items[j]) = (items[j], items[i]);
}
}
}
// ============================================================================

View File

@@ -0,0 +1,144 @@
using System.Text.Json;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
internal bool SupportsBinarySnapshot()
{
_mu.EnterReadLock();
try
{
return SupportsBinarySnapshotLocked();
}
finally
{
_mu.ExitReadLock();
}
}
internal bool SupportsBinarySnapshotLocked()
{
var raftNode = _node as IRaftNode;
var server = Account?.Server as NatsServer;
if (raftNode == null || server == null)
return false;
var ourId = raftNode.ID();
foreach (var peer in raftNode.Peers())
{
if (string.Equals(peer.Id, ourId, StringComparison.Ordinal))
continue;
if (server.GetNodeInfo(peer.Id) is { BinarySnapshots: false })
return false;
}
return true;
}
internal byte[]? StateSnapshot()
{
_mu.EnterReadLock();
try
{
return StateSnapshotLocked();
}
finally
{
_mu.ExitReadLock();
}
}
internal byte[]? StateSnapshotLocked()
{
if (Store == null)
return null;
if (SupportsBinarySnapshotLocked())
{
var (encoded, error) = Store.EncodedStreamState(GetCLFS());
return error == null ? encoded : null;
}
var state = Store.State();
var snapshot = new StreamSnapshot
{
Msgs = state.Msgs,
Bytes = state.Bytes,
FirstSeq = state.FirstSeq,
LastSeq = state.LastSeq,
Failed = GetCLFS(),
Deleted = state.Deleted,
};
return JsonSerializer.SerializeToUtf8Bytes(snapshot);
}
internal Exception? ProcessClusteredInboundMsg(string subject, string reply, byte[]? header, byte[]? message, object? msgTrace, bool sourced)
{
_ = msgTrace;
if (string.IsNullOrWhiteSpace(subject))
return new ArgumentException("subject is required", nameof(subject));
_mu.EnterUpgradeableReadLock();
try
{
var raftNode = _node as IRaftNode;
var canRespond = !Config.NoAck && !string.IsNullOrWhiteSpace(reply);
if (raftNode == null)
{
return ProcessJetStreamMsg(subject, reply, header, message, 0, 0, msgTrace, sourced, canRespond);
}
if (!IsLeader())
{
return new InvalidOperationException(JsApiErrors.NewJSClusterNotLeaderError().Description ?? "stream is not cluster leader");
}
if (Config.Sealed)
{
if (canRespond)
{
var response = new JSPubAckResponse
{
Stream = Name,
PubAckError = JsApiErrors.NewJSStreamSealedError(),
};
_outq.SendMsg(reply, JsonSerializer.SerializeToUtf8Bytes(response));
}
return new InvalidOperationException(JsApiErrors.NewJSStreamSealedError().Description ?? "stream is sealed");
}
_mu.EnterWriteLock();
try
{
if (_clseq == 0)
_clseq = (ulong)Math.Max(0, Interlocked.Read(ref LastSeq)) + _clfs;
_clseq++;
var encoded = JetStreamCluster.EncodeStreamMsgAllowCompress(
subject,
reply,
header,
message,
_clseq,
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L,
sourced);
raftNode.Propose(encoded);
TrackReplicationTraffic(raftNode, encoded.Length, Math.Max(1, Config.Replicas));
}
finally
{
_mu.ExitWriteLock();
}
return null;
}
finally
{
_mu.ExitUpgradeableReadLock();
}
}
}

View File

@@ -0,0 +1,145 @@
using System.Text.Json;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal void JsClusteredConsumerRequest(
ClientInfo clientInfo,
Account account,
string subject,
string reply,
byte[] requestMessage,
string stream,
ConsumerConfig config,
ConsumerAction action,
bool pedantic)
{
var (js, cluster) = GetJetStreamCluster();
if (js == null || cluster == null)
return;
var response = new JsApiConsumerCreateResponse
{
Type = JsApiSubjects.JsApiConsumerCreateResponseType,
};
var engine = new JetStreamEngine(js);
var (streamConfig, streamFound) = engine.ClusterStreamConfig(account.Name, stream);
if (!streamFound)
{
response.Error = JsApiErrors.NewJSStreamNotFoundError();
SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
return;
}
var defaultsError = NatsConsumer.SetConsumerConfigDefaults(config, streamConfig, selectedLimits: null, pedantic);
if (defaultsError != null)
{
response.Error = defaultsError;
SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
return;
}
var cfgError = NatsConsumer.CheckConsumerCfg(config, streamConfig, selectedLimits: null, isRecovering: false);
if (cfgError != null)
{
response.Error = cfgError;
SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
return;
}
var streamAssignment = engine.StreamAssignmentOrInflight(account.Name, stream);
if (streamAssignment == null)
{
response.Error = JsApiErrors.NewJSStreamNotFoundError();
SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
return;
}
var explicitName = !string.IsNullOrWhiteSpace(config.Name)
? config.Name
: config.Durable;
var existing = !string.IsNullOrWhiteSpace(explicitName)
? engine.ConsumerAssignmentOrInflight(account.Name, stream, explicitName)
: null;
ConsumerAssignment assignment;
if (existing != null)
{
JetStreamVersioning.CopyConsumerMetadata(config, existing.Config);
if (action == ConsumerAction.Create)
{
var existingConfig = existing.Config ?? new ConsumerConfig();
if (JsonSerializer.Serialize(existingConfig) != JsonSerializer.Serialize(config))
{
response.Error = JsApiErrors.NewJSConsumerAlreadyExistsError();
SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
return;
}
}
assignment = existing.CopyGroup();
assignment.Config = config;
assignment.ConfigJson = JetStreamCluster.SerializeJsonElement(config);
assignment.Subject = subject;
assignment.Reply = reply;
assignment.Client = clientInfo;
}
else
{
if (action == ConsumerAction.Update)
{
response.Error = JsApiErrors.NewJSConsumerDoesNotExistError();
SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
return;
}
JetStreamVersioning.SetStaticConsumerMetadata(config);
var group = cluster.CreateGroupForConsumer(config, streamAssignment);
if (group == null)
{
response.Error = JsApiErrors.NewJSInsufficientResourcesError();
SendAPIErrResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
return;
}
group.SetPreferred(this);
group.Cluster = streamAssignment.Group?.Cluster;
var name = explicitName;
if (!NatsConsumer.IsDurableConsumer(config))
{
if (string.IsNullOrWhiteSpace(name))
{
do
{
name = NatsConsumer.CreateConsumerName();
} while (engine.ConsumerAssignmentOrInflight(account.Name, stream, name) != null);
}
}
assignment = new ConsumerAssignment
{
Group = group,
Stream = stream,
Name = name,
Config = config,
ConfigJson = JetStreamCluster.SerializeJsonElement(config),
Subject = subject,
Reply = reply,
Client = clientInfo,
Created = DateTime.UtcNow,
};
}
if (cluster.Meta != null)
{
cluster.Meta.Propose(JetStreamCluster.EncodeAddConsumerAssignment(assignment));
cluster.TrackInflightConsumerProposal(account.Name, stream, assignment, deleted: false);
}
_ = requestMessage;
}
}