Files
natsdotnet/tests/NATS.Server.JetStream.Tests/JetStreamStreamReplicaGroupTests.cs
Joseph Doherty 78b4bc2486 refactor: extract NATS.Server.JetStream.Tests project
Move 225 JetStream-related test files from NATS.Server.Tests into a
dedicated NATS.Server.JetStream.Tests project. This includes root-level
JetStream*.cs files, storage test files (FileStore, MemStore,
StreamStoreContract), and the full JetStream/ subfolder tree (Api,
Cluster, Consumers, MirrorSource, Snapshots, Storage, Streams).

Updated all namespaces, added InternalsVisibleTo, registered in the
solution file, and added the JETSTREAM_INTEGRATION_MATRIX define.
2026-03-12 15:58:10 -04:00

72 lines
2.2 KiB
C#

using System.Text;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Publish;
namespace NATS.Server.JetStream.Tests;
public class JetStreamStreamReplicaGroupTests
{
[Fact]
public async Task Leader_stepdown_preserves_stream_write_availability_after_new_election()
{
await using var fixture = await JetStreamReplicaFixture.StartAsync(nodes: 3);
await fixture.CreateStreamAsync("ORDERS", replicas: 3);
await fixture.StepDownStreamLeaderAsync("ORDERS");
var ack = await fixture.PublishAndGetAckAsync("orders.created", "1");
ack.Stream.ShouldBe("ORDERS");
ack.Seq.ShouldBeGreaterThan((ulong)0);
}
}
internal sealed class JetStreamReplicaFixture : IAsyncDisposable
{
private readonly StreamManager _streamManager;
private readonly JetStreamPublisher _publisher;
private JetStreamReplicaFixture(StreamManager streamManager)
{
_streamManager = streamManager;
_publisher = new JetStreamPublisher(_streamManager);
}
public static Task<JetStreamReplicaFixture> StartAsync(int nodes)
{
_ = nodes;
var streamManager = new StreamManager();
return Task.FromResult(new JetStreamReplicaFixture(streamManager));
}
public Task CreateStreamAsync(string name, int replicas)
{
var response = _streamManager.CreateOrUpdate(new StreamConfig
{
Name = name,
Subjects = ["orders.*"],
Replicas = replicas,
});
if (response.Error is not null)
throw new InvalidOperationException(response.Error.Description);
return Task.CompletedTask;
}
public Task StepDownStreamLeaderAsync(string stream)
{
return _streamManager.StepDownStreamLeaderAsync(stream, default);
}
public Task<PubAck> PublishAndGetAckAsync(string subject, string payload)
{
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), null, out var ack))
return Task.FromResult(ack);
throw new InvalidOperationException("Publish did not match a stream.");
}
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}