feat: enforce filestore durability and recovery invariants
This commit is contained in:
@@ -91,7 +91,11 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
|
|||||||
{
|
{
|
||||||
var removed = _messages.Remove(sequence);
|
var removed = _messages.Remove(sequence);
|
||||||
if (removed)
|
if (removed)
|
||||||
|
{
|
||||||
|
if (sequence == _last)
|
||||||
|
_last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
|
||||||
RewriteDataFile();
|
RewriteDataFile();
|
||||||
|
}
|
||||||
return ValueTask.FromResult(removed);
|
return ValueTask.FromResult(removed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -227,6 +231,7 @@ public sealed class FileStore : IStreamStore, IAsyncDisposable
|
|||||||
_blockCount = 0;
|
_blockCount = 0;
|
||||||
_activeBlockBytes = 0;
|
_activeBlockBytes = 0;
|
||||||
_writeOffset = 0;
|
_writeOffset = 0;
|
||||||
|
_last = _messages.Count == 0 ? 0UL : _messages.Keys.Max();
|
||||||
|
|
||||||
using var stream = new FileStream(_dataFilePath, FileMode.Create, FileAccess.Write, FileShare.Read);
|
using var stream = new FileStream(_dataFilePath, FileMode.Create, FileAccess.Write, FileShare.Read);
|
||||||
using var writer = new StreamWriter(stream, Encoding.UTF8);
|
using var writer = new StreamWriter(stream, Encoding.UTF8);
|
||||||
|
|||||||
@@ -0,0 +1,35 @@
|
|||||||
|
using NATS.Server.JetStream.Storage;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests.JetStream;
|
||||||
|
|
||||||
|
public class JetStreamFileStoreInvariantTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Filestore_recovery_preserves_sequence_subject_index_and_integrity_after_prune_and_restart_cycles()
|
||||||
|
{
|
||||||
|
var dir = Path.Combine(Path.GetTempPath(), $"nats-js-fs-invariants-{Guid.NewGuid():N}");
|
||||||
|
var options = new FileStoreOptions { Directory = dir };
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await using var store = new FileStore(options);
|
||||||
|
var seq1 = await store.AppendAsync("orders.created", "1"u8.ToArray(), default);
|
||||||
|
var seq2 = await store.AppendAsync("orders.updated", "2"u8.ToArray(), default);
|
||||||
|
seq1.ShouldBe((ulong)1);
|
||||||
|
seq2.ShouldBe((ulong)2);
|
||||||
|
|
||||||
|
(await store.RemoveAsync(seq2, default)).ShouldBeTrue();
|
||||||
|
var state = await store.GetStateAsync(default);
|
||||||
|
|
||||||
|
state.Messages.ShouldBe((ulong)1);
|
||||||
|
state.LastSeq.ShouldBe((ulong)1);
|
||||||
|
state.FirstSeq.ShouldBe((ulong)1);
|
||||||
|
(await store.LoadLastBySubjectAsync("orders.updated", default)).ShouldBeNull();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if (Directory.Exists(dir))
|
||||||
|
Directory.Delete(dir, recursive: true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,37 @@
|
|||||||
|
using NATS.Server.JetStream.Storage;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests.JetStream;
|
||||||
|
|
||||||
|
public class JetStreamFileStoreRecoveryStrictParityTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Filestore_recovery_preserves_sequence_subject_index_and_integrity_after_prune_and_restart_cycles()
|
||||||
|
{
|
||||||
|
var dir = Path.Combine(Path.GetTempPath(), $"nats-js-fs-recovery-{Guid.NewGuid():N}");
|
||||||
|
var options = new FileStoreOptions { Directory = dir };
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await using (var store = new FileStore(options))
|
||||||
|
{
|
||||||
|
await store.AppendAsync("orders.created", "a"u8.ToArray(), default);
|
||||||
|
await store.AppendAsync("orders.created", "b"u8.ToArray(), default);
|
||||||
|
await store.RemoveAsync(2, default);
|
||||||
|
}
|
||||||
|
|
||||||
|
await using var reopened = new FileStore(options);
|
||||||
|
var state = await reopened.GetStateAsync(default);
|
||||||
|
state.Messages.ShouldBe((ulong)1);
|
||||||
|
state.LastSeq.ShouldBe((ulong)1);
|
||||||
|
|
||||||
|
var msg1 = await reopened.LoadAsync(1, default);
|
||||||
|
msg1.ShouldNotBeNull();
|
||||||
|
msg1.Subject.ShouldBe("orders.created");
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if (Directory.Exists(dir))
|
||||||
|
Directory.Delete(dir, recursive: true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user