diff --git a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs index c1fe76e..c8ba4e5 100644 --- a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs +++ b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs @@ -11,19 +11,14 @@ public sealed class JetStreamPublisher public bool TryCapture(string subject, ReadOnlyMemory payload, out PubAck ack) { - var stream = _streamManager.FindBySubject(subject); - if (stream == null) + var captured = _streamManager.Capture(subject, payload); + if (captured == null) { ack = new PubAck(); return false; } - var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult(); - ack = new PubAck - { - Stream = stream.Config.Name, - Seq = seq, - }; + ack = captured; return true; } diff --git a/src/NATS.Server/JetStream/Storage/FileStore.cs b/src/NATS.Server/JetStream/Storage/FileStore.cs index 04f05e7..659189a 100644 --- a/src/NATS.Server/JetStream/Storage/FileStore.cs +++ b/src/NATS.Server/JetStream/Storage/FileStore.cs @@ -62,6 +62,17 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable }); } + public void TrimToMaxMessages(ulong maxMessages) + { + while ((ulong)_messages.Count > maxMessages) + { + var first = _messages.Keys.Min(); + _messages.Remove(first); + } + + RewriteDataFile(); + } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; private void LoadExisting() @@ -91,6 +102,22 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable } } + private void RewriteDataFile() + { + var lines = new List(_messages.Count); + foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value)) + { + lines.Add(JsonSerializer.Serialize(new FileRecord + { + Sequence = message.Sequence, + Subject = message.Subject, + PayloadBase64 = Convert.ToBase64String(message.Payload.ToArray()), + })); + } + + File.WriteAllLines(_dataFilePath, lines); + } + private sealed class FileRecord { public ulong Sequence { get; init; } diff --git a/src/NATS.Server/JetStream/Storage/MemStore.cs b/src/NATS.Server/JetStream/Storage/MemStore.cs index 28e82f0..d637481 100644 --- a/src/NATS.Server/JetStream/Storage/MemStore.cs +++ b/src/NATS.Server/JetStream/Storage/MemStore.cs @@ -54,4 +54,16 @@ public sealed class MemStore : IStreamStore }); } } + + public void TrimToMaxMessages(ulong maxMessages) + { + lock (_gate) + { + while ((ulong)_messages.Count > maxMessages) + { + var first = _messages.Keys.Min(); + _messages.Remove(first); + } + } + } } diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index 020ec6e..f7dc151 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -1,6 +1,7 @@ using System.Collections.Concurrent; using NATS.Server.JetStream.Api; using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Publish; using NATS.Server.JetStream.Storage; using NATS.Server.Subscriptions; @@ -37,6 +38,14 @@ public sealed class StreamManager public bool TryGet(string name, out StreamHandle handle) => _streams.TryGetValue(name, out handle!); + public ValueTask GetStateAsync(string name, CancellationToken ct) + { + if (_streams.TryGetValue(name, out var stream)) + return stream.Store.GetStateAsync(ct); + + return ValueTask.FromResult(new StreamState()); + } + public StreamHandle? FindBySubject(string subject) { foreach (var stream in _streams.Values) @@ -48,6 +57,22 @@ public sealed class StreamManager return null; } + public PubAck? Capture(string subject, ReadOnlyMemory payload) + { + var stream = FindBySubject(subject); + if (stream == null) + return null; + + var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult(); + EnforceLimits(stream); + + return new PubAck + { + Stream = stream.Config.Name, + Seq = seq, + }; + } + private static StreamConfig NormalizeConfig(StreamConfig config) { var copy = new StreamConfig @@ -73,6 +98,22 @@ public sealed class StreamManager }, }; } + + private static void EnforceLimits(StreamHandle stream) + { + if (stream.Config.MaxMsgs <= 0) + return; + + var maxMessages = (ulong)stream.Config.MaxMsgs; + if (stream.Store is MemStore memStore) + { + memStore.TrimToMaxMessages(maxMessages); + return; + } + + if (stream.Store is FileStore fileStore) + fileStore.TrimToMaxMessages(maxMessages); + } } public sealed record StreamHandle(StreamConfig Config, IStreamStore Store); diff --git a/tests/NATS.Server.Tests/JetStreamApiFixture.cs b/tests/NATS.Server.Tests/JetStreamApiFixture.cs index 7da48ac..ee2281c 100644 --- a/tests/NATS.Server.Tests/JetStreamApiFixture.cs +++ b/tests/NATS.Server.Tests/JetStreamApiFixture.cs @@ -1,6 +1,7 @@ using System.Text; using NATS.Server.JetStream; using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Publish; namespace NATS.Server.Tests; @@ -47,5 +48,10 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable return Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload))); } + public Task GetStreamStateAsync(string streamName) + { + return _streamManager.GetStateAsync(streamName, default).AsTask(); + } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; } diff --git a/tests/NATS.Server.Tests/JetStreamRetentionPolicyTests.cs b/tests/NATS.Server.Tests/JetStreamRetentionPolicyTests.cs new file mode 100644 index 0000000..b9f558c --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamRetentionPolicyTests.cs @@ -0,0 +1,18 @@ +namespace NATS.Server.Tests; + +public class JetStreamRetentionPolicyTests +{ + [Fact] + public async Task MaxMsgs_limit_evicts_oldest_message() + { + await using var fixture = await JetStreamApiFixture.StartWithStreamAsync("L", "l.*", maxMsgs: 2); + + await fixture.PublishAndGetAckAsync("l.1", "a"); + await fixture.PublishAndGetAckAsync("l.2", "b"); + await fixture.PublishAndGetAckAsync("l.3", "c"); + + var state = await fixture.GetStreamStateAsync("L"); + state.Messages.ShouldBe((ulong)2); + state.FirstSeq.ShouldBe((ulong)2); + } +}