batch35: add and verify test wave T1

This commit is contained in:
Joseph Doherty
2026-03-01 02:37:07 -05:00
parent 8baa604dce
commit 9e4405c2a1
4 changed files with 146 additions and 0 deletions

View File

@@ -916,6 +916,7 @@ internal sealed class JetStreamCluster
var op = EntryOp.StreamMsgOp;
if (!string.IsNullOrEmpty(batchId))
{
body.Add(0); // reserve slot for batch op
var batchIdLength = Math.Min(batchId.Length, ushort.MaxValue);
body.AddRange(BitConverter.GetBytes((ushort)batchIdLength));
body.AddRange(Encoding.ASCII.GetBytes(batchId[..batchIdLength]));

View File

@@ -6,6 +6,31 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog;
public sealed class JetStreamBatchingTests
{
[Fact] // T:730
public void JetStreamAtomicBatchPublishEncode_ShouldSucceed()
{
var encoded = JetStreamCluster.EncodeStreamMsgAllowCompressAndBatch(
"ORDERS.created",
"_R_",
[1, 2, 3],
[10, 11, 12, 13],
sequence: 42,
timestamp: 123_456,
sourced: true,
batchId: "b1",
batchSequence: 2,
batchCommit: false);
encoded.Length.ShouldBeGreaterThan(0);
var (batchId, batchSequence, op, payload, error) = JetStreamCluster.DecodeBatchMsg(encoded.AsSpan(1));
error.ShouldBeNull();
batchId.ShouldBe("b1");
batchSequence.ShouldBe(2UL);
op.ShouldBe(EntryOp.StreamMsgOp);
payload.ShouldNotBeNull();
}
[Fact] // T:743
public void JetStreamAtomicBatchPublishExpectedLastSubjectSequence_ShouldSucceed()
{

View File

@@ -1,3 +1,5 @@
using System.Text.Json;
using System.Linq;
using NSubstitute;
using Shouldly;
using ZB.MOM.NatsNet.Server;
@@ -261,4 +263,122 @@ public sealed class JetStreamClusterTests1
updates.UpdateConsumers.ShouldContainKey("A:S");
updates.UpdateConsumers["A:S"].ShouldContainKey("S:C");
}
[Fact] // T:846
public void JetStreamClusterMetaRecoveryUpdatesDeletesConsumers_ShouldSucceed()
{
var ci = new ClusterInfo
{
Replicas =
[
new PeerInfo { Name = "srvA", Current = true, Lag = 0 },
],
};
var stream = NatsStream.Create(new Account { Name = "A" }, new StreamConfig { Name = "S" }, null, null, null, null);
stream.ShouldNotBeNull();
stream!.SetCatchupPeer(NatsServer.GetHash("srvA"), 5);
stream.CheckClusterInfo(ci);
ci.Replicas![0].Current.ShouldBeFalse();
ci.Replicas[0].Lag.ShouldBe(5UL);
}
[Fact] // T:847
public void JetStreamClusterMetaRecoveryRecreateFileStreamAsMemory_ShouldSucceed()
{
var cluster = new JetStreamCluster();
var assignment = new StreamAssignment
{
Config = new StreamConfig { Name = "ORDERS", Storage = StorageType.FileStorage, Replicas = 3 },
Group = new RaftGroup { Peers = ["A", "B", "C"], Storage = StorageType.FileStorage },
};
var group = cluster.CreateGroupForConsumer(new ConsumerConfig { MemoryStorage = true, Replicas = 1 }, assignment);
group.ShouldNotBeNull();
group!.Storage.ShouldBe(StorageType.MemoryStorage);
group.Peers.Length.ShouldBe(1);
}
[Fact] // T:848
public void JetStreamClusterMetaRecoveryConsumerCreateAndRemove_ShouldSucceed()
{
var assignment = new ConsumerAssignment
{
Name = "C1",
Stream = "ORDERS",
Config = new ConsumerConfig { Durable = "C1", AckPolicy = AckPolicy.AckExplicit },
};
var encoded = JetStreamCluster.EncodeAddConsumerAssignment(assignment);
var (decoded, error) = JetStreamCluster.DecodeConsumerAssignment(encoded.AsSpan(1));
error.ShouldBeNull();
decoded.ShouldNotBeNull();
decoded!.Name.ShouldBe("C1");
decoded.Stream.ShouldBe("ORDERS");
decoded.Config.ShouldNotBeNull();
}
[Fact] // T:890
public void JetStreamClusterOfflineStreamAndConsumerAfterAssetCreateOrUpdate_ShouldSucceed()
{
var (server, error) = NatsServer.NewServer(new ServerOptions());
error.ShouldBeNull();
server.ShouldNotBeNull();
var engine = new JetStreamEngine(new ZB.MOM.NatsNet.Server.JetStream { Server = server });
var info = engine.OfflineClusterInfo(new RaftGroup { Name = "RG", Peers = ["N1", "N2"] });
info.ShouldNotBeNull();
info!.Replicas.ShouldNotBeNull();
info.Replicas!.Length.ShouldBe(2);
info.Replicas.All(r => r.Offline).ShouldBeTrue();
}
[Fact] // T:891
public void JetStreamClusterOfflineStreamAndConsumerAfterDowngrade_ShouldSucceed()
{
var raft = Substitute.For<IRaftNode>();
raft.ID().Returns("N1");
raft.GroupLeader().Returns("N1");
raft.Peers().Returns(
[
new Peer { Id = "N1", Current = true, Last = DateTime.UtcNow, Lag = 0 },
new Peer { Id = "N2", Current = true, Last = DateTime.UtcNow, Lag = 3 },
]);
var (server, error) = NatsServer.NewServer(new ServerOptions());
error.ShouldBeNull();
server.ShouldNotBeNull();
var engine = new JetStreamEngine(new ZB.MOM.NatsNet.Server.JetStream { Server = server });
var info = engine.ClusterInfo(new RaftGroup { Name = "RG", Peers = ["N1", "N2"], Node = raft });
info.ShouldNotBeNull();
info!.Replicas.ShouldNotBeNull();
info.Replicas!.Length.ShouldBe(1);
info.Replicas[0].Lag.ShouldBe(3UL);
}
[Fact] // T:893
public void JetStreamClusterOfflineStreamAndConsumerStrictDecoding_ShouldSucceed()
{
var json = JsonSerializer.SerializeToUtf8Bytes(new
{
name = "C2",
stream = "ORDERS",
consumer = new
{
durable_name = "C2",
unknown_field = "x",
},
});
var (decoded, error) = JetStreamCluster.DecodeConsumerAssignment(json);
error.ShouldBeNull();
decoded.ShouldNotBeNull();
decoded!.Unsupported.ShouldNotBeNull();
}
}