test(batch8): resolve jetstream direct get up-to-time mapping
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
// Copyright 2025 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0
|
||||
|
||||
using System.Text;
|
||||
using Shouldly;
|
||||
using ZB.MOM.NatsNet.Server;
|
||||
|
||||
@@ -108,4 +109,61 @@ public sealed class JetStreamEngineTests
|
||||
err.ErrCode.ShouldBe(JsApiErrors.StreamInvalidConfig.ErrCode);
|
||||
err.Description.ShouldBe("stream configuration for republish destination forms a cycle");
|
||||
}
|
||||
|
||||
[Fact] // T:1751
|
||||
public void JetStreamDirectGetUpToTime_ShouldSucceed()
|
||||
{
|
||||
const long unixEpochTicks = 621355968000000000L;
|
||||
var baseTicks = DateTime.UnixEpoch.Ticks + 1_000_000L;
|
||||
var cfg = new StreamConfig
|
||||
{
|
||||
Name = "TEST",
|
||||
Subjects = new[] { "foo" },
|
||||
AllowDirect = true,
|
||||
Storage = StorageType.MemoryStorage,
|
||||
};
|
||||
|
||||
var ms = JetStreamMemStore.NewMemStore(cfg);
|
||||
var timestamps = new List<DateTime>(10);
|
||||
|
||||
for (var i = 0; i < 10; i++)
|
||||
{
|
||||
var ticks = baseTicks + i;
|
||||
var ts = (ticks - unixEpochTicks) * 100L;
|
||||
ms.StoreRawMsg("foo", null, Encoding.UTF8.GetBytes($"message {i + 1}"), (ulong)(i + 1), ts, 0, true);
|
||||
timestamps.Add(new DateTime(ticks, DateTimeKind.Utc));
|
||||
}
|
||||
|
||||
static string[] CheckResponses(IStreamStore store, DateTime upToTime)
|
||||
{
|
||||
var state = store.State();
|
||||
var upToSeq = store.GetSeqFromTime(upToTime);
|
||||
if (upToSeq <= state.FirstSeq)
|
||||
return Array.Empty<string>();
|
||||
|
||||
upToSeq--;
|
||||
if (upToSeq == 0)
|
||||
upToSeq = state.LastSeq;
|
||||
|
||||
var (seqs, err) = store.MultiLastSeqs(new[] { "foo" }, upToSeq, 1024);
|
||||
err.ShouldBeNull();
|
||||
if (seqs is null || seqs.Length == 0)
|
||||
return Array.Empty<string>();
|
||||
|
||||
var messages = new List<string>(seqs.Length);
|
||||
foreach (var seq in seqs)
|
||||
{
|
||||
var sm = store.LoadMsg(seq, null);
|
||||
sm.ShouldNotBeNull();
|
||||
messages.Add(Encoding.UTF8.GetString(sm!.Msg));
|
||||
}
|
||||
|
||||
return messages.ToArray();
|
||||
}
|
||||
|
||||
CheckResponses(ms, DateTime.UnixEpoch).ShouldBe(Array.Empty<string>());
|
||||
CheckResponses(ms, new DateTime(2100, 1, 1, 0, 0, 0, DateTimeKind.Utc)).ShouldBe(new[] { "message 10" });
|
||||
CheckResponses(ms, timestamps[0]).ShouldBe(Array.Empty<string>());
|
||||
CheckResponses(ms, timestamps[4]).ShouldBe(new[] { "message 4" });
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user