using System.Text; using System.Text.Json; using NATS.Server.Auth; using NATS.Server.JetStream; using NATS.Server.JetStream.Api; using NATS.Server.JetStream.Consumers; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Publish; namespace NATS.Server.Tests; internal sealed class JetStreamApiFixture : IAsyncDisposable { private static readonly StreamManager SharedStreamManager = new(); private static readonly ConsumerManager SharedConsumerManager = new(); private static readonly JetStreamApiRouter SharedRouter = new(SharedStreamManager, SharedConsumerManager); private readonly StreamManager _streamManager; private readonly ConsumerManager _consumerManager; private readonly JetStreamApiRouter _router; private readonly JetStreamPublisher _publisher; public JetStreamApiFixture() { _streamManager = new StreamManager(); _consumerManager = new ConsumerManager(); _router = new JetStreamApiRouter(_streamManager, _consumerManager); _publisher = new JetStreamPublisher(_streamManager); } private JetStreamApiFixture(Account? account) { _streamManager = new StreamManager(account: account); _consumerManager = new ConsumerManager(); _router = new JetStreamApiRouter(_streamManager, _consumerManager); _publisher = new JetStreamPublisher(_streamManager); } public static Task RequestAsync(string subject, string payload) { return Task.FromResult(SharedRouter.Route(subject, Encoding.UTF8.GetBytes(payload))); } public static async Task StartWithStreamAsync(string streamName, string subject, int maxMsgs = 0) { var fixture = new JetStreamApiFixture(); var payload = $"{{\"name\":\"{streamName}\",\"subjects\":[\"{subject}\"],\"max_msgs\":{maxMsgs}}}"; _ = await fixture.RequestLocalAsync($"$JS.API.STREAM.CREATE.{streamName}", payload); return fixture; } public static Task StartWithStreamConfigAsync(StreamConfig config) { var fixture = new JetStreamApiFixture(); _ = fixture._streamManager.CreateOrUpdate(config); return Task.FromResult(fixture); } public static async Task StartWithStreamJsonAsync(string json) { var fixture = new JetStreamApiFixture(); _ = await fixture.RequestLocalAsync("$JS.API.STREAM.CREATE.S", json); return fixture; } public static async Task StartWithPullConsumerAsync() { var fixture = await StartWithStreamAsync("ORDERS", "orders.*"); _ = await fixture.CreateConsumerAsync("ORDERS", "PULL", "orders.created"); return fixture; } public static async Task StartWithPushConsumerAsync() { var fixture = await StartWithStreamAsync("ORDERS", "orders.*"); _ = await fixture.CreateConsumerAsync("ORDERS", "PUSH", "orders.created", push: true, heartbeatMs: 25); return fixture; } public static async Task StartWithAckExplicitConsumerAsync(int ackWaitMs) { var fixture = await StartWithStreamAsync("ORDERS", "orders.*"); _ = await fixture.CreateConsumerAsync("ORDERS", "PULL", "orders.created", ackPolicy: AckPolicy.Explicit, ackWaitMs: ackWaitMs); return fixture; } public static async Task StartWithAckAllConsumerAsync() { var fixture = await StartWithStreamAsync("ORDERS", "orders.*"); _ = await fixture.CreateConsumerAsync("ORDERS", "ACKALL", "orders.created", ackPolicy: AckPolicy.All); return fixture; } public static async Task StartWithMirrorSetupAsync() { var fixture = await StartWithStreamAsync("ORDERS", "orders.*"); _ = fixture._streamManager.CreateOrUpdate(new StreamConfig { Name = "ORDERS_MIRROR", Subjects = ["orders.mirror.*"], Mirror = "ORDERS", }); return fixture; } public static async Task StartWithMultiFilterConsumerAsync() { var fixture = await StartWithStreamAsync("ORDERS", ">"); _ = await fixture.CreateConsumerAsync("ORDERS", "CF", null, filterSubjects: ["orders.*"]); return fixture; } public static async Task StartWithReplayOriginalConsumerAsync() { var fixture = await StartWithStreamAsync("ORDERS", "orders.*"); _ = await fixture.PublishAndGetAckAsync("orders.created", "1"); _ = await fixture.CreateConsumerAsync("ORDERS", "RO", "orders.*", replayPolicy: ReplayPolicy.Original, ackPolicy: AckPolicy.Explicit); return fixture; } public static Task StartWithMultipleSourcesAsync() { var fixture = new JetStreamApiFixture(); _ = fixture._streamManager.CreateOrUpdate(new StreamConfig { Name = "SRC1", Subjects = ["a.>"], }); _ = fixture._streamManager.CreateOrUpdate(new StreamConfig { Name = "SRC2", Subjects = ["b.>"], }); _ = fixture._streamManager.CreateOrUpdate(new StreamConfig { Name = "AGG", Subjects = ["agg.>"], Sources = [ new StreamSourceConfig { Name = "SRC1" }, new StreamSourceConfig { Name = "SRC2" }, ], }); return Task.FromResult(fixture); } public static Task StartJwtLimitedAccountAsync(int maxStreams) { var account = new Account("JWT-LIMITED") { MaxJetStreamStreams = maxStreams, JetStreamTier = "jwt-tier", }; return Task.FromResult(new JetStreamApiFixture(account)); } public Task PublishAndGetAckAsync(string subject, string payload, string? msgId = null, bool expectError = false) { if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), msgId, out var ack)) { if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var streamHandle)) { var stored = streamHandle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult(); if (stored != null) _consumerManager.OnPublished(ack.Stream, stored); } return Task.FromResult(ack); } if (expectError) return Task.FromResult(new PubAck { ErrorCode = 404 }); throw new InvalidOperationException($"No stream matched subject '{subject}'."); } public Task PublishAndGetAckAsync(string streamName, string subject, string payload) { return PublishAndGetAckAsync(subject, payload); } public Task PublishWithExpectedLastSeqAsync(string subject, string payload, ulong expectedLastSeq) { if (_publisher.TryCaptureWithOptions(subject, Encoding.UTF8.GetBytes(payload), new PublishOptions { ExpectedLastSeq = expectedLastSeq }, out var ack)) { return Task.FromResult(ack); } return Task.FromResult(new PubAck { ErrorCode = 404 }); } /// /// Publishes a batch message with the Nats-Batch-Id, Nats-Batch-Sequence (and optionally /// Nats-Batch-Commit) headers simulated via PublishOptions. /// Returns PubAck with ErrorCode set on error, empty BatchId on staged (flow-control), or /// full ack with BatchId+BatchSize on commit. /// public Task BatchPublishAsync( string subject, string payload, string batchId, ulong batchSeq, string? commitValue = null, string? msgId = null, ulong expectedLastSeq = 0, string? expectedLastMsgId = null) { var options = new PublishOptions { BatchId = batchId, BatchSeq = batchSeq, BatchCommit = commitValue, MsgId = msgId, ExpectedLastSeq = expectedLastSeq, ExpectedLastMsgId = expectedLastMsgId, }; if (_publisher.TryCaptureWithOptions(subject, Encoding.UTF8.GetBytes(payload), options, out var ack)) return Task.FromResult(ack); return Task.FromResult(new PubAck { ErrorCode = 404 }); } public StreamConfig? GetStreamConfig(string streamName) { return _streamManager.TryGet(streamName, out var handle) ? handle.Config : null; } public bool UpdateStream(StreamConfig config) { var result = _streamManager.CreateOrUpdate(config); return result.Error == null; } public JetStreamApiResponse UpdateStreamWithResult(StreamConfig config) { return _streamManager.CreateOrUpdate(config); } /// /// Exposes the underlying JetStreamPublisher for advanced test scenarios /// (e.g. calling ClearBatches to simulate a leader change). /// public JetStreamPublisher GetPublisher() => _publisher; public Task RequestLocalAsync(string subject, string payload) { return Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload))); } public Task CreateStreamAsync(string streamName, IReadOnlyList subjects) { var payload = JsonSerializer.Serialize(new { name = streamName, subjects, }); return RequestLocalAsync($"$JS.API.STREAM.CREATE.{streamName}", payload); } public Task GetStreamStateAsync(string streamName) { return _streamManager.GetStateAsync(streamName, default).AsTask(); } public Task GetStreamBackendTypeAsync(string streamName) { return Task.FromResult(_streamManager.GetStoreBackendType(streamName)); } public Task CreateConsumerAsync( string stream, string durableName, string? filterSubject, bool push = false, int heartbeatMs = 0, AckPolicy ackPolicy = AckPolicy.None, int ackWaitMs = 30_000, int maxAckPending = 0, IReadOnlyList? filterSubjects = null, ReplayPolicy replayPolicy = ReplayPolicy.Instant, DeliverPolicy deliverPolicy = DeliverPolicy.All, bool ephemeral = false) { var payloadObj = new { durable_name = durableName, filter_subject = filterSubject, filter_subjects = filterSubjects, push, heartbeat_ms = heartbeatMs, ack_policy = ackPolicy.ToString().ToLowerInvariant(), ack_wait_ms = ackWaitMs, max_ack_pending = maxAckPending, replay_policy = replayPolicy == ReplayPolicy.Original ? "original" : "instant", deliver_policy = deliverPolicy switch { DeliverPolicy.Last => "last", DeliverPolicy.New => "new", _ => "all", }, ephemeral, }; var payload = JsonSerializer.Serialize(payloadObj); return RequestLocalAsync($"$JS.API.CONSUMER.CREATE.{stream}.{durableName}", payload); } public async Task GetConsumerInfoAsync(string stream, string durableName) { var response = await RequestLocalAsync($"$JS.API.CONSUMER.INFO.{stream}.{durableName}", "{}"); return response.ConsumerInfo ?? throw new InvalidOperationException("Consumer not found."); } public Task FetchAsync(string stream, string durableName, int batch) { return _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask(); } public Task FetchWithNoWaitAsync(string stream, string durableName, int batch) { return _consumerManager.FetchAsync(stream, durableName, new PullFetchRequest { Batch = batch, NoWait = true, }, _streamManager, default).AsTask(); } public async Task FetchAfterDelayAsync(string stream, string durableName, int delayMs, int batch) { await Task.Delay(delayMs); return await FetchAsync(stream, durableName, batch); } public Task ReadPushFrameAsync(string stream = "ORDERS", string durableName = "PUSH") { var frame = _consumerManager.ReadPushFrame(stream, durableName); if (frame == null) throw new InvalidOperationException("No push frame available."); return Task.FromResult(frame); } public async Task WaitForMirrorSyncAsync(string streamName) { using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(2)); while (!timeout.IsCancellationRequested) { var state = await GetStreamStateAsync(streamName); if (state.Messages > 0) return; await Task.Delay(25, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); } } public async Task PublishManyAsync(string subject, IReadOnlyList payloads) { foreach (var payload in payloads) _ = await PublishAndGetAckAsync(subject, payload); } public Task PublishToSourceAsync(string sourceStream, string subject, string payload) { _ = sourceStream; return PublishAndGetAckAsync(subject, payload); } public Task AckAllAsync(string stream, string durableName, ulong sequence) { _consumerManager.AckAll(stream, durableName, sequence); return Task.CompletedTask; } public Task GetPendingCountAsync(string stream, string durableName) { return Task.FromResult(_consumerManager.GetPendingCount(stream, durableName)); } public ValueTask DisposeAsync() => ValueTask.CompletedTask; }