diff --git a/src/NATS.Server/Raft/RaftNode.cs b/src/NATS.Server/Raft/RaftNode.cs index 018d06e..0765151 100644 --- a/src/NATS.Server/Raft/RaftNode.cs +++ b/src/NATS.Server/Raft/RaftNode.cs @@ -612,6 +612,18 @@ public sealed class RaftNode : IDisposable await Log.PersistAsync(Path.Combine(dir, "log.json"), ct); await File.WriteAllTextAsync(Path.Combine(dir, "term.txt"), TermState.CurrentTerm.ToString(), ct); await File.WriteAllTextAsync(Path.Combine(dir, "applied.txt"), AppliedIndex.ToString(), ct); + + // Persist term and VotedFor together in meta.json for atomic durable state. + // Go reference: raft.go storeMeta / writeTermVote (term + votedFor written atomically) + var meta = new RaftMetaState + { + CurrentTerm = TermState.CurrentTerm, + VotedFor = TermState.VotedFor, + }; + await File.WriteAllTextAsync( + Path.Combine(dir, "meta.json"), + System.Text.Json.JsonSerializer.Serialize(meta), + ct); } public async Task LoadPersistedStateAsync(CancellationToken ct) @@ -619,9 +631,25 @@ public sealed class RaftNode : IDisposable var dir = _persistDirectory ?? Path.Combine(Path.GetTempPath(), "natsdotnet-raft", Id); Log = await RaftLog.LoadAsync(Path.Combine(dir, "log.json"), ct); - var termPath = Path.Combine(dir, "term.txt"); - if (File.Exists(termPath) && int.TryParse(await File.ReadAllTextAsync(termPath, ct), out var term)) - TermState.CurrentTerm = term; + // Load from meta.json first (includes VotedFor); fall back to legacy term.txt + var metaPath = Path.Combine(dir, "meta.json"); + if (File.Exists(metaPath)) + { + var json = await File.ReadAllTextAsync(metaPath, ct); + var meta = System.Text.Json.JsonSerializer.Deserialize(json); + if (meta is not null) + { + TermState.CurrentTerm = meta.CurrentTerm; + TermState.VotedFor = meta.VotedFor; + } + } + else + { + // Legacy: term.txt only (no VotedFor) + var termPath = Path.Combine(dir, "term.txt"); + if (File.Exists(termPath) && int.TryParse(await File.ReadAllTextAsync(termPath, ct), out var term)) + TermState.CurrentTerm = term; + } var appliedPath = Path.Combine(dir, "applied.txt"); if (File.Exists(appliedPath) && long.TryParse(await File.ReadAllTextAsync(appliedPath, ct), out var applied)) @@ -630,6 +658,13 @@ public sealed class RaftNode : IDisposable AppliedIndex = Log.Entries[^1].Index; } + /// Durable term + vote metadata written alongside the log. + private sealed class RaftMetaState + { + public int CurrentTerm { get; set; } + public string? VotedFor { get; set; } + } + public void Dispose() { StopElectionTimer(); diff --git a/src/NATS.Server/Raft/RaftWal.cs b/src/NATS.Server/Raft/RaftWal.cs new file mode 100644 index 0000000..8587c05 --- /dev/null +++ b/src/NATS.Server/Raft/RaftWal.cs @@ -0,0 +1,256 @@ +using System.Buffers.Binary; +using System.IO.Hashing; +using System.Text; + +namespace NATS.Server.Raft; + +/// +/// Binary write-ahead log for RAFT log entries. +/// +/// File layout: +/// Header: [4:magic="NWAL"][4:version_le=1] +/// Records (repeated): +/// [4:length_le][8:index_le][4:term_le][4:crc32_le][N:utf8_command] +/// where length = 8 + 4 + 4 + N (payload bytes counted after the length field itself) +/// +/// CRC32 is computed over the index, term, and command bytes of each record. +/// +/// Go reference: server/raft.go WAL / wal.go (binary WAL format) +/// +public sealed class RaftWal : IDisposable +{ + private static ReadOnlySpan Magic => "NWAL"u8; + private const int Version = 1; + + // Header: 4 bytes magic + 4 bytes version + private const int HeaderSize = 8; + + // After the 4-byte length field, each record payload contains: + // 8 bytes index + 4 bytes term + 4 bytes crc32 + N bytes command + private const int LengthFieldSize = 4; + private const int IndexFieldSize = 8; + private const int TermFieldSize = 4; + private const int Crc32FieldSize = 4; + private const int RecordFixedPayloadSize = IndexFieldSize + TermFieldSize + Crc32FieldSize; // 16 + + private readonly string _path; + private FileStream _stream; + private readonly List _entries = []; + + /// + /// Opens or creates a WAL file at the given path. Writes the file header if the file is new. + /// The file is opened with FileShare.Read so readers (Load) can access it concurrently. + /// + public RaftWal(string path) + { + _path = path; + var isNew = !File.Exists(path); + _stream = OpenWriteStream(path, isNew ? FileMode.Create : FileMode.Open); + + if (isNew) + { + WriteHeaderTo(_stream); + _stream.Flush(flushToDisk: true); + } + else + { + _stream.Seek(0, SeekOrigin.End); + } + } + + private RaftWal(string path, FileStream stream, List entries) + { + _path = path; + _stream = stream; + _entries = entries; + } + + /// All in-memory entries loaded from or appended to this WAL. + public IEnumerable Entries => _entries; + + /// + /// Appends a single RAFT log entry to the WAL file and the in-memory list. + /// + public async Task AppendAsync(RaftLogEntry entry) + { + var commandBytes = Encoding.UTF8.GetBytes(entry.Command); + var recordPayloadLength = RecordFixedPayloadSize + commandBytes.Length; + + // Build the full record: [length(4)][index(8)][term(4)][crc32(4)][command(N)] + var record = new byte[LengthFieldSize + recordPayloadLength]; + var span = record.AsSpan(); + + BinaryPrimitives.WriteInt32LittleEndian(span[..4], recordPayloadLength); + BinaryPrimitives.WriteInt64LittleEndian(span[4..12], entry.Index); + BinaryPrimitives.WriteInt32LittleEndian(span[12..16], entry.Term); + + // CRC32 over: index(8) + term(4) + command(N) + uint crc = ComputeCrc(span[4..12], span[12..16], commandBytes); + BinaryPrimitives.WriteUInt32LittleEndian(span[16..20], crc); + + commandBytes.CopyTo(record, 20); + + _stream.Seek(0, SeekOrigin.End); + await _stream.WriteAsync(record); + _entries.Add(entry); + } + + /// + /// Flushes the WAL to disk (fsync). + /// + public async Task SyncAsync() + { + await _stream.FlushAsync(); + _stream.Flush(flushToDisk: true); + } + + /// + /// Rewrites the WAL keeping only entries with index > upToIndex, using a temp file and atomic rename. + /// Also updates the in-memory list. + /// + public async Task CompactAsync(long upToIndex) + { + var remaining = _entries.Where(e => e.Index > upToIndex).ToList(); + var tmpPath = _path + ".tmp"; + + await using (var tmp = new FileStream(tmpPath, FileMode.Create, FileAccess.Write, + FileShare.None, bufferSize: 4096, useAsync: true)) + { + WriteHeaderTo(tmp); + foreach (var entry in remaining) + await WriteEntryTo(tmp, entry); + tmp.Flush(flushToDisk: true); + } + + // Atomic replace: close current stream, rename temp over original, reopen + _stream.Dispose(); + File.Move(tmpPath, _path, overwrite: true); + _stream = OpenWriteStream(_path, FileMode.Open); + _stream.Seek(0, SeekOrigin.End); + + _entries.Clear(); + _entries.AddRange(remaining); + } + + /// + /// Scans a WAL file, validates records via CRC32, stops at the first corrupt or truncated + /// record, and returns a populated RaftWal ready for append. + /// + public static RaftWal Load(string path) + { + var entries = new List(); + + if (!File.Exists(path)) + return new RaftWal(path); // creates new file with header + + // Read the file content while allowing concurrent writers (FileShare.ReadWrite) + byte[] bytes; + using (var fs = new FileStream(path, FileMode.Open, FileAccess.Read, + FileShare.ReadWrite, bufferSize: 4096, useAsync: false)) + { + bytes = new byte[fs.Length]; + var read = 0; + while (read < bytes.Length) + read += fs.Read(bytes, read, bytes.Length - read); + } + + var pos = 0; + + // Validate magic header + if (bytes.Length < HeaderSize) + goto done; + + if (bytes[0] != Magic[0] || bytes[1] != Magic[1] || + bytes[2] != Magic[2] || bytes[3] != Magic[3]) + goto done; // Unrecognized file — return empty + + // Skip version field (bytes 4..7) + pos = HeaderSize; + + while (pos < bytes.Length) + { + // Need at least the length field + if (pos + LengthFieldSize > bytes.Length) + break; + + var recordPayloadLength = BinaryPrimitives.ReadInt32LittleEndian(bytes.AsSpan(pos, LengthFieldSize)); + pos += LengthFieldSize; + + // Sanity-check payload length + if (recordPayloadLength < RecordFixedPayloadSize || pos + recordPayloadLength > bytes.Length) + break; // Truncated or corrupt record — stop + + var indexSpan = bytes.AsSpan(pos, IndexFieldSize); + var termSpan = bytes.AsSpan(pos + IndexFieldSize, TermFieldSize); + var storedCrc = BinaryPrimitives.ReadUInt32LittleEndian( + bytes.AsSpan(pos + IndexFieldSize + TermFieldSize, Crc32FieldSize)); + var commandLength = recordPayloadLength - RecordFixedPayloadSize; + var commandBytes = bytes.AsSpan(pos + RecordFixedPayloadSize, commandLength).ToArray(); + + uint computedCrc = ComputeCrc(indexSpan, termSpan, commandBytes); + if (computedCrc != storedCrc) + break; // CRC mismatch — corrupt record, truncate from here + + var index = BinaryPrimitives.ReadInt64LittleEndian(indexSpan); + var term = BinaryPrimitives.ReadInt32LittleEndian(termSpan); + var command = Encoding.UTF8.GetString(commandBytes); + + entries.Add(new RaftLogEntry(index, term, command)); + pos += recordPayloadLength; + } + +done: + var stream = OpenWriteStream(path, FileMode.Open); + stream.Seek(0, SeekOrigin.End); + return new RaftWal(path, stream, entries); + } + + public void Dispose() + { + _stream.Dispose(); + } + + // --- Private helpers --- + + /// Opens the WAL file for reading and appending with FileShare.Read to allow concurrent readers. + private static FileStream OpenWriteStream(string path, FileMode mode) => + new(path, mode, FileAccess.ReadWrite, FileShare.Read, bufferSize: 4096, useAsync: true); + + private static void WriteHeaderTo(Stream stream) + { + stream.Write(Magic); + Span version = stackalloc byte[4]; + BinaryPrimitives.WriteInt32LittleEndian(version, Version); + stream.Write(version); + } + + private static async Task WriteEntryTo(Stream stream, RaftLogEntry entry) + { + var commandBytes = Encoding.UTF8.GetBytes(entry.Command); + var recordPayloadLength = RecordFixedPayloadSize + commandBytes.Length; + var record = new byte[LengthFieldSize + recordPayloadLength]; + var span = record.AsSpan(); + + BinaryPrimitives.WriteInt32LittleEndian(span[..4], recordPayloadLength); + BinaryPrimitives.WriteInt64LittleEndian(span[4..12], entry.Index); + BinaryPrimitives.WriteInt32LittleEndian(span[12..16], entry.Term); + + uint crc = ComputeCrc(span[4..12], span[12..16], commandBytes); + BinaryPrimitives.WriteUInt32LittleEndian(span[16..20], crc); + + commandBytes.CopyTo(record, 20); + await stream.WriteAsync(record); + } + + private static uint ComputeCrc(ReadOnlySpan indexBytes, ReadOnlySpan termBytes, byte[] commandBytes) + { + // Build contiguous buffer for CRC: index(8) + term(4) + command(N) + var buffer = new byte[indexBytes.Length + termBytes.Length + commandBytes.Length]; + indexBytes.CopyTo(buffer); + termBytes.CopyTo(buffer.AsSpan(indexBytes.Length)); + commandBytes.CopyTo(buffer, indexBytes.Length + termBytes.Length); + + // System.IO.Hashing.Crc32 — IEEE 802.3 polynomial (same as zlib CRC32) + return Crc32.HashToUInt32(buffer); + } +} diff --git a/tests/NATS.Server.Tests/Raft/RaftWalTests.cs b/tests/NATS.Server.Tests/Raft/RaftWalTests.cs new file mode 100644 index 0000000..0733132 --- /dev/null +++ b/tests/NATS.Server.Tests/Raft/RaftWalTests.cs @@ -0,0 +1,147 @@ +using NATS.Server.Raft; + +// Go reference: server/raft.go (WAL binary format, compaction, CRC integrity) + +namespace NATS.Server.Tests.Raft; + +public class RaftWalTests : IDisposable +{ + private readonly string _root; + + public RaftWalTests() + { + _root = Path.Combine(Path.GetTempPath(), $"nats-wal-{Guid.NewGuid():N}"); + Directory.CreateDirectory(_root); + } + + public void Dispose() + { + if (Directory.Exists(_root)) + Directory.Delete(_root, recursive: true); + } + + // Go reference: server/raft.go WAL append + recover + [Fact] + public async Task Wal_persists_and_recovers_entries() + { + var walPath = Path.Combine(_root, "raft.wal"); + + // Write entries + { + using var wal = new RaftWal(walPath); + await wal.AppendAsync(new RaftLogEntry(1, 1, "cmd-1")); + await wal.AppendAsync(new RaftLogEntry(2, 1, "cmd-2")); + await wal.AppendAsync(new RaftLogEntry(3, 2, "cmd-3")); + await wal.SyncAsync(); + } + + // Recover + using var recovered = RaftWal.Load(walPath); + var entries = recovered.Entries.ToList(); + entries.Count.ShouldBe(3); + entries[0].Index.ShouldBe(1); + entries[0].Term.ShouldBe(1); + entries[0].Command.ShouldBe("cmd-1"); + entries[2].Index.ShouldBe(3); + entries[2].Term.ShouldBe(2); + } + + // Go reference: server/raft.go compactLog + [Fact] + public async Task Wal_compact_removes_old_entries() + { + var walPath = Path.Combine(_root, "compact.wal"); + + using var wal = new RaftWal(walPath); + for (int i = 1; i <= 10; i++) + await wal.AppendAsync(new RaftLogEntry(i, 1, $"cmd-{i}")); + await wal.SyncAsync(); + + await wal.CompactAsync(5); // remove entries 1-5 + + using var recovered = RaftWal.Load(walPath); + recovered.Entries.Count().ShouldBe(5); + recovered.Entries.First().Index.ShouldBe(6); + } + + // Go reference: server/raft.go WAL crash-truncation tolerance + [Fact] + public async Task Wal_handles_truncated_file() + { + var walPath = Path.Combine(_root, "truncated.wal"); + + { + using var wal = new RaftWal(walPath); + await wal.AppendAsync(new RaftLogEntry(1, 1, "good-entry")); + await wal.AppendAsync(new RaftLogEntry(2, 1, "will-be-truncated")); + await wal.SyncAsync(); + } + + // Truncate last few bytes to simulate crash + using (var fs = File.OpenWrite(walPath)) + fs.SetLength(fs.Length - 3); + + using var recovered = RaftWal.Load(walPath); + recovered.Entries.Count().ShouldBeGreaterThanOrEqualTo(1); + recovered.Entries.First().Command.ShouldBe("good-entry"); + } + + // Go reference: server/raft.go storeMeta (term + votedFor persistence) + [Fact] + public async Task RaftNode_persists_term_and_vote() + { + var dir = Path.Combine(_root, "node-persist"); + Directory.CreateDirectory(dir); + + { + using var node = new RaftNode("n1", persistDirectory: dir); + node.TermState.CurrentTerm = 5; + node.TermState.VotedFor = "n2"; + await node.PersistAsync(default); + } + + using var recovered = new RaftNode("n1", persistDirectory: dir); + await recovered.LoadPersistedStateAsync(default); + recovered.Term.ShouldBe(5); + recovered.TermState.VotedFor.ShouldBe("n2"); + } + + // Go reference: server/raft.go WAL empty file edge case + [Fact] + public async Task Wal_empty_file_loads_no_entries() + { + var walPath = Path.Combine(_root, "empty.wal"); + + { + using var wal = new RaftWal(walPath); + await wal.SyncAsync(); + } + + using var recovered = RaftWal.Load(walPath); + recovered.Entries.Count().ShouldBe(0); + } + + // Go reference: server/raft.go WAL CRC integrity check + [Fact] + public async Task Wal_crc_validates_record_integrity() + { + var walPath = Path.Combine(_root, "crc.wal"); + + { + using var wal = new RaftWal(walPath); + await wal.AppendAsync(new RaftLogEntry(1, 1, "valid")); + await wal.AppendAsync(new RaftLogEntry(2, 1, "also-valid")); + await wal.SyncAsync(); + } + + // Corrupt one byte in the tail of the file (inside the second record) + var bytes = File.ReadAllBytes(walPath); + bytes[^5] ^= 0xFF; + File.WriteAllBytes(walPath, bytes); + + // Load should recover at least the first record, stopping at the corrupt second + using var recovered = RaftWal.Load(walPath); + recovered.Entries.Count().ShouldBeGreaterThanOrEqualTo(1); + recovered.Entries.First().Command.ShouldBe("valid"); + } +}