fix(raft): address WAL code quality issues — CRC perf, DRY, safety, assertions
- SyncAsync: remove redundant FlushAsync, use single Flush(flushToDisk:true) - ComputeCrc: use incremental Crc32.Append to avoid contiguous buffer heap allocation - Load: cast pos+length to long to guard against int overflow in bounds check - AppendAsync: delegate to WriteEntryTo (DRY — eliminates duplicated record-building logic) - Load: extract ParseEntries static helper to eliminate goto pattern with early returns - Entries: change return type from IEnumerable to IReadOnlyList for index access and Count property - RaftNode.PersistAsync: remove redundant term.txt write (meta.json now owns term+votedFor) - RaftWalTests: tighten ShouldBeGreaterThanOrEqualTo(1) -> ShouldBe(1) in truncation/CRC tests; use .Count property directly on IReadOnlyList instead of .Count() LINQ extension
This commit is contained in:
@@ -610,7 +610,6 @@ public sealed class RaftNode : IDisposable
|
||||
var dir = _persistDirectory ?? Path.Combine(Path.GetTempPath(), "natsdotnet-raft", Id);
|
||||
Directory.CreateDirectory(dir);
|
||||
await Log.PersistAsync(Path.Combine(dir, "log.json"), ct);
|
||||
await File.WriteAllTextAsync(Path.Combine(dir, "term.txt"), TermState.CurrentTerm.ToString(), ct);
|
||||
await File.WriteAllTextAsync(Path.Combine(dir, "applied.txt"), AppliedIndex.ToString(), ct);
|
||||
|
||||
// Persist term and VotedFor together in meta.json for atomic durable state.
|
||||
|
||||
@@ -66,42 +66,24 @@ public sealed class RaftWal : IDisposable
|
||||
}
|
||||
|
||||
/// <summary>All in-memory entries loaded from or appended to this WAL.</summary>
|
||||
public IEnumerable<RaftLogEntry> Entries => _entries;
|
||||
public IReadOnlyList<RaftLogEntry> Entries => _entries;
|
||||
|
||||
/// <summary>
|
||||
/// Appends a single RAFT log entry to the WAL file and the in-memory list.
|
||||
/// </summary>
|
||||
public async Task AppendAsync(RaftLogEntry entry)
|
||||
{
|
||||
var commandBytes = Encoding.UTF8.GetBytes(entry.Command);
|
||||
var recordPayloadLength = RecordFixedPayloadSize + commandBytes.Length;
|
||||
|
||||
// Build the full record: [length(4)][index(8)][term(4)][crc32(4)][command(N)]
|
||||
var record = new byte[LengthFieldSize + recordPayloadLength];
|
||||
var span = record.AsSpan();
|
||||
|
||||
BinaryPrimitives.WriteInt32LittleEndian(span[..4], recordPayloadLength);
|
||||
BinaryPrimitives.WriteInt64LittleEndian(span[4..12], entry.Index);
|
||||
BinaryPrimitives.WriteInt32LittleEndian(span[12..16], entry.Term);
|
||||
|
||||
// CRC32 over: index(8) + term(4) + command(N)
|
||||
uint crc = ComputeCrc(span[4..12], span[12..16], commandBytes);
|
||||
BinaryPrimitives.WriteUInt32LittleEndian(span[16..20], crc);
|
||||
|
||||
commandBytes.CopyTo(record, 20);
|
||||
|
||||
_stream.Seek(0, SeekOrigin.End);
|
||||
await _stream.WriteAsync(record);
|
||||
await WriteEntryTo(_stream, entry);
|
||||
_entries.Add(entry);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Flushes the WAL to disk (fsync).
|
||||
/// </summary>
|
||||
public async Task SyncAsync()
|
||||
public Task SyncAsync()
|
||||
{
|
||||
await _stream.FlushAsync();
|
||||
_stream.Flush(flushToDisk: true);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -138,8 +120,6 @@ public sealed class RaftWal : IDisposable
|
||||
/// </summary>
|
||||
public static RaftWal Load(string path)
|
||||
{
|
||||
var entries = new List<RaftLogEntry>();
|
||||
|
||||
if (!File.Exists(path))
|
||||
return new RaftWal(path); // creates new file with header
|
||||
|
||||
@@ -154,52 +134,8 @@ public sealed class RaftWal : IDisposable
|
||||
read += fs.Read(bytes, read, bytes.Length - read);
|
||||
}
|
||||
|
||||
var pos = 0;
|
||||
var entries = ParseEntries(bytes);
|
||||
|
||||
// Validate magic header
|
||||
if (bytes.Length < HeaderSize)
|
||||
goto done;
|
||||
|
||||
if (bytes[0] != Magic[0] || bytes[1] != Magic[1] ||
|
||||
bytes[2] != Magic[2] || bytes[3] != Magic[3])
|
||||
goto done; // Unrecognized file — return empty
|
||||
|
||||
// Skip version field (bytes 4..7)
|
||||
pos = HeaderSize;
|
||||
|
||||
while (pos < bytes.Length)
|
||||
{
|
||||
// Need at least the length field
|
||||
if (pos + LengthFieldSize > bytes.Length)
|
||||
break;
|
||||
|
||||
var recordPayloadLength = BinaryPrimitives.ReadInt32LittleEndian(bytes.AsSpan(pos, LengthFieldSize));
|
||||
pos += LengthFieldSize;
|
||||
|
||||
// Sanity-check payload length
|
||||
if (recordPayloadLength < RecordFixedPayloadSize || pos + recordPayloadLength > bytes.Length)
|
||||
break; // Truncated or corrupt record — stop
|
||||
|
||||
var indexSpan = bytes.AsSpan(pos, IndexFieldSize);
|
||||
var termSpan = bytes.AsSpan(pos + IndexFieldSize, TermFieldSize);
|
||||
var storedCrc = BinaryPrimitives.ReadUInt32LittleEndian(
|
||||
bytes.AsSpan(pos + IndexFieldSize + TermFieldSize, Crc32FieldSize));
|
||||
var commandLength = recordPayloadLength - RecordFixedPayloadSize;
|
||||
var commandBytes = bytes.AsSpan(pos + RecordFixedPayloadSize, commandLength).ToArray();
|
||||
|
||||
uint computedCrc = ComputeCrc(indexSpan, termSpan, commandBytes);
|
||||
if (computedCrc != storedCrc)
|
||||
break; // CRC mismatch — corrupt record, truncate from here
|
||||
|
||||
var index = BinaryPrimitives.ReadInt64LittleEndian(indexSpan);
|
||||
var term = BinaryPrimitives.ReadInt32LittleEndian(termSpan);
|
||||
var command = Encoding.UTF8.GetString(commandBytes);
|
||||
|
||||
entries.Add(new RaftLogEntry(index, term, command));
|
||||
pos += recordPayloadLength;
|
||||
}
|
||||
|
||||
done:
|
||||
var stream = OpenWriteStream(path, FileMode.Open);
|
||||
stream.Seek(0, SeekOrigin.End);
|
||||
return new RaftWal(path, stream, entries);
|
||||
@@ -212,6 +148,60 @@ done:
|
||||
|
||||
// --- Private helpers ---
|
||||
|
||||
/// <summary>
|
||||
/// Parses all valid records from a WAL byte buffer, stopping at the first corrupt or
|
||||
/// truncated record. Extracted to eliminate the goto pattern in Load.
|
||||
/// </summary>
|
||||
private static List<RaftLogEntry> ParseEntries(byte[] bytes)
|
||||
{
|
||||
var entries = new List<RaftLogEntry>();
|
||||
|
||||
// Validate magic header
|
||||
if (bytes.Length < HeaderSize)
|
||||
return entries;
|
||||
|
||||
if (bytes[0] != Magic[0] || bytes[1] != Magic[1] ||
|
||||
bytes[2] != Magic[2] || bytes[3] != Magic[3])
|
||||
return entries; // Unrecognized file — return empty
|
||||
|
||||
// Skip version field (bytes 4..7)
|
||||
var pos = HeaderSize;
|
||||
|
||||
while (pos < bytes.Length)
|
||||
{
|
||||
// Need at least the length field
|
||||
if (pos + LengthFieldSize > bytes.Length)
|
||||
break;
|
||||
|
||||
var recordPayloadLength = BinaryPrimitives.ReadInt32LittleEndian(bytes.AsSpan(pos, LengthFieldSize));
|
||||
pos += LengthFieldSize;
|
||||
|
||||
// Sanity-check payload length — cast to long to guard against integer overflow
|
||||
if (recordPayloadLength < RecordFixedPayloadSize || (long)pos + recordPayloadLength > bytes.Length)
|
||||
break; // Truncated or corrupt record — stop
|
||||
|
||||
var indexSpan = bytes.AsSpan(pos, IndexFieldSize);
|
||||
var termSpan = bytes.AsSpan(pos + IndexFieldSize, TermFieldSize);
|
||||
var storedCrc = BinaryPrimitives.ReadUInt32LittleEndian(
|
||||
bytes.AsSpan(pos + IndexFieldSize + TermFieldSize, Crc32FieldSize));
|
||||
var commandLength = recordPayloadLength - RecordFixedPayloadSize;
|
||||
var commandSpan = bytes.AsSpan(pos + RecordFixedPayloadSize, commandLength);
|
||||
|
||||
uint computedCrc = ComputeCrc(indexSpan, termSpan, commandSpan);
|
||||
if (computedCrc != storedCrc)
|
||||
break; // CRC mismatch — corrupt record, truncate from here
|
||||
|
||||
var index = BinaryPrimitives.ReadInt64LittleEndian(indexSpan);
|
||||
var term = BinaryPrimitives.ReadInt32LittleEndian(termSpan);
|
||||
var command = Encoding.UTF8.GetString(commandSpan);
|
||||
|
||||
entries.Add(new RaftLogEntry(index, term, command));
|
||||
pos += recordPayloadLength;
|
||||
}
|
||||
|
||||
return entries;
|
||||
}
|
||||
|
||||
/// <summary>Opens the WAL file for reading and appending with FileShare.Read to allow concurrent readers.</summary>
|
||||
private static FileStream OpenWriteStream(string path, FileMode mode) =>
|
||||
new(path, mode, FileAccess.ReadWrite, FileShare.Read, bufferSize: 4096, useAsync: true);
|
||||
@@ -242,15 +232,16 @@ done:
|
||||
await stream.WriteAsync(record);
|
||||
}
|
||||
|
||||
private static uint ComputeCrc(ReadOnlySpan<byte> indexBytes, ReadOnlySpan<byte> termBytes, byte[] commandBytes)
|
||||
/// <summary>
|
||||
/// Computes CRC32 incrementally over index, term, and command bytes without allocating
|
||||
/// a contiguous buffer. Uses System.IO.Hashing.Crc32 (IEEE 802.3 polynomial).
|
||||
/// </summary>
|
||||
private static uint ComputeCrc(ReadOnlySpan<byte> indexBytes, ReadOnlySpan<byte> termBytes, ReadOnlySpan<byte> commandBytes)
|
||||
{
|
||||
// Build contiguous buffer for CRC: index(8) + term(4) + command(N)
|
||||
var buffer = new byte[indexBytes.Length + termBytes.Length + commandBytes.Length];
|
||||
indexBytes.CopyTo(buffer);
|
||||
termBytes.CopyTo(buffer.AsSpan(indexBytes.Length));
|
||||
commandBytes.CopyTo(buffer, indexBytes.Length + termBytes.Length);
|
||||
|
||||
// System.IO.Hashing.Crc32 — IEEE 802.3 polynomial (same as zlib CRC32)
|
||||
return Crc32.HashToUInt32(buffer);
|
||||
var crc = new Crc32();
|
||||
crc.Append(indexBytes);
|
||||
crc.Append(termBytes);
|
||||
crc.Append(commandBytes);
|
||||
return crc.GetCurrentHashAsUInt32();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ public class RaftWalTests : IDisposable
|
||||
|
||||
// Recover
|
||||
using var recovered = RaftWal.Load(walPath);
|
||||
var entries = recovered.Entries.ToList();
|
||||
var entries = recovered.Entries;
|
||||
entries.Count.ShouldBe(3);
|
||||
entries[0].Index.ShouldBe(1);
|
||||
entries[0].Term.ShouldBe(1);
|
||||
@@ -60,7 +60,7 @@ public class RaftWalTests : IDisposable
|
||||
await wal.CompactAsync(5); // remove entries 1-5
|
||||
|
||||
using var recovered = RaftWal.Load(walPath);
|
||||
recovered.Entries.Count().ShouldBe(5);
|
||||
recovered.Entries.Count.ShouldBe(5);
|
||||
recovered.Entries.First().Index.ShouldBe(6);
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ public class RaftWalTests : IDisposable
|
||||
fs.SetLength(fs.Length - 3);
|
||||
|
||||
using var recovered = RaftWal.Load(walPath);
|
||||
recovered.Entries.Count().ShouldBeGreaterThanOrEqualTo(1);
|
||||
recovered.Entries.Count.ShouldBe(1);
|
||||
recovered.Entries.First().Command.ShouldBe("good-entry");
|
||||
}
|
||||
|
||||
@@ -118,7 +118,7 @@ public class RaftWalTests : IDisposable
|
||||
}
|
||||
|
||||
using var recovered = RaftWal.Load(walPath);
|
||||
recovered.Entries.Count().ShouldBe(0);
|
||||
recovered.Entries.Count.ShouldBe(0);
|
||||
}
|
||||
|
||||
// Go reference: server/raft.go WAL CRC integrity check
|
||||
@@ -139,9 +139,9 @@ public class RaftWalTests : IDisposable
|
||||
bytes[^5] ^= 0xFF;
|
||||
File.WriteAllBytes(walPath, bytes);
|
||||
|
||||
// Load should recover at least the first record, stopping at the corrupt second
|
||||
// Load should recover exactly the first record, stopping at the corrupt second
|
||||
using var recovered = RaftWal.Load(walPath);
|
||||
recovered.Entries.Count().ShouldBeGreaterThanOrEqualTo(1);
|
||||
recovered.Entries.Count.ShouldBe(1);
|
||||
recovered.Entries.First().Command.ShouldBe("valid");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user