156 lines
6.3 KiB
C#
156 lines
6.3 KiB
C#
using System.Text;
|
|
using NATS.Server.JetStream;
|
|
using NATS.Server.JetStream.Api;
|
|
using NATS.Server.JetStream.Consumers;
|
|
using NATS.Server.JetStream.Models;
|
|
using NATS.Server.JetStream.Publish;
|
|
|
|
namespace NATS.Server.Tests;
|
|
|
|
internal sealed class JetStreamApiFixture : IAsyncDisposable
|
|
{
|
|
private static readonly StreamManager SharedStreamManager = new();
|
|
private static readonly ConsumerManager SharedConsumerManager = new();
|
|
private static readonly JetStreamApiRouter SharedRouter = new(SharedStreamManager, SharedConsumerManager);
|
|
|
|
private readonly StreamManager _streamManager;
|
|
private readonly ConsumerManager _consumerManager;
|
|
private readonly JetStreamApiRouter _router;
|
|
private readonly JetStreamPublisher _publisher;
|
|
|
|
private JetStreamApiFixture()
|
|
{
|
|
_streamManager = new StreamManager();
|
|
_consumerManager = new ConsumerManager();
|
|
_router = new JetStreamApiRouter(_streamManager, _consumerManager);
|
|
_publisher = new JetStreamPublisher(_streamManager);
|
|
}
|
|
|
|
public static Task<JetStreamApiResponse> RequestAsync(string subject, string payload)
|
|
{
|
|
return Task.FromResult(SharedRouter.Route(subject, Encoding.UTF8.GetBytes(payload)));
|
|
}
|
|
|
|
public static async Task<JetStreamApiFixture> StartWithStreamAsync(string streamName, string subject, int maxMsgs = 0)
|
|
{
|
|
var fixture = new JetStreamApiFixture();
|
|
var payload = $"{{\"name\":\"{streamName}\",\"subjects\":[\"{subject}\"],\"max_msgs\":{maxMsgs}}}";
|
|
_ = await fixture.RequestLocalAsync($"$JS.API.STREAM.CREATE.{streamName}", payload);
|
|
return fixture;
|
|
}
|
|
|
|
public static async Task<JetStreamApiFixture> StartWithPullConsumerAsync()
|
|
{
|
|
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
|
|
_ = await fixture.CreateConsumerAsync("ORDERS", "PULL", "orders.created");
|
|
return fixture;
|
|
}
|
|
|
|
public static async Task<JetStreamApiFixture> StartWithPushConsumerAsync()
|
|
{
|
|
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
|
|
_ = await fixture.CreateConsumerAsync("ORDERS", "PUSH", "orders.created", push: true, heartbeatMs: 25);
|
|
return fixture;
|
|
}
|
|
|
|
public static async Task<JetStreamApiFixture> StartWithAckExplicitConsumerAsync(int ackWaitMs)
|
|
{
|
|
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
|
|
_ = await fixture.CreateConsumerAsync("ORDERS", "PULL", "orders.created",
|
|
ackPolicy: AckPolicy.Explicit, ackWaitMs: ackWaitMs);
|
|
return fixture;
|
|
}
|
|
|
|
public static async Task<JetStreamApiFixture> StartWithMirrorSetupAsync()
|
|
{
|
|
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
|
|
_ = fixture._streamManager.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "ORDERS_MIRROR",
|
|
Subjects = ["orders.mirror.*"],
|
|
Mirror = "ORDERS",
|
|
});
|
|
return fixture;
|
|
}
|
|
|
|
public Task<PubAck> PublishAndGetAckAsync(string subject, string payload, string? msgId = null, bool expectError = false)
|
|
{
|
|
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), msgId, out var ack))
|
|
{
|
|
if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var streamHandle))
|
|
{
|
|
var stored = streamHandle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult();
|
|
if (stored != null)
|
|
_consumerManager.OnPublished(ack.Stream, stored);
|
|
}
|
|
|
|
return Task.FromResult(ack);
|
|
}
|
|
|
|
if (expectError)
|
|
return Task.FromResult(new PubAck { ErrorCode = 404 });
|
|
|
|
throw new InvalidOperationException($"No stream matched subject '{subject}'.");
|
|
}
|
|
|
|
public Task<PubAck> PublishAndGetAckAsync(string streamName, string subject, string payload)
|
|
{
|
|
return PublishAndGetAckAsync(subject, payload);
|
|
}
|
|
|
|
public Task<JetStreamApiResponse> RequestLocalAsync(string subject, string payload)
|
|
{
|
|
return Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload)));
|
|
}
|
|
|
|
public Task<StreamState> GetStreamStateAsync(string streamName)
|
|
{
|
|
return _streamManager.GetStateAsync(streamName, default).AsTask();
|
|
}
|
|
|
|
public Task<JetStreamApiResponse> CreateConsumerAsync(string stream, string durableName, string filterSubject, bool push = false, int heartbeatMs = 0, AckPolicy ackPolicy = AckPolicy.None, int ackWaitMs = 30_000)
|
|
{
|
|
var payload = $@"{{""durable_name"":""{durableName}"",""filter_subject"":""{filterSubject}"",""push"":{push.ToString().ToLowerInvariant()},""heartbeat_ms"":{heartbeatMs},""ack_policy"":""{ackPolicy.ToString().ToLowerInvariant()}"",""ack_wait_ms"":{ackWaitMs}}}";
|
|
return RequestLocalAsync($"$JS.API.CONSUMER.CREATE.{stream}.{durableName}", payload);
|
|
}
|
|
|
|
public async Task<JetStreamConsumerInfo> GetConsumerInfoAsync(string stream, string durableName)
|
|
{
|
|
var response = await RequestLocalAsync($"$JS.API.CONSUMER.INFO.{stream}.{durableName}", "{}");
|
|
return response.ConsumerInfo ?? throw new InvalidOperationException("Consumer not found.");
|
|
}
|
|
|
|
public Task<PullFetchBatch> FetchAsync(string stream, string durableName, int batch)
|
|
{
|
|
return _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask();
|
|
}
|
|
|
|
public async Task<PullFetchBatch> FetchAfterDelayAsync(string stream, string durableName, int delayMs, int batch)
|
|
{
|
|
await Task.Delay(delayMs);
|
|
return await FetchAsync(stream, durableName, batch);
|
|
}
|
|
|
|
public Task<PushFrame> ReadPushFrameAsync(string stream = "ORDERS", string durableName = "PUSH")
|
|
{
|
|
var frame = _consumerManager.ReadPushFrame(stream, durableName);
|
|
if (frame == null)
|
|
throw new InvalidOperationException("No push frame available.");
|
|
return Task.FromResult(frame);
|
|
}
|
|
|
|
public async Task WaitForMirrorSyncAsync(string streamName)
|
|
{
|
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(2));
|
|
while (!timeout.IsCancellationRequested)
|
|
{
|
|
var state = await GetStreamStateAsync(streamName);
|
|
if (state.Messages > 0)
|
|
return;
|
|
await Task.Delay(25, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default);
|
|
}
|
|
}
|
|
|
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
|
}
|