Files
natsdotnet/tests/NATS.Server.TestUtilities/JetStreamApiFixture.cs
Joseph Doherty 4de691c9c5 perf: add FileStore buffered writes, O(1) state tracking, and eliminate redundant per-publish work
Implement Go-parity background flush loop (coalesce 16KB/8ms) in MsgBlock/FileStore,
replace O(n) GetStateAsync with incremental counters, skip PruneExpired/LoadAsync/
PrunePerSubject when not needed, and bypass RAFT for single-replica streams. Fix counter
tracking bugs in RemoveMsg/EraseMsg/TTL expiry and ObjectDisposedException races in
flush loop disposal. FileStore optimizations verified with 3112/3112 JetStream tests
passing; async publish benchmark remains at ~174 msg/s due to E2E protocol path bottleneck.
2026-03-13 03:11:11 -04:00

377 lines
14 KiB
C#

using System.Text;
using System.Text.Json;
using NATS.Server.Auth;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Consumers;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Publish;
namespace NATS.Server.TestUtilities;
public sealed class JetStreamApiFixture : IAsyncDisposable
{
private static readonly StreamManager SharedStreamManager = new();
private static readonly ConsumerManager SharedConsumerManager = new();
private static readonly JetStreamApiRouter SharedRouter = new(SharedStreamManager, SharedConsumerManager);
private readonly StreamManager _streamManager;
private readonly ConsumerManager _consumerManager;
private readonly JetStreamApiRouter _router;
private readonly JetStreamPublisher _publisher;
public JetStreamApiFixture()
{
_streamManager = new StreamManager();
_consumerManager = new ConsumerManager { StreamManager = _streamManager };
_router = new JetStreamApiRouter(_streamManager, _consumerManager);
_publisher = new JetStreamPublisher(_streamManager);
}
private JetStreamApiFixture(Account? account)
{
_streamManager = new StreamManager(account: account);
_consumerManager = new ConsumerManager { StreamManager = _streamManager };
_router = new JetStreamApiRouter(_streamManager, _consumerManager);
_publisher = new JetStreamPublisher(_streamManager);
}
public static Task<JetStreamApiResponse> RequestAsync(string subject, string payload)
{
return Task.FromResult(SharedRouter.Route(subject, Encoding.UTF8.GetBytes(payload)));
}
public static async Task<JetStreamApiFixture> StartWithStreamAsync(string streamName, string subject, int maxMsgs = 0)
{
var fixture = new JetStreamApiFixture();
var payload = $"{{\"name\":\"{streamName}\",\"subjects\":[\"{subject}\"],\"max_msgs\":{maxMsgs}}}";
_ = await fixture.RequestLocalAsync($"$JS.API.STREAM.CREATE.{streamName}", payload);
return fixture;
}
public static Task<JetStreamApiFixture> StartWithStreamConfigAsync(StreamConfig config)
{
var fixture = new JetStreamApiFixture();
_ = fixture._streamManager.CreateOrUpdate(config);
return Task.FromResult(fixture);
}
public static async Task<JetStreamApiFixture> StartWithStreamJsonAsync(string json)
{
var fixture = new JetStreamApiFixture();
_ = await fixture.RequestLocalAsync("$JS.API.STREAM.CREATE.S", json);
return fixture;
}
public static async Task<JetStreamApiFixture> StartWithPullConsumerAsync()
{
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
_ = await fixture.CreateConsumerAsync("ORDERS", "PULL", "orders.created");
return fixture;
}
public static async Task<JetStreamApiFixture> StartWithPushConsumerAsync()
{
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
_ = await fixture.CreateConsumerAsync("ORDERS", "PUSH", "orders.created", push: true, heartbeatMs: 25);
return fixture;
}
public static async Task<JetStreamApiFixture> StartWithAckExplicitConsumerAsync(int ackWaitMs)
{
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
_ = await fixture.CreateConsumerAsync("ORDERS", "PULL", "orders.created",
ackPolicy: AckPolicy.Explicit, ackWaitMs: ackWaitMs);
return fixture;
}
public static async Task<JetStreamApiFixture> StartWithAckAllConsumerAsync()
{
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
_ = await fixture.CreateConsumerAsync("ORDERS", "ACKALL", "orders.created", ackPolicy: AckPolicy.All);
return fixture;
}
public static async Task<JetStreamApiFixture> StartWithMirrorSetupAsync()
{
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
_ = fixture._streamManager.CreateOrUpdate(new StreamConfig
{
Name = "ORDERS_MIRROR",
Subjects = ["orders.mirror.*"],
Mirror = "ORDERS",
});
return fixture;
}
public static async Task<JetStreamApiFixture> StartWithMultiFilterConsumerAsync()
{
var fixture = await StartWithStreamAsync("ORDERS", ">");
_ = await fixture.CreateConsumerAsync("ORDERS", "CF", null, filterSubjects: ["orders.*"]);
return fixture;
}
public static async Task<JetStreamApiFixture> StartWithReplayOriginalConsumerAsync()
{
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
_ = await fixture.PublishAndGetAckAsync("orders.created", "1");
_ = await fixture.CreateConsumerAsync("ORDERS", "RO", "orders.*", replayPolicy: ReplayPolicy.Original, ackPolicy: AckPolicy.Explicit);
return fixture;
}
public static Task<JetStreamApiFixture> StartWithMultipleSourcesAsync()
{
var fixture = new JetStreamApiFixture();
_ = fixture._streamManager.CreateOrUpdate(new StreamConfig
{
Name = "SRC1",
Subjects = ["a.>"],
});
_ = fixture._streamManager.CreateOrUpdate(new StreamConfig
{
Name = "SRC2",
Subjects = ["b.>"],
});
_ = fixture._streamManager.CreateOrUpdate(new StreamConfig
{
Name = "AGG",
Subjects = ["agg.>"],
Sources =
[
new StreamSourceConfig { Name = "SRC1" },
new StreamSourceConfig { Name = "SRC2" },
],
});
return Task.FromResult(fixture);
}
public static Task<JetStreamApiFixture> StartJwtLimitedAccountAsync(int maxStreams)
{
var account = new Account("JWT-LIMITED")
{
MaxJetStreamStreams = maxStreams,
JetStreamTier = "jwt-tier",
};
return Task.FromResult(new JetStreamApiFixture(account));
}
public Task<PubAck> PublishAndGetAckAsync(string subject, string payload, string? msgId = null, bool expectError = false)
{
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), msgId, out var ack))
{
if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var streamHandle))
{
var stored = streamHandle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult();
if (stored != null)
_consumerManager.OnPublished(ack.Stream, stored);
}
return Task.FromResult(ack);
}
if (expectError)
return Task.FromResult(new PubAck { ErrorCode = 404 });
throw new InvalidOperationException($"No stream matched subject '{subject}'.");
}
public Task<PubAck> PublishAndGetAckAsync(string streamName, string subject, string payload)
{
return PublishAndGetAckAsync(subject, payload);
}
public Task<PubAck> PublishWithExpectedLastSeqAsync(string subject, string payload, ulong expectedLastSeq)
{
if (_publisher.TryCaptureWithOptions(subject, Encoding.UTF8.GetBytes(payload), new PublishOptions { ExpectedLastSeq = expectedLastSeq }, out var ack))
{
return Task.FromResult(ack);
}
return Task.FromResult(new PubAck { ErrorCode = 404 });
}
/// <summary>
/// Publishes a batch message with the Nats-Batch-Id, Nats-Batch-Sequence (and optionally
/// Nats-Batch-Commit) headers simulated via PublishOptions.
/// Returns PubAck with ErrorCode set on error, empty BatchId on staged (flow-control), or
/// full ack with BatchId+BatchSize on commit.
/// </summary>
public Task<PubAck> BatchPublishAsync(
string subject,
string payload,
string batchId,
ulong batchSeq,
string? commitValue = null,
string? msgId = null,
ulong expectedLastSeq = 0,
string? expectedLastMsgId = null)
{
var options = new PublishOptions
{
BatchId = batchId,
BatchSeq = batchSeq,
BatchCommit = commitValue,
MsgId = msgId,
ExpectedLastSeq = expectedLastSeq,
ExpectedLastMsgId = expectedLastMsgId,
};
if (_publisher.TryCaptureWithOptions(subject, Encoding.UTF8.GetBytes(payload), options, out var ack))
return Task.FromResult(ack);
return Task.FromResult(new PubAck { ErrorCode = 404 });
}
public StreamConfig? GetStreamConfig(string streamName)
{
return _streamManager.TryGet(streamName, out var handle) ? handle.Config : null;
}
public bool UpdateStream(StreamConfig config)
{
var result = _streamManager.CreateOrUpdate(config);
return result.Error == null;
}
public JetStreamApiResponse UpdateStreamWithResult(StreamConfig config)
{
return _streamManager.CreateOrUpdate(config);
}
/// <summary>
/// Exposes the underlying JetStreamPublisher for advanced test scenarios
/// (e.g. calling ClearBatches to simulate a leader change).
/// </summary>
public JetStreamPublisher GetPublisher() => _publisher;
public Task<JetStreamApiResponse> RequestLocalAsync(string subject, string payload)
{
return Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload)));
}
public Task<JetStreamApiResponse> CreateStreamAsync(string streamName, IReadOnlyList<string> subjects)
{
var payload = JsonSerializer.Serialize(new
{
name = streamName,
subjects,
});
return RequestLocalAsync($"$JS.API.STREAM.CREATE.{streamName}", payload);
}
public Task<ApiStreamState> GetStreamStateAsync(string streamName)
{
return _streamManager.GetStateAsync(streamName, default).AsTask();
}
public Task<string> GetStreamBackendTypeAsync(string streamName)
{
return Task.FromResult(_streamManager.GetStoreBackendType(streamName));
}
public Task<JetStreamApiResponse> CreateConsumerAsync(
string stream,
string durableName,
string? filterSubject,
bool push = false,
int heartbeatMs = 0,
AckPolicy ackPolicy = AckPolicy.None,
int ackWaitMs = 30_000,
int maxAckPending = 0,
IReadOnlyList<string>? filterSubjects = null,
ReplayPolicy replayPolicy = ReplayPolicy.Instant,
DeliverPolicy deliverPolicy = DeliverPolicy.All,
bool ephemeral = false)
{
var payloadObj = new
{
durable_name = durableName,
filter_subject = filterSubject,
filter_subjects = filterSubjects,
push,
heartbeat_ms = heartbeatMs,
ack_policy = ackPolicy.ToString().ToLowerInvariant(),
ack_wait_ms = ackWaitMs,
max_ack_pending = maxAckPending,
replay_policy = replayPolicy == ReplayPolicy.Original ? "original" : "instant",
deliver_policy = deliverPolicy switch
{
DeliverPolicy.Last => "last",
DeliverPolicy.New => "new",
_ => "all",
},
ephemeral,
};
var payload = JsonSerializer.Serialize(payloadObj);
return RequestLocalAsync($"$JS.API.CONSUMER.CREATE.{stream}.{durableName}", payload);
}
public async Task<JetStreamConsumerInfo> GetConsumerInfoAsync(string stream, string durableName)
{
var response = await RequestLocalAsync($"$JS.API.CONSUMER.INFO.{stream}.{durableName}", "{}");
return response.ConsumerInfo ?? throw new InvalidOperationException("Consumer not found.");
}
public Task<PullFetchBatch> FetchAsync(string stream, string durableName, int batch)
{
return _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask();
}
public Task<PullFetchBatch> FetchWithNoWaitAsync(string stream, string durableName, int batch)
{
return _consumerManager.FetchAsync(stream, durableName, new PullFetchRequest
{
Batch = batch,
NoWait = true,
}, _streamManager, default).AsTask();
}
public async Task<PullFetchBatch> FetchAfterDelayAsync(string stream, string durableName, int delayMs, int batch)
{
await PollHelper.YieldForAsync(delayMs);
return await FetchAsync(stream, durableName, batch);
}
public Task<PushFrame> ReadPushFrameAsync(string stream = "ORDERS", string durableName = "PUSH")
{
var frame = _consumerManager.ReadPushFrame(stream, durableName);
if (frame == null)
throw new InvalidOperationException("No push frame available.");
return Task.FromResult(frame);
}
public async Task WaitForMirrorSyncAsync(string streamName)
{
await PollHelper.WaitUntilAsync(
async () => (await GetStreamStateAsync(streamName)).Messages > 0,
timeoutMs: 2000,
intervalMs: 25);
}
public async Task PublishManyAsync(string subject, IReadOnlyList<string> payloads)
{
foreach (var payload in payloads)
_ = await PublishAndGetAckAsync(subject, payload);
}
public Task PublishToSourceAsync(string sourceStream, string subject, string payload)
{
_ = sourceStream;
return PublishAndGetAckAsync(subject, payload);
}
public Task AckAllAsync(string stream, string durableName, ulong sequence)
{
_consumerManager.AckAll(stream, durableName, sequence);
return Task.CompletedTask;
}
public Task<int> GetPendingCountAsync(string stream, string durableName)
{
return Task.FromResult(_consumerManager.GetPendingCount(stream, durableName));
}
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}