feat: implement jetstream push delivery and heartbeat
This commit is contained in:
@@ -62,6 +62,12 @@ public static class ConsumerApiHandlers
|
|||||||
if (root.TryGetProperty("filter_subject", out var filterEl))
|
if (root.TryGetProperty("filter_subject", out var filterEl))
|
||||||
config.FilterSubject = filterEl.GetString();
|
config.FilterSubject = filterEl.GetString();
|
||||||
|
|
||||||
|
if (root.TryGetProperty("push", out var pushEl) && pushEl.ValueKind == JsonValueKind.True)
|
||||||
|
config.Push = true;
|
||||||
|
|
||||||
|
if (root.TryGetProperty("heartbeat_ms", out var hbEl) && hbEl.TryGetInt32(out var hbMs))
|
||||||
|
config.HeartbeatMs = hbMs;
|
||||||
|
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
catch (JsonException)
|
catch (JsonException)
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ public sealed class ConsumerManager
|
|||||||
{
|
{
|
||||||
private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new();
|
private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new();
|
||||||
private readonly PullConsumerEngine _pullConsumerEngine = new();
|
private readonly PullConsumerEngine _pullConsumerEngine = new();
|
||||||
|
private readonly PushConsumerEngine _pushConsumerEngine = new();
|
||||||
|
|
||||||
public int ConsumerCount => _consumers.Count;
|
public int ConsumerCount => _consumers.Count;
|
||||||
|
|
||||||
@@ -61,10 +62,28 @@ public sealed class ConsumerManager
|
|||||||
|
|
||||||
return await _pullConsumerEngine.FetchAsync(streamHandle, consumer, batch, ct);
|
return await _pullConsumerEngine.FetchAsync(streamHandle, consumer, batch, ct);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void OnPublished(string stream, StoredMessage message)
|
||||||
|
{
|
||||||
|
foreach (var handle in _consumers.Values.Where(c => c.Stream == stream && c.Config.Push))
|
||||||
|
_pushConsumerEngine.Enqueue(handle.PushFrames, message, handle.Config);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PushFrame? ReadPushFrame(string stream, string durableName)
|
||||||
|
{
|
||||||
|
if (!_consumers.TryGetValue((stream, durableName), out var consumer))
|
||||||
|
return null;
|
||||||
|
|
||||||
|
if (consumer.PushFrames.Count == 0)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
return consumer.PushFrames.Dequeue();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
|
public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
|
||||||
{
|
{
|
||||||
public ulong NextSequence { get; set; } = 1;
|
public ulong NextSequence { get; set; } = 1;
|
||||||
public Queue<StoredMessage> Pending { get; } = new();
|
public Queue<StoredMessage> Pending { get; } = new();
|
||||||
|
public Queue<PushFrame> PushFrames { get; } = new();
|
||||||
}
|
}
|
||||||
|
|||||||
31
src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs
Normal file
31
src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
using NATS.Server.JetStream.Models;
|
||||||
|
using NATS.Server.JetStream.Storage;
|
||||||
|
|
||||||
|
namespace NATS.Server.JetStream.Consumers;
|
||||||
|
|
||||||
|
public sealed class PushConsumerEngine
|
||||||
|
{
|
||||||
|
public void Enqueue(Queue<PushFrame> queue, StoredMessage message, ConsumerConfig config)
|
||||||
|
{
|
||||||
|
queue.Enqueue(new PushFrame
|
||||||
|
{
|
||||||
|
IsData = true,
|
||||||
|
Message = message,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (config.HeartbeatMs > 0)
|
||||||
|
{
|
||||||
|
queue.Enqueue(new PushFrame
|
||||||
|
{
|
||||||
|
IsHeartbeat = true,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public sealed class PushFrame
|
||||||
|
{
|
||||||
|
public bool IsData { get; init; }
|
||||||
|
public bool IsHeartbeat { get; init; }
|
||||||
|
public StoredMessage? Message { get; init; }
|
||||||
|
}
|
||||||
@@ -100,8 +100,20 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
public bool HasRemoteInterest(string subject) => _globalAccount.SubList.HasRemoteInterest(subject);
|
public bool HasRemoteInterest(string subject) => _globalAccount.SubList.HasRemoteInterest(subject);
|
||||||
public bool TryCaptureJetStreamPublish(string subject, ReadOnlyMemory<byte> payload, out PubAck ack)
|
public bool TryCaptureJetStreamPublish(string subject, ReadOnlyMemory<byte> payload, out PubAck ack)
|
||||||
{
|
{
|
||||||
if (_jetStreamPublisher != null)
|
if (_jetStreamPublisher != null && _jetStreamPublisher.TryCapture(subject, payload, out ack))
|
||||||
return _jetStreamPublisher.TryCapture(subject, payload, out ack);
|
{
|
||||||
|
if (ack.ErrorCode == null
|
||||||
|
&& _jetStreamConsumerManager != null
|
||||||
|
&& _jetStreamStreamManager != null
|
||||||
|
&& _jetStreamStreamManager.TryGet(ack.Stream, out var streamHandle))
|
||||||
|
{
|
||||||
|
var stored = streamHandle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult();
|
||||||
|
if (stored != null)
|
||||||
|
_jetStreamConsumerManager.OnPublished(ack.Stream, stored);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
ack = new PubAck();
|
ack = new PubAck();
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@@ -46,10 +46,26 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable
|
|||||||
return fixture;
|
return fixture;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static async Task<JetStreamApiFixture> StartWithPushConsumerAsync()
|
||||||
|
{
|
||||||
|
var fixture = await StartWithStreamAsync("ORDERS", "orders.*");
|
||||||
|
_ = await fixture.CreateConsumerAsync("ORDERS", "PUSH", "orders.created", push: true, heartbeatMs: 25);
|
||||||
|
return fixture;
|
||||||
|
}
|
||||||
|
|
||||||
public Task<PubAck> PublishAndGetAckAsync(string subject, string payload, string? msgId = null, bool expectError = false)
|
public Task<PubAck> PublishAndGetAckAsync(string subject, string payload, string? msgId = null, bool expectError = false)
|
||||||
{
|
{
|
||||||
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), msgId, out var ack))
|
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), msgId, out var ack))
|
||||||
|
{
|
||||||
|
if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var streamHandle))
|
||||||
|
{
|
||||||
|
var stored = streamHandle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult();
|
||||||
|
if (stored != null)
|
||||||
|
_consumerManager.OnPublished(ack.Stream, stored);
|
||||||
|
}
|
||||||
|
|
||||||
return Task.FromResult(ack);
|
return Task.FromResult(ack);
|
||||||
|
}
|
||||||
|
|
||||||
if (expectError)
|
if (expectError)
|
||||||
return Task.FromResult(new PubAck { ErrorCode = 404 });
|
return Task.FromResult(new PubAck { ErrorCode = 404 });
|
||||||
@@ -67,9 +83,9 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable
|
|||||||
return _streamManager.GetStateAsync(streamName, default).AsTask();
|
return _streamManager.GetStateAsync(streamName, default).AsTask();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task<JetStreamApiResponse> CreateConsumerAsync(string stream, string durableName, string filterSubject)
|
public Task<JetStreamApiResponse> CreateConsumerAsync(string stream, string durableName, string filterSubject, bool push = false, int heartbeatMs = 0)
|
||||||
{
|
{
|
||||||
var payload = $@"{{""durable_name"":""{durableName}"",""filter_subject"":""{filterSubject}""}}";
|
var payload = $@"{{""durable_name"":""{durableName}"",""filter_subject"":""{filterSubject}"",""push"":{push.ToString().ToLowerInvariant()},""heartbeat_ms"":{heartbeatMs}}}";
|
||||||
return RequestLocalAsync($"$JS.API.CONSUMER.CREATE.{stream}.{durableName}", payload);
|
return RequestLocalAsync($"$JS.API.CONSUMER.CREATE.{stream}.{durableName}", payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -84,5 +100,13 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable
|
|||||||
return _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask();
|
return _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Task<PushFrame> ReadPushFrameAsync(string stream = "ORDERS", string durableName = "PUSH")
|
||||||
|
{
|
||||||
|
var frame = _consumerManager.ReadPushFrame(stream, durableName);
|
||||||
|
if (frame == null)
|
||||||
|
throw new InvalidOperationException("No push frame available.");
|
||||||
|
return Task.FromResult(frame);
|
||||||
|
}
|
||||||
|
|
||||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|||||||
17
tests/NATS.Server.Tests/JetStreamPushConsumerTests.cs
Normal file
17
tests/NATS.Server.Tests/JetStreamPushConsumerTests.cs
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
public class JetStreamPushConsumerTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Push_consumer_delivers_and_sends_heartbeat()
|
||||||
|
{
|
||||||
|
await using var fixture = await JetStreamApiFixture.StartWithPushConsumerAsync();
|
||||||
|
await fixture.PublishAndGetAckAsync("orders.created", "1");
|
||||||
|
|
||||||
|
var frame = await fixture.ReadPushFrameAsync();
|
||||||
|
frame.IsData.ShouldBeTrue();
|
||||||
|
|
||||||
|
var hb = await fixture.ReadPushFrameAsync();
|
||||||
|
hb.IsHeartbeat.ShouldBeTrue();
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user