diff --git a/src/NATS.Server/Raft/RaftNode.cs b/src/NATS.Server/Raft/RaftNode.cs index 0765151..946c2b1 100644 --- a/src/NATS.Server/Raft/RaftNode.cs +++ b/src/NATS.Server/Raft/RaftNode.cs @@ -610,7 +610,6 @@ public sealed class RaftNode : IDisposable var dir = _persistDirectory ?? Path.Combine(Path.GetTempPath(), "natsdotnet-raft", Id); Directory.CreateDirectory(dir); await Log.PersistAsync(Path.Combine(dir, "log.json"), ct); - await File.WriteAllTextAsync(Path.Combine(dir, "term.txt"), TermState.CurrentTerm.ToString(), ct); await File.WriteAllTextAsync(Path.Combine(dir, "applied.txt"), AppliedIndex.ToString(), ct); // Persist term and VotedFor together in meta.json for atomic durable state. diff --git a/src/NATS.Server/Raft/RaftWal.cs b/src/NATS.Server/Raft/RaftWal.cs index 8587c05..186097f 100644 --- a/src/NATS.Server/Raft/RaftWal.cs +++ b/src/NATS.Server/Raft/RaftWal.cs @@ -66,42 +66,24 @@ public sealed class RaftWal : IDisposable } /// All in-memory entries loaded from or appended to this WAL. - public IEnumerable Entries => _entries; + public IReadOnlyList Entries => _entries; /// /// Appends a single RAFT log entry to the WAL file and the in-memory list. /// public async Task AppendAsync(RaftLogEntry entry) { - var commandBytes = Encoding.UTF8.GetBytes(entry.Command); - var recordPayloadLength = RecordFixedPayloadSize + commandBytes.Length; - - // Build the full record: [length(4)][index(8)][term(4)][crc32(4)][command(N)] - var record = new byte[LengthFieldSize + recordPayloadLength]; - var span = record.AsSpan(); - - BinaryPrimitives.WriteInt32LittleEndian(span[..4], recordPayloadLength); - BinaryPrimitives.WriteInt64LittleEndian(span[4..12], entry.Index); - BinaryPrimitives.WriteInt32LittleEndian(span[12..16], entry.Term); - - // CRC32 over: index(8) + term(4) + command(N) - uint crc = ComputeCrc(span[4..12], span[12..16], commandBytes); - BinaryPrimitives.WriteUInt32LittleEndian(span[16..20], crc); - - commandBytes.CopyTo(record, 20); - - _stream.Seek(0, SeekOrigin.End); - await _stream.WriteAsync(record); + await WriteEntryTo(_stream, entry); _entries.Add(entry); } /// /// Flushes the WAL to disk (fsync). /// - public async Task SyncAsync() + public Task SyncAsync() { - await _stream.FlushAsync(); _stream.Flush(flushToDisk: true); + return Task.CompletedTask; } /// @@ -138,8 +120,6 @@ public sealed class RaftWal : IDisposable /// public static RaftWal Load(string path) { - var entries = new List(); - if (!File.Exists(path)) return new RaftWal(path); // creates new file with header @@ -154,52 +134,8 @@ public sealed class RaftWal : IDisposable read += fs.Read(bytes, read, bytes.Length - read); } - var pos = 0; + var entries = ParseEntries(bytes); - // Validate magic header - if (bytes.Length < HeaderSize) - goto done; - - if (bytes[0] != Magic[0] || bytes[1] != Magic[1] || - bytes[2] != Magic[2] || bytes[3] != Magic[3]) - goto done; // Unrecognized file — return empty - - // Skip version field (bytes 4..7) - pos = HeaderSize; - - while (pos < bytes.Length) - { - // Need at least the length field - if (pos + LengthFieldSize > bytes.Length) - break; - - var recordPayloadLength = BinaryPrimitives.ReadInt32LittleEndian(bytes.AsSpan(pos, LengthFieldSize)); - pos += LengthFieldSize; - - // Sanity-check payload length - if (recordPayloadLength < RecordFixedPayloadSize || pos + recordPayloadLength > bytes.Length) - break; // Truncated or corrupt record — stop - - var indexSpan = bytes.AsSpan(pos, IndexFieldSize); - var termSpan = bytes.AsSpan(pos + IndexFieldSize, TermFieldSize); - var storedCrc = BinaryPrimitives.ReadUInt32LittleEndian( - bytes.AsSpan(pos + IndexFieldSize + TermFieldSize, Crc32FieldSize)); - var commandLength = recordPayloadLength - RecordFixedPayloadSize; - var commandBytes = bytes.AsSpan(pos + RecordFixedPayloadSize, commandLength).ToArray(); - - uint computedCrc = ComputeCrc(indexSpan, termSpan, commandBytes); - if (computedCrc != storedCrc) - break; // CRC mismatch — corrupt record, truncate from here - - var index = BinaryPrimitives.ReadInt64LittleEndian(indexSpan); - var term = BinaryPrimitives.ReadInt32LittleEndian(termSpan); - var command = Encoding.UTF8.GetString(commandBytes); - - entries.Add(new RaftLogEntry(index, term, command)); - pos += recordPayloadLength; - } - -done: var stream = OpenWriteStream(path, FileMode.Open); stream.Seek(0, SeekOrigin.End); return new RaftWal(path, stream, entries); @@ -212,6 +148,60 @@ done: // --- Private helpers --- + /// + /// Parses all valid records from a WAL byte buffer, stopping at the first corrupt or + /// truncated record. Extracted to eliminate the goto pattern in Load. + /// + private static List ParseEntries(byte[] bytes) + { + var entries = new List(); + + // Validate magic header + if (bytes.Length < HeaderSize) + return entries; + + if (bytes[0] != Magic[0] || bytes[1] != Magic[1] || + bytes[2] != Magic[2] || bytes[3] != Magic[3]) + return entries; // Unrecognized file — return empty + + // Skip version field (bytes 4..7) + var pos = HeaderSize; + + while (pos < bytes.Length) + { + // Need at least the length field + if (pos + LengthFieldSize > bytes.Length) + break; + + var recordPayloadLength = BinaryPrimitives.ReadInt32LittleEndian(bytes.AsSpan(pos, LengthFieldSize)); + pos += LengthFieldSize; + + // Sanity-check payload length — cast to long to guard against integer overflow + if (recordPayloadLength < RecordFixedPayloadSize || (long)pos + recordPayloadLength > bytes.Length) + break; // Truncated or corrupt record — stop + + var indexSpan = bytes.AsSpan(pos, IndexFieldSize); + var termSpan = bytes.AsSpan(pos + IndexFieldSize, TermFieldSize); + var storedCrc = BinaryPrimitives.ReadUInt32LittleEndian( + bytes.AsSpan(pos + IndexFieldSize + TermFieldSize, Crc32FieldSize)); + var commandLength = recordPayloadLength - RecordFixedPayloadSize; + var commandSpan = bytes.AsSpan(pos + RecordFixedPayloadSize, commandLength); + + uint computedCrc = ComputeCrc(indexSpan, termSpan, commandSpan); + if (computedCrc != storedCrc) + break; // CRC mismatch — corrupt record, truncate from here + + var index = BinaryPrimitives.ReadInt64LittleEndian(indexSpan); + var term = BinaryPrimitives.ReadInt32LittleEndian(termSpan); + var command = Encoding.UTF8.GetString(commandSpan); + + entries.Add(new RaftLogEntry(index, term, command)); + pos += recordPayloadLength; + } + + return entries; + } + /// Opens the WAL file for reading and appending with FileShare.Read to allow concurrent readers. private static FileStream OpenWriteStream(string path, FileMode mode) => new(path, mode, FileAccess.ReadWrite, FileShare.Read, bufferSize: 4096, useAsync: true); @@ -242,15 +232,16 @@ done: await stream.WriteAsync(record); } - private static uint ComputeCrc(ReadOnlySpan indexBytes, ReadOnlySpan termBytes, byte[] commandBytes) + /// + /// Computes CRC32 incrementally over index, term, and command bytes without allocating + /// a contiguous buffer. Uses System.IO.Hashing.Crc32 (IEEE 802.3 polynomial). + /// + private static uint ComputeCrc(ReadOnlySpan indexBytes, ReadOnlySpan termBytes, ReadOnlySpan commandBytes) { - // Build contiguous buffer for CRC: index(8) + term(4) + command(N) - var buffer = new byte[indexBytes.Length + termBytes.Length + commandBytes.Length]; - indexBytes.CopyTo(buffer); - termBytes.CopyTo(buffer.AsSpan(indexBytes.Length)); - commandBytes.CopyTo(buffer, indexBytes.Length + termBytes.Length); - - // System.IO.Hashing.Crc32 — IEEE 802.3 polynomial (same as zlib CRC32) - return Crc32.HashToUInt32(buffer); + var crc = new Crc32(); + crc.Append(indexBytes); + crc.Append(termBytes); + crc.Append(commandBytes); + return crc.GetCurrentHashAsUInt32(); } } diff --git a/tests/NATS.Server.Tests/Raft/RaftWalTests.cs b/tests/NATS.Server.Tests/Raft/RaftWalTests.cs index 0733132..83239ea 100644 --- a/tests/NATS.Server.Tests/Raft/RaftWalTests.cs +++ b/tests/NATS.Server.Tests/Raft/RaftWalTests.cs @@ -37,7 +37,7 @@ public class RaftWalTests : IDisposable // Recover using var recovered = RaftWal.Load(walPath); - var entries = recovered.Entries.ToList(); + var entries = recovered.Entries; entries.Count.ShouldBe(3); entries[0].Index.ShouldBe(1); entries[0].Term.ShouldBe(1); @@ -60,7 +60,7 @@ public class RaftWalTests : IDisposable await wal.CompactAsync(5); // remove entries 1-5 using var recovered = RaftWal.Load(walPath); - recovered.Entries.Count().ShouldBe(5); + recovered.Entries.Count.ShouldBe(5); recovered.Entries.First().Index.ShouldBe(6); } @@ -82,7 +82,7 @@ public class RaftWalTests : IDisposable fs.SetLength(fs.Length - 3); using var recovered = RaftWal.Load(walPath); - recovered.Entries.Count().ShouldBeGreaterThanOrEqualTo(1); + recovered.Entries.Count.ShouldBe(1); recovered.Entries.First().Command.ShouldBe("good-entry"); } @@ -118,7 +118,7 @@ public class RaftWalTests : IDisposable } using var recovered = RaftWal.Load(walPath); - recovered.Entries.Count().ShouldBe(0); + recovered.Entries.Count.ShouldBe(0); } // Go reference: server/raft.go WAL CRC integrity check @@ -139,9 +139,9 @@ public class RaftWalTests : IDisposable bytes[^5] ^= 0xFF; File.WriteAllBytes(walPath, bytes); - // Load should recover at least the first record, stopping at the corrupt second + // Load should recover exactly the first record, stopping at the corrupt second using var recovered = RaftWal.Load(walPath); - recovered.Entries.Count().ShouldBeGreaterThanOrEqualTo(1); + recovered.Entries.Count.ShouldBe(1); recovered.Entries.First().Command.ShouldBe("valid"); } }