using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.CBDDC.Core;
using ZB.MOM.WW.CBDDC.Core.Storage;
using ZB.MOM.WW.CBDDC.Persistence.Surreal;
namespace ZB.MOM.WW.CBDDC.Persistence.Lmdb;
///
/// Backfills LMDB oplog content from Surreal and validates parity.
///
public sealed class LmdbOplogBackfillTool
{
private readonly LmdbOplogStore _destination;
private readonly ILogger _logger;
private readonly SurrealOplogStore _source;
///
/// Initializes a new instance of the class.
///
public LmdbOplogBackfillTool(
SurrealOplogStore source,
LmdbOplogStore destination,
ILogger? logger = null)
{
_source = source ?? throw new ArgumentNullException(nameof(source));
_destination = destination ?? throw new ArgumentNullException(nameof(destination));
_logger = logger ?? NullLogger.Instance;
}
///
/// Backfills one dataset from Surreal to LMDB and validates parity.
///
public async Task BackfillAsync(
string datasetId,
CancellationToken cancellationToken = default)
{
string normalizedDatasetId = DatasetId.Normalize(datasetId);
var sourceEntries = (await _source.ExportAsync(normalizedDatasetId, cancellationToken))
.OrderBy(entry => entry.Timestamp.PhysicalTime)
.ThenBy(entry => entry.Timestamp.LogicalCounter)
.ThenBy(entry => entry.Timestamp.NodeId, StringComparer.Ordinal)
.ThenBy(entry => entry.Hash, StringComparer.Ordinal)
.ToList();
await _destination.MergeAsync(sourceEntries, normalizedDatasetId, cancellationToken);
LmdbOplogBackfillReport report = await ValidateParityAsync(normalizedDatasetId, sourceEntries, cancellationToken);
_logger.LogInformation(
"LMDB oplog backfill {Result} for dataset {DatasetId}. Source={SourceCount}, Destination={DestinationCount}, HashSpotChecks={HashSpotChecks}, ChainSpotChecks={ChainSpotChecks}.",
report.IsSuccess ? "succeeded" : "failed",
report.DatasetId,
report.SourceCount,
report.DestinationCount,
report.HashSpotCheckCount,
report.ChainSpotCheckCount);
return report;
}
///
/// Validates parity only without running a backfill merge.
///
public async Task ValidateParityAsync(
string datasetId,
CancellationToken cancellationToken = default)
{
string normalizedDatasetId = DatasetId.Normalize(datasetId);
var sourceEntries = (await _source.ExportAsync(normalizedDatasetId, cancellationToken)).ToList();
return await ValidateParityAsync(normalizedDatasetId, sourceEntries, cancellationToken);
}
///
/// Backfills and throws when parity validation fails.
///
public async Task BackfillOrThrowAsync(
string datasetId,
CancellationToken cancellationToken = default)
{
LmdbOplogBackfillReport report = await BackfillAsync(datasetId, cancellationToken);
if (report.IsSuccess) return report;
throw new InvalidOperationException(
$"LMDB oplog backfill parity failed for dataset '{report.DatasetId}'. " +
$"Source={report.SourceCount}, Destination={report.DestinationCount}, " +
$"CountsMatch={report.CountsMatch}, CountsPerNodeMatch={report.CountsPerNodeMatch}, " +
$"LatestHashPerNodeMatch={report.LatestHashPerNodeMatch}, HashSpotChecksPassed={report.HashSpotChecksPassed}, " +
$"ChainSpotChecksPassed={report.ChainSpotChecksPassed}.");
}
private async Task ValidateParityAsync(
string datasetId,
List sourceEntries,
CancellationToken cancellationToken)
{
var sourceOrdered = sourceEntries
.OrderBy(entry => entry.Timestamp.PhysicalTime)
.ThenBy(entry => entry.Timestamp.LogicalCounter)
.ThenBy(entry => entry.Timestamp.NodeId, StringComparer.Ordinal)
.ThenBy(entry => entry.Hash, StringComparer.Ordinal)
.ToList();
var destinationOrdered = (await _destination.ExportAsync(datasetId, cancellationToken))
.OrderBy(entry => entry.Timestamp.PhysicalTime)
.ThenBy(entry => entry.Timestamp.LogicalCounter)
.ThenBy(entry => entry.Timestamp.NodeId, StringComparer.Ordinal)
.ThenBy(entry => entry.Hash, StringComparer.Ordinal)
.ToList();
bool countsMatch = sourceOrdered.Count == destinationOrdered.Count;
IReadOnlyDictionary sourceCountByNode = CountByNode(sourceOrdered);
IReadOnlyDictionary destinationCountByNode = CountByNode(destinationOrdered);
bool countsPerNodeMatch = DictionaryEqual(sourceCountByNode, destinationCountByNode);
IReadOnlyDictionary sourceLatestHashByNode = LatestHashByNode(sourceOrdered);
IReadOnlyDictionary destinationLatestHashByNode = LatestHashByNode(destinationOrdered);
bool latestHashPerNodeMatch = DictionaryEqual(sourceLatestHashByNode, destinationLatestHashByNode);
(bool hashSpotChecksPassed, int hashSpotCheckCount) = await RunHashSpotChecksAsync(
datasetId,
sourceOrdered,
cancellationToken);
(bool chainSpotChecksPassed, int chainSpotCheckCount) = await RunChainSpotChecksAsync(
datasetId,
sourceOrdered,
cancellationToken);
return new LmdbOplogBackfillReport(
datasetId,
sourceOrdered.Count,
destinationOrdered.Count,
sourceCountByNode,
destinationCountByNode,
sourceLatestHashByNode,
destinationLatestHashByNode,
hashSpotCheckCount,
chainSpotCheckCount,
countsMatch,
countsPerNodeMatch,
latestHashPerNodeMatch,
hashSpotChecksPassed,
chainSpotChecksPassed);
}
private async Task<(bool Passed, int Count)> RunHashSpotChecksAsync(
string datasetId,
IReadOnlyList sourceEntries,
CancellationToken cancellationToken)
{
if (sourceEntries.Count == 0) return (true, 0);
var sampleIndexes = BuildSampleIndexes(sourceEntries.Count, Math.Min(10, sourceEntries.Count));
foreach (int index in sampleIndexes)
{
string hash = sourceEntries[index].Hash;
OplogEntry? destinationEntry = await _destination.GetEntryByHashAsync(hash, datasetId, cancellationToken);
if (destinationEntry == null) return (false, sampleIndexes.Count);
}
return (true, sampleIndexes.Count);
}
private async Task<(bool Passed, int Count)> RunChainSpotChecksAsync(
string datasetId,
IReadOnlyList sourceEntries,
CancellationToken cancellationToken)
{
if (sourceEntries.Count < 2) return (true, 0);
var sourceByHash = sourceEntries.ToDictionary(entry => entry.Hash, StringComparer.Ordinal);
var checks = sourceEntries
.Where(entry => !string.IsNullOrWhiteSpace(entry.PreviousHash) &&
sourceByHash.ContainsKey(entry.PreviousHash))
.Take(5)
.Select(entry => (StartHash: entry.PreviousHash, EndHash: entry.Hash))
.ToList();
foreach (var check in checks)
{
string[] sourceChain = (await _source.GetChainRangeAsync(check.StartHash, check.EndHash, datasetId, cancellationToken))
.Select(entry => entry.Hash)
.ToArray();
string[] destinationChain =
(await _destination.GetChainRangeAsync(check.StartHash, check.EndHash, datasetId, cancellationToken))
.Select(entry => entry.Hash)
.ToArray();
if (!sourceChain.SequenceEqual(destinationChain, StringComparer.Ordinal))
return (false, checks.Count);
}
return (true, checks.Count);
}
private static IReadOnlyDictionary CountByNode(IEnumerable entries)
{
return entries
.Where(entry => !string.IsNullOrWhiteSpace(entry.Timestamp.NodeId))
.GroupBy(entry => entry.Timestamp.NodeId, StringComparer.Ordinal)
.ToDictionary(group => group.Key, group => group.Count(), StringComparer.Ordinal);
}
private static IReadOnlyDictionary LatestHashByNode(IEnumerable entries)
{
return entries
.Where(entry => !string.IsNullOrWhiteSpace(entry.Timestamp.NodeId))
.GroupBy(entry => entry.Timestamp.NodeId, StringComparer.Ordinal)
.ToDictionary(
group => group.Key,
group => group
.OrderByDescending(entry => entry.Timestamp.PhysicalTime)
.ThenByDescending(entry => entry.Timestamp.LogicalCounter)
.ThenByDescending(entry => entry.Hash, StringComparer.Ordinal)
.First()
.Hash,
StringComparer.Ordinal);
}
private static bool DictionaryEqual(
IReadOnlyDictionary left,
IReadOnlyDictionary right)
{
if (left.Count != right.Count) return false;
foreach (var pair in left)
{
if (!right.TryGetValue(pair.Key, out T? rightValue)) return false;
if (!EqualityComparer.Default.Equals(pair.Value, rightValue)) return false;
}
return true;
}
private static List BuildSampleIndexes(int totalCount, int sampleCount)
{
if (sampleCount <= 0 || totalCount <= 0) return [];
if (sampleCount >= totalCount) return Enumerable.Range(0, totalCount).ToList();
var indexes = new HashSet();
for (var i = 0; i < sampleCount; i++)
{
int index = (int)Math.Round(i * (totalCount - 1d) / (sampleCount - 1d));
indexes.Add(Math.Clamp(index, 0, totalCount - 1));
}
return indexes.OrderBy(value => value).ToList();
}
}
///
/// Parity report produced by the LMDB backfill tool.
///
public sealed record LmdbOplogBackfillReport(
string DatasetId,
int SourceCount,
int DestinationCount,
IReadOnlyDictionary SourceCountByNode,
IReadOnlyDictionary DestinationCountByNode,
IReadOnlyDictionary SourceLatestHashByNode,
IReadOnlyDictionary DestinationLatestHashByNode,
int HashSpotCheckCount,
int ChainSpotCheckCount,
bool CountsMatch,
bool CountsPerNodeMatch,
bool LatestHashPerNodeMatch,
bool HashSpotChecksPassed,
bool ChainSpotChecksPassed)
{
///
/// Gets a value indicating whether parity validation passed all checks.
///
public bool IsSuccess =>
CountsMatch &&
CountsPerNodeMatch &&
LatestHashPerNodeMatch &&
HashSpotChecksPassed &&
ChainSpotChecksPassed;
}