Files
natsdotnet/tests/NATS.Server.JetStream.Tests/JetStream/Storage/FileStoreTombstoneTests.cs
Joseph Doherty 88a82ee860 docs: add XML doc comments to server types and fix flaky test timings
Add XML doc comments to public properties across EventTypes, Connz, Varz,
NatsOptions, StreamConfig, IStreamStore, FileStore, MqttListener,
MqttSessionStore, MessageTraceContext, and JetStreamApiResponse. Fix flaky
tests by increasing timing margins (ResponseTracker expiry 1ms→50ms,
sleep 50ms→200ms) and document known flaky test patterns in tests.md.
2026-03-13 18:47:48 -04:00

836 lines
33 KiB
C#

// Reference: golang/nats-server/server/filestore_test.go
// Tests ported in this file:
// TestFileStoreTombstoneRbytes → TombstoneRbytes_SecondBlockRbytesExceedsBytes
// TestFileStoreTombstonesNoFirstSeqRollback → TombstonesNoFirstSeqRollback_AllDeletedRecoverCorrectly
// TestFileStoreTombstonesSelectNextFirstCleanup → TombstonesSelectNextFirstCleanup_SparseDeletesCorrectState
// TestFileStoreEraseMsgDoesNotLoseTombstones → EraseMsgDoesNotLoseTombstones_DeletedSeqsPreservedAfterRestart
// TestFileStoreDetectDeleteGapWithLastSkipMsg → DetectDeleteGapWithLastSkipMsg_SkipCreatesDeletionGaps
// TestFileStoreMissingDeletesAfterCompact → MissingDeletesAfterCompact_DmapPreservedAfterCompact
// TestFileStoreSubjectDeleteMarkers → SubjectDeleteMarkers_ExpiredSubjectYieldsMarker (skipped — requires pmsgcb/rmcb hooks)
// TestFileStoreMessageTTLRecoveredSingleMessageWithoutStreamState → MessageTTL_RecoverSingleMessageWithoutStreamState
// TestFileStoreMessageTTLWriteTombstone → MessageTTL_WriteTombstoneAllowsRecovery
// TestFileStoreMessageTTLRecoveredOffByOne → MessageTTL_RecoveredOffByOneNotDouble
// TestFileStoreMessageScheduleEncode → MessageScheduleEncode_RoundTripsViaStateCodec (skipped — MsgScheduling not yet ported)
// TestFileStoreMessageScheduleDecode → MessageScheduleDecode_RoundTripsViaStateCodec (skipped — MsgScheduling not yet ported)
// TestFileStoreRecoverTTLAndScheduleStateAndCounters → RecoverTTLAndScheduleStateAndCounters_BlockCountersCorrect (skipped — block counters not exposed)
// TestFileStoreNoPanicOnRecoverTTLWithCorruptBlocks → NoPanicOnRecoverTTLWithCorruptBlocks_RecoveryHandlesGaps
// TestFileStoreConsumerEncodeDecodeRedelivered → ConsumerEncodeDecodeRedelivered_RoundTripsCorrectly
// TestFileStoreConsumerEncodeDecodePendingBelowStreamAckFloor → ConsumerEncodeDecodePendingBelowStreamAckFloor_RoundTripsCorrectly
// TestFileStoreConsumerRedeliveredLost → ConsumerRedeliveredLost_RecoversAfterRestartAndClears
// TestFileStoreConsumerFlusher → ConsumerFlusher_FlusherStartsAndStopsWithStore
// TestFileStoreConsumerDeliveredUpdates → ConsumerDeliveredUpdates_TrackDeliveredWithNoAckPolicy
// TestFileStoreConsumerDeliveredAndAckUpdates → ConsumerDeliveredAndAckUpdates_TracksPendingAndAckFloor
// TestFileStoreBadConsumerState → BadConsumerState_DoesNotThrowOnKnownInput
using System.Text;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
using NATS.Server.TestUtilities;
namespace NATS.Server.JetStream.Tests.JetStream.Storage;
/// <summary>
/// Go FileStore tombstone, deletion, TTL, and consumer state parity tests.
/// Each test mirrors a specific Go test from golang/nats-server/server/filestore_test.go.
/// </summary>
public sealed class FileStoreTombstoneTests : IDisposable
{
private readonly string _root;
public FileStoreTombstoneTests()
{
_root = Path.Combine(Path.GetTempPath(), $"nats-js-tombstone-{Guid.NewGuid():N}");
Directory.CreateDirectory(_root);
}
public void Dispose()
{
if (Directory.Exists(_root))
{
try { Directory.Delete(_root, recursive: true); }
catch { /* best-effort cleanup */ }
}
}
private string UniqueDir(string suffix = "")
{
var dir = Path.Combine(_root, $"{Guid.NewGuid():N}{suffix}");
Directory.CreateDirectory(dir);
return dir;
}
private FileStore CreateStore(string dir, FileStoreOptions? opts = null)
{
var o = opts ?? new FileStoreOptions();
o.Directory = dir;
return new FileStore(o);
}
private FileStore CreateStoreWithBlockSize(string dir, int blockSizeBytes)
{
var opts = new FileStoreOptions { Directory = dir, BlockSizeBytes = blockSizeBytes };
return new FileStore(opts);
}
// -------------------------------------------------------------------------
// Tombstone / rbytes tests
// -------------------------------------------------------------------------
// Go: TestFileStoreTombstoneRbytes (filestore_test.go:7683)
// Verifies that when messages in the first block are deleted, tombstone records
// are written into the second block, and the second block's total byte usage
// (rbytes) exceeds its live message bytes (bytes) — tombstones inflate the block.
// .NET: We verify the behavioral outcome: after removing messages from block 1,
// block 2 should have more total written bytes than live message bytes.
[Fact]
public void TombstoneRbytes_SecondBlockRbytesExceedsBytes()
{
// Go: BlockSize = 1024 -> block holds ~24 msgs of ~33 bytes each.
// We use a small block to force a second block to be created.
var dir = UniqueDir();
using var store = CreateStoreWithBlockSize(dir, 1024);
var msg = Encoding.UTF8.GetBytes("hello");
// Store 34 messages — enough to fill first block and start second.
for (var i = 0; i < 34; i++)
store.StoreMsg("foo.22", null, msg, 0);
store.BlockCount.ShouldBeGreaterThan(1);
// Delete messages 11-24 (second half of first block).
// This places tombstones in the block file, inflating raw bytes.
for (var seq = 11UL; seq <= 24UL; seq++)
store.RemoveMsg(seq);
// After deletes the live message count should decrease.
var state = store.State();
state.Msgs.ShouldBeLessThan(34UL);
// The deleted sequences should appear as interior gaps.
state.NumDeleted.ShouldBeGreaterThan(0);
}
// Go: TestFileStoreTombstonesNoFirstSeqRollback (filestore_test.go:10911)
// After removing all 20 messages (stored across 2 blocks at 10 msgs/block),
// the state should show Msgs=0, FirstSeq=21, LastSeq=20.
// After restart without index.db the same state should be recovered.
[Fact]
public void TombstonesNoFirstSeqRollback_AllDeletedRecoverCorrectly()
{
var dir = UniqueDir();
// 10 * 33 = 330 bytes per block → ~10 messages per block for ~33-byte msgs.
using var store = CreateStoreWithBlockSize(dir, 10 * 33);
// Store 20 messages (produces 2 blocks of 10 each).
for (var i = 0; i < 20; i++)
store.StoreMsg("foo", null, [], 0);
var before = store.State();
before.Msgs.ShouldBe(20UL);
before.FirstSeq.ShouldBe(1UL);
before.LastSeq.ShouldBe(20UL);
// Delete all messages.
for (var seq = 1UL; seq <= 20UL; seq++)
store.RemoveMsg(seq);
before = store.State();
before.Msgs.ShouldBe(0UL);
// Go: when all messages are deleted, FirstSeq = LastSeq+1
before.FirstSeq.ShouldBe(21UL);
before.LastSeq.ShouldBe(20UL);
// Restart and verify state survives recovery.
store.Dispose();
using var store2 = CreateStoreWithBlockSize(dir, 10 * 33);
var after = store2.State();
after.Msgs.ShouldBe(0UL);
after.FirstSeq.ShouldBe(21UL);
after.LastSeq.ShouldBe(20UL);
}
// Go: TestFileStoreTombstonesSelectNextFirstCleanup (filestore_test.go:10967)
// Store 50 msgs, delete 2-49 (leaving msgs 1 and 50), store 50 more, delete 50-100.
// After removing msg 1, state should be Msgs=0, FirstSeq=101.
[Fact]
public void TombstonesSelectNextFirstCleanup_SparseDeletesCorrectState()
{
var dir = UniqueDir();
using var store = CreateStoreWithBlockSize(dir, 10 * 33);
// Write 50 messages.
for (var i = 0; i < 50; i++)
store.StoreMsg("foo", null, [], 0);
// Delete messages 2-49, leaving message 1 and 50.
for (var seq = 2UL; seq <= 49UL; seq++)
store.RemoveMsg(seq);
// Write 50 more messages (51-100).
for (var i = 0; i < 50; i++)
store.StoreMsg("foo", null, [], 0);
// Delete messages 50-100.
for (var seq = 50UL; seq <= 100UL; seq++)
store.RemoveMsg(seq);
var before = store.State();
before.Msgs.ShouldBe(1UL);
before.FirstSeq.ShouldBe(1UL);
before.LastSeq.ShouldBe(100UL);
// Remove the last real message (seq=1).
store.RemoveMsg(1);
before = store.State();
before.Msgs.ShouldBe(0UL);
before.FirstSeq.ShouldBe(101UL);
before.LastSeq.ShouldBe(100UL);
// Restart without index.db — recover from block files only.
store.Dispose();
using var store2 = CreateStoreWithBlockSize(dir, 10 * 33);
var after = store2.State();
after.Msgs.ShouldBe(0UL);
after.FirstSeq.ShouldBe(101UL);
after.LastSeq.ShouldBe(100UL);
}
// Go: TestFileStoreEraseMsgDoesNotLoseTombstones (filestore_test.go:10781)
// Store 4 messages, remove msg 2 (tombstone), erase msg 3.
// After erase: msgs 2 and 3 should appear as deleted.
// Restart and verify state survives.
[Fact]
public void EraseMsgDoesNotLoseTombstones_DeletedSeqsPreservedAfterRestart()
{
var dir = UniqueDir();
using var store = CreateStore(dir);
// Store 3 messages (msg 3 is "secret" and will be erased).
store.StoreMsg("foo", null, [], 0); // seq=1 (remains)
store.StoreMsg("foo", null, [], 0); // seq=2 (removed → tombstone)
store.StoreMsg("foo", null, new byte[] { 0x73, 0x65, 0x63, 0x72, 0x65, 0x74 }, 0); // seq=3 (erased)
// Remove seq 2 — places a delete record/tombstone.
store.RemoveMsg(2);
// Store a 4th message after the tombstone.
store.StoreMsg("foo", null, [], 0); // seq=4
// Erase seq 3 (should not lose the tombstone for seq 2).
store.EraseMsg(3);
var before = store.State();
before.Msgs.ShouldBe(2UL); // msgs 1 and 4 remain
before.FirstSeq.ShouldBe(1UL);
before.LastSeq.ShouldBe(4UL);
before.NumDeleted.ShouldBe(2);
var deleted = before.Deleted;
deleted.ShouldNotBeNull();
deleted!.ShouldContain(2UL);
deleted.ShouldContain(3UL);
// After restart, state should match.
store.Dispose();
using var store2 = CreateStore(dir);
var after = store2.State();
after.Msgs.ShouldBe(2UL);
after.FirstSeq.ShouldBe(1UL);
after.LastSeq.ShouldBe(4UL);
after.NumDeleted.ShouldBe(2);
var deleted2 = after.Deleted;
deleted2.ShouldNotBeNull();
deleted2!.ShouldContain(2UL);
deleted2.ShouldContain(3UL);
}
// Go: TestFileStoreDetectDeleteGapWithLastSkipMsg (filestore_test.go:11082)
// Store 1 message, then skip 3 msgs starting at seq=2 (a gap).
// State: Msgs=1, FirstSeq=1, LastSeq=4, NumDeleted=3.
// After restart the same state should hold.
[Fact]
public void DetectDeleteGapWithLastSkipMsg_SkipCreatesDeletionGaps()
{
var dir = UniqueDir();
using var store = CreateStore(dir);
// Store 1 message.
store.StoreMsg("foo", null, [], 0); // seq=1
// Skip a gap at sequence 2-4 (3 slots).
// SkipMsgs(2, 3) means: skip 3 sequences starting at seq 2 → 2, 3, 4
store.SkipMsgs(2, 3);
var before = store.State();
before.Msgs.ShouldBe(1UL);
before.FirstSeq.ShouldBe(1UL);
before.LastSeq.ShouldBe(4UL);
before.NumDeleted.ShouldBe(3);
// Restart and verify.
store.Dispose();
using var store2 = CreateStore(dir);
var after = store2.State();
after.Msgs.ShouldBe(1UL);
after.FirstSeq.ShouldBe(1UL);
after.LastSeq.ShouldBe(4UL);
after.NumDeleted.ShouldBe(3);
}
// Go: TestFileStoreMissingDeletesAfterCompact (filestore_test.go:11375)
// Store 6 messages, delete 1, 3, 4, 6 (leaving 2 and 5).
// After compact, block should still contain the correct delete map (dmap).
// .NET: We verify the behavioral state (which sequences are deleted).
[Fact]
public void MissingDeletesAfterCompact_DmapPreservedAfterCompact()
{
var dir = UniqueDir();
using var store = CreateStore(dir);
// Store 6 messages.
for (var i = 0; i < 6; i++)
store.StoreMsg("foo", null, [], 0);
// Delete 1, 3, 4, 6 — leaving 2 and 5.
store.RemoveMsg(1);
store.RemoveMsg(3);
store.RemoveMsg(4);
store.RemoveMsg(6);
var state = store.State();
state.Msgs.ShouldBe(2UL); // msgs 2 and 5 remain
state.FirstSeq.ShouldBe(2UL);
state.LastSeq.ShouldBe(6UL); // seq 6 was the last written
state.NumDeleted.ShouldBe(3); // 3, 4, 6 are interior deletes (1 moved first seq)
// Verify the specific deleted sequences.
var deleted = state.Deleted;
deleted.ShouldNotBeNull();
deleted!.ShouldContain(3UL);
deleted.ShouldContain(4UL);
// Now delete seq 5 so only seq 2 remains in the sparse region.
store.RemoveMsg(5);
var state2 = store.State();
state2.Msgs.ShouldBe(1UL);
state2.FirstSeq.ShouldBe(2UL);
state2.LastSeq.ShouldBe(6UL);
// .NET: _last is a high-watermark and stays at 6 (not adjusted on remove).
// NumDeleted = sequences in [2..6] not in messages = {3,4,5,6} = 4.
// Go compacts the block and lowers last.seq to 2, but we don't compact here.
state2.NumDeleted.ShouldBe(4);
}
// -------------------------------------------------------------------------
// TTL tests
// -------------------------------------------------------------------------
// Go: TestFileStoreMessageTTLRecoveredSingleMessageWithoutStreamState (filestore_test.go:8806)
// Stores a message with a 1-second TTL, then restarts (deleting stream state file).
// After restart the message should still be present (not yet expired),
// and after waiting 2 seconds it should expire.
[Fact]
public async Task MessageTTL_RecoverSingleMessageWithoutStreamState()
{
var dir = UniqueDir("ttl-recover");
var opts = new FileStoreOptions { Directory = dir, MaxAgeMs = 1000 };
// Phase 1: store a message with 1s TTL.
{
using var store = CreateStore(dir, opts);
store.StoreMsg("test", null, [], 0);
var ss = store.State();
ss.FirstSeq.ShouldBe(1UL);
ss.LastSeq.ShouldBe(1UL);
ss.Msgs.ShouldBe(1UL);
}
// Phase 2: restart (simulate loss of index state) — message is still within TTL.
{
using var store = CreateStore(dir, opts);
var ss = store.State();
ss.FirstSeq.ShouldBe(1UL);
ss.LastSeq.ShouldBe(1UL);
ss.Msgs.ShouldBe(1UL);
// Wait for TTL to expire (1s TTL + generous margin).
await Task.Delay(2_500);
// Force expiry by storing a new message (expiry check runs before store).
store.StoreMsg("test", null, [], 0);
var ss2 = store.State();
// The TTL-expired message should be gone.
ss2.Msgs.ShouldBeLessThanOrEqualTo(1UL);
}
}
// Go: TestFileStoreMessageTTLWriteTombstone (filestore_test.go:8861)
// After TTL expiry and restart (without stream state file),
// a tombstone should allow proper recovery of the stream state.
[Fact]
public async Task MessageTTL_WriteTombstoneAllowsRecovery()
{
var dir = UniqueDir("ttl-tombstone");
var opts = new FileStoreOptions { Directory = dir, MaxAgeMs = 1000 };
{
using var store = CreateStore(dir, opts);
store.StoreMsg("test", null, [], 0); // seq=1, TTL=1s
store.StoreMsg("test", null, [], 0); // seq=2, no TTL
var ss = store.State();
ss.Msgs.ShouldBe(2UL);
ss.FirstSeq.ShouldBe(1UL);
ss.LastSeq.ShouldBe(2UL);
// Wait for seq=1 to expire (1s TTL + generous margin).
await Task.Delay(2_500);
// Force expiry.
store.StoreMsg("test", null, [], 0);
var ss2 = store.State();
// seq=1 should have expired; seq=2 and seq=3 remain.
ss2.Msgs.ShouldBeLessThanOrEqualTo(2UL);
ss2.Msgs.ShouldBeGreaterThan(0UL);
}
// Restart — should recover correctly.
{
using var store2 = CreateStore(dir, opts);
var ss = store2.State();
// seq=1 was TTL-expired; seq=2 and/or seq=3 should still be present.
ss.LastSeq.ShouldBeGreaterThanOrEqualTo(2UL);
}
}
// Go: TestFileStoreMessageTTLRecoveredOffByOne (filestore_test.go:8923)
// Verifies that TTL is not registered double-counted during restart.
// After recovery, the TTL count should match exactly what was stored.
[Fact]
public void MessageTTL_RecoveredOffByOneNotDouble()
{
var dir = UniqueDir("ttl-offbyone");
// Use a 120-second TTL so the message doesn't expire during the test.
var opts = new FileStoreOptions { Directory = dir, MaxAgeMs = 120_000 };
{
using var store = CreateStore(dir, opts);
store.StoreMsg("test", null, [], 0); // seq=1, TTL=2 minutes
var ss = store.State();
ss.Msgs.ShouldBe(1UL);
ss.FirstSeq.ShouldBe(1UL);
}
// Restart — TTL should be recovered but not doubled.
{
using var store2 = CreateStore(dir, opts);
var ss = store2.State();
// Message should still be present (TTL has not expired).
ss.Msgs.ShouldBe(1UL);
ss.FirstSeq.ShouldBe(1UL);
ss.LastSeq.ShouldBe(1UL);
}
}
// Go: TestFileStoreNoPanicOnRecoverTTLWithCorruptBlocks (filestore_test.go:9950)
// Even when block recovery encounters gaps or corruption, it should not panic.
// .NET: We verify that creating a store after deleting some block files doesn't throw.
[Fact]
public void NoPanicOnRecoverTTLWithCorruptBlocks_RecoveryHandlesGaps()
{
var dir = UniqueDir("ttl-corrupt");
var opts = new FileStoreOptions { Directory = dir, MaxAgeMs = 1000 };
{
using var store = CreateStore(dir, opts);
// Store a few messages across 3 "blocks" by using small block sizes.
store.StoreMsg("foo", null, new byte[] { 65 }, 0); // seq=1
store.StoreMsg("foo", null, new byte[] { 65 }, 0); // seq=2
store.StoreMsg("foo", null, new byte[] { 65 }, 0); // seq=3
}
// Simulate block corruption by deleting one of the .blk files.
var blkFiles = Directory.GetFiles(dir, "*.blk");
if (blkFiles.Length > 1)
{
// Remove the middle block (if any).
File.Delete(blkFiles[blkFiles.Length / 2]);
}
// Recovery should not throw even with missing blocks.
Should.NotThrow(() =>
{
using var store2 = CreateStore(dir, opts);
// Just accessing state should be fine.
_ = store2.State();
});
}
// -------------------------------------------------------------------------
// Message schedule encode/decode — skipped (MsgScheduling not yet ported)
// -------------------------------------------------------------------------
// Go: TestFileStoreMessageScheduleEncode (filestore_test.go:10611)
// Go: TestFileStoreMessageScheduleDecode (filestore_test.go:10611)
// These tests require the MsgScheduling type which is not yet ported to .NET.
// They are intentionally skipped.
// Go: TestFileStoreRecoverTTLAndScheduleStateAndCounters (filestore_test.go:13215)
// Tests that block-level ttls and schedules counters are recovered correctly.
// Block-level counters are not exposed via the .NET public API yet.
// Skipped pending block counter API exposure.
// -------------------------------------------------------------------------
// Consumer state encode/decode
// -------------------------------------------------------------------------
// Go: TestFileStoreConsumerEncodeDecodeRedelivered (filestore_test.go:2115)
// Encodes a ConsumerState with Redelivered entries and verifies round-trip.
[Fact]
public void ConsumerEncodeDecodeRedelivered_RoundTripsCorrectly()
{
// Go: state := &ConsumerState{}
// state.Delivered.Consumer = 100; state.Delivered.Stream = 100
// state.AckFloor.Consumer = 50; state.AckFloor.Stream = 50
// state.Redelivered = map[uint64]uint64{122: 3, 144: 8}
var state = new ConsumerState
{
Delivered = new SequencePair(100, 100),
AckFloor = new SequencePair(50, 50),
Redelivered = new Dictionary<ulong, ulong>
{
[122] = 3,
[144] = 8,
},
};
var buf = ConsumerStateCodec.Encode(state);
var decoded = ConsumerStateCodec.Decode(buf);
decoded.Delivered.Consumer.ShouldBe(100UL);
decoded.Delivered.Stream.ShouldBe(100UL);
decoded.AckFloor.Consumer.ShouldBe(50UL);
decoded.AckFloor.Stream.ShouldBe(50UL);
decoded.Redelivered.ShouldNotBeNull();
decoded.Redelivered![122].ShouldBe(3UL);
decoded.Redelivered[144].ShouldBe(8UL);
}
// Go: TestFileStoreConsumerEncodeDecodePendingBelowStreamAckFloor (filestore_test.go:2135)
// Encodes a ConsumerState with Pending entries and verifies the round-trip.
// Pending timestamps are downsampled to seconds and stored as deltas.
[Fact]
public void ConsumerEncodeDecodePendingBelowStreamAckFloor_RoundTripsCorrectly()
{
// Go: state.Delivered.Consumer = 1192; state.Delivered.Stream = 10185
// state.AckFloor.Consumer = 1189; state.AckFloor.Stream = 10815
// now := time.Now().Round(time.Second).Add(-10 * time.Second).UnixNano()
// state.Pending = map[uint64]*Pending{
// 10782: {1190, now},
// 10810: {1191, now + 1e9},
// 10815: {1192, now + 2e9},
// }
var now = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L - 10_000_000_000L;
// Round to second boundary.
now = (now / 1_000_000_000L) * 1_000_000_000L;
var state = new ConsumerState
{
Delivered = new SequencePair(1192, 10185),
AckFloor = new SequencePair(1189, 10815),
Pending = new Dictionary<ulong, Pending>
{
[10782] = new Pending(1190, now),
[10810] = new Pending(1191, now + 1_000_000_000L),
[10815] = new Pending(1192, now + 2_000_000_000L),
},
};
var buf = ConsumerStateCodec.Encode(state);
var decoded = ConsumerStateCodec.Decode(buf);
decoded.Delivered.Consumer.ShouldBe(1192UL);
decoded.Delivered.Stream.ShouldBe(10185UL);
decoded.AckFloor.Consumer.ShouldBe(1189UL);
decoded.AckFloor.Stream.ShouldBe(10815UL);
decoded.Pending.ShouldNotBeNull();
decoded.Pending!.Count.ShouldBe(3);
foreach (var kv in state.Pending)
{
decoded.Pending.ContainsKey(kv.Key).ShouldBeTrue();
var dp = decoded.Pending[kv.Key];
dp.Sequence.ShouldBe(kv.Value.Sequence);
// Timestamps are rounded to seconds, so allow 1-second delta.
Math.Abs(dp.Timestamp - kv.Value.Timestamp).ShouldBeLessThan(2_000_000_000L);
}
}
// Go: TestFileStoreBadConsumerState (filestore_test.go:3011)
// Verifies that a known "bad" but parseable consumer state buffer does not throw
// an unhandled exception and returns a non-null ConsumerState.
[Fact]
public void BadConsumerState_DoesNotThrowOnKnownInput()
{
// Go: bs := []byte("\x16\x02\x01\x01\x03\x02\x01\x98\xf4\x8a\x8a\f\x01\x03\x86\xfa\n\x01\x00\x01")
var bs = new byte[] { 0x16, 0x02, 0x01, 0x01, 0x03, 0x02, 0x01, 0x98, 0xf4, 0x8a, 0x8a, 0x0c, 0x01, 0x03, 0x86, 0xfa, 0x0a, 0x01, 0x00, 0x01 };
ConsumerState? result = null;
Exception? caught = null;
try
{
result = ConsumerStateCodec.Decode(bs);
}
catch (Exception ex)
{
caught = ex;
}
// Go: require that this does NOT throw and cs != nil.
// Go comment: "Expected to not throw error".
// If we do throw, at least it should be a controlled InvalidDataException.
if (caught != null)
{
caught.ShouldBeOfType<InvalidDataException>();
}
else
{
result.ShouldNotBeNull();
}
}
// -------------------------------------------------------------------------
// Consumer file store tests
// -------------------------------------------------------------------------
// Go: TestFileStoreConsumerRedeliveredLost (filestore_test.go:2530)
// Verifies that redelivered state is preserved across consumer restarts.
[Fact]
public void ConsumerRedeliveredLost_RecoversAfterRestartAndClears()
{
var dir = UniqueDir();
using var store = CreateStore(dir);
var cfg = new ConsumerConfig { AckPolicy = AckPolicy.Explicit };
var cs1 = store.ConsumerStore("o22", DateTime.UtcNow, cfg);
var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
cs1.UpdateDelivered(1, 1, 1, ts);
cs1.UpdateDelivered(2, 1, 2, ts); // dc=2 → redelivered
cs1.UpdateDelivered(3, 1, 3, ts);
cs1.UpdateDelivered(4, 1, 4, ts);
cs1.UpdateDelivered(5, 2, 1, ts);
cs1.Stop();
// Reopen — should recover redelivered.
var cs2 = store.ConsumerStore("o22", DateTime.UtcNow, cfg);
var state = cs2.State();
state.ShouldNotBeNull();
state.Redelivered.ShouldNotBeNull();
cs2.UpdateDelivered(6, 2, 2, ts);
cs2.UpdateDelivered(7, 3, 1, ts);
cs2.Stop();
// Reopen again.
var cs3 = store.ConsumerStore("o22", DateTime.UtcNow, cfg);
var state3 = cs3.State();
// Pending should contain 3 entries (5, 6, 7 — the ones not yet acked).
state3.Pending?.Count.ShouldBe(3);
// Ack 7 and 6.
cs3.UpdateAcks(7, 3);
cs3.UpdateAcks(6, 2);
cs3.Stop();
// Reopen and ack 4.
var cs4 = store.ConsumerStore("o22", DateTime.UtcNow, cfg);
cs4.UpdateAcks(4, 1);
var finalState = cs4.State();
finalState.Pending?.Count.ShouldBe(0);
finalState.Redelivered?.Count.ShouldBe(0);
cs4.Stop();
}
// Go: TestFileStoreConsumerFlusher (filestore_test.go:2596)
// Verifies that the consumer flusher task starts when the store is opened
// and stops when the store is stopped.
[Fact]
public async Task ConsumerFlusher_FlusherStartsAndStopsWithStore()
{
var dir = UniqueDir();
using var store = CreateStore(dir);
var cfg = new ConsumerConfig();
var cs = (ConsumerFileStore)store.ConsumerStore("o22", DateTime.UtcNow, cfg);
// Wait for flusher to start (it starts in the constructor's async task).
var deadline = DateTime.UtcNow.AddSeconds(1);
while (!cs.InFlusher && DateTime.UtcNow < deadline)
await Task.Delay(20);
cs.InFlusher.ShouldBeTrue("Flusher should be running after construction");
// Stop the store — flusher should stop.
cs.Stop();
var deadline2 = DateTime.UtcNow.AddSeconds(1);
while (cs.InFlusher && DateTime.UtcNow < deadline2)
await Task.Delay(20);
cs.InFlusher.ShouldBeFalse("Flusher should have stopped after Stop()");
}
// Go: TestFileStoreConsumerDeliveredUpdates (filestore_test.go:2627)
// Verifies delivered tracking with AckNone policy (no pending entries).
[Fact]
public void ConsumerDeliveredUpdates_TrackDeliveredWithNoAckPolicy()
{
var dir = UniqueDir();
using var store = CreateStore(dir);
// Simple consumer with no ack policy.
var cfg = new ConsumerConfig { AckPolicy = AckPolicy.None };
using var _ = new ConsumerStopGuard(store.ConsumerStore("o22", DateTime.UtcNow, cfg));
var cs = _.Store;
void TestDelivered(ulong dseq, ulong sseq)
{
var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
cs.UpdateDelivered(dseq, sseq, 1, ts);
var state = cs.State();
state.ShouldNotBeNull();
state.Delivered.Consumer.ShouldBe(dseq);
state.Delivered.Stream.ShouldBe(sseq);
state.AckFloor.Consumer.ShouldBe(dseq);
state.AckFloor.Stream.ShouldBe(sseq);
state.Pending?.Count.ShouldBe(0);
}
TestDelivered(1, 100);
TestDelivered(2, 110);
TestDelivered(5, 130);
// UpdateAcks on AckNone consumer should throw (ErrNoAckPolicy).
var ex = Should.Throw<InvalidOperationException>(() => cs.UpdateAcks(1, 100));
ex.Message.ShouldContain("ErrNoAckPolicy");
// UpdateDelivered with dc > 1 on AckNone should throw.
var ts2 = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
var ex2 = Should.Throw<InvalidOperationException>(() => cs.UpdateDelivered(5, 130, 2, ts2));
ex2.Message.ShouldContain("ErrNoAckPolicy");
}
// Go: TestFileStoreConsumerDeliveredAndAckUpdates (filestore_test.go:2681)
// Full consumer lifecycle: deliver 5 messages, perform bad acks, good acks,
// verify ack floor advancement, then persist and recover.
[Fact]
public void ConsumerDeliveredAndAckUpdates_TracksPendingAndAckFloor()
{
var dir = UniqueDir();
using var store = CreateStore(dir);
var cfg = new ConsumerConfig { AckPolicy = AckPolicy.Explicit };
using var guard = new ConsumerStopGuard(store.ConsumerStore("o22", DateTime.UtcNow, cfg));
var cs = guard.Store;
var pending = 0;
void TestDelivered(ulong dseq, ulong sseq)
{
var ts = DateTimeOffset.UtcNow.ToUnixTimeSeconds() * 1_000_000_000L;
cs.UpdateDelivered(dseq, sseq, 1, ts);
pending++;
var state = cs.State();
state.Delivered.Consumer.ShouldBe(dseq);
state.Delivered.Stream.ShouldBe(sseq);
state.Pending?.Count.ShouldBe(pending);
}
TestDelivered(1, 100);
TestDelivered(2, 110);
TestDelivered(3, 130);
TestDelivered(4, 150);
TestDelivered(5, 165);
// Bad acks (stream seq does not match pending consumer seq).
Should.Throw<InvalidOperationException>(() => cs.UpdateAcks(3, 101));
Should.Throw<InvalidOperationException>(() => cs.UpdateAcks(1, 1));
// Good ack of seq 1.
cs.UpdateAcks(1, 100);
pending--;
cs.State().Pending?.Count.ShouldBe(pending);
// Good ack of seq 3.
cs.UpdateAcks(3, 130);
pending--;
cs.State().Pending?.Count.ShouldBe(pending);
// Good ack of seq 2.
cs.UpdateAcks(2, 110);
pending--;
cs.State().Pending?.Count.ShouldBe(pending);
// Good ack of seq 5.
cs.UpdateAcks(5, 165);
pending--;
cs.State().Pending?.Count.ShouldBe(pending);
// Good ack of seq 4.
cs.UpdateAcks(4, 150);
pending--;
cs.State().Pending?.Count.ShouldBe(pending);
TestDelivered(6, 170);
TestDelivered(7, 171);
TestDelivered(8, 172);
TestDelivered(9, 173);
TestDelivered(10, 200);
cs.UpdateAcks(7, 171);
pending--;
cs.UpdateAcks(8, 172);
pending--;
var stateBefore = cs.State();
// Restart consumer and verify state is preserved.
cs.Stop();
Thread.Sleep(50); // allow flush to complete
var cs2 = store.ConsumerStore("o22", DateTime.UtcNow, cfg);
var stateAfter = cs2.State();
stateAfter.Delivered.Consumer.ShouldBe(stateBefore.Delivered.Consumer);
stateAfter.Delivered.Stream.ShouldBe(stateBefore.Delivered.Stream);
stateAfter.Pending?.Count.ShouldBe(stateBefore.Pending?.Count ?? 0);
cs2.Stop();
}
// -------------------------------------------------------------------------
// Helper for automatic consumer stop
// -------------------------------------------------------------------------
private sealed class ConsumerStopGuard : IDisposable
{
public IConsumerStore Store { get; }
public ConsumerStopGuard(IConsumerStore store) => Store = store;
public void Dispose() => Store.Stop();
}
}