feat: implement jetstream pull consumer fetch
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using NATS.Server.JetStream.Api;
|
using NATS.Server.JetStream.Api;
|
||||||
|
using NATS.Server.JetStream.Consumers;
|
||||||
using NATS.Server.JetStream.Models;
|
using NATS.Server.JetStream.Models;
|
||||||
using NATS.Server.JetStream.Storage;
|
using NATS.Server.JetStream.Storage;
|
||||||
|
|
||||||
@@ -8,6 +9,7 @@ namespace NATS.Server.JetStream;
|
|||||||
public sealed class ConsumerManager
|
public sealed class ConsumerManager
|
||||||
{
|
{
|
||||||
private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new();
|
private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new();
|
||||||
|
private readonly PullConsumerEngine _pullConsumerEngine = new();
|
||||||
|
|
||||||
public int ConsumerCount => _consumers.Count;
|
public int ConsumerCount => _consumers.Count;
|
||||||
|
|
||||||
@@ -48,6 +50,17 @@ public sealed class ConsumerManager
|
|||||||
|
|
||||||
public bool TryGet(string stream, string durableName, out ConsumerHandle handle)
|
public bool TryGet(string stream, string durableName, out ConsumerHandle handle)
|
||||||
=> _consumers.TryGetValue((stream, durableName), out handle!);
|
=> _consumers.TryGetValue((stream, durableName), out handle!);
|
||||||
|
|
||||||
|
public async ValueTask<PullFetchBatch> FetchAsync(string stream, string durableName, int batch, StreamManager streamManager, CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (!_consumers.TryGetValue((stream, durableName), out var consumer))
|
||||||
|
return new PullFetchBatch([]);
|
||||||
|
|
||||||
|
if (!streamManager.TryGet(stream, out var streamHandle))
|
||||||
|
return new PullFetchBatch([]);
|
||||||
|
|
||||||
|
return await _pullConsumerEngine.FetchAsync(streamHandle, consumer, batch, ct);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
|
public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
|
||||||
|
|||||||
35
src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs
Normal file
35
src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
using NATS.Server.JetStream.Storage;
|
||||||
|
|
||||||
|
namespace NATS.Server.JetStream.Consumers;
|
||||||
|
|
||||||
|
public sealed class PullConsumerEngine
|
||||||
|
{
|
||||||
|
public async ValueTask<PullFetchBatch> FetchAsync(StreamHandle stream, ConsumerHandle consumer, int batch, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var messages = new List<StoredMessage>(batch);
|
||||||
|
var sequence = consumer.NextSequence;
|
||||||
|
|
||||||
|
for (var i = 0; i < batch; i++)
|
||||||
|
{
|
||||||
|
var message = await stream.Store.LoadAsync(sequence, ct);
|
||||||
|
if (message == null)
|
||||||
|
break;
|
||||||
|
|
||||||
|
messages.Add(message);
|
||||||
|
sequence++;
|
||||||
|
}
|
||||||
|
|
||||||
|
consumer.NextSequence = sequence;
|
||||||
|
return new PullFetchBatch(messages);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public sealed class PullFetchBatch
|
||||||
|
{
|
||||||
|
public IReadOnlyList<StoredMessage> Messages { get; }
|
||||||
|
|
||||||
|
public PullFetchBatch(IReadOnlyList<StoredMessage> messages)
|
||||||
|
{
|
||||||
|
Messages = messages;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
using System.Text;
|
using System.Text;
|
||||||
using NATS.Server.JetStream;
|
using NATS.Server.JetStream;
|
||||||
using NATS.Server.JetStream.Api;
|
using NATS.Server.JetStream.Api;
|
||||||
|
using NATS.Server.JetStream.Consumers;
|
||||||
using NATS.Server.JetStream.Models;
|
using NATS.Server.JetStream.Models;
|
||||||
using NATS.Server.JetStream.Publish;
|
using NATS.Server.JetStream.Publish;
|
||||||
|
|
||||||
@@ -38,6 +39,13 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable
|
|||||||
return fixture;
|
return fixture;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static async Task<JetStreamApiFixture> StartWithPullConsumerAsync()
|
||||||
|
{
|
||||||
|
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
|
||||||
|
_ = await fixture.CreateConsumerAsync("ORDERS", "PULL", "orders.created");
|
||||||
|
return fixture;
|
||||||
|
}
|
||||||
|
|
||||||
public Task<PubAck> PublishAndGetAckAsync(string subject, string payload, string? msgId = null, bool expectError = false)
|
public Task<PubAck> PublishAndGetAckAsync(string subject, string payload, string? msgId = null, bool expectError = false)
|
||||||
{
|
{
|
||||||
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), msgId, out var ack))
|
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), msgId, out var ack))
|
||||||
@@ -71,5 +79,10 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable
|
|||||||
return response.ConsumerInfo ?? throw new InvalidOperationException("Consumer not found.");
|
return response.ConsumerInfo ?? throw new InvalidOperationException("Consumer not found.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Task<PullFetchBatch> FetchAsync(string stream, string durableName, int batch)
|
||||||
|
{
|
||||||
|
return _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask();
|
||||||
|
}
|
||||||
|
|
||||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|||||||
15
tests/NATS.Server.Tests/JetStreamPullConsumerTests.cs
Normal file
15
tests/NATS.Server.Tests/JetStreamPullConsumerTests.cs
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
public class JetStreamPullConsumerTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Pull_consumer_fetch_returns_available_messages()
|
||||||
|
{
|
||||||
|
await using var fixture = await JetStreamApiFixture.StartWithPullConsumerAsync();
|
||||||
|
|
||||||
|
await fixture.PublishAndGetAckAsync("orders.created", "1");
|
||||||
|
var batch = await fixture.FetchAsync("ORDERS", "PULL", batch: 1);
|
||||||
|
|
||||||
|
batch.Messages.Count.ShouldBe(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user