Implement checkpoint modes with docs/tests and reorganize project file layout
All checks were successful
NuGet Publish / build-and-pack (push) Successful in 46s
NuGet Publish / publish-to-gitea (push) Successful in 53s

This commit is contained in:
Joseph Doherty
2026-02-21 07:56:36 -05:00
parent 3ffd468c79
commit 4c6aaa5a3f
96 changed files with 744 additions and 249 deletions

View File

@@ -2,8 +2,8 @@ using ZB.MOM.WW.CBDD.Core.Transactions;
namespace ZB.MOM.WW.CBDD.Core.Storage;
public sealed partial class StorageEngine
{
public sealed partial class StorageEngine
{
/// <summary>
/// Gets the current size of the WAL file.
/// </summary>
@@ -29,151 +29,244 @@ public sealed partial class StorageEngine
_wal.Flush();
}
/// <summary>
/// Performs a checkpoint: merges WAL into PageFile.
/// Uses in-memory WAL index for efficiency and consistency.
/// </summary>
/// <summary>
/// Performs a checkpoint: merges WAL into PageFile.
/// Uses in-memory WAL index for efficiency and consistency.
/// </summary>
public void Checkpoint()
{
_commitLock.Wait();
try
{
CheckpointInternal();
}
finally
{
_commitLock.Release();
}
}
private void CheckpointInternal()
/// <summary>
/// Performs a truncate checkpoint by default.
/// </summary>
public void Checkpoint()
{
if (_walIndex.IsEmpty)
_ = Checkpoint(CheckpointMode.Truncate);
}
/// <summary>
/// Performs a checkpoint using the requested mode.
/// </summary>
/// <param name="mode">Checkpoint mode to execute.</param>
/// <returns>The checkpoint execution result.</returns>
public CheckpointResult Checkpoint(CheckpointMode mode)
{
bool lockAcquired;
if (mode == CheckpointMode.Passive)
{
// WAL may still contain begin/commit records for read-only transactions.
lockAcquired = _commitLock.Wait(0);
if (!lockAcquired)
{
var walSize = _wal.GetCurrentSize();
return new CheckpointResult(mode, false, 0, walSize, walSize, false, false);
}
}
else
{
_commitLock.Wait();
lockAcquired = true;
}
try
{
return CheckpointInternal(mode);
}
finally
{
if (lockAcquired)
{
_commitLock.Release();
}
}
}
private void CheckpointInternal()
=> _ = CheckpointInternal(CheckpointMode.Truncate);
private CheckpointResult CheckpointInternal(CheckpointMode mode)
{
var walBytesBefore = _wal.GetCurrentSize();
var appliedPages = 0;
var truncated = false;
var restarted = false;
// 1. Write all committed pages from index to PageFile.
foreach (var kvp in _walIndex)
{
_pageFile.WritePage(kvp.Key, kvp.Value);
appliedPages++;
}
// 2. Flush PageFile to ensure durability.
if (appliedPages > 0)
{
_pageFile.Flush();
}
// 3. Clear in-memory WAL index (now persisted).
_walIndex.Clear();
// 4. Apply mode-specific WAL handling.
switch (mode)
{
case CheckpointMode.Passive:
case CheckpointMode.Full:
if (walBytesBefore > 0 || appliedPages > 0)
{
_wal.WriteCheckpointRecord();
_wal.Flush();
}
break;
case CheckpointMode.Truncate:
if (walBytesBefore > 0)
{
_wal.Truncate();
truncated = true;
}
break;
case CheckpointMode.Restart:
_wal.Restart();
truncated = true;
restarted = true;
break;
default:
throw new ArgumentOutOfRangeException(nameof(mode), mode, "Unsupported checkpoint mode.");
}
var walBytesAfter = _wal.GetCurrentSize();
return new CheckpointResult(mode, true, appliedPages, walBytesBefore, walBytesAfter, truncated, restarted);
}
/// <summary>
/// Performs a truncate checkpoint asynchronously by default.
/// </summary>
/// <param name="ct">The cancellation token.</param>
public async Task CheckpointAsync(CancellationToken ct = default)
{
_ = await CheckpointAsync(CheckpointMode.Truncate, ct);
}
/// <summary>
/// Performs a checkpoint asynchronously using the requested mode.
/// </summary>
/// <param name="mode">Checkpoint mode to execute.</param>
/// <param name="ct">The cancellation token.</param>
/// <returns>A task that represents the asynchronous checkpoint operation.</returns>
public async Task<CheckpointResult> CheckpointAsync(CheckpointMode mode, CancellationToken ct = default)
{
bool lockAcquired;
if (mode == CheckpointMode.Passive)
{
lockAcquired = await _commitLock.WaitAsync(0, ct);
if (!lockAcquired)
{
var walSize = _wal.GetCurrentSize();
return new CheckpointResult(mode, false, 0, walSize, walSize, false, false);
}
}
else
{
await _commitLock.WaitAsync(ct);
lockAcquired = true;
}
try
{
// Checkpoint work is synchronous over MMF/page writes for now.
return CheckpointInternal(mode);
}
finally
{
if (lockAcquired)
{
_commitLock.Release();
}
}
}
/// <summary>
/// Recovers from crash by replaying WAL.
/// Applies committed transactions to PageFile in deterministic WAL order, then truncates WAL.
/// </summary>
public void Recover()
{
_commitLock.Wait();
try
{
// 1. Read WAL and locate the latest checkpoint boundary.
var records = _wal.ReadAll();
var startIndex = 0;
for (var i = records.Count - 1; i >= 0; i--)
{
if (records[i].Type == WalRecordType.Checkpoint)
{
startIndex = i + 1;
break;
}
}
// 2. Replay WAL in source order with deterministic commit application.
var pendingWrites = new Dictionary<ulong, List<(uint pageId, byte[] data)>>();
var appliedAny = false;
for (var i = startIndex; i < records.Count; i++)
{
var record = records[i];
switch (record.Type)
{
case WalRecordType.Begin:
if (!pendingWrites.ContainsKey(record.TransactionId))
{
pendingWrites[record.TransactionId] = new List<(uint, byte[])>();
}
break;
case WalRecordType.Write:
if (record.AfterImage == null)
{
break;
}
if (!pendingWrites.TryGetValue(record.TransactionId, out var writes))
{
writes = new List<(uint, byte[])>();
pendingWrites[record.TransactionId] = writes;
}
writes.Add((record.PageId, record.AfterImage));
break;
case WalRecordType.Commit:
if (!pendingWrites.TryGetValue(record.TransactionId, out var committedWrites))
{
break;
}
foreach (var (pageId, data) in committedWrites)
{
_pageFile.WritePage(pageId, data);
appliedAny = true;
}
pendingWrites.Remove(record.TransactionId);
break;
case WalRecordType.Abort:
pendingWrites.Remove(record.TransactionId);
break;
case WalRecordType.Checkpoint:
pendingWrites.Clear();
break;
}
}
// 3. Flush PageFile to ensure durability.
if (appliedAny)
{
_pageFile.Flush();
}
// 4. Clear in-memory WAL index (redundant since we just recovered).
_walIndex.Clear();
// 5. Truncate WAL (all changes now in PageFile).
if (_wal.GetCurrentSize() > 0)
{
_wal.Truncate();
}
return;
}
// 1. Write all committed pages from index to PageFile
foreach (var kvp in _walIndex)
{
_pageFile.WritePage(kvp.Key, kvp.Value);
}
// 2. Flush PageFile to ensure durability
_pageFile.Flush();
// 3. Clear in-memory WAL index (now persisted)
_walIndex.Clear();
// 4. Truncate WAL (all changes now in PageFile)
_wal.Truncate();
}
/// <summary>
/// Performs a checkpoint asynchronously by merging WAL pages into the page file.
/// </summary>
/// <param name="ct">The cancellation token.</param>
/// <returns>A task that represents the asynchronous checkpoint operation.</returns>
public async Task CheckpointAsync(CancellationToken ct = default)
{
await _commitLock.WaitAsync(ct);
try
finally
{
if (_walIndex.IsEmpty)
{
if (_wal.GetCurrentSize() > 0)
{
_wal.Truncate();
}
return;
}
// 1. Write all committed pages from index to PageFile
// PageFile writes are sync (MMF), but that's fine as per plan (ValueTask strategy for MMF)
foreach (var kvp in _walIndex)
{
_pageFile.WritePage(kvp.Key, kvp.Value);
}
// 2. Flush PageFile to ensure durability
_pageFile.Flush();
// 3. Clear in-memory WAL index (now persisted)
_walIndex.Clear();
// 4. Truncate WAL (all changes now in PageFile)
// WAL truncation involves file resize and flush
// TODO: Add TruncateAsync to WAL? For now Truncate is sync.
_wal.Truncate();
}
finally
{
_commitLock.Release();
}
}
/// <summary>
/// Recovers from crash by replaying WAL.
/// Applies all committed transactions to PageFile, then truncates WAL.
/// </summary>
public void Recover()
{
_commitLock.Wait();
try
{
// 1. Read WAL and identify committed transactions
var records = _wal.ReadAll();
var committedTxns = new HashSet<ulong>();
var txnWrites = new Dictionary<ulong, List<(uint pageId, byte[] data)>>();
foreach (var record in records)
{
if (record.Type == WalRecordType.Commit)
committedTxns.Add(record.TransactionId);
else if (record.Type == WalRecordType.Write)
{
if (!txnWrites.ContainsKey(record.TransactionId))
txnWrites[record.TransactionId] = new List<(uint, byte[])>();
if (record.AfterImage != null)
{
txnWrites[record.TransactionId].Add((record.PageId, record.AfterImage));
}
}
}
// 2. Apply committed transactions to PageFile
foreach (var txnId in committedTxns)
{
if (!txnWrites.ContainsKey(txnId))
continue;
foreach (var (pageId, data) in txnWrites[txnId])
{
_pageFile.WritePage(pageId, data);
}
}
// 3. Flush PageFile to ensure durability
_pageFile.Flush();
// 4. Clear in-memory WAL index (redundant since we just recovered)
_walIndex.Clear();
// 5. Truncate WAL (all changes now in PageFile)
_wal.Truncate();
}
finally
{
_commitLock.Release();
}
}