Files
CBDD/src/CBDD.Core/Storage/StorageEngine.Recovery.cs
Joseph Doherty a70d8befae
All checks were successful
NuGet Publish / build-and-pack (push) Successful in 46s
NuGet Publish / publish-to-gitea (push) Successful in 56s
Reformat / cleanup
2026-02-21 08:10:36 -05:00

252 lines
7.8 KiB
C#
Executable File

using ZB.MOM.WW.CBDD.Core.Transactions;
namespace ZB.MOM.WW.CBDD.Core.Storage;
public sealed partial class StorageEngine
{
/// <summary>
/// Gets the current size of the WAL file.
/// </summary>
public long GetWalSize()
{
return _wal.GetCurrentSize();
}
/// <summary>
/// Truncates the WAL file.
/// Should only be called after a successful checkpoint.
/// </summary>
public void TruncateWal()
{
_wal.Truncate();
}
/// <summary>
/// Flushes the WAL to disk.
/// </summary>
public void FlushWal()
{
_wal.Flush();
}
/// <summary>
/// Performs a truncate checkpoint by default.
/// </summary>
public void Checkpoint()
{
_ = Checkpoint(CheckpointMode.Truncate);
}
/// <summary>
/// Performs a checkpoint using the requested mode.
/// </summary>
/// <param name="mode">Checkpoint mode to execute.</param>
/// <returns>The checkpoint execution result.</returns>
public CheckpointResult Checkpoint(CheckpointMode mode)
{
bool lockAcquired;
if (mode == CheckpointMode.Passive)
{
lockAcquired = _commitLock.Wait(0);
if (!lockAcquired)
{
long walSize = _wal.GetCurrentSize();
return new CheckpointResult(mode, false, 0, walSize, walSize, false, false);
}
}
else
{
_commitLock.Wait();
lockAcquired = true;
}
try
{
return CheckpointInternal(mode);
}
finally
{
if (lockAcquired) _commitLock.Release();
}
}
private void CheckpointInternal()
{
_ = CheckpointInternal(CheckpointMode.Truncate);
}
private CheckpointResult CheckpointInternal(CheckpointMode mode)
{
long walBytesBefore = _wal.GetCurrentSize();
var appliedPages = 0;
var truncated = false;
var restarted = false;
// 1. Write all committed pages from index to PageFile.
foreach (var kvp in _walIndex)
{
_pageFile.WritePage(kvp.Key, kvp.Value);
appliedPages++;
}
// 2. Flush PageFile to ensure durability.
if (appliedPages > 0) _pageFile.Flush();
// 3. Clear in-memory WAL index (now persisted).
_walIndex.Clear();
// 4. Apply mode-specific WAL handling.
switch (mode)
{
case CheckpointMode.Passive:
case CheckpointMode.Full:
if (walBytesBefore > 0 || appliedPages > 0)
{
_wal.WriteCheckpointRecord();
_wal.Flush();
}
break;
case CheckpointMode.Truncate:
if (walBytesBefore > 0)
{
_wal.Truncate();
truncated = true;
}
break;
case CheckpointMode.Restart:
_wal.Restart();
truncated = true;
restarted = true;
break;
default:
throw new ArgumentOutOfRangeException(nameof(mode), mode, "Unsupported checkpoint mode.");
}
long walBytesAfter = _wal.GetCurrentSize();
return new CheckpointResult(mode, true, appliedPages, walBytesBefore, walBytesAfter, truncated, restarted);
}
/// <summary>
/// Performs a truncate checkpoint asynchronously by default.
/// </summary>
/// <param name="ct">The cancellation token.</param>
public async Task CheckpointAsync(CancellationToken ct = default)
{
_ = await CheckpointAsync(CheckpointMode.Truncate, ct);
}
/// <summary>
/// Performs a checkpoint asynchronously using the requested mode.
/// </summary>
/// <param name="mode">Checkpoint mode to execute.</param>
/// <param name="ct">The cancellation token.</param>
/// <returns>A task that represents the asynchronous checkpoint operation.</returns>
public async Task<CheckpointResult> CheckpointAsync(CheckpointMode mode, CancellationToken ct = default)
{
bool lockAcquired;
if (mode == CheckpointMode.Passive)
{
lockAcquired = await _commitLock.WaitAsync(0, ct);
if (!lockAcquired)
{
long walSize = _wal.GetCurrentSize();
return new CheckpointResult(mode, false, 0, walSize, walSize, false, false);
}
}
else
{
await _commitLock.WaitAsync(ct);
lockAcquired = true;
}
try
{
// Checkpoint work is synchronous over MMF/page writes for now.
return CheckpointInternal(mode);
}
finally
{
if (lockAcquired) _commitLock.Release();
}
}
/// <summary>
/// Recovers from crash by replaying WAL.
/// Applies committed transactions to PageFile in deterministic WAL order, then truncates WAL.
/// </summary>
public void Recover()
{
_commitLock.Wait();
try
{
// 1. Read WAL and locate the latest checkpoint boundary.
var records = _wal.ReadAll();
var startIndex = 0;
for (int i = records.Count - 1; i >= 0; i--)
if (records[i].Type == WalRecordType.Checkpoint)
{
startIndex = i + 1;
break;
}
// 2. Replay WAL in source order with deterministic commit application.
var pendingWrites = new Dictionary<ulong, List<(uint pageId, byte[] data)>>();
var appliedAny = false;
for (int i = startIndex; i < records.Count; i++)
{
var record = records[i];
switch (record.Type)
{
case WalRecordType.Begin:
if (!pendingWrites.ContainsKey(record.TransactionId))
pendingWrites[record.TransactionId] = new List<(uint, byte[])>();
break;
case WalRecordType.Write:
if (record.AfterImage == null) break;
if (!pendingWrites.TryGetValue(record.TransactionId, out var writes))
{
writes = new List<(uint, byte[])>();
pendingWrites[record.TransactionId] = writes;
}
writes.Add((record.PageId, record.AfterImage));
break;
case WalRecordType.Commit:
if (!pendingWrites.TryGetValue(record.TransactionId, out var committedWrites)) break;
foreach ((uint pageId, byte[] data) in committedWrites)
{
_pageFile.WritePage(pageId, data);
appliedAny = true;
}
pendingWrites.Remove(record.TransactionId);
break;
case WalRecordType.Abort:
pendingWrites.Remove(record.TransactionId);
break;
case WalRecordType.Checkpoint:
pendingWrites.Clear();
break;
}
}
// 3. Flush PageFile to ensure durability.
if (appliedAny) _pageFile.Flush();
// 4. Clear in-memory WAL index (redundant since we just recovered).
_walIndex.Clear();
// 5. Truncate WAL (all changes now in PageFile).
if (_wal.GetCurrentSize() > 0) _wal.Truncate();
}
finally
{
_commitLock.Release();
}
}
}