Files
CBDD/src/CBDD.Core/Collections/DocumentCollection.Scan.cs

102 lines
3.5 KiB
C#

using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using ZB.MOM.WW.CBDD.Bson;
using ZB.MOM.WW.CBDD.Core.Storage;
namespace ZB.MOM.WW.CBDD.Core.Collections;
public partial class DocumentCollection<TId, T> where T : class
{
/// <summary>
/// Scans the entire collection using a raw BSON predicate.
/// This avoids deserializing documents that don't match the criteria.
/// </summary>
/// <param name="predicate">Function to evaluate raw BSON data</param>
/// <returns>Matching documents</returns>
internal IEnumerable<T> Scan(Func<BsonSpanReader, bool> predicate)
{
if (predicate == null) throw new ArgumentNullException(nameof(predicate));
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
var txnId = transaction.TransactionId;
var pageCount = _storage.PageCount;
var buffer = new byte[_storage.PageSize];
var pageResults = new List<T>();
for (uint pageId = 0; pageId < pageCount; pageId++)
{
pageResults.Clear();
ScanPage(pageId, txnId, buffer, predicate, pageResults);
foreach (var doc in pageResults)
{
yield return doc;
}
}
}
/// <summary>
/// Scans the collection in parallel using multiple threads.
/// Useful for large collections on multi-core machines.
/// </summary>
/// <param name="predicate">Function to evaluate raw BSON data</param>
/// <param name="degreeOfParallelism">Number of threads to use (default: -1 = ProcessorCount)</param>
internal IEnumerable<T> ParallelScan(Func<BsonSpanReader, bool> predicate, int degreeOfParallelism = -1)
{
if (predicate == null) throw new ArgumentNullException(nameof(predicate));
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
var txnId = transaction.TransactionId;
var pageCount = (int)_storage.PageCount;
if (degreeOfParallelism <= 0)
degreeOfParallelism = Environment.ProcessorCount;
return Partitioner.Create(0, pageCount)
.AsParallel()
.WithDegreeOfParallelism(degreeOfParallelism)
.SelectMany(range =>
{
var localBuffer = new byte[_storage.PageSize];
var localResults = new List<T>();
for (int i = range.Item1; i < range.Item2; i++)
{
ScanPage((uint)i, txnId, localBuffer, predicate, localResults);
}
return localResults;
});
}
private void ScanPage(uint pageId, ulong txnId, byte[] buffer, Func<BsonSpanReader, bool> predicate, List<T> results)
{
_storage.ReadPage(pageId, txnId, buffer);
var header = SlottedPageHeader.ReadFrom(buffer);
if (header.PageType != PageType.Data)
return;
var slots = MemoryMarshal.Cast<byte, SlotEntry>(
buffer.AsSpan(SlottedPageHeader.Size, header.SlotCount * SlotEntry.Size));
for (int i = 0; i < header.SlotCount; i++)
{
var slot = slots[i];
if (slot.Flags.HasFlag(SlotFlags.Deleted))
continue;
var data = buffer.AsSpan(slot.Offset, slot.Length);
var reader = new BsonSpanReader(data, _storage.GetKeyReverseMap());
if (predicate(reader))
{
var doc = FindByLocation(new DocumentLocation(pageId, (ushort)i));
if (doc != null)
results.Add(doc);
}
}
}
}