diff --git a/src/NATS.Server/JetStream/ConsumerManager.cs b/src/NATS.Server/JetStream/ConsumerManager.cs index 816069b..b659ec2 100644 --- a/src/NATS.Server/JetStream/ConsumerManager.cs +++ b/src/NATS.Server/JetStream/ConsumerManager.cs @@ -1,5 +1,6 @@ using System.Collections.Concurrent; using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Consumers; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Storage; @@ -8,6 +9,7 @@ namespace NATS.Server.JetStream; public sealed class ConsumerManager { private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new(); + private readonly PullConsumerEngine _pullConsumerEngine = new(); public int ConsumerCount => _consumers.Count; @@ -48,6 +50,17 @@ public sealed class ConsumerManager public bool TryGet(string stream, string durableName, out ConsumerHandle handle) => _consumers.TryGetValue((stream, durableName), out handle!); + + public async ValueTask FetchAsync(string stream, string durableName, int batch, StreamManager streamManager, CancellationToken ct) + { + if (!_consumers.TryGetValue((stream, durableName), out var consumer)) + return new PullFetchBatch([]); + + if (!streamManager.TryGet(stream, out var streamHandle)) + return new PullFetchBatch([]); + + return await _pullConsumerEngine.FetchAsync(streamHandle, consumer, batch, ct); + } } public sealed record ConsumerHandle(string Stream, ConsumerConfig Config) diff --git a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs new file mode 100644 index 0000000..47e41f2 --- /dev/null +++ b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs @@ -0,0 +1,35 @@ +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.JetStream.Consumers; + +public sealed class PullConsumerEngine +{ + public async ValueTask FetchAsync(StreamHandle stream, ConsumerHandle consumer, int batch, CancellationToken ct) + { + var messages = new List(batch); + var sequence = consumer.NextSequence; + + for (var i = 0; i < batch; i++) + { + var message = await stream.Store.LoadAsync(sequence, ct); + if (message == null) + break; + + messages.Add(message); + sequence++; + } + + consumer.NextSequence = sequence; + return new PullFetchBatch(messages); + } +} + +public sealed class PullFetchBatch +{ + public IReadOnlyList Messages { get; } + + public PullFetchBatch(IReadOnlyList messages) + { + Messages = messages; + } +} diff --git a/tests/NATS.Server.Tests/JetStreamApiFixture.cs b/tests/NATS.Server.Tests/JetStreamApiFixture.cs index 966fc5d..c6989a0 100644 --- a/tests/NATS.Server.Tests/JetStreamApiFixture.cs +++ b/tests/NATS.Server.Tests/JetStreamApiFixture.cs @@ -1,6 +1,7 @@ using System.Text; using NATS.Server.JetStream; using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Consumers; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Publish; @@ -38,6 +39,13 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable return fixture; } + public static async Task StartWithPullConsumerAsync() + { + var fixture = await StartWithStreamAsync("ORDERS", "orders.*"); + _ = await fixture.CreateConsumerAsync("ORDERS", "PULL", "orders.created"); + return fixture; + } + public Task PublishAndGetAckAsync(string subject, string payload, string? msgId = null, bool expectError = false) { if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), msgId, out var ack)) @@ -71,5 +79,10 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable return response.ConsumerInfo ?? throw new InvalidOperationException("Consumer not found."); } + public Task FetchAsync(string stream, string durableName, int batch) + { + return _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask(); + } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; } diff --git a/tests/NATS.Server.Tests/JetStreamPullConsumerTests.cs b/tests/NATS.Server.Tests/JetStreamPullConsumerTests.cs new file mode 100644 index 0000000..4ce724d --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamPullConsumerTests.cs @@ -0,0 +1,15 @@ +namespace NATS.Server.Tests; + +public class JetStreamPullConsumerTests +{ + [Fact] + public async Task Pull_consumer_fetch_returns_available_messages() + { + await using var fixture = await JetStreamApiFixture.StartWithPullConsumerAsync(); + + await fixture.PublishAndGetAckAsync("orders.created", "1"); + var batch = await fixture.FetchAsync("ORDERS", "PULL", batch: 1); + + batch.Messages.Count.ShouldBe(1); + } +}