222 lines
8.6 KiB
C#
222 lines
8.6 KiB
C#
using System.Text;
|
|
using System.Text.Json;
|
|
using NATS.Server.Auth;
|
|
using NATS.Server.JetStream;
|
|
using NATS.Server.JetStream.Api;
|
|
using NATS.Server.JetStream.Consumers;
|
|
using NATS.Server.JetStream.Models;
|
|
using NATS.Server.JetStream.Publish;
|
|
|
|
namespace NATS.Server.Tests;
|
|
|
|
internal sealed class JetStreamApiFixture : IAsyncDisposable
|
|
{
|
|
private static readonly StreamManager SharedStreamManager = new();
|
|
private static readonly ConsumerManager SharedConsumerManager = new();
|
|
private static readonly JetStreamApiRouter SharedRouter = new(SharedStreamManager, SharedConsumerManager);
|
|
|
|
private readonly StreamManager _streamManager;
|
|
private readonly ConsumerManager _consumerManager;
|
|
private readonly JetStreamApiRouter _router;
|
|
private readonly JetStreamPublisher _publisher;
|
|
|
|
private JetStreamApiFixture(Account? account = null)
|
|
{
|
|
_streamManager = new StreamManager(account: account);
|
|
_consumerManager = new ConsumerManager();
|
|
_router = new JetStreamApiRouter(_streamManager, _consumerManager);
|
|
_publisher = new JetStreamPublisher(_streamManager);
|
|
}
|
|
|
|
public static Task<JetStreamApiResponse> RequestAsync(string subject, string payload)
|
|
{
|
|
return Task.FromResult(SharedRouter.Route(subject, Encoding.UTF8.GetBytes(payload)));
|
|
}
|
|
|
|
public static async Task<JetStreamApiFixture> StartWithStreamAsync(string streamName, string subject, int maxMsgs = 0)
|
|
{
|
|
var fixture = new JetStreamApiFixture();
|
|
var payload = $"{{\"name\":\"{streamName}\",\"subjects\":[\"{subject}\"],\"max_msgs\":{maxMsgs}}}";
|
|
_ = await fixture.RequestLocalAsync($"$JS.API.STREAM.CREATE.{streamName}", payload);
|
|
return fixture;
|
|
}
|
|
|
|
public static async Task<JetStreamApiFixture> StartWithPullConsumerAsync()
|
|
{
|
|
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
|
|
_ = await fixture.CreateConsumerAsync("ORDERS", "PULL", "orders.created");
|
|
return fixture;
|
|
}
|
|
|
|
public static async Task<JetStreamApiFixture> StartWithPushConsumerAsync()
|
|
{
|
|
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
|
|
_ = await fixture.CreateConsumerAsync("ORDERS", "PUSH", "orders.created", push: true, heartbeatMs: 25);
|
|
return fixture;
|
|
}
|
|
|
|
public static async Task<JetStreamApiFixture> StartWithAckExplicitConsumerAsync(int ackWaitMs)
|
|
{
|
|
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
|
|
_ = await fixture.CreateConsumerAsync("ORDERS", "PULL", "orders.created",
|
|
ackPolicy: AckPolicy.Explicit, ackWaitMs: ackWaitMs);
|
|
return fixture;
|
|
}
|
|
|
|
public static async Task<JetStreamApiFixture> StartWithAckAllConsumerAsync()
|
|
{
|
|
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
|
|
_ = await fixture.CreateConsumerAsync("ORDERS", "ACKALL", "orders.created", ackPolicy: AckPolicy.All);
|
|
return fixture;
|
|
}
|
|
|
|
public static async Task<JetStreamApiFixture> StartWithMirrorSetupAsync()
|
|
{
|
|
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
|
|
_ = fixture._streamManager.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "ORDERS_MIRROR",
|
|
Subjects = ["orders.mirror.*"],
|
|
Mirror = "ORDERS",
|
|
});
|
|
return fixture;
|
|
}
|
|
|
|
public static Task<JetStreamApiFixture> StartJwtLimitedAccountAsync(int maxStreams)
|
|
{
|
|
var account = new Account("JWT-LIMITED")
|
|
{
|
|
MaxJetStreamStreams = maxStreams,
|
|
JetStreamTier = "jwt-tier",
|
|
};
|
|
|
|
return Task.FromResult(new JetStreamApiFixture(account));
|
|
}
|
|
|
|
public Task<PubAck> PublishAndGetAckAsync(string subject, string payload, string? msgId = null, bool expectError = false)
|
|
{
|
|
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), msgId, out var ack))
|
|
{
|
|
if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var streamHandle))
|
|
{
|
|
var stored = streamHandle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult();
|
|
if (stored != null)
|
|
_consumerManager.OnPublished(ack.Stream, stored);
|
|
}
|
|
|
|
return Task.FromResult(ack);
|
|
}
|
|
|
|
if (expectError)
|
|
return Task.FromResult(new PubAck { ErrorCode = 404 });
|
|
|
|
throw new InvalidOperationException($"No stream matched subject '{subject}'.");
|
|
}
|
|
|
|
public Task<PubAck> PublishAndGetAckAsync(string streamName, string subject, string payload)
|
|
{
|
|
return PublishAndGetAckAsync(subject, payload);
|
|
}
|
|
|
|
public Task<PubAck> PublishWithExpectedLastSeqAsync(string subject, string payload, ulong expectedLastSeq)
|
|
{
|
|
if (_publisher.TryCaptureWithOptions(subject, Encoding.UTF8.GetBytes(payload), new PublishOptions { ExpectedLastSeq = expectedLastSeq }, out var ack))
|
|
{
|
|
return Task.FromResult(ack);
|
|
}
|
|
|
|
return Task.FromResult(new PubAck { ErrorCode = 404 });
|
|
}
|
|
|
|
public Task<JetStreamApiResponse> RequestLocalAsync(string subject, string payload)
|
|
{
|
|
return Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload)));
|
|
}
|
|
|
|
public Task<JetStreamApiResponse> CreateStreamAsync(string streamName, IReadOnlyList<string> subjects)
|
|
{
|
|
var payload = JsonSerializer.Serialize(new
|
|
{
|
|
name = streamName,
|
|
subjects,
|
|
});
|
|
return RequestLocalAsync($"$JS.API.STREAM.CREATE.{streamName}", payload);
|
|
}
|
|
|
|
public Task<StreamState> GetStreamStateAsync(string streamName)
|
|
{
|
|
return _streamManager.GetStateAsync(streamName, default).AsTask();
|
|
}
|
|
|
|
public Task<JetStreamApiResponse> CreateConsumerAsync(string stream, string durableName, string filterSubject, bool push = false, int heartbeatMs = 0, AckPolicy ackPolicy = AckPolicy.None, int ackWaitMs = 30_000)
|
|
{
|
|
var payload = $@"{{""durable_name"":""{durableName}"",""filter_subject"":""{filterSubject}"",""push"":{push.ToString().ToLowerInvariant()},""heartbeat_ms"":{heartbeatMs},""ack_policy"":""{ackPolicy.ToString().ToLowerInvariant()}"",""ack_wait_ms"":{ackWaitMs}}}";
|
|
return RequestLocalAsync($"$JS.API.CONSUMER.CREATE.{stream}.{durableName}", payload);
|
|
}
|
|
|
|
public async Task<JetStreamConsumerInfo> GetConsumerInfoAsync(string stream, string durableName)
|
|
{
|
|
var response = await RequestLocalAsync($"$JS.API.CONSUMER.INFO.{stream}.{durableName}", "{}");
|
|
return response.ConsumerInfo ?? throw new InvalidOperationException("Consumer not found.");
|
|
}
|
|
|
|
public Task<PullFetchBatch> FetchAsync(string stream, string durableName, int batch)
|
|
{
|
|
return _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask();
|
|
}
|
|
|
|
public Task<PullFetchBatch> FetchWithNoWaitAsync(string stream, string durableName, int batch)
|
|
{
|
|
return _consumerManager.FetchAsync(stream, durableName, new PullFetchRequest
|
|
{
|
|
Batch = batch,
|
|
NoWait = true,
|
|
}, _streamManager, default).AsTask();
|
|
}
|
|
|
|
public async Task<PullFetchBatch> FetchAfterDelayAsync(string stream, string durableName, int delayMs, int batch)
|
|
{
|
|
await Task.Delay(delayMs);
|
|
return await FetchAsync(stream, durableName, batch);
|
|
}
|
|
|
|
public Task<PushFrame> ReadPushFrameAsync(string stream = "ORDERS", string durableName = "PUSH")
|
|
{
|
|
var frame = _consumerManager.ReadPushFrame(stream, durableName);
|
|
if (frame == null)
|
|
throw new InvalidOperationException("No push frame available.");
|
|
return Task.FromResult(frame);
|
|
}
|
|
|
|
public async Task WaitForMirrorSyncAsync(string streamName)
|
|
{
|
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(2));
|
|
while (!timeout.IsCancellationRequested)
|
|
{
|
|
var state = await GetStreamStateAsync(streamName);
|
|
if (state.Messages > 0)
|
|
return;
|
|
await Task.Delay(25, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
|
|
}
|
|
}
|
|
|
|
public async Task PublishManyAsync(string subject, IReadOnlyList<string> payloads)
|
|
{
|
|
foreach (var payload in payloads)
|
|
_ = await PublishAndGetAckAsync(subject, payload);
|
|
}
|
|
|
|
public Task AckAllAsync(string stream, string durableName, ulong sequence)
|
|
{
|
|
_consumerManager.AckAll(stream, durableName, sequence);
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public Task<int> GetPendingCountAsync(string stream, string durableName)
|
|
{
|
|
return Task.FromResult(_consumerManager.GetPendingCount(stream, durableName));
|
|
}
|
|
|
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
|
}
|