Files

75 lines
2.3 KiB
C#

// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
public sealed class ConsumerFileStoreTests
{
[Fact]
public void UpdateDelivered_UpdateAcks_AndReload_ShouldPersistState()
{
var root = Path.Combine(Path.GetTempPath(), $"cfs-{Guid.NewGuid():N}");
Directory.CreateDirectory(root);
try
{
var fs = NewStore(root);
var cfg = new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit };
var cs = (ConsumerFileStore)fs.ConsumerStore("D", DateTime.UtcNow, cfg);
cs.SetStarting(0);
cs.UpdateDelivered(1, 1, 1, 123);
cs.UpdateDelivered(2, 2, 1, 456);
cs.UpdateAcks(1, 1);
var (state, err) = cs.State();
err.ShouldBeNull();
state.ShouldNotBeNull();
state!.Delivered.Consumer.ShouldBe(2UL);
state.AckFloor.Consumer.ShouldBe(1UL);
cs.Stop();
var odir = Path.Combine(root, FileStoreDefaults.ConsumerDir, "D");
var loaded = new ConsumerFileStore(
fs,
new FileConsumerInfo { Name = "D", Created = DateTime.UtcNow, Config = cfg },
"D",
odir);
var (loadedState, loadedErr) = loaded.State();
loadedErr.ShouldBeNull();
loadedState.ShouldNotBeNull();
loadedState!.Delivered.Consumer.ShouldBe(2UL);
loadedState.AckFloor.Consumer.ShouldBe(1UL);
loaded.Delete();
Directory.Exists(odir).ShouldBeFalse();
fs.Stop();
}
finally
{
if (Directory.Exists(root))
Directory.Delete(root, recursive: true);
}
}
private static JetStreamFileStore NewStore(string root)
{
return new JetStreamFileStore(
new FileStoreConfig { StoreDir = root },
new FileStreamInfo
{
Created = DateTime.UtcNow,
Config = new StreamConfig
{
Name = "S",
Storage = StorageType.FileStorage,
Subjects = ["foo"],
},
});
}
}