feat(raft): add binary WAL and VotedFor persistence
Implements a binary write-ahead log (RaftWal) for durable RAFT entry
storage, replacing in-memory-only semantics. The WAL uses a magic header
("NWAL" + version), length-prefixed records with per-record CRC32
integrity checking, and CompactAsync with atomic temp-file rename.
Load() tolerates truncated or corrupt tail records for crash safety.
Also fixes RaftNode to persist and reload TermState.VotedFor via a
meta.json file alongside term.txt, ensuring vote durability across
restarts. Falls back gracefully to legacy term.txt when meta.json is
absent.
6 new tests in RaftWalTests: persist/recover, compact, truncation
tolerance, VotedFor round-trip, empty WAL, and CRC corruption.
All 458 Raft tests pass.
This commit is contained in:
@@ -612,6 +612,18 @@ public sealed class RaftNode : IDisposable
|
||||
await Log.PersistAsync(Path.Combine(dir, "log.json"), ct);
|
||||
await File.WriteAllTextAsync(Path.Combine(dir, "term.txt"), TermState.CurrentTerm.ToString(), ct);
|
||||
await File.WriteAllTextAsync(Path.Combine(dir, "applied.txt"), AppliedIndex.ToString(), ct);
|
||||
|
||||
// Persist term and VotedFor together in meta.json for atomic durable state.
|
||||
// Go reference: raft.go storeMeta / writeTermVote (term + votedFor written atomically)
|
||||
var meta = new RaftMetaState
|
||||
{
|
||||
CurrentTerm = TermState.CurrentTerm,
|
||||
VotedFor = TermState.VotedFor,
|
||||
};
|
||||
await File.WriteAllTextAsync(
|
||||
Path.Combine(dir, "meta.json"),
|
||||
System.Text.Json.JsonSerializer.Serialize(meta),
|
||||
ct);
|
||||
}
|
||||
|
||||
public async Task LoadPersistedStateAsync(CancellationToken ct)
|
||||
@@ -619,9 +631,25 @@ public sealed class RaftNode : IDisposable
|
||||
var dir = _persistDirectory ?? Path.Combine(Path.GetTempPath(), "natsdotnet-raft", Id);
|
||||
Log = await RaftLog.LoadAsync(Path.Combine(dir, "log.json"), ct);
|
||||
|
||||
var termPath = Path.Combine(dir, "term.txt");
|
||||
if (File.Exists(termPath) && int.TryParse(await File.ReadAllTextAsync(termPath, ct), out var term))
|
||||
TermState.CurrentTerm = term;
|
||||
// Load from meta.json first (includes VotedFor); fall back to legacy term.txt
|
||||
var metaPath = Path.Combine(dir, "meta.json");
|
||||
if (File.Exists(metaPath))
|
||||
{
|
||||
var json = await File.ReadAllTextAsync(metaPath, ct);
|
||||
var meta = System.Text.Json.JsonSerializer.Deserialize<RaftMetaState>(json);
|
||||
if (meta is not null)
|
||||
{
|
||||
TermState.CurrentTerm = meta.CurrentTerm;
|
||||
TermState.VotedFor = meta.VotedFor;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Legacy: term.txt only (no VotedFor)
|
||||
var termPath = Path.Combine(dir, "term.txt");
|
||||
if (File.Exists(termPath) && int.TryParse(await File.ReadAllTextAsync(termPath, ct), out var term))
|
||||
TermState.CurrentTerm = term;
|
||||
}
|
||||
|
||||
var appliedPath = Path.Combine(dir, "applied.txt");
|
||||
if (File.Exists(appliedPath) && long.TryParse(await File.ReadAllTextAsync(appliedPath, ct), out var applied))
|
||||
@@ -630,6 +658,13 @@ public sealed class RaftNode : IDisposable
|
||||
AppliedIndex = Log.Entries[^1].Index;
|
||||
}
|
||||
|
||||
/// <summary>Durable term + vote metadata written alongside the log.</summary>
|
||||
private sealed class RaftMetaState
|
||||
{
|
||||
public int CurrentTerm { get; set; }
|
||||
public string? VotedFor { get; set; }
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
StopElectionTimer();
|
||||
|
||||
256
src/NATS.Server/Raft/RaftWal.cs
Normal file
256
src/NATS.Server/Raft/RaftWal.cs
Normal file
@@ -0,0 +1,256 @@
|
||||
using System.Buffers.Binary;
|
||||
using System.IO.Hashing;
|
||||
using System.Text;
|
||||
|
||||
namespace NATS.Server.Raft;
|
||||
|
||||
/// <summary>
|
||||
/// Binary write-ahead log for RAFT log entries.
|
||||
///
|
||||
/// File layout:
|
||||
/// Header: [4:magic="NWAL"][4:version_le=1]
|
||||
/// Records (repeated):
|
||||
/// [4:length_le][8:index_le][4:term_le][4:crc32_le][N:utf8_command]
|
||||
/// where length = 8 + 4 + 4 + N (payload bytes counted after the length field itself)
|
||||
///
|
||||
/// CRC32 is computed over the index, term, and command bytes of each record.
|
||||
///
|
||||
/// Go reference: server/raft.go WAL / wal.go (binary WAL format)
|
||||
/// </summary>
|
||||
public sealed class RaftWal : IDisposable
|
||||
{
|
||||
private static ReadOnlySpan<byte> Magic => "NWAL"u8;
|
||||
private const int Version = 1;
|
||||
|
||||
// Header: 4 bytes magic + 4 bytes version
|
||||
private const int HeaderSize = 8;
|
||||
|
||||
// After the 4-byte length field, each record payload contains:
|
||||
// 8 bytes index + 4 bytes term + 4 bytes crc32 + N bytes command
|
||||
private const int LengthFieldSize = 4;
|
||||
private const int IndexFieldSize = 8;
|
||||
private const int TermFieldSize = 4;
|
||||
private const int Crc32FieldSize = 4;
|
||||
private const int RecordFixedPayloadSize = IndexFieldSize + TermFieldSize + Crc32FieldSize; // 16
|
||||
|
||||
private readonly string _path;
|
||||
private FileStream _stream;
|
||||
private readonly List<RaftLogEntry> _entries = [];
|
||||
|
||||
/// <summary>
|
||||
/// Opens or creates a WAL file at the given path. Writes the file header if the file is new.
|
||||
/// The file is opened with FileShare.Read so readers (Load) can access it concurrently.
|
||||
/// </summary>
|
||||
public RaftWal(string path)
|
||||
{
|
||||
_path = path;
|
||||
var isNew = !File.Exists(path);
|
||||
_stream = OpenWriteStream(path, isNew ? FileMode.Create : FileMode.Open);
|
||||
|
||||
if (isNew)
|
||||
{
|
||||
WriteHeaderTo(_stream);
|
||||
_stream.Flush(flushToDisk: true);
|
||||
}
|
||||
else
|
||||
{
|
||||
_stream.Seek(0, SeekOrigin.End);
|
||||
}
|
||||
}
|
||||
|
||||
private RaftWal(string path, FileStream stream, List<RaftLogEntry> entries)
|
||||
{
|
||||
_path = path;
|
||||
_stream = stream;
|
||||
_entries = entries;
|
||||
}
|
||||
|
||||
/// <summary>All in-memory entries loaded from or appended to this WAL.</summary>
|
||||
public IEnumerable<RaftLogEntry> Entries => _entries;
|
||||
|
||||
/// <summary>
|
||||
/// Appends a single RAFT log entry to the WAL file and the in-memory list.
|
||||
/// </summary>
|
||||
public async Task AppendAsync(RaftLogEntry entry)
|
||||
{
|
||||
var commandBytes = Encoding.UTF8.GetBytes(entry.Command);
|
||||
var recordPayloadLength = RecordFixedPayloadSize + commandBytes.Length;
|
||||
|
||||
// Build the full record: [length(4)][index(8)][term(4)][crc32(4)][command(N)]
|
||||
var record = new byte[LengthFieldSize + recordPayloadLength];
|
||||
var span = record.AsSpan();
|
||||
|
||||
BinaryPrimitives.WriteInt32LittleEndian(span[..4], recordPayloadLength);
|
||||
BinaryPrimitives.WriteInt64LittleEndian(span[4..12], entry.Index);
|
||||
BinaryPrimitives.WriteInt32LittleEndian(span[12..16], entry.Term);
|
||||
|
||||
// CRC32 over: index(8) + term(4) + command(N)
|
||||
uint crc = ComputeCrc(span[4..12], span[12..16], commandBytes);
|
||||
BinaryPrimitives.WriteUInt32LittleEndian(span[16..20], crc);
|
||||
|
||||
commandBytes.CopyTo(record, 20);
|
||||
|
||||
_stream.Seek(0, SeekOrigin.End);
|
||||
await _stream.WriteAsync(record);
|
||||
_entries.Add(entry);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Flushes the WAL to disk (fsync).
|
||||
/// </summary>
|
||||
public async Task SyncAsync()
|
||||
{
|
||||
await _stream.FlushAsync();
|
||||
_stream.Flush(flushToDisk: true);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Rewrites the WAL keeping only entries with index > upToIndex, using a temp file and atomic rename.
|
||||
/// Also updates the in-memory list.
|
||||
/// </summary>
|
||||
public async Task CompactAsync(long upToIndex)
|
||||
{
|
||||
var remaining = _entries.Where(e => e.Index > upToIndex).ToList();
|
||||
var tmpPath = _path + ".tmp";
|
||||
|
||||
await using (var tmp = new FileStream(tmpPath, FileMode.Create, FileAccess.Write,
|
||||
FileShare.None, bufferSize: 4096, useAsync: true))
|
||||
{
|
||||
WriteHeaderTo(tmp);
|
||||
foreach (var entry in remaining)
|
||||
await WriteEntryTo(tmp, entry);
|
||||
tmp.Flush(flushToDisk: true);
|
||||
}
|
||||
|
||||
// Atomic replace: close current stream, rename temp over original, reopen
|
||||
_stream.Dispose();
|
||||
File.Move(tmpPath, _path, overwrite: true);
|
||||
_stream = OpenWriteStream(_path, FileMode.Open);
|
||||
_stream.Seek(0, SeekOrigin.End);
|
||||
|
||||
_entries.Clear();
|
||||
_entries.AddRange(remaining);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Scans a WAL file, validates records via CRC32, stops at the first corrupt or truncated
|
||||
/// record, and returns a populated RaftWal ready for append.
|
||||
/// </summary>
|
||||
public static RaftWal Load(string path)
|
||||
{
|
||||
var entries = new List<RaftLogEntry>();
|
||||
|
||||
if (!File.Exists(path))
|
||||
return new RaftWal(path); // creates new file with header
|
||||
|
||||
// Read the file content while allowing concurrent writers (FileShare.ReadWrite)
|
||||
byte[] bytes;
|
||||
using (var fs = new FileStream(path, FileMode.Open, FileAccess.Read,
|
||||
FileShare.ReadWrite, bufferSize: 4096, useAsync: false))
|
||||
{
|
||||
bytes = new byte[fs.Length];
|
||||
var read = 0;
|
||||
while (read < bytes.Length)
|
||||
read += fs.Read(bytes, read, bytes.Length - read);
|
||||
}
|
||||
|
||||
var pos = 0;
|
||||
|
||||
// Validate magic header
|
||||
if (bytes.Length < HeaderSize)
|
||||
goto done;
|
||||
|
||||
if (bytes[0] != Magic[0] || bytes[1] != Magic[1] ||
|
||||
bytes[2] != Magic[2] || bytes[3] != Magic[3])
|
||||
goto done; // Unrecognized file — return empty
|
||||
|
||||
// Skip version field (bytes 4..7)
|
||||
pos = HeaderSize;
|
||||
|
||||
while (pos < bytes.Length)
|
||||
{
|
||||
// Need at least the length field
|
||||
if (pos + LengthFieldSize > bytes.Length)
|
||||
break;
|
||||
|
||||
var recordPayloadLength = BinaryPrimitives.ReadInt32LittleEndian(bytes.AsSpan(pos, LengthFieldSize));
|
||||
pos += LengthFieldSize;
|
||||
|
||||
// Sanity-check payload length
|
||||
if (recordPayloadLength < RecordFixedPayloadSize || pos + recordPayloadLength > bytes.Length)
|
||||
break; // Truncated or corrupt record — stop
|
||||
|
||||
var indexSpan = bytes.AsSpan(pos, IndexFieldSize);
|
||||
var termSpan = bytes.AsSpan(pos + IndexFieldSize, TermFieldSize);
|
||||
var storedCrc = BinaryPrimitives.ReadUInt32LittleEndian(
|
||||
bytes.AsSpan(pos + IndexFieldSize + TermFieldSize, Crc32FieldSize));
|
||||
var commandLength = recordPayloadLength - RecordFixedPayloadSize;
|
||||
var commandBytes = bytes.AsSpan(pos + RecordFixedPayloadSize, commandLength).ToArray();
|
||||
|
||||
uint computedCrc = ComputeCrc(indexSpan, termSpan, commandBytes);
|
||||
if (computedCrc != storedCrc)
|
||||
break; // CRC mismatch — corrupt record, truncate from here
|
||||
|
||||
var index = BinaryPrimitives.ReadInt64LittleEndian(indexSpan);
|
||||
var term = BinaryPrimitives.ReadInt32LittleEndian(termSpan);
|
||||
var command = Encoding.UTF8.GetString(commandBytes);
|
||||
|
||||
entries.Add(new RaftLogEntry(index, term, command));
|
||||
pos += recordPayloadLength;
|
||||
}
|
||||
|
||||
done:
|
||||
var stream = OpenWriteStream(path, FileMode.Open);
|
||||
stream.Seek(0, SeekOrigin.End);
|
||||
return new RaftWal(path, stream, entries);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_stream.Dispose();
|
||||
}
|
||||
|
||||
// --- Private helpers ---
|
||||
|
||||
/// <summary>Opens the WAL file for reading and appending with FileShare.Read to allow concurrent readers.</summary>
|
||||
private static FileStream OpenWriteStream(string path, FileMode mode) =>
|
||||
new(path, mode, FileAccess.ReadWrite, FileShare.Read, bufferSize: 4096, useAsync: true);
|
||||
|
||||
private static void WriteHeaderTo(Stream stream)
|
||||
{
|
||||
stream.Write(Magic);
|
||||
Span<byte> version = stackalloc byte[4];
|
||||
BinaryPrimitives.WriteInt32LittleEndian(version, Version);
|
||||
stream.Write(version);
|
||||
}
|
||||
|
||||
private static async Task WriteEntryTo(Stream stream, RaftLogEntry entry)
|
||||
{
|
||||
var commandBytes = Encoding.UTF8.GetBytes(entry.Command);
|
||||
var recordPayloadLength = RecordFixedPayloadSize + commandBytes.Length;
|
||||
var record = new byte[LengthFieldSize + recordPayloadLength];
|
||||
var span = record.AsSpan();
|
||||
|
||||
BinaryPrimitives.WriteInt32LittleEndian(span[..4], recordPayloadLength);
|
||||
BinaryPrimitives.WriteInt64LittleEndian(span[4..12], entry.Index);
|
||||
BinaryPrimitives.WriteInt32LittleEndian(span[12..16], entry.Term);
|
||||
|
||||
uint crc = ComputeCrc(span[4..12], span[12..16], commandBytes);
|
||||
BinaryPrimitives.WriteUInt32LittleEndian(span[16..20], crc);
|
||||
|
||||
commandBytes.CopyTo(record, 20);
|
||||
await stream.WriteAsync(record);
|
||||
}
|
||||
|
||||
private static uint ComputeCrc(ReadOnlySpan<byte> indexBytes, ReadOnlySpan<byte> termBytes, byte[] commandBytes)
|
||||
{
|
||||
// Build contiguous buffer for CRC: index(8) + term(4) + command(N)
|
||||
var buffer = new byte[indexBytes.Length + termBytes.Length + commandBytes.Length];
|
||||
indexBytes.CopyTo(buffer);
|
||||
termBytes.CopyTo(buffer.AsSpan(indexBytes.Length));
|
||||
commandBytes.CopyTo(buffer, indexBytes.Length + termBytes.Length);
|
||||
|
||||
// System.IO.Hashing.Crc32 — IEEE 802.3 polynomial (same as zlib CRC32)
|
||||
return Crc32.HashToUInt32(buffer);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user