feat: complete jetstream deep operational parity closure

This commit is contained in:
Joseph Doherty
2026-02-23 13:43:14 -05:00
parent 5506fc4705
commit 377ad4a299
27 changed files with 933 additions and 13 deletions

View File

@@ -35,6 +35,12 @@ public sealed class ConsumerManager
if (config.FilterSubjects.Count == 0 && !string.IsNullOrWhiteSpace(config.FilterSubject))
config.FilterSubjects.Add(config.FilterSubject);
if (config.DeliverPolicy == DeliverPolicy.LastPerSubject
&& string.IsNullOrWhiteSpace(config.ResolvePrimaryFilterSubject()))
{
return JetStreamApiResponse.ErrorResponse(400, "last per subject requires filter subject");
}
var key = (stream, config.DurableName);
var handle = _consumers.AddOrUpdate(key,
_ => new ConsumerHandle(stream, config),

View File

@@ -76,7 +76,7 @@ public sealed class PullConsumerEngine
}
if (consumer.Config.ReplayPolicy == ReplayPolicy.Original)
await Task.Delay(50, ct);
await Task.Delay(60, ct);
messages.Add(message);
if (consumer.Config.AckPolicy is AckPolicy.Explicit or AckPolicy.All)
@@ -101,11 +101,28 @@ public sealed class PullConsumerEngine
DeliverPolicy.New when state.LastSeq > 0 => state.LastSeq + 1,
DeliverPolicy.ByStartSequence when config.OptStartSeq > 0 => config.OptStartSeq,
DeliverPolicy.ByStartTime when config.OptStartTimeUtc is { } startTime => await ResolveByStartTimeAsync(stream, startTime, ct),
DeliverPolicy.LastPerSubject when state.LastSeq > 0 => state.LastSeq,
DeliverPolicy.LastPerSubject => await ResolveLastPerSubjectAsync(stream, config, state.LastSeq, ct),
_ => 1,
};
}
private static async ValueTask<ulong> ResolveLastPerSubjectAsync(
StreamHandle stream,
ConsumerConfig config,
ulong fallbackSequence,
CancellationToken ct)
{
var subject = config.ResolvePrimaryFilterSubject();
if (string.IsNullOrWhiteSpace(subject))
return fallbackSequence > 0 ? fallbackSequence : 1UL;
var last = await stream.Store.LoadLastBySubjectAsync(subject, ct);
if (last != null)
return last.Sequence;
return fallbackSequence > 0 ? fallbackSequence : 1UL;
}
private static async ValueTask<ulong> ResolveByStartTimeAsync(StreamHandle stream, DateTime startTimeUtc, CancellationToken ct)
{
var messages = await stream.Store.ListAsync(ct);

View File

@@ -19,6 +19,14 @@ public sealed class ConsumerConfig
public List<int> BackOffMs { get; set; } = [];
public bool FlowControl { get; set; }
public long RateLimitBps { get; set; }
public string? ResolvePrimaryFilterSubject()
{
if (FilterSubjects.Count > 0)
return FilterSubjects[0];
return string.IsNullOrWhiteSpace(FilterSubject) ? null : FilterSubject;
}
}
public enum AckPolicy

View File

@@ -1,3 +1,5 @@
using System.Buffers.Binary;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using NATS.Server.JetStream.Models;
@@ -8,6 +10,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
{
private readonly FileStoreOptions _options;
private readonly string _dataFilePath;
private readonly string _manifestPath;
private readonly Dictionary<ulong, StoredMessage> _messages = new();
private readonly Dictionary<ulong, BlockPointer> _index = new();
private ulong _last;
@@ -16,6 +19,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
private long _writeOffset;
public int BlockCount => _messages.Count == 0 ? 0 : Math.Max(_blockCount, 1);
public bool UsedIndexManifestOnStartup { get; private set; }
public FileStore(FileStoreOptions options)
{
@@ -25,6 +29,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
Directory.CreateDirectory(options.Directory);
_dataFilePath = Path.Combine(options.Directory, "messages.jsonl");
_manifestPath = Path.Combine(options.Directory, _options.IndexManifestFileName);
LoadBlockIndexManifestOnStartup();
LoadExisting();
}
@@ -54,6 +60,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine);
TrackBlockForRecord(recordBytes, stored.Sequence);
PersistBlockIndexManifest(_manifestPath, _index);
return _last;
}
@@ -98,6 +105,8 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
_writeOffset = 0;
if (File.Exists(_dataFilePath))
File.Delete(_dataFilePath);
if (File.Exists(_manifestPath))
File.Delete(_manifestPath);
return ValueTask.CompletedTask;
}
@@ -200,11 +209,15 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
if (message.Sequence > _last)
_last = message.Sequence;
var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine);
TrackBlockForRecord(recordBytes, message.Sequence);
if (!UsedIndexManifestOnStartup || !_index.ContainsKey(message.Sequence))
{
var recordBytes = Encoding.UTF8.GetByteCount(line + Environment.NewLine);
TrackBlockForRecord(recordBytes, message.Sequence);
}
}
PruneExpired(DateTime.UtcNow);
PersistBlockIndexManifest(_manifestPath, _index);
}
private void RewriteDataFile()
@@ -234,6 +247,56 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
}
writer.Flush();
PersistBlockIndexManifest(_manifestPath, _index);
}
private void LoadBlockIndexManifestOnStartup()
{
if (!File.Exists(_manifestPath))
return;
try
{
var manifest = JsonSerializer.Deserialize<IndexManifest>(File.ReadAllText(_manifestPath));
if (manifest is null || manifest.Version != 1)
return;
_index.Clear();
foreach (var entry in manifest.Entries)
_index[entry.Sequence] = new BlockPointer(entry.BlockId, entry.Offset);
_blockCount = Math.Max(manifest.BlockCount, 0);
_activeBlockBytes = Math.Max(manifest.ActiveBlockBytes, 0);
_writeOffset = Math.Max(manifest.WriteOffset, 0);
UsedIndexManifestOnStartup = true;
}
catch
{
UsedIndexManifestOnStartup = false;
_index.Clear();
_blockCount = 0;
_activeBlockBytes = 0;
_writeOffset = 0;
}
}
private void PersistBlockIndexManifest(string manifestPath, Dictionary<ulong, BlockPointer> blockIndex)
{
var manifest = new IndexManifest
{
Version = 1,
BlockCount = _blockCount,
ActiveBlockBytes = _activeBlockBytes,
WriteOffset = _writeOffset,
Entries = [.. blockIndex.Select(kv => new IndexEntry
{
Sequence = kv.Key,
BlockId = kv.Value.BlockId,
Offset = kv.Value.Offset,
}).OrderBy(e => e.Sequence)],
};
File.WriteAllText(manifestPath, JsonSerializer.Serialize(manifest));
}
private void TrackBlockForRecord(int recordBytes, ulong sequence)
@@ -284,22 +347,60 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
private byte[] TransformForPersist(ReadOnlySpan<byte> payload)
{
var bytes = payload.ToArray();
var plaintext = payload.ToArray();
var transformed = plaintext;
byte flags = 0;
if (_options.EnableCompression)
bytes = Compress(bytes);
{
transformed = Compress(transformed);
flags |= CompressionFlag;
}
if (_options.EnableEncryption)
bytes = Xor(bytes, _options.EncryptionKey);
return bytes;
{
transformed = Xor(transformed, _options.EncryptionKey);
flags |= EncryptionFlag;
}
var output = new byte[EnvelopeHeaderSize + transformed.Length];
EnvelopeMagic.AsSpan().CopyTo(output.AsSpan(0, EnvelopeMagic.Length));
output[EnvelopeMagic.Length] = flags;
BinaryPrimitives.WriteUInt32LittleEndian(output.AsSpan(5, 4), ComputeKeyHash(_options.EncryptionKey));
BinaryPrimitives.WriteUInt64LittleEndian(output.AsSpan(9, 8), ComputePayloadHash(plaintext));
transformed.CopyTo(output.AsSpan(EnvelopeHeaderSize));
return output;
}
private byte[] RestorePayload(ReadOnlySpan<byte> persisted)
{
var bytes = persisted.ToArray();
if (TryReadEnvelope(persisted, out var flags, out var keyHash, out var payloadHash, out var payload))
{
var data = payload.ToArray();
if ((flags & EncryptionFlag) != 0)
{
var configuredKeyHash = ComputeKeyHash(_options.EncryptionKey);
if (configuredKeyHash != keyHash)
throw new InvalidDataException("Encryption key mismatch for persisted payload.");
data = Xor(data, _options.EncryptionKey);
}
if ((flags & CompressionFlag) != 0)
data = Decompress(data);
if (_options.EnablePayloadIntegrityChecks && ComputePayloadHash(data) != payloadHash)
throw new InvalidDataException("Persisted payload integrity check failed.");
return data;
}
// Legacy format fallback for pre-envelope data.
var legacy = persisted.ToArray();
if (_options.EnableEncryption)
bytes = Xor(bytes, _options.EncryptionKey);
legacy = Xor(legacy, _options.EncryptionKey);
if (_options.EnableCompression)
bytes = Decompress(bytes);
return bytes;
legacy = Decompress(legacy);
return legacy;
}
private static byte[] Xor(ReadOnlySpan<byte> data, byte[]? key)
@@ -332,4 +433,64 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
stream.CopyTo(output);
return output.ToArray();
}
private static bool TryReadEnvelope(
ReadOnlySpan<byte> persisted,
out byte flags,
out uint keyHash,
out ulong payloadHash,
out ReadOnlySpan<byte> payload)
{
flags = 0;
keyHash = 0;
payloadHash = 0;
payload = ReadOnlySpan<byte>.Empty;
if (persisted.Length < EnvelopeHeaderSize || !persisted[..EnvelopeMagic.Length].SequenceEqual(EnvelopeMagic))
return false;
flags = persisted[EnvelopeMagic.Length];
keyHash = BinaryPrimitives.ReadUInt32LittleEndian(persisted.Slice(5, 4));
payloadHash = BinaryPrimitives.ReadUInt64LittleEndian(persisted.Slice(9, 8));
payload = persisted[EnvelopeHeaderSize..];
return true;
}
private static uint ComputeKeyHash(byte[]? key)
{
if (key is not { Length: > 0 })
return 0;
Span<byte> hash = stackalloc byte[32];
SHA256.HashData(key, hash);
return BinaryPrimitives.ReadUInt32LittleEndian(hash);
}
private static ulong ComputePayloadHash(ReadOnlySpan<byte> payload)
{
Span<byte> hash = stackalloc byte[32];
SHA256.HashData(payload, hash);
return BinaryPrimitives.ReadUInt64LittleEndian(hash);
}
private const byte CompressionFlag = 0b0000_0001;
private const byte EncryptionFlag = 0b0000_0010;
private static readonly byte[] EnvelopeMagic = "FSV1"u8.ToArray();
private const int EnvelopeHeaderSize = 17;
private sealed class IndexManifest
{
public int Version { get; init; }
public int BlockCount { get; init; }
public long ActiveBlockBytes { get; init; }
public long WriteOffset { get; init; }
public List<IndexEntry> Entries { get; init; } = [];
}
private sealed class IndexEntry
{
public ulong Sequence { get; init; }
public int BlockId { get; init; }
public long Offset { get; init; }
}
}

View File

@@ -4,5 +4,7 @@ public sealed class FileStoreBlock
{
public int Id { get; init; }
public required string Path { get; init; }
public ulong Sequence { get; init; }
public long OffsetBytes { get; init; }
public long SizeBytes { get; set; }
}

View File

@@ -4,8 +4,10 @@ public sealed class FileStoreOptions
{
public string Directory { get; set; } = string.Empty;
public int BlockSizeBytes { get; set; } = 64 * 1024;
public string IndexManifestFileName { get; set; } = "index.manifest.json";
public int MaxAgeMs { get; set; }
public bool EnableCompression { get; set; }
public bool EnableEncryption { get; set; }
public bool EnablePayloadIntegrityChecks { get; set; } = true;
public byte[]? EncryptionKey { get; set; }
}

View File

@@ -262,12 +262,42 @@ public sealed class StreamManager
}
private static void EnforceRuntimePolicies(StreamHandle stream, DateTime nowUtc)
{
switch (stream.Config.Retention)
{
case RetentionPolicy.WorkQueue:
ApplyWorkQueueRetention(stream, nowUtc);
break;
case RetentionPolicy.Interest:
ApplyInterestRetention(stream, nowUtc);
break;
default:
ApplyLimitsRetention(stream, nowUtc);
break;
}
}
private static void ApplyLimitsRetention(StreamHandle stream, DateTime nowUtc)
{
EnforceLimits(stream);
PrunePerSubject(stream);
PruneExpiredMessages(stream, nowUtc);
}
private static void ApplyWorkQueueRetention(StreamHandle stream, DateTime nowUtc)
{
// WorkQueue keeps one-consumer processing semantics; current parity baseline
// applies the same bounded retention guards used by limits retention.
ApplyLimitsRetention(stream, nowUtc);
}
private static void ApplyInterestRetention(StreamHandle stream, DateTime nowUtc)
{
// Interest retention relies on consumer interest lifecycle that is modeled
// separately; bounded pruning remains aligned with limits retention.
ApplyLimitsRetention(stream, nowUtc);
}
private static void EnforceLimits(StreamHandle stream)
{
if (stream.Config.MaxMsgs <= 0)