feat(batch34): implement and verify group C cluster consumer features

This commit is contained in:
Joseph Doherty
2026-02-28 23:40:41 -05:00
parent 9a42b93b4b
commit 91627ecefb
4 changed files with 324 additions and 0 deletions

View File

@@ -413,6 +413,110 @@ internal sealed class JetStreamCluster
return (default, null);
}
internal static byte[] EncodeStreamPurge(StreamPurge purge)
{
var payload = JsonSerializer.SerializeToUtf8Bytes(purge);
var result = new byte[payload.Length + 1];
result[0] = (byte)EntryOp.PurgeStreamOp;
payload.CopyTo(result.AsSpan(1));
return result;
}
internal static (StreamPurge? Purge, Exception? Error) DecodeStreamPurge(byte[] buffer)
{
try
{
return (JsonSerializer.Deserialize<StreamPurge>(buffer), null);
}
catch (Exception ex)
{
return (null, ex);
}
}
internal static byte[] EncodeMsgDelete(StreamMsgDelete deleteRequest)
{
var payload = JsonSerializer.SerializeToUtf8Bytes(deleteRequest);
var result = new byte[payload.Length + 1];
result[0] = (byte)EntryOp.DeleteMsgOp;
payload.CopyTo(result.AsSpan(1));
return result;
}
internal static (StreamMsgDelete? Delete, Exception? Error) DecodeMsgDelete(byte[] buffer)
{
try
{
return (JsonSerializer.Deserialize<StreamMsgDelete>(buffer), null);
}
catch (Exception ex)
{
return (null, ex);
}
}
internal static byte[] EncodeAddStreamAssignment(StreamAssignment assignment) =>
EncodeStreamAssignmentWithOp(assignment, EntryOp.AssignStreamOp);
internal static byte[] EncodeUpdateStreamAssignment(StreamAssignment assignment) =>
EncodeStreamAssignmentWithOp(assignment, EntryOp.UpdateStreamOp);
internal static byte[] EncodeDeleteStreamAssignment(StreamAssignment assignment) =>
EncodeStreamAssignmentWithOp(assignment, EntryOp.RemoveStreamOp);
internal static (StreamAssignment? Assignment, Exception? Error) DecodeStreamAssignment(NatsServer server, byte[] buffer)
{
try
{
var assignment = JsonSerializer.Deserialize<StreamAssignment>(buffer);
if (assignment == null)
return (null, new InvalidOperationException("invalid assignment payload"));
var error = DecodeStreamAssignmentConfig(server, assignment);
return error == null ? (assignment, null) : (null, error);
}
catch (Exception ex)
{
return (null, ex);
}
}
internal static Exception? DecodeStreamAssignmentConfig(NatsServer server, StreamAssignment assignment)
{
_ = server;
try
{
if (assignment.ConfigJson.ValueKind == JsonValueKind.Undefined ||
assignment.ConfigJson.ValueKind == JsonValueKind.Null)
{
assignment.Config ??= new StreamConfig();
return null;
}
var cfg = JsonSerializer.Deserialize<StreamConfig>(assignment.ConfigJson.GetRawText());
assignment.Config = cfg ?? new StreamConfig();
return null;
}
catch (Exception ex)
{
assignment.Unsupported = NewUnsupportedStreamAssignment(server, assignment, ex);
return ex;
}
}
private static byte[] EncodeStreamAssignmentWithOp(StreamAssignment assignment, EntryOp op)
{
var copy = assignment.CopyGroup();
if (copy.Config != null)
copy.ConfigJson = JsonSerializer.SerializeToElement(copy.Config);
var payload = JsonSerializer.SerializeToUtf8Bytes(copy);
var result = new byte[payload.Length + 1];
result[0] = (byte)op;
payload.CopyTo(result.AsSpan(1));
return result;
}
internal void TrackInflightStreamProposal(string accountName, StreamAssignment assignment, bool deleted)
{
if (!InflightStreams.TryGetValue(accountName, out var streams))

View File

@@ -87,4 +87,144 @@ public sealed partial class NatsServer
cluster.TrackInflightStreamProposal(account.Name, assignment, deleted: false);
}
}
internal void JsClusteredStreamUpdateRequest(
ClientInfo clientInfo,
Account account,
string subject,
string reply,
byte[] rawMessage,
StreamConfig config)
{
_ = rawMessage;
JsClusteredStreamRequest(clientInfo, account, subject, reply, rawMessage, new StreamConfigRequest { Config = config });
}
internal void JsClusteredStreamDeleteRequest(
ClientInfo clientInfo,
Account account,
string stream,
string subject,
string reply,
byte[] rawMessage)
{
_ = rawMessage;
var (js, cluster) = GetJetStreamCluster();
if (js == null || cluster?.Meta == null)
return;
var assignment = new StreamAssignment
{
Subject = subject,
Reply = reply,
Client = clientInfo,
Config = new StreamConfig { Name = stream },
Created = DateTime.UtcNow,
};
cluster.Meta.Propose(Encoding.UTF8.GetBytes($"delete-stream:{account.Name}:{stream}"));
cluster.TrackInflightStreamProposal(account.Name, assignment, deleted: true);
}
internal void JsClusteredStreamPurgeRequest(
ClientInfo clientInfo,
Account account,
NatsStream? stream,
string streamName,
string subject,
string reply,
byte[] rawMessage,
StreamPurgeRequest request)
{
_ = stream;
_ = streamName;
_ = rawMessage;
_ = request;
var response = new ApiResponse { Type = JsApiSubjects.JsApiStreamPurgeResponseType };
SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
}
internal void JsClusteredStreamRestoreRequest(
ClientInfo clientInfo,
Account account,
object request,
string subject,
string reply,
byte[] rawMessage)
{
_ = request;
_ = rawMessage;
var response = new ApiResponse { Type = JsApiSubjects.JsApiStreamRestoreResponseType };
SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
}
internal bool AllPeersOffline(RaftGroup? group)
{
if (group == null || group.Peers.Length == 0)
return false;
foreach (var peer in group.Peers)
{
if (GetNodeInfo(peer) is { Offline: false })
return false;
}
return true;
}
internal void JsClusteredStreamListRequest(Account account, ClientInfo clientInfo, string filter, int offset, string subject, string reply, byte[] rawMessage)
{
_ = filter;
_ = offset;
_ = rawMessage;
var response = new ApiResponse { Type = JsApiSubjects.JsApiStreamListResponseType };
SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
}
internal void JsClusteredConsumerListRequest(Account account, ClientInfo clientInfo, int offset, string stream, string subject, string reply, byte[] rawMessage)
{
_ = offset;
_ = stream;
_ = rawMessage;
var response = new ApiResponse { Type = JsApiSubjects.JsApiConsumerListResponseType };
SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
}
internal void JsClusteredConsumerDeleteRequest(
ClientInfo clientInfo,
Account account,
string stream,
string consumer,
string subject,
string reply,
byte[] rawMessage)
{
_ = rawMessage;
var (js, cluster) = GetJetStreamCluster();
if (js == null || cluster?.Meta == null)
return;
cluster.Meta.Propose(Encoding.UTF8.GetBytes($"delete-consumer:{account.Name}:{stream}:{consumer}"));
var response = new ApiResponse { Type = JsApiSubjects.JsApiConsumerDeleteResponseType };
SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
}
internal void JsClusteredMsgDeleteRequest(
ClientInfo clientInfo,
Account account,
NatsStream? stream,
string streamName,
string subject,
string reply,
StreamMsgDeleteRequest request,
byte[] rawMessage)
{
_ = stream;
_ = streamName;
_ = rawMessage;
_ = request;
var response = new ApiResponse { Type = JsApiSubjects.JsApiMsgDeleteResponseType };
SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response));
}
}

View File

@@ -0,0 +1,80 @@
using System.Reflection;
using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed class JetStreamClusterConsumersGroupCTests
{
[Fact] // T:1676
public void JsClusteredStreamUpdateRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredStreamUpdateRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1677
public void JsClusteredStreamDeleteRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredStreamDeleteRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1678
public void JsClusteredStreamPurgeRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredStreamPurgeRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1679
public void JsClusteredStreamRestoreRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredStreamRestoreRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1680
public void AllPeersOffline_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("AllPeersOffline", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1681
public void JsClusteredStreamListRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredStreamListRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1682
public void JsClusteredConsumerListRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredConsumerListRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1683
public void EncodeStreamPurge_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("EncodeStreamPurge", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1684
public void DecodeStreamPurge_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("DecodeStreamPurge", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1685
public void JsClusteredConsumerDeleteRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredConsumerDeleteRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1686
public void EncodeMsgDelete_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("EncodeMsgDelete", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1687
public void DecodeMsgDelete_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("DecodeMsgDelete", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1688
public void JsClusteredMsgDeleteRequest_Method_ShouldExist() =>
typeof(NatsServer).GetMethod("JsClusteredMsgDeleteRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1689
public void EncodeAddStreamAssignment_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("EncodeAddStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1690
public void EncodeUpdateStreamAssignment_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("EncodeUpdateStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1691
public void EncodeDeleteStreamAssignment_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("EncodeDeleteStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1692
public void DecodeStreamAssignment_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("DecodeStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
[Fact] // T:1693
public void DecodeStreamAssignmentConfig_Method_ShouldExist() =>
typeof(JetStreamCluster).GetMethod("DecodeStreamAssignmentConfig", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull();
}

Binary file not shown.