feat: enforce jetstream retention and limits

This commit is contained in:
Joseph Doherty
2026-02-23 06:04:23 -05:00
parent 95691fa9e7
commit d73e7e2f88
6 changed files with 107 additions and 8 deletions

View File

@@ -11,19 +11,14 @@ public sealed class JetStreamPublisher
public bool TryCapture(string subject, ReadOnlyMemory<byte> payload, out PubAck ack)
{
var stream = _streamManager.FindBySubject(subject);
if (stream == null)
var captured = _streamManager.Capture(subject, payload);
if (captured == null)
{
ack = new PubAck();
return false;
}
var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult();
ack = new PubAck
{
Stream = stream.Config.Name,
Seq = seq,
};
ack = captured;
return true;
}

View File

@@ -62,6 +62,17 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
});
}
public void TrimToMaxMessages(ulong maxMessages)
{
while ((ulong)_messages.Count > maxMessages)
{
var first = _messages.Keys.Min();
_messages.Remove(first);
}
RewriteDataFile();
}
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
private void LoadExisting()
@@ -91,6 +102,22 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
}
}
private void RewriteDataFile()
{
var lines = new List<string>(_messages.Count);
foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
{
lines.Add(JsonSerializer.Serialize(new FileRecord
{
Sequence = message.Sequence,
Subject = message.Subject,
PayloadBase64 = Convert.ToBase64String(message.Payload.ToArray()),
}));
}
File.WriteAllLines(_dataFilePath, lines);
}
private sealed class FileRecord
{
public ulong Sequence { get; init; }

View File

@@ -54,4 +54,16 @@ public sealed class MemStore : IStreamStore
});
}
}
public void TrimToMaxMessages(ulong maxMessages)
{
lock (_gate)
{
while ((ulong)_messages.Count > maxMessages)
{
var first = _messages.Keys.Min();
_messages.Remove(first);
}
}
}
}

View File

@@ -1,6 +1,7 @@
using System.Collections.Concurrent;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Publish;
using NATS.Server.JetStream.Storage;
using NATS.Server.Subscriptions;
@@ -37,6 +38,14 @@ public sealed class StreamManager
public bool TryGet(string name, out StreamHandle handle) => _streams.TryGetValue(name, out handle!);
public ValueTask<StreamState> GetStateAsync(string name, CancellationToken ct)
{
if (_streams.TryGetValue(name, out var stream))
return stream.Store.GetStateAsync(ct);
return ValueTask.FromResult(new StreamState());
}
public StreamHandle? FindBySubject(string subject)
{
foreach (var stream in _streams.Values)
@@ -48,6 +57,22 @@ public sealed class StreamManager
return null;
}
public PubAck? Capture(string subject, ReadOnlyMemory<byte> payload)
{
var stream = FindBySubject(subject);
if (stream == null)
return null;
var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult();
EnforceLimits(stream);
return new PubAck
{
Stream = stream.Config.Name,
Seq = seq,
};
}
private static StreamConfig NormalizeConfig(StreamConfig config)
{
var copy = new StreamConfig
@@ -73,6 +98,22 @@ public sealed class StreamManager
},
};
}
private static void EnforceLimits(StreamHandle stream)
{
if (stream.Config.MaxMsgs <= 0)
return;
var maxMessages = (ulong)stream.Config.MaxMsgs;
if (stream.Store is MemStore memStore)
{
memStore.TrimToMaxMessages(maxMessages);
return;
}
if (stream.Store is FileStore fileStore)
fileStore.TrimToMaxMessages(maxMessages);
}
}
public sealed record StreamHandle(StreamConfig Config, IStreamStore Store);

View File

@@ -1,6 +1,7 @@
using System.Text;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Publish;
namespace NATS.Server.Tests;
@@ -47,5 +48,10 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable
return Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload)));
}
public Task<StreamState> GetStreamStateAsync(string streamName)
{
return _streamManager.GetStateAsync(streamName, default).AsTask();
}
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}

View File

@@ -0,0 +1,18 @@
namespace NATS.Server.Tests;
public class JetStreamRetentionPolicyTests
{
[Fact]
public async Task MaxMsgs_limit_evicts_oldest_message()
{
await using var fixture = await JetStreamApiFixture.StartWithStreamAsync("L", "l.*", maxMsgs: 2);
await fixture.PublishAndGetAckAsync("l.1", "a");
await fixture.PublishAndGetAckAsync("l.2", "b");
await fixture.PublishAndGetAckAsync("l.3", "c");
var state = await fixture.GetStreamStateAsync("L");
state.Messages.ShouldBe((ulong)2);
state.FirstSeq.ShouldBe((ulong)2);
}
}