Files
natsnet/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamFileStoreTests.cs

181 lines
5.4 KiB
C#

// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
public sealed class JetStreamFileStoreTests
{
[Fact]
public void FileStoreSubjectDeleteMarkers_ShouldSucceed()
{
var root = Path.Combine(Path.GetTempPath(), $"fs-sdm-{Guid.NewGuid():N}");
Directory.CreateDirectory(root);
try
{
var fs = new JetStreamFileStore(
new FileStoreConfig { StoreDir = root },
new FileStreamInfo
{
Created = DateTime.UtcNow,
Config = new StreamConfig
{
Name = "SDM",
Storage = StorageType.FileStorage,
Subjects = ["test"],
MaxAge = TimeSpan.FromSeconds(1),
AllowMsgTTL = true,
SubjectDeleteMarkerTTL = TimeSpan.FromSeconds(1),
},
});
var (seq, _) = fs.StoreMsg("test", null, [1], 0);
seq.ShouldBe(1UL);
var (removed, err) = fs.RemoveMsg(seq);
removed.ShouldBeTrue();
err.ShouldBeNull();
fs.State().Msgs.ShouldBe(0UL);
fs.Stop();
}
finally
{
Directory.Delete(root, recursive: true);
}
}
[Fact]
public void FileStoreNoPanicOnRecoverTTLWithCorruptBlocks_ShouldSucceed()
{
var root = Path.Combine(Path.GetTempPath(), $"fs-ttl-{Guid.NewGuid():N}");
Directory.CreateDirectory(root);
try
{
var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsMessageTtl, "1");
var fs = NewStore(root, cfg =>
{
cfg.AllowMsgTTL = true;
cfg.Subjects = ["foo"];
});
fs.StoreMsg("foo", hdr, [1], 1).Seq.ShouldBe(1UL);
fs.Stop();
var reopened = NewStore(root, cfg =>
{
cfg.AllowMsgTTL = true;
cfg.Subjects = ["foo"];
});
reopened.State().Msgs.ShouldBeGreaterThanOrEqualTo(0UL);
reopened.Stop();
}
finally
{
Directory.Delete(root, recursive: true);
}
}
[Fact]
public void FileStorePurgeMsgBlockRemovesSchedules_ShouldSucceed()
{
var root = Path.Combine(Path.GetTempPath(), $"fs-purge-sched-{Guid.NewGuid():N}");
Directory.CreateDirectory(root);
try
{
var fs = NewStore(root, cfg =>
{
cfg.AllowMsgSchedules = true;
cfg.Subjects = ["foo.*"];
});
var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsSchedulePattern, "@every 10s");
hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsScheduleTarget, "foo.target");
for (var i = 0; i < 10; i++)
fs.StoreMsg($"foo.schedule.{i}", hdr, [1], 0);
var (purged, err) = fs.Purge();
err.ShouldBeNull();
purged.ShouldBe(10UL);
fs.State().Msgs.ShouldBe(0UL);
fs.Stop();
}
finally
{
Directory.Delete(root, recursive: true);
}
}
[Fact]
public void StoreMsg_LoadAndPurge_ShouldRoundTrip()
{
var root = Path.Combine(Path.GetTempPath(), $"fs-{Guid.NewGuid():N}");
Directory.CreateDirectory(root);
try
{
var fs = NewStore(root);
var (seq1, _) = fs.StoreMsg("foo", [1], [2, 3], 0);
var (seq2, _) = fs.StoreMsg("bar", null, [4, 5], 0);
seq1.ShouldBe(1UL);
seq2.ShouldBe(2UL);
fs.State().Msgs.ShouldBe(2UL);
var msg = fs.LoadMsg(1, null);
msg.ShouldNotBeNull();
msg!.Subject.ShouldBe("foo");
fs.SubjectForSeq(2).Subject.ShouldBe("bar");
fs.SubjectsTotals(string.Empty).Count.ShouldBe(2);
var (removed, remErr) = fs.RemoveMsg(1);
removed.ShouldBeTrue();
remErr.ShouldBeNull();
fs.State().Msgs.ShouldBe(1UL);
var (purged, purgeErr) = fs.Purge();
purgeErr.ShouldBeNull();
purged.ShouldBe(1UL);
fs.State().Msgs.ShouldBe(0UL);
var (snapshot, snapErr) = fs.Snapshot(TimeSpan.FromSeconds(1), includeConsumers: false, checkMsgs: false);
snapErr.ShouldBeNull();
snapshot.ShouldNotBeNull();
snapshot!.Reader.ShouldNotBeNull();
var (total, reported, utilErr) = fs.Utilization();
utilErr.ShouldBeNull();
total.ShouldBe(reported);
fs.Stop();
}
finally
{
Directory.Delete(root, recursive: true);
}
}
private static JetStreamFileStore NewStore(string root, Action<StreamConfig>? configure = null)
{
var config = new StreamConfig
{
Name = "S",
Storage = StorageType.FileStorage,
Subjects = ["foo", "bar"],
};
configure?.Invoke(config);
return new JetStreamFileStore(
new FileStoreConfig { StoreDir = root },
new FileStreamInfo
{
Created = DateTime.UtcNow,
Config = config,
});
}
}