feat: execute post-baseline jetstream parity plan

This commit is contained in:
Joseph Doherty
2026-02-23 12:11:19 -05:00
parent c3763e83d6
commit b41e6ff320
58 changed files with 1430 additions and 102 deletions

View File

@@ -190,9 +190,37 @@ public static class ConsumerApiHandlers
if (root.TryGetProperty("ack_wait_ms", out var ackWaitEl) && ackWaitEl.TryGetInt32(out var ackWait))
config.AckWaitMs = ackWait;
if (root.TryGetProperty("max_deliver", out var maxDeliverEl) && maxDeliverEl.TryGetInt32(out var maxDeliver))
config.MaxDeliver = Math.Max(maxDeliver, 0);
if (root.TryGetProperty("max_ack_pending", out var maxAckPendingEl) && maxAckPendingEl.TryGetInt32(out var maxAckPending))
config.MaxAckPending = Math.Max(maxAckPending, 0);
if (root.TryGetProperty("flow_control", out var flowControlEl) && flowControlEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
config.FlowControl = flowControlEl.GetBoolean();
if (root.TryGetProperty("rate_limit_bps", out var rateLimitEl) && rateLimitEl.TryGetInt64(out var rateLimit))
config.RateLimitBps = Math.Max(rateLimit, 0);
if (root.TryGetProperty("opt_start_seq", out var optStartSeqEl) && optStartSeqEl.TryGetUInt64(out var optStartSeq))
config.OptStartSeq = optStartSeq;
if (root.TryGetProperty("opt_start_time_utc", out var optStartTimeEl)
&& optStartTimeEl.ValueKind == JsonValueKind.String
&& DateTime.TryParse(optStartTimeEl.GetString(), out var optStartTime))
{
config.OptStartTimeUtc = optStartTime.ToUniversalTime();
}
if (root.TryGetProperty("backoff_ms", out var backoffEl) && backoffEl.ValueKind == JsonValueKind.Array)
{
foreach (var item in backoffEl.EnumerateArray())
{
if (item.TryGetInt32(out var backoffValue))
config.BackOffMs.Add(Math.Max(backoffValue, 0));
}
}
if (root.TryGetProperty("ack_policy", out var ackPolicyEl))
{
var ackPolicy = ackPolicyEl.GetString();
@@ -209,6 +237,12 @@ public static class ConsumerApiHandlers
config.DeliverPolicy = DeliverPolicy.Last;
else if (string.Equals(deliver, "new", StringComparison.OrdinalIgnoreCase))
config.DeliverPolicy = DeliverPolicy.New;
else if (string.Equals(deliver, "by_start_sequence", StringComparison.OrdinalIgnoreCase))
config.DeliverPolicy = DeliverPolicy.ByStartSequence;
else if (string.Equals(deliver, "by_start_time", StringComparison.OrdinalIgnoreCase))
config.DeliverPolicy = DeliverPolicy.ByStartTime;
else if (string.Equals(deliver, "last_per_subject", StringComparison.OrdinalIgnoreCase))
config.DeliverPolicy = DeliverPolicy.LastPerSubject;
}
if (root.TryGetProperty("replay_policy", out var replayPolicyEl))

View File

@@ -220,6 +220,24 @@ public static class StreamApiHandlers
if (root.TryGetProperty("max_age_ms", out var maxAgeMsEl) && maxAgeMsEl.TryGetInt32(out var maxAgeMs))
config.MaxAgeMs = maxAgeMs;
if (root.TryGetProperty("max_msg_size", out var maxMsgSizeEl) && maxMsgSizeEl.TryGetInt32(out var maxMsgSize))
config.MaxMsgSize = maxMsgSize;
if (root.TryGetProperty("duplicate_window_ms", out var dupWindowEl) && dupWindowEl.TryGetInt32(out var dupWindow))
config.DuplicateWindowMs = dupWindow;
if (root.TryGetProperty("sealed", out var sealedEl) && sealedEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
config.Sealed = sealedEl.GetBoolean();
if (root.TryGetProperty("deny_delete", out var denyDeleteEl) && denyDeleteEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
config.DenyDelete = denyDeleteEl.GetBoolean();
if (root.TryGetProperty("deny_purge", out var denyPurgeEl) && denyPurgeEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
config.DenyPurge = denyPurgeEl.GetBoolean();
if (root.TryGetProperty("allow_direct", out var allowDirectEl) && allowDirectEl.ValueKind is JsonValueKind.True or JsonValueKind.False)
config.AllowDirect = allowDirectEl.GetBoolean();
if (root.TryGetProperty("discard", out var discardEl))
{
var discard = discardEl.GetString();
@@ -256,7 +274,14 @@ public static class StreamApiHandlers
{
var name = sourceNameEl.GetString();
if (!string.IsNullOrWhiteSpace(name))
config.Sources.Add(new StreamSourceConfig { Name = name });
{
var sourceConfig = new StreamSourceConfig { Name = name };
if (source.TryGetProperty("subject_transform_prefix", out var prefixEl))
sourceConfig.SubjectTransformPrefix = prefixEl.GetString();
if (source.TryGetProperty("source_account", out var accountEl))
sourceConfig.SourceAccount = accountEl.GetString();
config.Sources.Add(sourceConfig);
}
}
}
}

View File

@@ -42,6 +42,30 @@ public sealed class StreamReplicaGroup
return Task.CompletedTask;
}
public Task ApplyPlacementAsync(IReadOnlyList<int> placement, CancellationToken ct)
{
_ = ct;
var targetCount = Math.Max(placement.Count, 1);
if (targetCount == _nodes.Count)
return Task.CompletedTask;
if (targetCount > _nodes.Count)
{
for (var i = _nodes.Count + 1; i <= targetCount; i++)
_nodes.Add(new RaftNode($"{streamNamePrefix()}-r{i}"));
}
else
{
_nodes.RemoveRange(targetCount, _nodes.Count - targetCount);
}
foreach (var node in _nodes)
node.ConfigureCluster(_nodes);
Leader = ElectLeader(_nodes[0]);
return Task.CompletedTask;
}
private RaftNode SelectNextCandidate(RaftNode currentLeader)
{
if (_nodes.Count == 1)
@@ -62,4 +86,6 @@ public sealed class StreamReplicaGroup
return candidate;
}
private string streamNamePrefix() => StreamName.ToLowerInvariant();
}

View File

@@ -157,6 +157,10 @@ public sealed class ConsumerManager
if (consumer.PushFrames.Count == 0)
return null;
var frame = consumer.PushFrames.Peek();
if (frame.AvailableAtUtc > DateTime.UtcNow)
return null;
return consumer.PushFrames.Dequeue();
}
@@ -179,4 +183,5 @@ public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
public Queue<StoredMessage> Pending { get; } = new();
public Queue<PushFrame> PushFrames { get; } = new();
public AckProcessor AckProcessor { get; } = new();
public DateTime NextPushDataAvailableAtUtc { get; set; }
}

View File

@@ -2,22 +2,50 @@ namespace NATS.Server.JetStream.Consumers;
public sealed class AckProcessor
{
private readonly Dictionary<ulong, DateTime> _pending = new();
private readonly Dictionary<ulong, PendingState> _pending = new();
public void Register(ulong sequence, int ackWaitMs)
{
_pending[sequence] = DateTime.UtcNow.AddMilliseconds(Math.Max(ackWaitMs, 1));
if (_pending.ContainsKey(sequence))
return;
_pending[sequence] = new PendingState
{
DeadlineUtc = DateTime.UtcNow.AddMilliseconds(Math.Max(ackWaitMs, 1)),
Deliveries = 1,
};
}
public ulong? NextExpired()
public bool TryGetExpired(out ulong sequence, out int deliveries)
{
foreach (var (seq, deadline) in _pending)
foreach (var (seq, state) in _pending)
{
if (DateTime.UtcNow >= deadline)
return seq;
if (DateTime.UtcNow >= state.DeadlineUtc)
{
sequence = seq;
deliveries = state.Deliveries;
return true;
}
}
return null;
sequence = 0;
deliveries = 0;
return false;
}
public void ScheduleRedelivery(ulong sequence, int delayMs)
{
if (!_pending.TryGetValue(sequence, out var state))
return;
state.Deliveries++;
state.DeadlineUtc = DateTime.UtcNow.AddMilliseconds(Math.Max(delayMs, 1));
_pending[sequence] = state;
}
public void Drop(ulong sequence)
{
_pending.Remove(sequence);
}
public bool HasPending => _pending.Count > 0;
@@ -28,4 +56,10 @@ public sealed class AckProcessor
foreach (var key in _pending.Keys.Where(k => k <= sequence).ToArray())
_pending.Remove(key);
}
private sealed class PendingState
{
public DateTime DeadlineUtc { get; set; }
public int Deliveries { get; set; }
}
}

View File

@@ -16,11 +16,7 @@ public sealed class PullConsumerEngine
if (consumer.NextSequence == 1)
{
var state = await stream.Store.GetStateAsync(ct);
if (consumer.Config.DeliverPolicy == DeliverPolicy.Last && state.LastSeq > 0)
consumer.NextSequence = state.LastSeq;
else if (consumer.Config.DeliverPolicy == DeliverPolicy.New && state.LastSeq > 0)
consumer.NextSequence = state.LastSeq + 1;
consumer.NextSequence = await ResolveInitialSequenceAsync(stream, consumer.Config, ct);
}
if (request.NoWait)
@@ -32,9 +28,19 @@ public sealed class PullConsumerEngine
if (consumer.Config.AckPolicy == AckPolicy.Explicit)
{
var expired = consumer.AckProcessor.NextExpired();
if (expired is { } expiredSequence)
if (consumer.AckProcessor.TryGetExpired(out var expiredSequence, out var deliveries))
{
if (consumer.Config.MaxDeliver > 0 && deliveries > consumer.Config.MaxDeliver)
{
consumer.AckProcessor.Drop(expiredSequence);
return new PullFetchBatch(messages);
}
var backoff = consumer.Config.BackOffMs.Count >= deliveries
? consumer.Config.BackOffMs[deliveries - 1]
: consumer.Config.AckWaitMs;
consumer.AckProcessor.ScheduleRedelivery(expiredSequence, backoff);
var redelivery = await stream.Store.LoadAsync(expiredSequence, ct);
if (redelivery != null)
{
@@ -86,6 +92,27 @@ public sealed class PullConsumerEngine
return new PullFetchBatch(messages);
}
private static async ValueTask<ulong> ResolveInitialSequenceAsync(StreamHandle stream, ConsumerConfig config, CancellationToken ct)
{
var state = await stream.Store.GetStateAsync(ct);
return config.DeliverPolicy switch
{
DeliverPolicy.Last when state.LastSeq > 0 => state.LastSeq,
DeliverPolicy.New when state.LastSeq > 0 => state.LastSeq + 1,
DeliverPolicy.ByStartSequence when config.OptStartSeq > 0 => config.OptStartSeq,
DeliverPolicy.ByStartTime when config.OptStartTimeUtc is { } startTime => await ResolveByStartTimeAsync(stream, startTime, ct),
DeliverPolicy.LastPerSubject when state.LastSeq > 0 => state.LastSeq,
_ => 1,
};
}
private static async ValueTask<ulong> ResolveByStartTimeAsync(StreamHandle stream, DateTime startTimeUtc, CancellationToken ct)
{
var messages = await stream.Store.ListAsync(ct);
var match = messages.FirstOrDefault(m => m.TimestampUtc >= startTimeUtc);
return match?.Sequence ?? 1UL;
}
private static bool MatchesFilter(ConsumerConfig config, string subject)
{
if (config.FilterSubjects.Count > 0)

View File

@@ -7,20 +7,41 @@ public sealed class PushConsumerEngine
{
public void Enqueue(ConsumerHandle consumer, StoredMessage message)
{
var availableAtUtc = DateTime.UtcNow;
if (consumer.Config.RateLimitBps > 0)
{
if (consumer.NextPushDataAvailableAtUtc > availableAtUtc)
availableAtUtc = consumer.NextPushDataAvailableAtUtc;
var delayMs = (long)Math.Ceiling((double)message.Payload.Length * 1000 / consumer.Config.RateLimitBps);
consumer.NextPushDataAvailableAtUtc = availableAtUtc.AddMilliseconds(Math.Max(delayMs, 1));
}
consumer.PushFrames.Enqueue(new PushFrame
{
IsData = true,
Message = message,
AvailableAtUtc = availableAtUtc,
});
if (consumer.Config.AckPolicy is AckPolicy.Explicit or AckPolicy.All)
consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
if (consumer.Config.FlowControl)
{
consumer.PushFrames.Enqueue(new PushFrame
{
IsFlowControl = true,
AvailableAtUtc = availableAtUtc,
});
}
if (consumer.Config.HeartbeatMs > 0)
{
consumer.PushFrames.Enqueue(new PushFrame
{
IsHeartbeat = true,
AvailableAtUtc = availableAtUtc,
});
}
}
@@ -29,6 +50,8 @@ public sealed class PushConsumerEngine
public sealed class PushFrame
{
public bool IsData { get; init; }
public bool IsFlowControl { get; init; }
public bool IsHeartbeat { get; init; }
public StoredMessage? Message { get; init; }
public DateTime AvailableAtUtc { get; init; } = DateTime.UtcNow;
}

View File

@@ -1,15 +1,18 @@
using NATS.Server.Configuration;
using NATS.Server;
namespace NATS.Server.JetStream;
public sealed class JetStreamService : IAsyncDisposable
{
private readonly JetStreamOptions _options;
public InternalClient? InternalClient { get; }
public bool IsRunning { get; private set; }
public JetStreamService(JetStreamOptions options)
public JetStreamService(JetStreamOptions options, InternalClient? internalClient = null)
{
_options = options;
InternalClient = internalClient;
}
public Task StartAsync(CancellationToken ct)

View File

@@ -5,12 +5,18 @@ namespace NATS.Server.JetStream.MirrorSource;
public sealed class MirrorCoordinator
{
private readonly IStreamStore _targetStore;
public ulong LastOriginSequence { get; private set; }
public DateTime LastSyncUtc { get; private set; }
public MirrorCoordinator(IStreamStore targetStore)
{
_targetStore = targetStore;
}
public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct)
=> _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask();
public async Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct)
{
await _targetStore.AppendAsync(message.Subject, message.Payload, ct);
LastOriginSequence = message.Sequence;
LastSyncUtc = DateTime.UtcNow;
}
}

View File

@@ -1,16 +1,29 @@
using NATS.Server.JetStream.Storage;
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.MirrorSource;
public sealed class SourceCoordinator
{
private readonly IStreamStore _targetStore;
private readonly StreamSourceConfig _sourceConfig;
public ulong LastOriginSequence { get; private set; }
public DateTime LastSyncUtc { get; private set; }
public SourceCoordinator(IStreamStore targetStore)
public SourceCoordinator(IStreamStore targetStore, StreamSourceConfig sourceConfig)
{
_targetStore = targetStore;
_sourceConfig = sourceConfig;
}
public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct)
=> _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask();
public async Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct)
{
var subject = message.Subject;
if (!string.IsNullOrWhiteSpace(_sourceConfig.SubjectTransformPrefix))
subject = $"{_sourceConfig.SubjectTransformPrefix}{subject}";
await _targetStore.AppendAsync(subject, message.Payload, ct);
LastOriginSequence = message.Sequence;
LastSyncUtc = DateTime.UtcNow;
}
}

View File

@@ -8,12 +8,17 @@ public sealed class ConsumerConfig
public List<string> FilterSubjects { get; set; } = [];
public AckPolicy AckPolicy { get; set; } = AckPolicy.None;
public DeliverPolicy DeliverPolicy { get; set; } = DeliverPolicy.All;
public ulong OptStartSeq { get; set; }
public DateTime? OptStartTimeUtc { get; set; }
public ReplayPolicy ReplayPolicy { get; set; } = ReplayPolicy.Instant;
public int AckWaitMs { get; set; } = 30_000;
public int MaxDeliver { get; set; } = 1;
public int MaxAckPending { get; set; }
public bool Push { get; set; }
public int HeartbeatMs { get; set; }
public List<int> BackOffMs { get; set; } = [];
public bool FlowControl { get; set; }
public long RateLimitBps { get; set; }
}
public enum AckPolicy

View File

@@ -8,7 +8,13 @@ public sealed class StreamConfig
public long MaxBytes { get; set; }
public int MaxMsgsPer { get; set; }
public int MaxAgeMs { get; set; }
public int MaxMsgSize { get; set; }
public int MaxConsumers { get; set; }
public int DuplicateWindowMs { get; set; }
public bool Sealed { get; set; }
public bool DenyDelete { get; set; }
public bool DenyPurge { get; set; }
public bool AllowDirect { get; set; }
public RetentionPolicy Retention { get; set; } = RetentionPolicy.Limits;
public DiscardPolicy Discard { get; set; } = DiscardPolicy.Old;
public StorageType Storage { get; set; } = StorageType.Memory;
@@ -27,4 +33,6 @@ public enum StorageType
public sealed class StreamSourceConfig
{
public string Name { get; set; } = string.Empty;
public string? SubjectTransformPrefix { get; set; }
public string? SourceAccount { get; set; }
}

View File

@@ -34,7 +34,7 @@ public sealed class JetStreamPublisher
return true;
}
if (_preconditions.IsDuplicate(options.MsgId, out var existingSequence))
if (_preconditions.IsDuplicate(options.MsgId, stream.Config.DuplicateWindowMs, out var existingSequence))
{
ack = new PubAck
{
@@ -47,6 +47,7 @@ public sealed class JetStreamPublisher
var captured = _streamManager.Capture(subject, payload);
ack = captured ?? new PubAck();
_preconditions.Record(options.MsgId, ack.Seq);
_preconditions.TrimOlderThan(stream.Config.DuplicateWindowMs);
return true;
}
}

View File

@@ -4,15 +4,26 @@ namespace NATS.Server.JetStream.Publish;
public sealed class PublishPreconditions
{
private readonly ConcurrentDictionary<string, ulong> _dedupe = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, DedupeEntry> _dedupe = new(StringComparer.Ordinal);
public bool IsDuplicate(string? msgId, out ulong existingSequence)
public bool IsDuplicate(string? msgId, int duplicateWindowMs, out ulong existingSequence)
{
existingSequence = 0;
if (string.IsNullOrEmpty(msgId))
return false;
return _dedupe.TryGetValue(msgId, out existingSequence);
if (!_dedupe.TryGetValue(msgId, out var entry))
return false;
if (duplicateWindowMs > 0
&& DateTime.UtcNow - entry.TimestampUtc > TimeSpan.FromMilliseconds(duplicateWindowMs))
{
_dedupe.TryRemove(msgId, out _);
return false;
}
existingSequence = entry.Sequence;
return true;
}
public void Record(string? msgId, ulong sequence)
@@ -20,9 +31,24 @@ public sealed class PublishPreconditions
if (string.IsNullOrEmpty(msgId))
return;
_dedupe[msgId] = sequence;
_dedupe[msgId] = new DedupeEntry(sequence, DateTime.UtcNow);
}
public void TrimOlderThan(int duplicateWindowMs)
{
if (duplicateWindowMs <= 0)
return;
var cutoff = DateTime.UtcNow.AddMilliseconds(-duplicateWindowMs);
foreach (var (key, entry) in _dedupe)
{
if (entry.TimestampUtc < cutoff)
_dedupe.TryRemove(key, out _);
}
}
public bool CheckExpectedLastSeq(ulong expectedLastSeq, ulong actualLastSeq)
=> expectedLastSeq == 0 || expectedLastSeq == actualLastSeq;
private readonly record struct DedupeEntry(ulong Sequence, DateTime TimestampUtc);
}

View File

@@ -1,3 +1,4 @@
using System.Text;
using System.Text.Json;
using NATS.Server.JetStream.Models;
@@ -5,12 +6,23 @@ namespace NATS.Server.JetStream.Storage;
public sealed class FileStore : IStreamStore, IAsyncDisposable
{
private readonly FileStoreOptions _options;
private readonly string _dataFilePath;
private readonly Dictionary<ulong, StoredMessage> _messages = new();
private readonly Dictionary<ulong, BlockPointer> _index = new();
private ulong _last;
private int _blockCount;
private long _activeBlockBytes;
private long _writeOffset;
public int BlockCount => _messages.Count == 0 ? 0 : Math.Max(_blockCount, 1);
public FileStore(FileStoreOptions options)
{
_options = options;
if (_options.BlockSizeBytes <= 0)
_options.BlockSizeBytes = 64 * 1024;
Directory.CreateDirectory(options.Directory);
_dataFilePath = Path.Combine(options.Directory, "messages.jsonl");
LoadExisting();
@@ -18,6 +30,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
public async ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct)
{
PruneExpired(DateTime.UtcNow);
_last++;
var stored = new StoredMessage
{
@@ -36,6 +50,9 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
TimestampUtc = stored.TimestampUtc,
});
await File.AppendAllTextAsync(_dataFilePath, line + Environment.NewLine, ct);
var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine);
TrackBlockForRecord(recordBytes, stored.Sequence);
return _last;
}
@@ -54,6 +71,14 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
return ValueTask.FromResult(match);
}
public ValueTask<IReadOnlyList<StoredMessage>> ListAsync(CancellationToken ct)
{
var messages = _messages.Values
.OrderBy(m => m.Sequence)
.ToArray();
return ValueTask.FromResult<IReadOnlyList<StoredMessage>>(messages);
}
public ValueTask<bool> RemoveAsync(ulong sequence, CancellationToken ct)
{
var removed = _messages.Remove(sequence);
@@ -65,7 +90,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
public ValueTask PurgeAsync(CancellationToken ct)
{
_messages.Clear();
_index.Clear();
_last = 0;
_blockCount = 0;
_activeBlockBytes = 0;
_writeOffset = 0;
if (File.Exists(_dataFilePath))
File.Delete(_dataFilePath);
return ValueTask.CompletedTask;
@@ -90,7 +119,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
public ValueTask RestoreSnapshotAsync(ReadOnlyMemory<byte> snapshot, CancellationToken ct)
{
_messages.Clear();
_index.Clear();
_last = 0;
_blockCount = 0;
_activeBlockBytes = 0;
_writeOffset = 0;
if (!snapshot.IsEmpty)
{
@@ -159,29 +192,83 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
Sequence = record.Sequence,
Subject = record.Subject ?? string.Empty,
Payload = Convert.FromBase64String(record.PayloadBase64 ?? string.Empty),
TimestampUtc = record.TimestampUtc,
};
_messages[message.Sequence] = message;
if (message.Sequence > _last)
_last = message.Sequence;
var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine);
TrackBlockForRecord(recordBytes, message.Sequence);
}
PruneExpired(DateTime.UtcNow);
}
private void RewriteDataFile()
{
var lines = new List<string>(_messages.Count);
Directory.CreateDirectory(Path.GetDirectoryName(_dataFilePath)!);
_index.Clear();
_blockCount = 0;
_activeBlockBytes = 0;
_writeOffset = 0;
using var stream = new FileStream(_dataFilePath, FileMode.Create, FileAccess.Write, FileShare.Read);
using var writer = new StreamWriter(stream, Encoding.UTF8);
foreach (var message in _messages.OrderBy(kv => kv.Key).Select(kv => kv.Value))
{
lines.Add(JsonSerializer.Serialize(new FileRecord
var line = JsonSerializer.Serialize(new FileRecord
{
Sequence = message.Sequence,
Subject = message.Subject,
PayloadBase64 = Convert.ToBase64String(message.Payload.ToArray()),
TimestampUtc = message.TimestampUtc,
}));
});
writer.WriteLine(line);
var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine);
TrackBlockForRecord(recordBytes, message.Sequence);
}
File.WriteAllLines(_dataFilePath, lines);
writer.Flush();
}
private void TrackBlockForRecord(int recordBytes, ulong sequence)
{
if (_blockCount == 0)
_blockCount = 1;
if (_activeBlockBytes > 0 && _activeBlockBytes + recordBytes > _options.BlockSizeBytes)
{
_blockCount++;
_activeBlockBytes = 0;
}
_index[sequence] = new BlockPointer(_blockCount, _writeOffset);
_activeBlockBytes += recordBytes;
_writeOffset += recordBytes;
}
private void PruneExpired(DateTime nowUtc)
{
if (_options.MaxAgeMs <= 0)
return;
var cutoff = nowUtc.AddMilliseconds(-_options.MaxAgeMs);
var expired = _messages
.Where(kv => kv.Value.TimestampUtc < cutoff)
.Select(kv => kv.Key)
.ToArray();
if (expired.Length == 0)
return;
foreach (var sequence in expired)
_messages.Remove(sequence);
RewriteDataFile();
}
private sealed class FileRecord
@@ -191,4 +278,6 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
public string? PayloadBase64 { get; init; }
public DateTime TimestampUtc { get; init; }
}
private readonly record struct BlockPointer(int BlockId, long Offset);
}

View File

@@ -2,5 +2,7 @@ namespace NATS.Server.JetStream.Storage;
public sealed class FileStoreBlock
{
public int Id { get; init; }
public required string Path { get; init; }
public long SizeBytes { get; set; }
}

View File

@@ -3,4 +3,6 @@ namespace NATS.Server.JetStream.Storage;
public sealed class FileStoreOptions
{
public string Directory { get; set; } = string.Empty;
public int BlockSizeBytes { get; set; } = 64 * 1024;
public int MaxAgeMs { get; set; }
}

View File

@@ -7,6 +7,7 @@ public interface IStreamStore
ValueTask<ulong> AppendAsync(string subject, ReadOnlyMemory<byte> payload, CancellationToken ct);
ValueTask<StoredMessage?> LoadAsync(ulong sequence, CancellationToken ct);
ValueTask<StoredMessage?> LoadLastBySubjectAsync(string subject, CancellationToken ct);
ValueTask<IReadOnlyList<StoredMessage>> ListAsync(CancellationToken ct);
ValueTask<bool> RemoveAsync(ulong sequence, CancellationToken ct);
ValueTask PurgeAsync(CancellationToken ct);
ValueTask<byte[]> CreateSnapshotAsync(CancellationToken ct);

View File

@@ -54,6 +54,17 @@ public sealed class MemStore : IStreamStore
}
}
public ValueTask<IReadOnlyList<StoredMessage>> ListAsync(CancellationToken ct)
{
lock (_gate)
{
var messages = _messages.Values
.OrderBy(m => m.Sequence)
.ToArray();
return ValueTask.FromResult<IReadOnlyList<StoredMessage>>(messages);
}
}
public ValueTask<bool> RemoveAsync(ulong sequence, CancellationToken ct)
{
lock (_gate)

View File

@@ -93,6 +93,8 @@ public sealed class StreamManager
{
if (!_streams.TryGetValue(name, out var stream))
return false;
if (stream.Config.Sealed || stream.Config.DenyPurge)
return false;
stream.Store.PurgeAsync(default).GetAwaiter().GetResult();
return true;
@@ -110,6 +112,8 @@ public sealed class StreamManager
{
if (!_streams.TryGetValue(name, out var stream))
return false;
if (stream.Config.Sealed || stream.Config.DenyDelete)
return false;
return stream.Store.RemoveAsync(sequence, default).GetAwaiter().GetResult();
}
@@ -156,6 +160,17 @@ public sealed class StreamManager
if (stream == null)
return null;
if (stream.Config.MaxMsgSize > 0 && payload.Length > stream.Config.MaxMsgSize)
{
return new PubAck
{
Stream = stream.Config.Name,
ErrorCode = 10054,
};
}
PruneExpiredMessages(stream, DateTime.UtcNow);
var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();
if (stream.Config.MaxBytes > 0 && (long)stateBefore.Bytes + payload.Length > stream.Config.MaxBytes)
{
@@ -179,7 +194,7 @@ public sealed class StreamManager
_ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult();
var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult();
EnforceLimits(stream);
EnforceRuntimePolicies(stream, DateTime.UtcNow);
var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
if (stored != null)
ReplicateIfConfigured(stream.Config.Name, stored);
@@ -209,14 +224,25 @@ public sealed class StreamManager
MaxBytes = config.MaxBytes,
MaxMsgsPer = config.MaxMsgsPer,
MaxAgeMs = config.MaxAgeMs,
MaxMsgSize = config.MaxMsgSize,
MaxConsumers = config.MaxConsumers,
DuplicateWindowMs = config.DuplicateWindowMs,
Sealed = config.Sealed,
DenyDelete = config.DenyDelete,
DenyPurge = config.DenyPurge,
AllowDirect = config.AllowDirect,
Retention = config.Retention,
Discard = config.Discard,
Storage = config.Storage,
Replicas = config.Replicas,
Mirror = config.Mirror,
Source = config.Source,
Sources = config.Sources.Count == 0 ? [] : [.. config.Sources.Select(s => new StreamSourceConfig { Name = s.Name })],
Sources = config.Sources.Count == 0 ? [] : [.. config.Sources.Select(s => new StreamSourceConfig
{
Name = s.Name,
SubjectTransformPrefix = s.SubjectTransformPrefix,
SourceAccount = s.SourceAccount,
})],
};
return copy;
@@ -235,6 +261,13 @@ public sealed class StreamManager
};
}
private static void EnforceRuntimePolicies(StreamHandle stream, DateTime nowUtc)
{
EnforceLimits(stream);
PrunePerSubject(stream);
PruneExpiredMessages(stream, nowUtc);
}
private static void EnforceLimits(StreamHandle stream)
{
if (stream.Config.MaxMsgs <= 0)
@@ -251,6 +284,34 @@ public sealed class StreamManager
fileStore.TrimToMaxMessages(maxMessages);
}
private static void PrunePerSubject(StreamHandle stream)
{
if (stream.Config.MaxMsgsPer <= 0)
return;
var maxPerSubject = stream.Config.MaxMsgsPer;
var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult();
foreach (var group in messages.GroupBy(m => m.Subject, StringComparer.Ordinal))
{
foreach (var message in group.OrderByDescending(m => m.Sequence).Skip(maxPerSubject))
stream.Store.RemoveAsync(message.Sequence, default).GetAwaiter().GetResult();
}
}
private static void PruneExpiredMessages(StreamHandle stream, DateTime nowUtc)
{
if (stream.Config.MaxAgeMs <= 0)
return;
var cutoff = nowUtc.AddMilliseconds(-stream.Config.MaxAgeMs);
var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult();
foreach (var message in messages)
{
if (message.TimestampUtc < cutoff)
stream.Store.RemoveAsync(message.Sequence, default).GetAwaiter().GetResult();
}
}
private void RebuildReplicationCoordinators()
{
_mirrorsByOrigin.Clear();
@@ -269,7 +330,7 @@ public sealed class StreamManager
&& _streams.TryGetValue(stream.Config.Source, out _))
{
var list = _sourcesByOrigin.GetOrAdd(stream.Config.Source, _ => []);
list.Add(new SourceCoordinator(stream.Store));
list.Add(new SourceCoordinator(stream.Store, new StreamSourceConfig { Name = stream.Config.Source }));
}
if (stream.Config.Sources.Count > 0)
@@ -280,7 +341,7 @@ public sealed class StreamManager
continue;
var list = _sourcesByOrigin.GetOrAdd(source.Name, _ => []);
list.Add(new SourceCoordinator(stream.Store));
list.Add(new SourceCoordinator(stream.Store, source));
}
}
}
@@ -320,6 +381,7 @@ public sealed class StreamManager
StorageType.File => new FileStore(new FileStoreOptions
{
Directory = Path.Combine(Path.GetTempPath(), "natsdotnet-js-store", config.Name),
MaxAgeMs = config.MaxAgeMs,
}),
_ => new MemStore(),
};

View File

@@ -11,6 +11,12 @@ public static class JetStreamConfigValidator
if (config.Retention == RetentionPolicy.WorkQueue && config.MaxConsumers == 0)
return ValidationResult.Invalid("workqueue retention requires max consumers > 0");
if (config.MaxMsgSize < 0)
return ValidationResult.Invalid("max_msg_size must be >= 0");
if (config.MaxMsgsPer < 0)
return ValidationResult.Invalid("max_msgs_per must be >= 0");
if (config.MaxAgeMs < 0)
return ValidationResult.Invalid("max_age_ms must be >= 0");
return ValidationResult.Valid();
}