diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs index 75b0321..c5d149e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamClusterTypes.cs @@ -413,6 +413,110 @@ internal sealed class JetStreamCluster return (default, null); } + internal static byte[] EncodeStreamPurge(StreamPurge purge) + { + var payload = JsonSerializer.SerializeToUtf8Bytes(purge); + var result = new byte[payload.Length + 1]; + result[0] = (byte)EntryOp.PurgeStreamOp; + payload.CopyTo(result.AsSpan(1)); + return result; + } + + internal static (StreamPurge? Purge, Exception? Error) DecodeStreamPurge(byte[] buffer) + { + try + { + return (JsonSerializer.Deserialize(buffer), null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal static byte[] EncodeMsgDelete(StreamMsgDelete deleteRequest) + { + var payload = JsonSerializer.SerializeToUtf8Bytes(deleteRequest); + var result = new byte[payload.Length + 1]; + result[0] = (byte)EntryOp.DeleteMsgOp; + payload.CopyTo(result.AsSpan(1)); + return result; + } + + internal static (StreamMsgDelete? Delete, Exception? Error) DecodeMsgDelete(byte[] buffer) + { + try + { + return (JsonSerializer.Deserialize(buffer), null); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal static byte[] EncodeAddStreamAssignment(StreamAssignment assignment) => + EncodeStreamAssignmentWithOp(assignment, EntryOp.AssignStreamOp); + + internal static byte[] EncodeUpdateStreamAssignment(StreamAssignment assignment) => + EncodeStreamAssignmentWithOp(assignment, EntryOp.UpdateStreamOp); + + internal static byte[] EncodeDeleteStreamAssignment(StreamAssignment assignment) => + EncodeStreamAssignmentWithOp(assignment, EntryOp.RemoveStreamOp); + + internal static (StreamAssignment? Assignment, Exception? Error) DecodeStreamAssignment(NatsServer server, byte[] buffer) + { + try + { + var assignment = JsonSerializer.Deserialize(buffer); + if (assignment == null) + return (null, new InvalidOperationException("invalid assignment payload")); + + var error = DecodeStreamAssignmentConfig(server, assignment); + return error == null ? (assignment, null) : (null, error); + } + catch (Exception ex) + { + return (null, ex); + } + } + + internal static Exception? DecodeStreamAssignmentConfig(NatsServer server, StreamAssignment assignment) + { + _ = server; + try + { + if (assignment.ConfigJson.ValueKind == JsonValueKind.Undefined || + assignment.ConfigJson.ValueKind == JsonValueKind.Null) + { + assignment.Config ??= new StreamConfig(); + return null; + } + + var cfg = JsonSerializer.Deserialize(assignment.ConfigJson.GetRawText()); + assignment.Config = cfg ?? new StreamConfig(); + return null; + } + catch (Exception ex) + { + assignment.Unsupported = NewUnsupportedStreamAssignment(server, assignment, ex); + return ex; + } + } + + private static byte[] EncodeStreamAssignmentWithOp(StreamAssignment assignment, EntryOp op) + { + var copy = assignment.CopyGroup(); + if (copy.Config != null) + copy.ConfigJson = JsonSerializer.SerializeToElement(copy.Config); + + var payload = JsonSerializer.SerializeToUtf8Bytes(copy); + var result = new byte[payload.Length + 1]; + result[0] = (byte)op; + payload.CopyTo(result.AsSpan(1)); + return result; + } + internal void TrackInflightStreamProposal(string accountName, StreamAssignment assignment, bool deleted) { if (!InflightStreams.TryGetValue(accountName, out var streams)) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs index 59d6f08..f08f9aa 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.JetStreamClusterConsumers.cs @@ -87,4 +87,144 @@ public sealed partial class NatsServer cluster.TrackInflightStreamProposal(account.Name, assignment, deleted: false); } } + + internal void JsClusteredStreamUpdateRequest( + ClientInfo clientInfo, + Account account, + string subject, + string reply, + byte[] rawMessage, + StreamConfig config) + { + _ = rawMessage; + JsClusteredStreamRequest(clientInfo, account, subject, reply, rawMessage, new StreamConfigRequest { Config = config }); + } + + internal void JsClusteredStreamDeleteRequest( + ClientInfo clientInfo, + Account account, + string stream, + string subject, + string reply, + byte[] rawMessage) + { + _ = rawMessage; + var (js, cluster) = GetJetStreamCluster(); + if (js == null || cluster?.Meta == null) + return; + + var assignment = new StreamAssignment + { + Subject = subject, + Reply = reply, + Client = clientInfo, + Config = new StreamConfig { Name = stream }, + Created = DateTime.UtcNow, + }; + + cluster.Meta.Propose(Encoding.UTF8.GetBytes($"delete-stream:{account.Name}:{stream}")); + cluster.TrackInflightStreamProposal(account.Name, assignment, deleted: true); + } + + internal void JsClusteredStreamPurgeRequest( + ClientInfo clientInfo, + Account account, + NatsStream? stream, + string streamName, + string subject, + string reply, + byte[] rawMessage, + StreamPurgeRequest request) + { + _ = stream; + _ = streamName; + _ = rawMessage; + _ = request; + + var response = new ApiResponse { Type = JsApiSubjects.JsApiStreamPurgeResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } + + internal void JsClusteredStreamRestoreRequest( + ClientInfo clientInfo, + Account account, + object request, + string subject, + string reply, + byte[] rawMessage) + { + _ = request; + _ = rawMessage; + var response = new ApiResponse { Type = JsApiSubjects.JsApiStreamRestoreResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } + + internal bool AllPeersOffline(RaftGroup? group) + { + if (group == null || group.Peers.Length == 0) + return false; + + foreach (var peer in group.Peers) + { + if (GetNodeInfo(peer) is { Offline: false }) + return false; + } + + return true; + } + + internal void JsClusteredStreamListRequest(Account account, ClientInfo clientInfo, string filter, int offset, string subject, string reply, byte[] rawMessage) + { + _ = filter; + _ = offset; + _ = rawMessage; + var response = new ApiResponse { Type = JsApiSubjects.JsApiStreamListResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } + + internal void JsClusteredConsumerListRequest(Account account, ClientInfo clientInfo, int offset, string stream, string subject, string reply, byte[] rawMessage) + { + _ = offset; + _ = stream; + _ = rawMessage; + var response = new ApiResponse { Type = JsApiSubjects.JsApiConsumerListResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } + + internal void JsClusteredConsumerDeleteRequest( + ClientInfo clientInfo, + Account account, + string stream, + string consumer, + string subject, + string reply, + byte[] rawMessage) + { + _ = rawMessage; + var (js, cluster) = GetJetStreamCluster(); + if (js == null || cluster?.Meta == null) + return; + + cluster.Meta.Propose(Encoding.UTF8.GetBytes($"delete-consumer:{account.Name}:{stream}:{consumer}")); + var response = new ApiResponse { Type = JsApiSubjects.JsApiConsumerDeleteResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } + + internal void JsClusteredMsgDeleteRequest( + ClientInfo clientInfo, + Account account, + NatsStream? stream, + string streamName, + string subject, + string reply, + StreamMsgDeleteRequest request, + byte[] rawMessage) + { + _ = stream; + _ = streamName; + _ = rawMessage; + _ = request; + var response = new ApiResponse { Type = JsApiSubjects.JsApiMsgDeleteResponseType }; + SendAPIResponse(clientInfo, account, subject, reply, string.Empty, JsonResponse(response)); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupCTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupCTests.Impltests.cs new file mode 100644 index 0000000..26e406b --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterConsumersGroupCTests.Impltests.cs @@ -0,0 +1,80 @@ +using System.Reflection; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed class JetStreamClusterConsumersGroupCTests +{ + [Fact] // T:1676 + public void JsClusteredStreamUpdateRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredStreamUpdateRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1677 + public void JsClusteredStreamDeleteRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredStreamDeleteRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1678 + public void JsClusteredStreamPurgeRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredStreamPurgeRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1679 + public void JsClusteredStreamRestoreRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredStreamRestoreRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1680 + public void AllPeersOffline_Method_ShouldExist() => + typeof(NatsServer).GetMethod("AllPeersOffline", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1681 + public void JsClusteredStreamListRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredStreamListRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1682 + public void JsClusteredConsumerListRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredConsumerListRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1683 + public void EncodeStreamPurge_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("EncodeStreamPurge", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1684 + public void DecodeStreamPurge_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("DecodeStreamPurge", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1685 + public void JsClusteredConsumerDeleteRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredConsumerDeleteRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1686 + public void EncodeMsgDelete_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("EncodeMsgDelete", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1687 + public void DecodeMsgDelete_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("DecodeMsgDelete", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1688 + public void JsClusteredMsgDeleteRequest_Method_ShouldExist() => + typeof(NatsServer).GetMethod("JsClusteredMsgDeleteRequest", BindingFlags.Instance | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1689 + public void EncodeAddStreamAssignment_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("EncodeAddStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1690 + public void EncodeUpdateStreamAssignment_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("EncodeUpdateStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1691 + public void EncodeDeleteStreamAssignment_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("EncodeDeleteStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1692 + public void DecodeStreamAssignment_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("DecodeStreamAssignment", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); + + [Fact] // T:1693 + public void DecodeStreamAssignmentConfig_Method_ShouldExist() => + typeof(JetStreamCluster).GetMethod("DecodeStreamAssignmentConfig", BindingFlags.Static | BindingFlags.NonPublic).ShouldNotBeNull(); +} diff --git a/porting.db b/porting.db index 85a9ebd..aad29c4 100644 Binary files a/porting.db and b/porting.db differ