diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs index 7234cbc..ba3cdac 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/JetStreamApiTypes.cs @@ -21,8 +21,24 @@ namespace ZB.MOM.NatsNet.Server; // Forward stubs for types defined in later sessions // --------------------------------------------------------------------------- -/// Stub: stored message type — full definition in session 20. -public sealed class StoredMsg { } +/// Stored message returned by direct-get and message-get APIs. +public sealed class StoredMsg +{ + [JsonPropertyName("subject")] + public string Subject { get; set; } = string.Empty; + + [JsonPropertyName("seq")] + public ulong Sequence { get; set; } + + [JsonPropertyName("hdrs")] + public byte[]? Header { get; set; } + + [JsonPropertyName("data")] + public byte[]? Data { get; set; } + + [JsonPropertyName("time")] + public DateTime Time { get; set; } +} /// /// Priority group for pull consumers. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.DirectGet.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.DirectGet.cs new file mode 100644 index 0000000..a99ca01 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.DirectGet.cs @@ -0,0 +1,246 @@ +using System.Text; +using System.Text.Json; +using System.Linq; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal static (string Ttl, bool Ok) GetMessageScheduleTTL(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (string.Empty, true); + + var ttl = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduleTtl, hdr); + if (ttl == null || ttl.Length == 0) + return (string.Empty, true); + + var ttlValue = Encoding.ASCII.GetString(ttl); + var (_, err) = ParseMessageTTL(ttlValue); + return err == null ? (ttlValue, true) : (string.Empty, false); + } + + internal static string GetMessageScheduleTarget(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduleTarget, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static string GetMessageScheduleSource(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsScheduleSource, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static string GetBatchId(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return string.Empty; + + var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsBatchId, hdr); + return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value); + } + + internal static (ulong Seq, bool Exists) GetBatchSequence(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return (0, false); + + var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsBatchSeq, hdr); + if (value is null || value.Value.Length == 0) + return (0, false); + + var parsed = ServerUtilities.ParseInt64(value.Value.Span); + return parsed < 0 ? (0, false) : ((ulong)parsed, true); + } + + public bool IsClustered() + { + _mu.EnterReadLock(); + try + { + return IsClusteredInternal(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool IsClusteredInternal() => _node is IRaftNode; + + internal void QueueInbound(IpQueue inbound, string subject, string? reply, byte[]? hdr, byte[]? msg, object? sourceInfo, object? trace) + { + _ = sourceInfo; + _ = trace; + + var inboundMsg = InMsg.Rent(); + inboundMsg.Subject = subject; + inboundMsg.Reply = reply; + inboundMsg.Hdr = hdr; + inboundMsg.Msg = msg; + + var (_, error) = inbound.Push(inboundMsg); + if (error != null) + inboundMsg.ReturnToPool(); + } + + internal JsApiMsgGetResponse ProcessDirectGetRequest(string reply, byte[]? hdr, byte[]? msg) + { + var response = new JsApiMsgGetResponse(); + if (string.IsNullOrWhiteSpace(reply)) + return response; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiLevelHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + return response; + } + + if (msg == null || msg.Length == 0) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + return response; + } + + JsApiMsgGetRequest? request; + try + { + request = JsonSerializer.Deserialize(msg); + } + catch (JsonException) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + return response; + } + + if (request == null) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + return response; + } + + return GetDirectRequest(request, reply); + } + + internal JsApiMsgGetResponse ProcessDirectGetLastBySubjectRequest(string subject, string reply, byte[]? hdr, byte[]? msg) + { + var response = new JsApiMsgGetResponse(); + if (string.IsNullOrWhiteSpace(reply)) + return response; + + if (JetStreamVersioning.ErrorOnRequiredApiLevel(GetRequiredApiLevelHeader(hdr))) + { + response.Error = JsApiErrors.NewJSRequiredApiLevelError(); + return response; + } + + var request = msg == null || msg.Length == 0 + ? new JsApiMsgGetRequest() + : JsonSerializer.Deserialize(msg) ?? new JsApiMsgGetRequest(); + + var key = ExtractDirectGetLastBySubjectKey(subject); + if (string.IsNullOrEmpty(key)) + { + response.Error = JsApiErrors.NewJSBadRequestError(); + return response; + } + + request.LastFor = key; + return GetDirectRequest(request, reply); + } + + internal JsApiMsgGetResponse GetDirectMulti(JsApiMsgGetRequest request, string reply) + { + _ = reply; + + if (Store == null) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + var filters = request.MultiLastFor ?? []; + if (filters.Length == 0) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + var (seqs, error) = Store.MultiLastSeqs(filters, request.UpToSeq, 1024); + if (error != null || seqs.Length == 0) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + var firstSeq = seqs[0]; + var loaded = Store.LoadMsg(firstSeq, new StoreMsg()); + if (loaded == null) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + return new JsApiMsgGetResponse { Message = ToStoredMsg(loaded) }; + } + + internal JsApiMsgGetResponse GetDirectRequest(JsApiMsgGetRequest request, string reply) + { + _ = reply; + if (request.MultiLastFor is { Length: > 0 }) + return GetDirectMulti(request, reply); + + if (Store == null) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + StoreMsg? loaded = null; + if (request.Seq > 0) + { + loaded = Store.LoadMsg(request.Seq, new StoreMsg()); + } + else if (!string.IsNullOrWhiteSpace(request.LastFor)) + { + loaded = Store.LoadLastMsg(request.LastFor!, new StoreMsg()); + } + else if (!string.IsNullOrWhiteSpace(request.NextFor)) + { + var (sm, _) = Store.LoadNextMsg(request.NextFor!, SubscriptionIndex.SubjectHasWildcard(request.NextFor!), request.Seq, new StoreMsg()); + loaded = sm; + } + else if (request.StartTime.HasValue) + { + var seq = Store.GetSeqFromTime(request.StartTime.Value); + if (seq > 0) + loaded = Store.LoadMsg(seq, new StoreMsg()); + } + + if (loaded == null) + return new JsApiMsgGetResponse { Error = JsApiErrors.NewJSNoMessageFoundError() }; + + return new JsApiMsgGetResponse { Message = ToStoredMsg(loaded) }; + } + + private static StoredMsg ToStoredMsg(StoreMsg loaded) => new() + { + Subject = loaded.Subject, + Sequence = loaded.Seq, + Header = loaded.Hdr, + Data = loaded.Msg, + Time = DateTimeOffset.FromUnixTimeMilliseconds(loaded.Ts / 1_000_000L).UtcDateTime, + }; + + private static string? GetRequiredApiLevelHeader(byte[]? hdr) + { + if (hdr == null || hdr.Length == 0) + return null; + + var value = NatsMessageHeaders.GetHeader(JsApiSubjects.JsRequiredApiLevel, hdr); + return value == null || value.Length == 0 ? null : Encoding.ASCII.GetString(value); + } + + private static string ExtractDirectGetLastBySubjectKey(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return string.Empty; + + var parts = subject.Split('.'); + return parts.Length <= 5 ? string.Empty : string.Join('.', parts.Skip(5)); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessagePipeline.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessagePipeline.cs new file mode 100644 index 0000000..3c68827 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.MessagePipeline.cs @@ -0,0 +1,98 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + internal Exception? ProcessInboundJetStreamMsg(InMsg? msg) + { + if (msg == null) + return new ArgumentNullException(nameof(msg)); + + try + { + return ProcessJetStreamMsg( + msg.Subject, + msg.Reply ?? string.Empty, + msg.Hdr, + msg.Msg, + lseq: 0, + ts: 0, + msgTrace: null, + sourced: false, + canRespond: true); + } + finally + { + msg.ReturnToPool(); + } + } + + internal Exception? ProcessJetStreamMsg( + string subject, + string reply, + byte[]? hdr, + byte[]? msg, + ulong lseq, + long ts, + object? msgTrace, + bool sourced, + bool canRespond) + { + _ = reply; + _ = msgTrace; + _ = sourced; + _ = canRespond; + + if (string.IsNullOrWhiteSpace(subject)) + return new ArgumentException("subject is required", nameof(subject)); + + if (Store == null) + return new InvalidOperationException("store not initialized"); + + var batchId = GetBatchId(hdr); + if (!string.IsNullOrEmpty(batchId)) + return ProcessJetStreamBatchMsg(batchId, subject, reply, hdr, msg, msgTrace); + + try + { + var (seq, _) = Store.StoreMsg(subject, hdr, msg, ttl: 0); + if (lseq > 0) + seq = lseq; + + if (ts == 0) + ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L; + + Interlocked.Exchange(ref LastSeq, (long)seq); + return null; + } + catch (Exception ex) + { + return ex; + } + } + + internal Exception? ProcessJetStreamBatchMsg(string batchId, string subject, string reply, byte[]? hdr, byte[]? msg, object? msgTrace) + { + _ = reply; + _ = msgTrace; + + if (string.IsNullOrWhiteSpace(batchId)) + return new InvalidOperationException(JsApiErrors.NewJSAtomicPublishInvalidBatchIDError().ToString()); + + var (_, exists) = GetBatchSequence(hdr); + if (!exists) + return new InvalidOperationException(JsApiErrors.NewJSAtomicPublishMissingSeqError().ToString()); + + if (Store == null) + return new InvalidOperationException("store not initialized"); + + try + { + Store.StoreMsg(subject, hdr, msg, ttl: 0); + return null; + } + catch (Exception ex) + { + return ex; + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs new file mode 100644 index 0000000..112ea30 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs @@ -0,0 +1,20 @@ +using System.Collections.Concurrent; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class InMsg +{ + private static readonly ConcurrentBag Pool = new(); + + internal static InMsg Rent() => Pool.TryTake(out var msg) ? msg : new InMsg(); + + internal void ReturnToPool() + { + Subject = string.Empty; + Reply = null; + Hdr = null; + Msg = null; + Client = null; + Pool.Add(this); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index c63e5dd..4bd6ecb 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -227,7 +227,7 @@ public sealed class JsStreamPubMsg /// A JetStream publish message with sync tracking. /// Mirrors jsPubMsg in server/stream.go. /// -public sealed class JsPubMsg +public sealed partial class JsPubMsg { public string Subject { get; set; } = string.Empty; public string? Reply { get; set; } @@ -245,7 +245,7 @@ public sealed class JsPubMsg /// An inbound message to be processed by the JetStream layer. /// Mirrors inMsg in server/stream.go. /// -public sealed class InMsg +public sealed partial class InMsg { public string Subject { get; set; } = string.Empty; public string? Reply { get; set; } @@ -277,7 +277,7 @@ public sealed class InMsg /// A cached/clustered message for replication. /// Mirrors cMsg in server/stream.go. /// -public sealed class CMsg +public sealed partial class CMsg { public string Subject { get; set; } = string.Empty; public byte[]? Msg { get; set; } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamDirectGetPipelineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamDirectGetPipelineTests.cs new file mode 100644 index 0000000..8771204 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamDirectGetPipelineTests.cs @@ -0,0 +1,152 @@ +using System.Text; +using System.Text.Json; +using NSubstitute; +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsStreamDirectGetPipelineTests +{ + [Fact] + public void GetMessageScheduleTTL_InvalidValue_ReturnsNotOk() + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsScheduleTtl, "not-a-ttl"); + + var (ttl, ok) = NatsStream.GetMessageScheduleTTL(hdr); + + ok.ShouldBeFalse(); + ttl.ShouldBe(string.Empty); + } + + [Fact] + public void GetBatchSequence_ValidHeader_ReturnsSequence() + { + var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsBatchSeq, "9"); + + var (seq, ok) = NatsStream.GetBatchSequence(hdr); + + ok.ShouldBeTrue(); + seq.ShouldBe(9UL); + } + + [Fact] + public void IsClustered_WithAssignmentNode_ReturnsTrue() + { + var stream = CreateStream(); + var node = Substitute.For(); + var assignment = new StreamAssignment + { + Group = new RaftGroup + { + Node = node, + Peers = ["N1"], + }, + }; + + stream.SetStreamAssignment(assignment); + + stream.IsClustered().ShouldBeTrue(); + stream.IsClusteredInternal().ShouldBeTrue(); + } + + [Fact] + public void InMsgReturnToPool_WithValues_ClearsState() + { + var msg = new InMsg + { + Subject = "foo", + Reply = "bar", + Hdr = [1, 2], + Msg = [3, 4], + Client = new object(), + }; + + msg.ReturnToPool(); + + msg.Subject.ShouldBe(string.Empty); + msg.Reply.ShouldBeNull(); + msg.Hdr.ShouldBeNull(); + msg.Msg.ShouldBeNull(); + } + + [Fact] + public void QueueInbound_WithQueue_PushesMessage() + { + var stream = CreateStream(); + var queue = new IpQueue("inbound"); + + stream.QueueInbound(queue, "orders.created", null, null, [1], null, null); + var popped = queue.Pop(); + + popped.ShouldNotBeNull(); + popped!.Length.ShouldBe(1); + popped[0].Subject.ShouldBe("orders.created"); + } + + [Fact] + public void ProcessDirectGetRequest_SeqRequest_ReturnsStoredMessage() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + stream.Store!.StoreMsg("orders", null, [7, 8], ttl: 0); + + var request = JsonSerializer.SerializeToUtf8Bytes(new JsApiMsgGetRequest { Seq = 1 }); + var response = stream.ProcessDirectGetRequest("reply.inbox", null, request); + + response.Error.ShouldBeNull(); + response.Message.ShouldNotBeNull(); + response.Message!.Sequence.ShouldBe(1UL); + response.Message.Subject.ShouldBe("orders"); + } + + [Fact] + public void ProcessDirectGetLastBySubjectRequest_InvalidSubject_ReturnsBadRequest() + { + var stream = CreateStream(); + + var response = stream.ProcessDirectGetLastBySubjectRequest("$JS.API.DIRECT.GET.STREAM", "reply", null, null); + + response.Error.ShouldNotBeNull(); + response.Error!.ErrCode.ShouldBe(JsApiErrors.BadRequest.ErrCode); + } + + [Fact] + public void ProcessJetStreamMsg_WithMemoryStore_StoresMessage() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + + var error = stream.ProcessJetStreamMsg("events", string.Empty, null, Encoding.ASCII.GetBytes("m1"), 0, 0, null, false, true); + var stored = stream.Store!.LoadMsg(1, new StoreMsg()); + + error.ShouldBeNull(); + stored.ShouldNotBeNull(); + stored!.Subject.ShouldBe("events"); + } + + [Fact] + public void ProcessJetStreamBatchMsg_MissingSequence_ReturnsApiError() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + + var error = stream.ProcessJetStreamBatchMsg("batch-1", "events", string.Empty, null, [1], null); + + error.ShouldNotBeNull(); + error.ShouldBeOfType(); + } + + private static NatsStream CreateStream() + { + var account = new Account { Name = "A" }; + var config = new StreamConfig + { + Name = "S", + Storage = StorageType.MemoryStorage, + Subjects = ["events.>"], + }; + return new NatsStream(account, config, DateTime.UtcNow); + } +} diff --git a/porting.db b/porting.db index 3d4b277..24ba797 100644 Binary files a/porting.db and b/porting.db differ