diff --git a/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs index af5fd0e..8d38c16 100644 --- a/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs +++ b/src/NATS.Server/JetStream/Api/Handlers/ConsumerApiHandlers.cs @@ -62,6 +62,12 @@ public static class ConsumerApiHandlers if (root.TryGetProperty("filter_subject", out var filterEl)) config.FilterSubject = filterEl.GetString(); + if (root.TryGetProperty("push", out var pushEl) && pushEl.ValueKind == JsonValueKind.True) + config.Push = true; + + if (root.TryGetProperty("heartbeat_ms", out var hbEl) && hbEl.TryGetInt32(out var hbMs)) + config.HeartbeatMs = hbMs; + return config; } catch (JsonException) diff --git a/src/NATS.Server/JetStream/ConsumerManager.cs b/src/NATS.Server/JetStream/ConsumerManager.cs index b659ec2..a235db0 100644 --- a/src/NATS.Server/JetStream/ConsumerManager.cs +++ b/src/NATS.Server/JetStream/ConsumerManager.cs @@ -10,6 +10,7 @@ public sealed class ConsumerManager { private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new(); private readonly PullConsumerEngine _pullConsumerEngine = new(); + private readonly PushConsumerEngine _pushConsumerEngine = new(); public int ConsumerCount => _consumers.Count; @@ -61,10 +62,28 @@ public sealed class ConsumerManager return await _pullConsumerEngine.FetchAsync(streamHandle, consumer, batch, ct); } + + public void OnPublished(string stream, StoredMessage message) + { + foreach (var handle in _consumers.Values.Where(c => c.Stream == stream && c.Config.Push)) + _pushConsumerEngine.Enqueue(handle.PushFrames, message, handle.Config); + } + + public PushFrame? ReadPushFrame(string stream, string durableName) + { + if (!_consumers.TryGetValue((stream, durableName), out var consumer)) + return null; + + if (consumer.PushFrames.Count == 0) + return null; + + return consumer.PushFrames.Dequeue(); + } } public sealed record ConsumerHandle(string Stream, ConsumerConfig Config) { public ulong NextSequence { get; set; } = 1; public Queue Pending { get; } = new(); + public Queue PushFrames { get; } = new(); } diff --git a/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs new file mode 100644 index 0000000..52bfbc2 --- /dev/null +++ b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs @@ -0,0 +1,31 @@ +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.JetStream.Consumers; + +public sealed class PushConsumerEngine +{ + public void Enqueue(Queue queue, StoredMessage message, ConsumerConfig config) + { + queue.Enqueue(new PushFrame + { + IsData = true, + Message = message, + }); + + if (config.HeartbeatMs > 0) + { + queue.Enqueue(new PushFrame + { + IsHeartbeat = true, + }); + } + } +} + +public sealed class PushFrame +{ + public bool IsData { get; init; } + public bool IsHeartbeat { get; init; } + public StoredMessage? Message { get; init; } +} diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 7feb887..d7396c0 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -100,8 +100,20 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public bool HasRemoteInterest(string subject) => _globalAccount.SubList.HasRemoteInterest(subject); public bool TryCaptureJetStreamPublish(string subject, ReadOnlyMemory payload, out PubAck ack) { - if (_jetStreamPublisher != null) - return _jetStreamPublisher.TryCapture(subject, payload, out ack); + if (_jetStreamPublisher != null && _jetStreamPublisher.TryCapture(subject, payload, out ack)) + { + if (ack.ErrorCode == null + && _jetStreamConsumerManager != null + && _jetStreamStreamManager != null + && _jetStreamStreamManager.TryGet(ack.Stream, out var streamHandle)) + { + var stored = streamHandle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult(); + if (stored != null) + _jetStreamConsumerManager.OnPublished(ack.Stream, stored); + } + + return true; + } ack = new PubAck(); return false; diff --git a/tests/NATS.Server.Tests/JetStreamApiFixture.cs b/tests/NATS.Server.Tests/JetStreamApiFixture.cs index c6989a0..ca321e2 100644 --- a/tests/NATS.Server.Tests/JetStreamApiFixture.cs +++ b/tests/NATS.Server.Tests/JetStreamApiFixture.cs @@ -46,10 +46,26 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable return fixture; } + public static async Task StartWithPushConsumerAsync() + { + var fixture = await StartWithStreamAsync("ORDERS", "orders.*"); + _ = await fixture.CreateConsumerAsync("ORDERS", "PUSH", "orders.created", push: true, heartbeatMs: 25); + return fixture; + } + public Task PublishAndGetAckAsync(string subject, string payload, string? msgId = null, bool expectError = false) { if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), msgId, out var ack)) + { + if (ack.ErrorCode == null && _streamManager.TryGet(ack.Stream, out var streamHandle)) + { + var stored = streamHandle.Store.LoadAsync(ack.Seq, default).GetAwaiter().GetResult(); + if (stored != null) + _consumerManager.OnPublished(ack.Stream, stored); + } + return Task.FromResult(ack); + } if (expectError) return Task.FromResult(new PubAck { ErrorCode = 404 }); @@ -67,9 +83,9 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable return _streamManager.GetStateAsync(streamName, default).AsTask(); } - public Task CreateConsumerAsync(string stream, string durableName, string filterSubject) + public Task CreateConsumerAsync(string stream, string durableName, string filterSubject, bool push = false, int heartbeatMs = 0) { - var payload = $@"{{""durable_name"":""{durableName}"",""filter_subject"":""{filterSubject}""}}"; + var payload = $@"{{""durable_name"":""{durableName}"",""filter_subject"":""{filterSubject}"",""push"":{push.ToString().ToLowerInvariant()},""heartbeat_ms"":{heartbeatMs}}}"; return RequestLocalAsync($"$JS.API.CONSUMER.CREATE.{stream}.{durableName}", payload); } @@ -84,5 +100,13 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable return _consumerManager.FetchAsync(stream, durableName, batch, _streamManager, default).AsTask(); } + public Task ReadPushFrameAsync(string stream = "ORDERS", string durableName = "PUSH") + { + var frame = _consumerManager.ReadPushFrame(stream, durableName); + if (frame == null) + throw new InvalidOperationException("No push frame available."); + return Task.FromResult(frame); + } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; } diff --git a/tests/NATS.Server.Tests/JetStreamPushConsumerTests.cs b/tests/NATS.Server.Tests/JetStreamPushConsumerTests.cs new file mode 100644 index 0000000..5f40ab0 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamPushConsumerTests.cs @@ -0,0 +1,17 @@ +namespace NATS.Server.Tests; + +public class JetStreamPushConsumerTests +{ + [Fact] + public async Task Push_consumer_delivers_and_sends_heartbeat() + { + await using var fixture = await JetStreamApiFixture.StartWithPushConsumerAsync(); + await fixture.PublishAndGetAckAsync("orders.created", "1"); + + var frame = await fixture.ReadPushFrameAsync(); + frame.IsData.ShouldBeTrue(); + + var hb = await fixture.ReadPushFrameAsync(); + hb.IsHeartbeat.ShouldBeTrue(); + } +}