Merge branch 'worktree-agent-aea55702'
# Conflicts: # dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/CheckHelper.cs # dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/ConfigHelper.cs # dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/IntegrationTestBase.cs # dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/NatsTestClient.cs # dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestCluster.cs # dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestServerHelper.cs # dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestSuperCluster.cs
This commit is contained in:
@@ -0,0 +1,593 @@
|
||||
// Copyright 2020-2025 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Mirrors server/jetstream_super_cluster_test.go (first 36 tests) and
|
||||
// server/jetstream_leafnode_test.go (first 3 tests) from the NATS server Go source.
|
||||
// All tests require a running JetStream super-cluster or leaf-node topology and
|
||||
// are deferred until the full server runtime is available.
|
||||
|
||||
using ZB.MOM.NatsNet.Server.IntegrationTests.Helpers;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server.IntegrationTests.JetStream;
|
||||
|
||||
/// <summary>
|
||||
/// Integration tests for JetStream super-cluster scenarios and leaf-node
|
||||
/// JetStream cross-domain interactions.
|
||||
/// Mirrors server/jetstream_super_cluster_test.go and
|
||||
/// server/jetstream_leafnode_test.go.
|
||||
/// All tests are deferred pending multi-server cluster infrastructure.
|
||||
/// </summary>
|
||||
[Collection("SuperClusterIntegration")]
|
||||
[Trait("Category", "Integration")]
|
||||
public sealed class JetStreamSuperClusterTests : IntegrationTestBase
|
||||
{
|
||||
// -------------------------------------------------------------------------
|
||||
// From server/jetstream_super_cluster_test.go
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that the JetStream meta-leader can step down with placement
|
||||
/// constraints (cluster, tags, preferred server) and that invalid placements
|
||||
/// return the expected error codes.
|
||||
/// Mirrors TestJetStreamSuperClusterMetaStepDown.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void MetaStepDown_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamTaggedSuperCluster — 3 clusters (C1/C2/C3),
|
||||
// each with 3 servers, tagged cloud:aws/gcp/az and node:1/2/3.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
sc.WaitOnLeader();
|
||||
|
||||
// Verify step-down to unknown cluster returns JSClusterNoPeersErrF (400).
|
||||
// Verify step-down to unknown preferred server returns 400.
|
||||
// Verify step-down with unknown tag returns 400.
|
||||
// Verify step-down when preferred server is already leader returns 400.
|
||||
// Verify successful placement by preferred server name.
|
||||
// Verify successful placement by cluster name.
|
||||
// Verify successful placement by single tag.
|
||||
// Verify successful placement by multiple tags (must match all).
|
||||
// Verify successful placement by cluster name and tag combination.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a stream leader can step down with placement constraints
|
||||
/// and that non-participant clusters and other invalid placements return errors.
|
||||
/// Mirrors TestJetStreamSuperClusterStreamStepDown.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void StreamStepDown_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamTaggedSuperCluster; stream "foo" placed in C1
|
||||
// with cloud:aws tag, R3. Step-down tested for:
|
||||
// UnknownCluster, UnknownPreferredServer, UnknownTag, NonParticipantCluster,
|
||||
// PreferredServerAlreadyLeader, PlacementByPreferredServer,
|
||||
// PlacementByCluster, PlacementByTag, PlacementByMultipleTags, PlacementByClusterAndTag.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
sc.WaitOnLeader();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a consumer leader can step down with placement constraints
|
||||
/// and that invalid placements return the correct errors.
|
||||
/// Mirrors TestJetStreamSuperClusterConsumerStepDown.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void ConsumerStepDown_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamTaggedSuperCluster; stream "foo" + consumer
|
||||
// "consumer" in C1 with cloud:aws tag. Consumer step-down tested for
|
||||
// the same set of sub-tests as StreamStepDown.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
sc.WaitOnLeader();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that unique_tag placement (az tag) ensures replicas land on
|
||||
/// servers in different availability zones, and fails when no suitable
|
||||
/// diverse peers exist.
|
||||
/// Mirrors TestJetStreamSuperClusterUniquePlacementTag.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void UniquePlacementTag_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperClusterWithTemplateAndModHook — 5 servers
|
||||
// per cluster, 2 clusters. C1 servers all tagged az:same; C2 servers
|
||||
// alternating az:1 / az:2. Tests R1 and R2 placement with/without az tags.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(5, 2);
|
||||
sc.WaitOnLeader();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a super-cluster of 3×3 allows creating and publishing to
|
||||
/// replicated streams and that streams can be explicitly placed in a named cluster.
|
||||
/// Mirrors TestJetStreamSuperClusterBasics.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void Basics_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 3)
|
||||
// Creates stream "TEST" R3, publishes 10 messages, verifies state.
|
||||
// Creates stream "TEST2" placed explicitly in "C3" and verifies cluster name.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
sc.WaitOnLeader();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that push and pull consumers created in one cluster correctly
|
||||
/// receive messages whose stream is homed in another cluster (cross-cluster
|
||||
/// consumer interest via gateways).
|
||||
/// Mirrors TestJetStreamSuperClusterCrossClusterConsumerInterest.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void CrossClusterConsumerInterest_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 3)
|
||||
// Stream "foo" placed in C2, consumer connected from C1.
|
||||
// Pull and push delivery tested across gateway.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a stream's peers can be reassigned across the super-cluster
|
||||
/// when the current peer set is insufficient.
|
||||
/// Mirrors TestJetStreamSuperClusterPeerReassign.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void PeerReassign_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 3)
|
||||
// Stream "TEST" placed in C2, R3. Checks peer reassignment after
|
||||
// removing/replacing a server.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies interest-only gateway mode is triggered and cleared correctly
|
||||
/// when a super-cluster account has/lacks active subscribers.
|
||||
/// Mirrors TestJetStreamSuperClusterInterestOnlyMode.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void InterestOnlyMode_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperClusterWithTemplate with account template.
|
||||
// Checks gateway transitions into/out of interest-only mode.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that internal JetStream connection counts are not reported as
|
||||
/// active client connections in account connection queries.
|
||||
/// Mirrors TestJetStreamSuperClusterConnectionCount.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void ConnectionCount_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperClusterWithTemplate with accounts template.
|
||||
// Creates source streams and a mirror, verifies account NumConnections == 0.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a sourcing stream in C2 continues to replicate messages from
|
||||
/// a stream in C1 even when the gateway connection is broken mid-publish and
|
||||
/// subsequently reconnects.
|
||||
/// Mirrors TestJetStreamSuperClusterConsumersBrokenGateways.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void ConsumersBrokenGateways_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 1, 2)
|
||||
// Stream "TEST" in C1, sourced by stream "S" in C2. Publishes 100 msgs,
|
||||
// breaks GW connection at ~50, waits for reconnect and verifies all 200 msgs present.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(1, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a leaf-node cluster sharing the system account in the same
|
||||
/// JetStream domain does not become a meta-leader and can publish/consume messages.
|
||||
/// Mirrors TestJetStreamSuperClusterLeafNodesWithSharedSystemAccountAndSameDomain.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster with leaf nodes")]
|
||||
public void LeafNodesWithSharedSystemAccountAndSameDomain_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 2) + sc.createLeafNodes("LNC", 2).
|
||||
// Verifies meta-leader is always in supercluster, not the leaf cluster.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a leaf-node cluster with a different JetStream domain from
|
||||
/// the super-cluster behaves correctly regarding meta-leadership.
|
||||
/// Mirrors TestJetStreamSuperClusterLeafNodesWithSharedSystemAccountAndDifferentDomain.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster with leaf nodes")]
|
||||
public void LeafNodesWithSharedSystemAccountAndDifferentDomain_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 2) + sc.createLeafNodes("LNC", 2).
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies a single leaf node using the shared system account works correctly
|
||||
/// with the super-cluster's JetStream.
|
||||
/// Mirrors TestJetStreamSuperClusterSingleLeafNodeWithSharedSystemAccount.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster with leaf nodes")]
|
||||
public void SingleLeafNodeWithSharedSystemAccount_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 2) + single leaf node.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that pull-consumer GetNext requests are correctly rewritten when
|
||||
/// proxied through a gateway to a leaf-node cluster.
|
||||
/// Mirrors TestJetStreamSuperClusterGetNextRewrite.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster with leaf nodes")]
|
||||
public void GetNextRewrite_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperClusterWithTemplate accounts template, 2×2.
|
||||
// Leaf node attached to C1, client connects to C2; pull consumer GetNext
|
||||
// subject must be rewritten to correct account subject.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(2, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that ephemeral source consumers are cleaned up once the sourcing
|
||||
/// stream is deleted, both for same-cluster and cross-cluster sources.
|
||||
/// Mirrors TestJetStreamSuperClusterEphemeralCleanup.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void EphemeralCleanup_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 2).
|
||||
// Tests "local" (same cluster) and "remote" (cross cluster) source streams.
|
||||
// After deleting the sourcing stream the direct consumer count on the origin
|
||||
// stream must drop to 0.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reproduces a race condition where GetNext requests could be lost when
|
||||
/// a gateway connection had no inbound side for a cluster.
|
||||
/// Mirrors TestJetStreamSuperClusterGetNextSubRace.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster with leaf nodes")]
|
||||
public void GetNextSubRace_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperClusterWithTemplate accounts template, 2×2.
|
||||
// Leaf attached to C1; one C1 server shut down; 100 messages published and
|
||||
// fetched to ensure no race on the GetNext subject delivery.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(2, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that pull consumers work correctly across gateways and that
|
||||
/// message headers survive gateway hops.
|
||||
/// Mirrors TestJetStreamSuperClusterPullConsumerAndHeaders.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void PullConsumerAndHeaders_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 2).
|
||||
// Publishes messages with headers from C1, pull-consumes from C2.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that the statsz API reports the correct number of active servers
|
||||
/// as servers are shut down and restarted.
|
||||
/// Mirrors TestJetStreamSuperClusterStatszActiveServers.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void StatszActiveServers_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 2, 2).
|
||||
// Checks Stats.ActiveServers == 4 initially, == 3 after one shutdown,
|
||||
// == 4 after restart.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(2, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that only one direct consumer per origin stream is maintained
|
||||
/// after multiple leader changes on a sourcing/mirroring stream.
|
||||
/// Mirrors TestJetStreamSuperClusterSourceAndMirrorConsumersLeaderChange.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void SourceAndMirrorConsumersLeaderChange_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 2).
|
||||
// 10 origin streams in C1; sourcing stream "S" R2 in C2.
|
||||
// Two forced leader changes on "S". numDirectConsumers on a random origin
|
||||
// stream must equal 1.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that push consumers created in one cluster receive messages after
|
||||
/// the connection is re-established in a different cluster.
|
||||
/// Mirrors TestJetStreamSuperClusterPushConsumerInterest.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void PushConsumerInterest_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 2).
|
||||
// Tests non-queue and queue push consumers crossing the cluster boundary.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that overflow placement correctly spills streams to another cluster
|
||||
/// when the requested cluster is full.
|
||||
/// Mirrors TestJetStreamSuperClusterOverflowPlacement.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void OverflowPlacement_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperClusterWithTemplate max-bytes template, 3×3.
|
||||
// MaxBytes required opt-in; stream "foo" R2 in C2. Subsequent R3 placement
|
||||
// must overflow to other clusters.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that concurrent stream placements across the super-cluster do not
|
||||
/// conflict when both overflow from the same cluster simultaneously.
|
||||
/// Mirrors TestJetStreamSuperClusterConcurrentOverflow.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void ConcurrentOverflow_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperClusterWithTemplate max-bytes template, 3×3.
|
||||
// Two goroutines concurrently place R3 streams; both must succeed.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that stream placement using server tags correctly routes streams
|
||||
/// to clusters matching the requested tags (case-insensitive).
|
||||
/// Mirrors TestJetStreamSuperClusterStreamTagPlacement.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void StreamTagPlacement_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamTaggedSuperCluster (3 clusters with cloud and country tags).
|
||||
// cloud:aws → C1, country:jp → C3, cloud:gcp + country:uk → C2.
|
||||
// Case-insensitive matching verified.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that removed peers are reflected in stream and consumer listings
|
||||
/// and that streams can still be deleted after their peer set changes.
|
||||
/// Mirrors TestJetStreamSuperClusterRemovedPeersAndStreamsListAndDelete.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void RemovedPeersAndStreamsListAndDelete_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 3).
|
||||
// Removes a server; verifies STREAM.LIST and STREAM.DELETE still work.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reproduces a bug where push consumers with DeliverNew policy did not
|
||||
/// deliver messages when the first published sequence was not 1.
|
||||
/// Mirrors TestJetStreamSuperClusterConsumerDeliverNewBug.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void ConsumerDeliverNewBug_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 2).
|
||||
// Publishes a message, then creates a DeliverNew consumer; next publish
|
||||
// must be delivered.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that streams and their consumers can be moved between clusters
|
||||
/// and that data is not lost during the move.
|
||||
/// Mirrors TestJetStreamSuperClusterMovingStreamsAndConsumers.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void MovingStreamsAndConsumers_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 3).
|
||||
// Tests moving R1 and R3 streams across clusters with active consumers.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a mirror stream can be moved to a different cluster while
|
||||
/// maintaining data integrity.
|
||||
/// Mirrors TestJetStreamSuperClusterMovingStreamsWithMirror.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void MovingStreamsWithMirror_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 3).
|
||||
// Stream "TEST" moved; mirror "M" must track correctly.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a stream can be moved to another cluster and then moved back
|
||||
/// to the original cluster.
|
||||
/// Mirrors TestJetStreamSuperClusterMovingStreamAndMoveBack.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void MovingStreamAndMoveBack_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 3).
|
||||
// Stream moved from C1→C2, then back to C1; message count must be preserved.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a consumer's ack subject is correctly remapped when the
|
||||
/// stream owner imports the consumer subject from another account.
|
||||
/// Mirrors TestJetStreamSuperClusterImportConsumerStreamSubjectRemap.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void ImportConsumerStreamSubjectRemap_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 2) with account imports.
|
||||
// Consumer ack subject must survive gateway hops and subject remapping.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that system-level HA asset limits (MaxHaAssets) are enforced
|
||||
/// across the super-cluster.
|
||||
/// Mirrors TestJetStreamSuperClusterMaxHaAssets.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void MaxHaAssets_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 3) with MaxHaAssets system limit.
|
||||
// Adding replicated streams beyond the limit must return an error.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that stream alternates are correctly listed and used for direct-get
|
||||
/// requests when a stream has mirrors in multiple clusters.
|
||||
/// Mirrors TestJetStreamSuperClusterStreamAlternates.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void StreamAlternates_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 3).
|
||||
// Stream "TEST" with a mirror in another cluster; Alternates list checked.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a server with stale state on restart does not prevent
|
||||
/// consumer recovery on streams that have been moved.
|
||||
/// Mirrors TestJetStreamSuperClusterStateOnRestartPreventsConsumerRecovery.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void StateOnRestartPreventsConsumerRecovery_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 2).
|
||||
// Moves a stream; restarts a server that had stale entries; consumer
|
||||
// must still be accessible after recovery.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a direct-get on a mirror stream correctly uses a queue group
|
||||
/// to distribute load across replicas.
|
||||
/// Mirrors TestJetStreamSuperClusterStreamDirectGetMirrorQueueGroup.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void StreamDirectGetMirrorQueueGroup_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 3).
|
||||
// Mirror stream direct-get must be served by any replica via queue group.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a tag-induced stream move can be cancelled and the stream
|
||||
/// remains operational on its original cluster.
|
||||
/// Mirrors TestJetStreamSuperClusterTagInducedMoveCancel.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void TagInducedMoveCancel_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamTaggedSuperCluster.
|
||||
// Updates stream tags to trigger a move, then cancels it; stream must
|
||||
// still be in the original cluster.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a stream move can be initiated and then cancelled, leaving
|
||||
/// the stream in its original cluster with data intact.
|
||||
/// Mirrors TestJetStreamSuperClusterMoveCancel.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void MoveCancel_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 3).
|
||||
// Stream "TEST" move initiated; JSApiStreamUpdate with empty placement
|
||||
// cancels the move; stream stays on original cluster.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that two consecutive move requests for the same stream are handled
|
||||
/// correctly without data loss.
|
||||
/// Mirrors TestJetStreamSuperClusterDoubleStreamMove.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream super-cluster")]
|
||||
public void DoubleStreamMove_ShouldSucceed()
|
||||
{
|
||||
// Go source: createJetStreamSuperCluster(t, 3, 3).
|
||||
// Stream moved from C1 to C2 then to C3; message counts verified at each step.
|
||||
using var sc = TestSuperCluster.CreateJetStreamSuperCluster(3, 3);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// From server/jetstream_leafnode_test.go (first 3 tests)
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that when two servers with the same server name connect via leaf
|
||||
/// nodes to the same hub cluster with the same or different JetStream domains,
|
||||
/// their IDs are tracked correctly and do not collide.
|
||||
/// Mirrors TestJetStreamLeafNodeUniqueServerNameCrossJSDomain.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream leaf-node topology")]
|
||||
public void LeafNodeUniqueServerNameCrossJSDomain_ShouldSucceed()
|
||||
{
|
||||
// Go source: hub server + 2 leaf nodes both named "NOT-UNIQUE".
|
||||
// t.Run("same-domain"): leaf uses domain "hub" — sL.ID() expected in nodeToInfo.
|
||||
// t.Run("different-domain"): leaf uses domain "spoke" — sA.ID() expected.
|
||||
// Verified via $SYS server stats messages.
|
||||
using var cluster = TestCluster.CreateJetStreamCluster(1, "hub");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that JWT-based permissions and JetStream domain isolation work
|
||||
/// correctly when a leaf node connects to a hub with account-scoped credentials.
|
||||
/// Mirrors TestJetStreamLeafNodeJwtPermsAndJSDomains.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream leaf-node topology with JWT")]
|
||||
public void LeafNodeJwtPermsAndJSDomains_ShouldSucceed()
|
||||
{
|
||||
// Go source: hub with operator JWT, leaf with local accounts.
|
||||
// Sub/pub deny permissions set via JWT UserPermissionLimits.
|
||||
// Four sub-tests: sub-on-ln-pass, sub-on-ln-fail, pub-on-ln-pass, pub-on-ln-fail.
|
||||
using var cluster = TestCluster.CreateJetStreamCluster(1, "hub");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a leaf-node cluster can extend a system account from the hub
|
||||
/// cluster and that JetStream operations work bidirectionally through the
|
||||
/// leaf-node topology.
|
||||
/// Mirrors TestJetStreamLeafNodeClusterExtensionWithSystemAccount.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running JetStream leaf-node cluster topology")]
|
||||
public void LeafNodeClusterExtensionWithSystemAccount_ShouldSucceed()
|
||||
{
|
||||
// Go source: 2-server hub cluster (A+B) + 2-server leaf cluster (LA+LB).
|
||||
// System account shared; proxy used to control leaf connection timing.
|
||||
// Two topologies tested (same == true/false — whether LA connects to A or A+B).
|
||||
using var cluster = TestCluster.CreateJetStreamCluster(2, "hub");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,229 @@
|
||||
// Copyright 2019-2025 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Mirrors server/leafnode_test.go (first 14 tests) from the NATS server Go source.
|
||||
// All tests require a running NATS server with leaf-node support and are
|
||||
// deferred until the full server runtime is available.
|
||||
|
||||
using ZB.MOM.NatsNet.Server.IntegrationTests.Helpers;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server.IntegrationTests.LeafNode;
|
||||
|
||||
/// <summary>
|
||||
/// Integration tests for leaf-node connectivity, authentication, TLS, and
|
||||
/// loop detection scenarios.
|
||||
/// Mirrors server/leafnode_test.go.
|
||||
/// All tests are deferred pending leaf-node server infrastructure.
|
||||
/// </summary>
|
||||
[Collection("LeafNodeIntegration")]
|
||||
[Trait("Category", "Integration")]
|
||||
public sealed class LeafNodeTests : IntegrationTestBase
|
||||
{
|
||||
/// <summary>
|
||||
/// Verifies that when a leaf-node remote URL resolves to multiple IP addresses
|
||||
/// the server randomly cycles through them, ensuring all IPs are eventually used.
|
||||
/// Mirrors TestLeafNodeRandomIP.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running NATS server with leaf-node support")]
|
||||
public void RandomIP_ShouldSucceed()
|
||||
{
|
||||
// Go source: DefaultOptions with LeafNode.Remotes pointing to a hostname that
|
||||
// resolves to 3 IPs (127.0.0.1/2/3) via a custom DNS resolver.
|
||||
// ReconnectInterval = 50ms, dialTimeout = 15ms.
|
||||
// Verifies all three IPs appear in debug logs within 3 seconds.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that leaf-node remote URL lists are randomised on startup when
|
||||
/// NoRandomize is false, and preserved in order when NoRandomize is true.
|
||||
/// Mirrors TestLeafNodeRandomRemotes.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running NATS server with leaf-node support")]
|
||||
public void RandomRemotes_ShouldSucceed()
|
||||
{
|
||||
// Go source: DefaultOptions with 2 RemoteLeafOpts — rem0 (NoRandomize=true)
|
||||
// and rem1 (NoRandomize=false), each with 16 URLs.
|
||||
// Asserts rem0 URLs are in original order; rem1 URLs are shuffled.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a leaf node can connect to a server that requires mutual TLS
|
||||
/// (client and server certificates).
|
||||
/// Mirrors TestLeafNodeTLSWithCerts.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running NATS server with leaf-node TLS support")]
|
||||
public void TlsWithCerts_ShouldSucceed()
|
||||
{
|
||||
// Go source: s1 configured with leaf TLS (ca, cert, key) listening.
|
||||
// s2 leaf remote specifies client cert and key (tlsauth/* certs).
|
||||
// Verifies leaf node establishes TLS-authenticated connection.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a leaf-node remote that does not provide a certificate to a
|
||||
/// server requiring mutual TLS fails to connect.
|
||||
/// Mirrors TestLeafNodeTLSRemoteWithNoCerts.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running NATS server with leaf-node TLS support")]
|
||||
public void TlsRemoteWithNoCerts_ShouldSucceed()
|
||||
{
|
||||
// Go source: s1 requires client cert (verify=true). s2 provides no cert.
|
||||
// Verifies that s2 fails to connect and reports TLS error in logs.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that attempting to connect a leaf node with a local account that
|
||||
/// does not exist on the server produces a clear error, and that retries are
|
||||
/// observed after the account is removed mid-operation.
|
||||
/// Mirrors TestLeafNodeAccountNotFound.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running NATS server with leaf-node support")]
|
||||
public void AccountNotFound_ShouldSucceed()
|
||||
{
|
||||
// Go source:
|
||||
// 1. NewServer with LeafNode.Remotes specifying LocalAccount="foo" (missing) → error.
|
||||
// 2. Add account "foo", RunServer → leaf connects to sb.
|
||||
// 3. Delete account "foo" from sa; restart sb → expect "Unable to lookup account" error log.
|
||||
// 4. Verify gcid keeps incrementing (retries happen).
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a leaf node reconnects to an alternate server in the cluster
|
||||
/// after the primary server it was connected to shuts down, and that the leaf
|
||||
/// node password never appears in debug or trace logs.
|
||||
/// Mirrors TestLeafNodeBasicAuthFailover.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running NATS server cluster with leaf-node support")]
|
||||
public void BasicAuthFailover_ShouldSucceed()
|
||||
{
|
||||
// Go source: 2-server cluster (sb1+sb2) with leafnode auth user=foo/password=pwdfatal.
|
||||
// sa configured as leaf remote pointing to sb1 only.
|
||||
// sb1 shuts down; sa must reconnect to sb2.
|
||||
// All log messages checked to ensure "pwdfatal" never appears.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that the RTT (round-trip time) reported by a leaf-node connection
|
||||
/// is non-zero and updated after PING/PONG exchanges.
|
||||
/// Mirrors TestLeafNodeRTT.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running NATS server with leaf-node support")]
|
||||
public void Rtt_ShouldSucceed()
|
||||
{
|
||||
// Go source: PingInterval=15ms on both servers. Leaf connects to sb.
|
||||
// After a short wait, checks that sa leaf RTT > 0.
|
||||
// Also verifies RTT is reported in CONNZ output.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that configuring both a single-user credential and a Users array
|
||||
/// on the leaf-node listener returns a clear error, and that duplicate user
|
||||
/// names in the array also produce an error.
|
||||
/// Mirrors TestLeafNodeValidateAuthOptions.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running NATS server with leaf-node support")]
|
||||
public void ValidateAuthOptions_ShouldSucceed()
|
||||
{
|
||||
// Go source: DefaultOptions with both LeafNode.Username and LeafNode.Users set →
|
||||
// "can not have a single user/pass and a users array".
|
||||
// Then clears Username, adds duplicate "user" → "duplicate user".
|
||||
// These are options-validation errors; NewServer must return error before starting.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that single-user leaf-node authorization correctly routes connections
|
||||
/// to the configured account, and that connections with the wrong credentials
|
||||
/// are rejected.
|
||||
/// Mirrors TestLeafNodeBasicAuthSingleton.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running NATS server with leaf-node support")]
|
||||
public void BasicAuthSingleton_ShouldSucceed()
|
||||
{
|
||||
// Go source: 4 sub-test combinations:
|
||||
// 1. No user spec, no creds → fail (no LN connection established).
|
||||
// 2. No user spec, creds=user2:user2 → succeeds, bound to ACC2.
|
||||
// 3. No user spec, unknown user → fail.
|
||||
// 4. user=ln/pass=pwd, creds=ln:pwd → succeeds, bound to ACC1.
|
||||
// Verifies pub/sub message routing to correct account.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a leaf-node server with multiple authorised users can
|
||||
/// map different leaf connections to different accounts, and that each
|
||||
/// account's messages are isolated from the others.
|
||||
/// Mirrors TestLeafNodeBasicAuthMultiple.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running NATS server with leaf-node support")]
|
||||
public void BasicAuthMultiple_ShouldSucceed()
|
||||
{
|
||||
// Go source: s1 with users ln1→S1ACC1, ln2→S1ACC2, ln3 (no account).
|
||||
// s2 with 2 leaf remotes: ln1 bound to S2ACC1, ln2 to S2ACC2.
|
||||
// Verifies publish from S2ACC1 is received by S1ACC1 subscribers only,
|
||||
// and publish from S2ACC2 reaches only S1ACC2 subscribers.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a self-loop leaf-node configuration (A→B and B→A) is detected
|
||||
/// and reported as an error by both standalone and clustered server combinations.
|
||||
/// Mirrors TestLeafNodeLoop.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running NATS server with leaf-node support")]
|
||||
public void Loop_ShouldSucceed()
|
||||
{
|
||||
// Go source: t.Run("standalone", ...) and t.Run("cluster", ...).
|
||||
// Server A on port 1234 pointing to B on 5678; B pointing back to A.
|
||||
// Within 5s, one of the loop-detected loggers must fire.
|
||||
// After B restarts without the return remote, A must connect successfully.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a loop formed through a directed acyclic graph (C→A and C→B,
|
||||
/// where B→A) is detected: C receives the loop error and establishes zero connections.
|
||||
/// Mirrors TestLeafNodeLoopFromDAG.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running NATS server with leaf-node support")]
|
||||
public void LoopFromDAG_ShouldSucceed()
|
||||
{
|
||||
// Go source: A standalone, B→A, C→A and C→B.
|
||||
// Loop detected on C; C has 0 leaf connections.
|
||||
// After restarting C with only C→B, A has 1, B has 2, C has 1.
|
||||
// Uses CheckHelper.CheckLeafNodeConnectedCount.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a pending write that is blocking does not prevent the server
|
||||
/// from closing a TLS leaf-node connection within a reasonable timeout.
|
||||
/// Mirrors TestLeafNodeCloseTLSConnection.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running NATS server with leaf-node TLS support")]
|
||||
public void CloseTlsConnection_ShouldSucceed()
|
||||
{
|
||||
// Go source: Server with TLSTimeout=100ms. TLS client performs raw TCP
|
||||
// dial, TLS handshake, sends CONNECT+PING, verifies leaf is established.
|
||||
// Fills the kernel write buffer to create a blocked write, then closes
|
||||
// the connection — must complete within 3 seconds without hanging.
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that the server name used in TLS SNI is saved and returned
|
||||
/// in varz/connz output for a leaf-node connection.
|
||||
/// Mirrors TestLeafNodeTLSSaveName.
|
||||
/// </summary>
|
||||
[Fact(Skip = "deferred: requires running NATS server with leaf-node TLS support")]
|
||||
public void TlsSaveName_ShouldSucceed()
|
||||
{
|
||||
// Go source: Leaf remote with TLSConfig containing ServerName.
|
||||
// After connection, checks that the leaf connection's TLS server name
|
||||
// is saved and visible in connz (RemoteAddr or TLS info).
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user