From 50a11f51822b0ba696cff6293d16d84aaddb1da6 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 11:58:16 -0500 Subject: [PATCH] test(batch8): resolve jetstream direct get up-to-time mapping --- .../JetStream/JetStreamEngineTests.cs | 58 ++++++++++++++++++ porting.db | Bin 6524928 -> 6524928 bytes 2 files changed, 58 insertions(+) diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs index 79a2e77..b466bc0 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs @@ -1,6 +1,7 @@ // Copyright 2025 The NATS Authors // Licensed under the Apache License, Version 2.0 +using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -108,4 +109,61 @@ public sealed class JetStreamEngineTests err.ErrCode.ShouldBe(JsApiErrors.StreamInvalidConfig.ErrCode); err.Description.ShouldBe("stream configuration for republish destination forms a cycle"); } + + [Fact] // T:1751 + public void JetStreamDirectGetUpToTime_ShouldSucceed() + { + const long unixEpochTicks = 621355968000000000L; + var baseTicks = DateTime.UnixEpoch.Ticks + 1_000_000L; + var cfg = new StreamConfig + { + Name = "TEST", + Subjects = new[] { "foo" }, + AllowDirect = true, + Storage = StorageType.MemoryStorage, + }; + + var ms = JetStreamMemStore.NewMemStore(cfg); + var timestamps = new List(10); + + for (var i = 0; i < 10; i++) + { + var ticks = baseTicks + i; + var ts = (ticks - unixEpochTicks) * 100L; + ms.StoreRawMsg("foo", null, Encoding.UTF8.GetBytes($"message {i + 1}"), (ulong)(i + 1), ts, 0, true); + timestamps.Add(new DateTime(ticks, DateTimeKind.Utc)); + } + + static string[] CheckResponses(IStreamStore store, DateTime upToTime) + { + var state = store.State(); + var upToSeq = store.GetSeqFromTime(upToTime); + if (upToSeq <= state.FirstSeq) + return Array.Empty(); + + upToSeq--; + if (upToSeq == 0) + upToSeq = state.LastSeq; + + var (seqs, err) = store.MultiLastSeqs(new[] { "foo" }, upToSeq, 1024); + err.ShouldBeNull(); + if (seqs is null || seqs.Length == 0) + return Array.Empty(); + + var messages = new List(seqs.Length); + foreach (var seq in seqs) + { + var sm = store.LoadMsg(seq, null); + sm.ShouldNotBeNull(); + messages.Add(Encoding.UTF8.GetString(sm!.Msg)); + } + + return messages.ToArray(); + } + + CheckResponses(ms, DateTime.UnixEpoch).ShouldBe(Array.Empty()); + CheckResponses(ms, new DateTime(2100, 1, 1, 0, 0, 0, DateTimeKind.Utc)).ShouldBe(new[] { "message 10" }); + CheckResponses(ms, timestamps[0]).ShouldBe(Array.Empty()); + CheckResponses(ms, timestamps[4]).ShouldBe(new[] { "message 4" }); + } } diff --git a/porting.db b/porting.db index 4484a1ed67b3903d7eece0d6e537a968e6068c91..81877262059e6eb436f25e5136907b6eb51b0957 100644 GIT binary patch delta 690 zcmWm8yH67V0LO8zl+p)}J1Rn{&?{O+w6t7#)G8|YRs==G7YOAz%7s3NJ%VE!6B-jk zj1IRr(Ep%L7ox$%#La;iiJOxHF1i^9^~>=2Ex+IJD{QP+7f{)V2%)FtNU0nVU_&+R zsDT5uaKZ&QJg7rG8qjE!BcfUmo|`ol@vHf*BL23%yb<5k3UAFEJv z37ccb`sRqIEX_C;V_P% z3rEq7V>pg5dJsVry-*NCANnzXLBz55ErF9bg&~~A8Jxv1MsN=2aRC=`372sNS8)xa zxQ;Q5;|6YG0=IA*cQA>&xQF|AfQOjEG#+6Fvq)mj>PV^ITU*^Oe^ujH!|VFp`gq-% z=d-8Nz3kd@`K{pW^gm}?G4L|vt_s@P$D50R*h*G6W(+NFTTRFr%~;IM%X(%xt!1>Vku>yNR-V)Lw#oT|6)jgt^S&h78bu% z#CGwae0nS zJkJY^Gr^0z#AaUR6(*VDRkrXN(`;oMud|&uc#|FMWEZ>Xv4_3v<1O}cfVVlwAr5nd zqrAg0j&p*Oyvq#l@jf4Liqm|^M|{i~&T@|Pe8Q((;3Aj!jL-Rk%Ut0q*SO9NX1U3i x+~PK0spO9TE8b)nl{1a=(Jxlb8ftiNem7g{-x{(vwSAy^wz6?URrdXi_8)J_x%dD8