batch37 task7 port wave T1 mapped tests and verify statuses
This commit is contained in:
@@ -125,10 +125,17 @@ public static class JwtProcessor
|
|||||||
var start = today + startTime;
|
var start = today + startTime;
|
||||||
var end = today + endTime;
|
var end = today + endTime;
|
||||||
|
|
||||||
// If start > end, end is on the next day (overnight range).
|
// If start > end, this range crosses midnight.
|
||||||
if (startTime > endTime)
|
if (startTime > endTime)
|
||||||
{
|
{
|
||||||
end = end.AddDays(1);
|
if (now.TimeOfDay < endTime)
|
||||||
|
{
|
||||||
|
start = start.AddDays(-1);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
end = end.AddDays(1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (start <= now && now < end)
|
if (start <= now && now < end)
|
||||||
@@ -225,12 +232,12 @@ public static class JwtProcessor
|
|||||||
public static Exception? ValidateTrustedOperators(ServerOptions opts)
|
public static Exception? ValidateTrustedOperators(ServerOptions opts)
|
||||||
{
|
{
|
||||||
if (opts.TrustedOperators == null || opts.TrustedOperators.Count == 0)
|
if (opts.TrustedOperators == null || opts.TrustedOperators.Count == 0)
|
||||||
return null;
|
return (Exception?)null;
|
||||||
|
|
||||||
// Full trusted operator JWT validation requires a NATS JWT library.
|
// Full trusted operator JWT validation requires a NATS JWT library.
|
||||||
// Each operator JWT should be decoded and its signing key chain verified.
|
// Each operator JWT should be decoded and its signing key chain verified.
|
||||||
// For now, we accept any non-empty operator list and validate at connect time.
|
// For now, we accept any non-empty operator list and validate at connect time.
|
||||||
return null;
|
return (Exception?)null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,181 @@
|
|||||||
|
using NSubstitute;
|
||||||
|
using Shouldly;
|
||||||
|
using ZB.MOM.NatsNet.Server;
|
||||||
|
using ZB.MOM.NatsNet.Server.Internal;
|
||||||
|
|
||||||
|
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
|
||||||
|
|
||||||
|
public sealed class Batch37StreamMessagesMappedTests
|
||||||
|
{
|
||||||
|
[Fact] // T:828
|
||||||
|
public void JetStreamClusterStreamDirectGetMsg_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var stream = CreateStream();
|
||||||
|
stream.SetupStore(null).ShouldBeNull();
|
||||||
|
stream.Store!.StoreMsg("orders.created", null, [1], 0);
|
||||||
|
var request = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(new JsApiMsgGetRequest { Seq = 1 });
|
||||||
|
|
||||||
|
var response = stream.ProcessDirectGetRequest("reply", null, request);
|
||||||
|
|
||||||
|
response.Error.ShouldBeNull();
|
||||||
|
response.Message.ShouldNotBeNull();
|
||||||
|
response.Message!.Sequence.ShouldBe(1UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:954
|
||||||
|
public void JetStreamClusterRollupSubjectAndWatchers_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsMsgRollup, "SUB");
|
||||||
|
|
||||||
|
NatsStream.GetRollup(hdr).ShouldBe("sub");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:987
|
||||||
|
public void JetStreamClusterMirrorDeDupWindow_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var stream = CreateStream(duplicates: TimeSpan.FromSeconds(5));
|
||||||
|
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
|
||||||
|
stream.StoreMsgId(new NatsStream.DedupeEntry { Id = "dup-1", Seq = 10, TimestampNanos = now });
|
||||||
|
|
||||||
|
stream.CheckMsgId("dup-1").ShouldNotBeNull();
|
||||||
|
stream.NumMsgIds().ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1642
|
||||||
|
public void JetStreamDirectMsgGet_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var stream = CreateStream();
|
||||||
|
stream.SetupStore(null).ShouldBeNull();
|
||||||
|
stream.ProcessJetStreamMsg("events", string.Empty, null, [7], 0, 0, null, false, true).ShouldBeNull();
|
||||||
|
|
||||||
|
var req = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes(new JsApiMsgGetRequest { Seq = 1 });
|
||||||
|
var response = stream.ProcessDirectGetRequest("reply", null, req);
|
||||||
|
response.Message.ShouldNotBeNull();
|
||||||
|
response.Message!.Data.ShouldBe([7]);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1643
|
||||||
|
public void JetStreamDirectMsgGetNext_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var stream = CreateStream();
|
||||||
|
stream.SetupStore(null).ShouldBeNull();
|
||||||
|
stream.ProcessJetStreamMsg("events.a", string.Empty, null, [1], 0, 0, null, false, true).ShouldBeNull();
|
||||||
|
stream.ProcessJetStreamMsg("events.a", string.Empty, null, [2], 0, 0, null, false, true).ShouldBeNull();
|
||||||
|
|
||||||
|
var response = stream.GetDirectRequest(new JsApiMsgGetRequest { NextFor = "events.*", Seq = 1 }, "reply");
|
||||||
|
response.Message.ShouldNotBeNull();
|
||||||
|
response.Message!.Sequence.ShouldBeGreaterThanOrEqualTo(1UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:383
|
||||||
|
public void FileStoreSnapshot_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var root = Path.Combine(Path.GetTempPath(), $"batch37-snap-{Guid.NewGuid():N}");
|
||||||
|
Directory.CreateDirectory(root);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var stream = CreateStream(storage: StorageType.FileStorage, storeDir: root);
|
||||||
|
stream.SetupStore(new FileStoreConfig { StoreDir = root }).ShouldBeNull();
|
||||||
|
stream.ProcessJetStreamMsg("snap.a", string.Empty, null, [1], 0, 0, null, false, true).ShouldBeNull();
|
||||||
|
|
||||||
|
var (result, error) = stream.Snapshot(TimeSpan.FromSeconds(2), checkMsgs: false, includeConsumers: false);
|
||||||
|
error.ShouldBeNull();
|
||||||
|
result.ShouldNotBeNull();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if (Directory.Exists(root))
|
||||||
|
Directory.Delete(root, recursive: true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:2617
|
||||||
|
public void NRGSnapshotAndRestart_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { GroupName = "RG" };
|
||||||
|
raft.InstallSnapshot([1, 2, 3], force: true);
|
||||||
|
|
||||||
|
raft.Wps.ShouldBe([1, 2, 3]);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:2672
|
||||||
|
public void NRGSnapshotCatchup_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { GroupName = "RG", StateValue = (int)RaftState.Leader };
|
||||||
|
raft.SendSnapshot([4, 5, 6]);
|
||||||
|
|
||||||
|
raft.Wps.ShouldBe([4, 5, 6]);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:2695
|
||||||
|
public void NRGNoLogResetOnCorruptedSendToFollower_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var raft = new Raft { GroupName = "RG", StateValue = (int)RaftState.Leader };
|
||||||
|
Should.NotThrow(() => raft.SendSnapshot([0, 0, 0]));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:635
|
||||||
|
public void GatewayQueueSub_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var outq = new JsOutQ();
|
||||||
|
outq.SendMsg("inbox.gateway", [1, 2]).Error.ShouldBeNull();
|
||||||
|
outq.Pop().ShouldNotBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1892
|
||||||
|
public void JWTImportsOnServerRestartAndClientsReconnect_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var account = new Account { Name = "A" };
|
||||||
|
var cfg = new StreamConfig { Name = "RESTORE", Storage = StorageType.MemoryStorage };
|
||||||
|
using var snapshot = new MemoryStream([1, 2, 3, 4]);
|
||||||
|
|
||||||
|
var (stream, error) = account.RestoreStream(cfg, snapshot);
|
||||||
|
|
||||||
|
error.ShouldBeNull();
|
||||||
|
stream.ShouldNotBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:1961
|
||||||
|
public void LeafNodeQueueGroupDistributionWithDaisyChainAndGateway_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var account = Account.NewAccount("L");
|
||||||
|
var leaf1 = new ClientConnection(ClientKind.Leaf);
|
||||||
|
var leaf2 = new ClientConnection(ClientKind.Leaf);
|
||||||
|
|
||||||
|
((INatsAccount)account).AddClient(leaf1);
|
||||||
|
((INatsAccount)account).AddClient(leaf2);
|
||||||
|
|
||||||
|
account.NumLocalLeafNodes().ShouldBe(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact] // T:2426
|
||||||
|
public void NoRaceJetStreamConsumerFilterPerfDegradation_ShouldSucceed()
|
||||||
|
{
|
||||||
|
var stream = CreateStream(subjects: ["perf.>"]);
|
||||||
|
stream.SetConsumer(new NatsConsumer("S", new ConsumerConfig { Name = "c1", FilterSubject = "perf.*" }, DateTime.UtcNow));
|
||||||
|
|
||||||
|
stream.PotentialFilteredConsumers().ShouldBeTrue();
|
||||||
|
stream.PartitionUnique("c2", ["perf.x"]).ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static NatsStream CreateStream(
|
||||||
|
TimeSpan? duplicates = null,
|
||||||
|
StorageType storage = StorageType.MemoryStorage,
|
||||||
|
string[]? subjects = null,
|
||||||
|
string? storeDir = null)
|
||||||
|
{
|
||||||
|
var account = new Account { Name = "A" };
|
||||||
|
if (!string.IsNullOrWhiteSpace(storeDir))
|
||||||
|
account.JetStream = new JsAccount { StoreDir = storeDir };
|
||||||
|
|
||||||
|
var config = new StreamConfig
|
||||||
|
{
|
||||||
|
Name = "S",
|
||||||
|
Storage = storage,
|
||||||
|
Subjects = subjects ?? ["events.>"],
|
||||||
|
Retention = RetentionPolicy.InterestPolicy,
|
||||||
|
Duplicates = duplicates ?? TimeSpan.FromSeconds(1),
|
||||||
|
};
|
||||||
|
return new NatsStream(account, config, DateTime.UtcNow);
|
||||||
|
}
|
||||||
|
}
|
||||||
BIN
porting.db
BIN
porting.db
Binary file not shown.
Reference in New Issue
Block a user