diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs index 79a2e77..b466bc0 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.cs @@ -1,6 +1,7 @@ // Copyright 2025 The NATS Authors // Licensed under the Apache License, Version 2.0 +using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server; @@ -108,4 +109,61 @@ public sealed class JetStreamEngineTests err.ErrCode.ShouldBe(JsApiErrors.StreamInvalidConfig.ErrCode); err.Description.ShouldBe("stream configuration for republish destination forms a cycle"); } + + [Fact] // T:1751 + public void JetStreamDirectGetUpToTime_ShouldSucceed() + { + const long unixEpochTicks = 621355968000000000L; + var baseTicks = DateTime.UnixEpoch.Ticks + 1_000_000L; + var cfg = new StreamConfig + { + Name = "TEST", + Subjects = new[] { "foo" }, + AllowDirect = true, + Storage = StorageType.MemoryStorage, + }; + + var ms = JetStreamMemStore.NewMemStore(cfg); + var timestamps = new List(10); + + for (var i = 0; i < 10; i++) + { + var ticks = baseTicks + i; + var ts = (ticks - unixEpochTicks) * 100L; + ms.StoreRawMsg("foo", null, Encoding.UTF8.GetBytes($"message {i + 1}"), (ulong)(i + 1), ts, 0, true); + timestamps.Add(new DateTime(ticks, DateTimeKind.Utc)); + } + + static string[] CheckResponses(IStreamStore store, DateTime upToTime) + { + var state = store.State(); + var upToSeq = store.GetSeqFromTime(upToTime); + if (upToSeq <= state.FirstSeq) + return Array.Empty(); + + upToSeq--; + if (upToSeq == 0) + upToSeq = state.LastSeq; + + var (seqs, err) = store.MultiLastSeqs(new[] { "foo" }, upToSeq, 1024); + err.ShouldBeNull(); + if (seqs is null || seqs.Length == 0) + return Array.Empty(); + + var messages = new List(seqs.Length); + foreach (var seq in seqs) + { + var sm = store.LoadMsg(seq, null); + sm.ShouldNotBeNull(); + messages.Add(Encoding.UTF8.GetString(sm!.Msg)); + } + + return messages.ToArray(); + } + + CheckResponses(ms, DateTime.UnixEpoch).ShouldBe(Array.Empty()); + CheckResponses(ms, new DateTime(2100, 1, 1, 0, 0, 0, DateTimeKind.Utc)).ShouldBe(new[] { "message 10" }); + CheckResponses(ms, timestamps[0]).ShouldBe(Array.Empty()); + CheckResponses(ms, timestamps[4]).ShouldBe(new[] { "message 4" }); + } } diff --git a/porting.db b/porting.db index 4484a1e..8187726 100644 Binary files a/porting.db and b/porting.db differ