Files
CBDD/src/CBDD.Core/Collections/DocumentCollection.cs
Joseph Doherty 528939d3a0
Some checks failed
NuGet Publish / build-and-pack (push) Failing after 24s
NuGet Publish / publish-to-gitea (push) Has been skipped
Add Gitea NuGet publish workflow and finalize current storage/index/docs updates.
2026-02-20 13:32:10 -05:00

2028 lines
78 KiB
C#
Executable File

using ZB.MOM.WW.CBDD.Core.CDC;
using System.Buffers;
using System.Buffers.Binary;
using ZB.MOM.WW.CBDD.Bson;
using ZB.MOM.WW.CBDD.Core.Indexing;
using ZB.MOM.WW.CBDD.Core.Compression;
using ZB.MOM.WW.CBDD.Core.Storage;
using ZB.MOM.WW.CBDD.Core.Transactions;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using System.Linq;
using System.Linq.Expressions;
using ZB.MOM.WW.CBDD.Core.Query;
using System.Collections.Generic;
using System;
using System.IO;
using System.Diagnostics;
using System.Threading;
using ZB.MOM.WW.CBDD.Core;
[assembly: InternalsVisibleTo("ZB.MOM.WW.CBDD.Tests")]
namespace ZB.MOM.WW.CBDD.Core.Collections;
public class DocumentCollection<T> : DocumentCollection<ObjectId, T> where T : class
{
/// <summary>
/// Initializes a new document collection that uses <see cref="ObjectId"/> as the primary key.
/// </summary>
/// <param name="storage">The storage engine used for persistence.</param>
/// <param name="transactionHolder">The transaction context holder.</param>
/// <param name="mapper">The document mapper for <typeparamref name="T"/>.</param>
/// <param name="collectionName">Optional collection name override.</param>
public DocumentCollection(StorageEngine storage, ITransactionHolder transactionHolder, IDocumentMapper<T> mapper, string? collectionName = null)
: this((IStorageEngine)storage, transactionHolder, mapper, collectionName)
{
}
internal DocumentCollection(IStorageEngine storage, ITransactionHolder transactionHolder, IDocumentMapper<T> mapper, string? collectionName = null)
: base(storage, transactionHolder, mapper, collectionName)
{
}
}
/// <summary>
/// Production-ready document collection with slotted page architecture.
/// Supports multiple documents per page, overflow chains, and efficient space utilization.
/// Represents a collection of documents of type T with an ID of type TId.
/// </summary>
/// <typeparam name="TId">Type of the primary key</typeparam>
/// <typeparam name="T">Type of the entity</typeparam>
public partial class DocumentCollection<TId, T> : IDisposable, ICompactionAwareCollection where T : class
{
private readonly ITransactionHolder _transactionHolder;
private readonly IStorageEngine _storage;
private readonly IDocumentMapper<TId, T> _mapper;
internal readonly BTreeIndex _primaryIndex;
private readonly CollectionIndexManager<TId, T> _indexManager;
private readonly CollectionCdcPublisher<TId, T> _cdcPublisher;
private readonly string _collectionName;
// Free space tracking: PageId → Free bytes
private readonly Dictionary<uint, ushort> _freeSpaceMap;
// Current page for inserts (optimization)
private uint _currentDataPage;
/// <summary>
/// Gets the current persisted schema version for the collection.
/// </summary>
public SchemaVersion? CurrentSchemaVersion { get; private set; }
// Concurrency control for write operations (B-Tree and Page modifications)
private readonly SemaphoreSlim _collectionLock = new(1, 1);
private readonly int _maxDocumentSizeForSinglePage;
private const int OverflowMetadataSize = 8;
private const int MaxLogicalDocumentSizeBytes = 16 * 1024 * 1024;
private const int MaxStoredPayloadSizeBytes = MaxLogicalDocumentSizeBytes + CompressedPayloadHeader.Size;
/// <summary>
/// Initializes a new instance of the document collection.
/// </summary>
/// <param name="storage">The storage engine used for persistence.</param>
/// <param name="transactionHolder">The transaction context holder.</param>
/// <param name="mapper">The mapper used to serialize and deserialize documents.</param>
/// <param name="collectionName">Optional collection name override.</param>
public DocumentCollection(StorageEngine storage, ITransactionHolder transactionHolder, IDocumentMapper<TId, T> mapper, string? collectionName = null)
: this((IStorageEngine)storage, transactionHolder, mapper, collectionName)
{
}
internal DocumentCollection(IStorageEngine storage, ITransactionHolder transactionHolder, IDocumentMapper<TId, T> mapper, string? collectionName = null)
{
_storage = storage ?? throw new ArgumentNullException(nameof(storage));
_transactionHolder = transactionHolder ?? throw new ArgumentNullException(nameof(transactionHolder));
_mapper = mapper ?? throw new ArgumentNullException(nameof(mapper));
_collectionName = collectionName ?? _mapper.CollectionName;
_cdcPublisher = new CollectionCdcPublisher<TId, T>(
_transactionHolder,
_collectionName,
_mapper,
_storage.Cdc,
_storage.GetKeyReverseMap());
// Initialize secondary index manager first (loads metadata including Primary Root Page ID)
_indexManager = new CollectionIndexManager<TId, T>(_storage, _mapper, _collectionName);
_freeSpaceMap = new Dictionary<uint, ushort>();
// Calculate max document size dynamically based on page size
// Reserve space for PageHeader (24) and some safety margin
_maxDocumentSizeForSinglePage = _storage.PageSize - 128;
// Ensure schema is persisted and versioned
EnsureSchema();
// Create primary index on _id (stores ObjectId → DocumentLocation mapping)
// Use persisted root page ID if available
var indexOptions = IndexOptions.CreateBTree("_id");
_primaryIndex = new BTreeIndex(_storage, indexOptions, _indexManager.PrimaryRootPageId);
// If a new root page was allocated, persist it
if (_indexManager.PrimaryRootPageId != _primaryIndex.RootPageId)
{
_indexManager.SetPrimaryRootPageId(_primaryIndex.RootPageId);
}
// Register keys used by the mapper to ensure they are available for compression
_storage.RegisterKeys(_mapper.UsedKeys);
}
private void RefreshPrimaryIndexRootFromMetadata()
{
var metadata = _storage.GetCollectionMetadata(_collectionName);
if (metadata == null || metadata.PrimaryRootPageId == 0)
return;
if (metadata.PrimaryRootPageId != _primaryIndex.RootPageId)
{
_primaryIndex.SetRootPageId(metadata.PrimaryRootPageId);
}
}
void ICompactionAwareCollection.RefreshIndexBindingsAfterCompaction()
{
var metadata = _storage.GetCollectionMetadata(_collectionName);
if (metadata == null)
return;
_indexManager.RebindFromMetadata(metadata);
if (metadata.PrimaryRootPageId != 0 && metadata.PrimaryRootPageId != _primaryIndex.RootPageId)
{
_primaryIndex.SetRootPageId(metadata.PrimaryRootPageId);
}
}
private void EnsureSchema()
{
var currentSchema = _mapper.GetSchema();
var metadata = _indexManager.GetMetadata();
var persistedSchemas = _storage.GetSchemas(metadata.SchemaRootPageId);
var latestPersisted = persistedSchemas.Count > 0 ? persistedSchemas[persistedSchemas.Count - 1] : null;
if (latestPersisted == null || !currentSchema.Equals(latestPersisted))
{
// Assign next version number
int nextVersion = persistedSchemas.Count + 1;
currentSchema.Version = nextVersion;
var newRootId = _storage.AppendSchema(metadata.SchemaRootPageId, currentSchema);
if (newRootId != metadata.SchemaRootPageId)
{
metadata.SchemaRootPageId = newRootId;
_storage.SaveCollectionMetadata(metadata);
}
CurrentSchemaVersion = new SchemaVersion(nextVersion, currentSchema.GetHash());
}
else
{
// Latest persisted is same as current structure
CurrentSchemaVersion = new SchemaVersion(latestPersisted.Version ?? persistedSchemas.Count, latestPersisted.GetHash());
}
}
#region Index Management API
/// <summary>
/// Creates a secondary index on a property for fast lookups.
/// The index is automatically maintained on insert/update/delete operations.
/// </summary>
/// <typeparam name="TKey">Property type</typeparam>
/// <param name="keySelector">Expression to extract the indexed property (e.g., p => p.Age)</param>
/// <param name="name">Optional custom index name (auto-generated if null)</param>
/// <param name="unique">If true, enforces uniqueness constraint on the indexed values</param>
/// <returns>The created secondary index</returns>
/// <example>
/// // Simple index on Age
/// collection.CreateIndex(p => p.Age);
///
/// // Unique index on Email
/// collection.CreateIndex(p => p.Email, unique: true);
///
/// // Custom name
/// collection.CreateIndex(p => p.LastName, name: "idx_lastname");
/// </example>
public CollectionSecondaryIndex<TId, T> CreateIndex<TKey>(
System.Linq.Expressions.Expression<Func<T, TKey>> keySelector,
string? name = null,
bool unique = false)
{
if (keySelector == null)
throw new ArgumentNullException(nameof(keySelector));
using (var txn = _storage.BeginTransaction())
{
var index = _indexManager.CreateIndex(keySelector, name, unique);
// Rebuild index for existing documents
RebuildIndex(index);
txn.Commit();
return index;
}
}
/// <summary>
/// Creates a vector (HNSW) index for similarity search.
/// </summary>
/// <typeparam name="TKey">The type of the indexed vector property.</typeparam>
/// <param name="keySelector">Expression selecting the property to index.</param>
/// <param name="dimensions">The number of vector dimensions.</param>
/// <param name="metric">The similarity metric used for distance calculations.</param>
/// <param name="name">Optional index name.</param>
/// <returns>The created secondary index.</returns>
public CollectionSecondaryIndex<TId, T> CreateVectorIndex<TKey>(
System.Linq.Expressions.Expression<Func<T, TKey>> keySelector,
int dimensions,
VectorMetric metric = VectorMetric.Cosine,
string? name = null)
{
if (keySelector == null) throw new ArgumentNullException(nameof(keySelector));
using (var txn = _storage.BeginTransaction())
{
var index = _indexManager.CreateVectorIndex(keySelector, dimensions, metric, name);
RebuildIndex(index);
txn.Commit();
return index;
}
}
/// <summary>
/// Ensures that an index exists on the specified property.
/// If the index already exists, it is returned without modification (idempotent).
/// If it doesn't exist, it is created and populated.
/// </summary>
/// <typeparam name="TKey">The type of the indexed property.</typeparam>
/// <param name="keySelector">Expression selecting the property to index.</param>
/// <param name="name">Optional index name.</param>
/// <param name="unique">Whether the index enforces unique values.</param>
/// <returns>An existing or newly created secondary index.</returns>
public CollectionSecondaryIndex<TId, T> EnsureIndex<TKey>(
System.Linq.Expressions.Expression<Func<T, TKey>> keySelector,
string? name = null,
bool unique = false)
{
if (keySelector == null) throw new ArgumentNullException(nameof(keySelector));
// 1. Check if index already exists (fast path)
var propertyPaths = ExpressionAnalyzer.ExtractPropertyPaths(keySelector);
var indexName = name ?? $"idx_{string.Join("_", propertyPaths)}";
var existingIndex = GetIndex(indexName);
if (existingIndex != null)
{
return existingIndex;
}
// 2. Create if missing (slow path: rebuilds index)
return CreateIndex(keySelector, name, unique);
}
/// <summary>
/// Drops (removes) an existing secondary index by name.
/// The primary index (_id) cannot be dropped.
/// </summary>
/// <param name="name">Name of the index to drop</param>
/// <returns>True if the index was found and dropped, false otherwise</returns>
public bool DropIndex(string name)
{
if (string.IsNullOrWhiteSpace(name))
throw new ArgumentException("Index name cannot be empty", nameof(name));
// Prevent dropping primary index
if (name.Equals("_id", StringComparison.OrdinalIgnoreCase))
throw new InvalidOperationException("Cannot drop primary index");
return _indexManager.DropIndex(name);
}
/// <summary>
/// Gets metadata about all secondary indexes in this collection.
/// Does not include the primary index (_id).
/// </summary>
/// <returns>Collection of index metadata</returns>
public IEnumerable<CollectionIndexInfo> GetIndexes()
{
return _indexManager.GetIndexInfo();
}
/// <summary>
/// Applies an index builder definition to the collection metadata and index store.
/// </summary>
/// <param name="builder">The index builder definition to apply.</param>
internal void ApplyIndexBuilder(Metadata.IndexBuilder<T> builder)
{
// Use the IndexManager directly to ensure the index exists
// We need to convert the LambdaExpression to a typed expression if possible,
// or add an untyped CreateIndex to IndexManager.
// For now, let's use a dynamic approach or cast if we know it's Func<T, object>
if (builder.Type == IndexType.Vector)
{
_indexManager.CreateVectorIndexUntyped(builder.KeySelector, builder.Dimensions, builder.Metric, builder.Name);
}
else if (builder.Type == IndexType.Spatial)
{
_indexManager.CreateSpatialIndexUntyped(builder.KeySelector, builder.Name);
}
else if (builder.KeySelector is System.Linq.Expressions.Expression<Func<T, object>> selector)
{
_indexManager.EnsureIndex(selector, builder.Name, builder.IsUnique);
}
else
{
// Try to rebuild the expression or use untyped version
_indexManager.EnsureIndexUntyped(builder.KeySelector, builder.Name, builder.IsUnique);
}
}
/// <summary>
/// Gets a queryable interface for this collection.
/// Supports LINQ queries that are translated to optimized BTree scans or index lookups.
/// </summary>
public IQueryable<T> AsQueryable()
{
return new BTreeQueryable<T>(new BTreeQueryProvider<TId, T>(this));
}
/// <summary>
/// Gets a specific secondary index by name for advanced querying.
/// Returns null if the index doesn't exist.
/// </summary>
/// <param name="name">Name of the index.</param>
/// <returns>The matching secondary index, or null when not found.</returns>
public CollectionSecondaryIndex<TId, T>? GetIndex(string name)
{
return _indexManager.GetIndex(name);
}
/// <summary>
/// Queries a specific index for a range of values.
/// Returns matching documents using the index for efficient retrieval.
/// </summary>
/// <param name="indexName">Name of the index to query.</param>
/// <param name="minKey">Inclusive lower bound key.</param>
/// <param name="maxKey">Inclusive upper bound key.</param>
/// <param name="ascending">True to iterate ascending; false for descending.</param>
/// <returns>Documents that match the requested index range.</returns>
public IEnumerable<T> QueryIndex(string indexName, object? minKey, object? maxKey, bool ascending = true)
{
var index = GetIndex(indexName);
if (index == null) throw new ArgumentException($"Index {indexName} not found");
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
var direction = ascending ? IndexDirection.Forward : IndexDirection.Backward;
foreach (var location in index.Range(minKey, maxKey, direction, transaction))
{
var doc = FindByLocation(location);
if (doc != null) yield return doc;
}
}
/// <summary>
/// Rebuilds an index by scanning all existing documents and re-inserting them.
/// Called automatically when creating a new index.
/// </summary>
private void RebuildIndex(CollectionSecondaryIndex<TId, T> index)
{
RefreshPrimaryIndexRootFromMetadata();
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
// Iterate all documents in the collection via primary index
var minKey = new IndexKey(Array.Empty<byte>());
var maxKey = new IndexKey(Enumerable.Repeat((byte)0xFF, 32).ToArray());
foreach (var entry in _primaryIndex.Range(minKey, maxKey, IndexDirection.Forward, transaction.TransactionId))
{
try
{
var document = FindByLocation(entry.Location);
if (document != null)
{
index.Insert(document, entry.Location, transaction);
}
}
catch
{
// Skip documents that fail to load or index
// Production: should log errors
}
}
}
#endregion
#region Data Page Management
private uint FindPageWithSpace(int requiredBytes)
{
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
var txnId = transaction.TransactionId;
// Try current page first
if (_currentDataPage != 0)
{
if (_freeSpaceMap.TryGetValue(_currentDataPage, out var freeBytes))
{
if (freeBytes >= requiredBytes && !_storage.IsPageLocked(_currentDataPage, txnId))
{
return _currentDataPage;
}
}
else
{
// Load header and check - use StorageEngine
Span<byte> page = stackalloc byte[SlottedPageHeader.Size];
_storage.ReadPage(_currentDataPage, null, page);
var header = SlottedPageHeader.ReadFrom(page);
if (header.AvailableFreeSpace >= requiredBytes)
{
_freeSpaceMap[_currentDataPage] = (ushort)header.AvailableFreeSpace;
if (!_storage.IsPageLocked(_currentDataPage, txnId))
return _currentDataPage;
}
}
}
// Search free space map
foreach (var (pageId, freeBytes) in _freeSpaceMap)
{
if (freeBytes >= requiredBytes)
{
if (!_storage.IsPageLocked(pageId, txnId))
{
return pageId;
}
}
}
return 0; // No suitable page
}
private uint AllocateNewDataPage()
{
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
var pageId = _storage.AllocatePage();
// Initialize slotted page header
var buffer = ArrayPool<byte>.Shared.Rent(_storage.PageSize);
try
{
buffer.AsSpan().Clear();
var header = new SlottedPageHeader
{
PageId = pageId,
PageType = PageType.Data,
SlotCount = 0,
FreeSpaceStart = SlottedPageHeader.Size,
FreeSpaceEnd = (ushort)_storage.PageSize,
NextOverflowPage = 0,
TransactionId = 0
};
header.WriteTo(buffer);
// Transaction write or direct write
if (transaction is Transaction t)
{
// OPTIMIZATION: Pass ReadOnlyMemory to avoid ToArray() allocation
var writeOp = new WriteOperation(ObjectId.Empty, buffer.AsMemory(0, _storage.PageSize), pageId, OperationType.AllocatePage);
t.AddWrite(writeOp);
}
else
{
_storage.WritePage(pageId, transaction.TransactionId, buffer);
}
// Track free space
_freeSpaceMap[pageId] = (ushort)header.AvailableFreeSpace;
_currentDataPage = pageId;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
return pageId;
}
private ushort InsertIntoPage(uint pageId, ReadOnlySpan<byte> data, SlotFlags slotFlags = SlotFlags.None)
{
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
var buffer = ArrayPool<byte>.Shared.Rent(_storage.PageSize);
try
{
_storage.ReadPage(pageId, transaction.TransactionId, buffer);
var header = SlottedPageHeader.ReadFrom(buffer);
// ROLLBACK RECOVERY: If the page is completely zeroed (e.g., from a rolled-back allocation)
// we re-initialize the header for the current transaction.
if (header.PageType == PageType.Empty && header.FreeSpaceEnd == 0)
{
header = new SlottedPageHeader
{
PageId = pageId,
PageType = PageType.Data,
SlotCount = 0,
FreeSpaceStart = SlottedPageHeader.Size,
FreeSpaceEnd = (ushort)_storage.PageSize,
TransactionId = (uint)transaction.TransactionId
};
header.WriteTo(buffer);
}
// Check free space
var freeSpace = header.AvailableFreeSpace;
var requiredSpace = data.Length + SlotEntry.Size;
if (freeSpace < requiredSpace)
throw new InvalidOperationException($"Not enough space: need {requiredSpace}, have {freeSpace} | PageId={pageId} | SlotCount={header.SlotCount} | Start={header.FreeSpaceStart} | End={header.FreeSpaceEnd} | Map={_freeSpaceMap.GetValueOrDefault(pageId)}");
// Find free slot (reuse deleted or create new)
ushort slotIndex = FindFreeSlot(buffer, ref header);
// Write document at end of used space (grows up)
var docOffset = header.FreeSpaceEnd - data.Length;
data.CopyTo(buffer.AsSpan(docOffset, data.Length));
// Write slot entry
var slotOffset = SlottedPageHeader.Size + (slotIndex * SlotEntry.Size);
var slot = new SlotEntry
{
Offset = (ushort)docOffset,
Length = (ushort)data.Length,
Flags = slotFlags
};
slot.WriteTo(buffer.AsSpan(slotOffset));
// Update header
if (slotIndex >= header.SlotCount)
header.SlotCount = (ushort)(slotIndex + 1);
header.FreeSpaceStart = (ushort)(SlottedPageHeader.Size + (header.SlotCount * SlotEntry.Size));
header.FreeSpaceEnd = (ushort)docOffset;
header.WriteTo(buffer);
// NEW: Buffer write in transaction or write immediately
if (transaction is Transaction t)
{
// OPTIMIZATION: Pass ReadOnlyMemory to avoid ToArray() allocation
var writeOp = new WriteOperation(
documentId: ObjectId.Empty,
newValue: buffer.AsMemory(0, _storage.PageSize),
pageId: pageId,
type: OperationType.Insert
);
t.AddWrite(writeOp);
}
else
{
_storage.WritePage(pageId, transaction.TransactionId, buffer);
}
// Update free space map
_freeSpaceMap[pageId] = (ushort)header.AvailableFreeSpace;
return slotIndex;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
private ushort FindFreeSlot(Span<byte> page, ref SlottedPageHeader header)
{
// Scan existing slots for deleted ones
for (ushort i = 0; i < header.SlotCount; i++)
{
var slotOffset = SlottedPageHeader.Size + (i * SlotEntry.Size);
var slot = SlotEntry.ReadFrom(page.Slice(slotOffset, SlotEntry.Size));
if ((slot.Flags & SlotFlags.Deleted) != 0)
return i; // Reuse deleted slot
}
// No free slot, use next index
return header.SlotCount;
}
private uint AllocateOverflowPage(ReadOnlySpan<byte> data, uint nextOverflowPageId, ITransaction transaction)
{
var pageId = _storage.AllocatePage();
var buffer = ArrayPool<byte>.Shared.Rent(_storage.PageSize);
try
{
buffer.AsSpan().Clear();
var header = new SlottedPageHeader
{
PageId = pageId,
PageType = PageType.Overflow,
SlotCount = 0,
FreeSpaceStart = SlottedPageHeader.Size,
FreeSpaceEnd = (ushort)_storage.PageSize,
NextOverflowPage = nextOverflowPageId,
TransactionId = 0
};
header.WriteTo(buffer);
// Write data immediately after header
data.CopyTo(buffer.AsSpan(SlottedPageHeader.Size));
// NEW: Buffer write in transaction or write immediately
var writeOp = new WriteOperation(
documentId: ObjectId.Empty,
newValue: buffer.AsSpan(0, _storage.PageSize).ToArray(),
pageId: pageId,
type: OperationType.Insert
);
((Transaction)transaction).AddWrite(writeOp);
return pageId;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
private (uint pageId, ushort slotIndex) InsertWithOverflow(ReadOnlySpan<byte> data, SlotFlags slotFlags = SlotFlags.None)
{
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
// 1. Calculate Primary Chunk Size
// We need 8 bytes for metadata (TotalLength: 4, NextOverflowPage: 4)
int maxPrimaryPayload = _maxDocumentSizeForSinglePage - OverflowMetadataSize;
// 2. Build Overflow Chain (Reverse Order)
// We must ensure that pages closer to Primary are FULL (PageSize-Header),
// and only the last page (tail) is partial. This matches FindByLocation greedy reading.
uint nextOverflowPageId = 0;
int overflowChunkSize = _storage.PageSize - SlottedPageHeader.Size;
int totalOverflowBytes = data.Length - maxPrimaryPayload;
if (totalOverflowBytes > 0)
{
int tailSize = totalOverflowBytes % overflowChunkSize;
int fullPages = totalOverflowBytes / overflowChunkSize;
// 2a. Handle Tail (if any) - This is the highest offset
if (tailSize > 0)
{
int tailOffset = maxPrimaryPayload + (fullPages * overflowChunkSize);
var overflowPageId = AllocateOverflowPage(
data.Slice(tailOffset, tailSize),
nextOverflowPageId, // Points to 0 (or previous tail if we had one? No, 0)
transaction
);
nextOverflowPageId = overflowPageId;
}
else if (fullPages > 0)
{
// If no tail, nextId starts at 0.
}
// 2b. Handle Full Pages (Reverse order)
// Iterate from last full page down to first full page
for (int i = fullPages - 1; i >= 0; i--)
{
int chunkOffset = maxPrimaryPayload + (i * overflowChunkSize);
var overflowPageId = AllocateOverflowPage(
data.Slice(chunkOffset, overflowChunkSize),
nextOverflowPageId,
transaction
);
nextOverflowPageId = overflowPageId;
}
}
// 3. Prepare Primary Page Payload
// Layout: [TotalLength (4)] [NextOverflowPage (4)] [DataChunk (...)]
// Since we are in InsertWithOverflow, we know data.Length > maxPrimaryPayload
int primaryPayloadSize = maxPrimaryPayload;
int totalSlotSize = OverflowMetadataSize + primaryPayloadSize;
// Allocate primary page
var primaryPageId = FindPageWithSpace(totalSlotSize + SlotEntry.Size);
if (primaryPageId == 0)
primaryPageId = AllocateNewDataPage();
// 4. Write to Primary Page
var buffer = ArrayPool<byte>.Shared.Rent(_storage.PageSize);
try
{
_storage.ReadPage(primaryPageId, transaction.TransactionId, buffer);
var header = SlottedPageHeader.ReadFrom(buffer);
// Find free slot
ushort slotIndex = FindFreeSlot(buffer, ref header);
// Write payload at end of used space
var docOffset = header.FreeSpaceEnd - totalSlotSize;
var payloadSpan = buffer.AsSpan(docOffset, totalSlotSize);
// Write Metadata
System.Buffers.Binary.BinaryPrimitives.WriteInt32LittleEndian(payloadSpan.Slice(0, 4), data.Length); // Total Length
System.Buffers.Binary.BinaryPrimitives.WriteUInt32LittleEndian(payloadSpan.Slice(4, 4), nextOverflowPageId); // First Overflow Page
// Write Data Chunk
data.Slice(0, primaryPayloadSize).CopyTo(payloadSpan.Slice(8));
// Write Slot Entry
// FLAGS: HasOverflow
// LENGTH: Length of data *in this slot* (Metadata + Chunk)
// This avoids the 65KB limit issue for the SlotEntry.Length field itself,
// as specific slots are bounded by Page Size (16KB).
var slotOffset = SlottedPageHeader.Size + (slotIndex * SlotEntry.Size);
var slot = new SlotEntry
{
Offset = (ushort)docOffset,
Length = (ushort)totalSlotSize,
Flags = slotFlags | SlotFlags.HasOverflow
};
slot.WriteTo(buffer.AsSpan(slotOffset));
// Update header
if (slotIndex >= header.SlotCount)
header.SlotCount = (ushort)(slotIndex + 1);
header.FreeSpaceStart = (ushort)(SlottedPageHeader.Size + (header.SlotCount * SlotEntry.Size));
header.FreeSpaceEnd = (ushort)docOffset;
header.WriteTo(buffer);
// NEW: Buffer write in transaction or write immediately
var writeOp = new WriteOperation(
documentId: ObjectId.Empty,
newValue: buffer.AsMemory(0, _storage.PageSize),
pageId: primaryPageId,
type: OperationType.Insert
);
((Transaction)transaction).AddWrite(writeOp);
// Update free space map
_freeSpaceMap[primaryPageId] = (ushort)header.AvailableFreeSpace;
return (primaryPageId, slotIndex);
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
/// <summary>
/// Inserts a new document into the collection
/// </summary>
/// <param name="entity">Entity to insert</param>
/// <param name="transaction">Optional transaction to batch multiple operations. If null, auto-commits.</param>
/// <returns>The primary key of the inserted document</returns>
public TId Insert(T entity)
{
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
if (entity == null) throw new ArgumentNullException(nameof(entity));
_collectionLock.Wait();
try
{
try
{
var id = InsertCore(entity);
return id;
}
catch
{
transaction.Rollback();
throw;
}
}
finally
{
_collectionLock.Release();
}
}
/// <summary>
/// Asynchronously inserts a new document into the collection
/// </summary>
/// <param name="entity">The entity to insert.</param>
/// <returns>The identifier of the inserted entity.</returns>
public async Task<TId> InsertAsync(T entity)
{
var transaction = await _transactionHolder.GetCurrentTransactionOrStartAsync();
if (entity == null) throw new ArgumentNullException(nameof(entity));
await _collectionLock.WaitAsync();
try
{
try
{
var id = InsertCore(entity);
return id;
}
catch
{
transaction.Rollback();
throw;
}
}
finally
{
_collectionLock.Release();
}
}
/// <summary>
/// Inserts multiple documents in a single transaction for optimal performance.
/// This is the recommended way to insert many documents at once.
/// Uses micro-batched parallel serialization for optimal CPU utilization without excessive memory overhead.
/// </summary>
/// <param name="entities">Collection of entities to insert</param>
/// <returns>List of ObjectIds for the inserted documents</returns>
/// <example>
/// var people = new List&lt;Person&gt; { person1, person2, person3 };
/// var ids = collection.InsertBulk(people);
/// </example>
public List<TId> InsertBulk(IEnumerable<T> entities)
{
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
if (entities == null) throw new ArgumentNullException(nameof(entities));
var entityList = entities.ToList();
var ids = new List<TId>(entityList.Count);
_collectionLock.Wait();
try
{
try
{
InsertBulkInternal(entityList, ids);
return ids;
}
catch
{
transaction.Rollback();
throw;
}
}
finally
{
_collectionLock.Release();
}
}
/// <summary>
/// Asynchronously inserts multiple documents in a single transaction.
/// </summary>
/// <param name="entities">Collection of entities to insert.</param>
/// <returns>List of identifiers for the inserted entities.</returns>
public async Task<List<TId>> InsertBulkAsync(IEnumerable<T> entities)
{
var transaction = await _transactionHolder.GetCurrentTransactionOrStartAsync();
if (entities == null) throw new ArgumentNullException(nameof(entities));
var entityList = entities.ToList();
var ids = new List<TId>(entityList.Count);
await _collectionLock.WaitAsync();
try
{
try
{
InsertBulkInternal(entityList, ids);
return ids;
}
catch
{
transaction.Rollback();
throw;
}
}
finally
{
_collectionLock.Release();
}
}
private void InsertBulkInternal(List<T> entityList, List<TId> ids)
{
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
const int BATCH_SIZE = 50;
for (int batchStart = 0; batchStart < entityList.Count; batchStart += BATCH_SIZE)
{
int batchEnd = Math.Min(batchStart + BATCH_SIZE, entityList.Count);
int batchCount = batchEnd - batchStart;
// PHASE 1: Parallel serialize this batch
var serializedBatch = new (TId id, byte[] data, int length)[batchCount];
System.Threading.Tasks.Parallel.For(0, batchCount, i =>
{
var entity = entityList[batchStart + i];
var id = EnsureId(entity);
var length = SerializeWithRetry(entity, out var buffer);
serializedBatch[i] = (id, buffer, length);
});
// PHASE 2: Sequential insert this batch
for (int i = 0; i < batchCount; i++)
{
var (id, buffer, length) = serializedBatch[i];
var entity = entityList[batchStart + i];
try
{
InsertDataCore(id, entity, buffer.AsSpan(0, length));
ids.Add(id);
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
}
private TId EnsureId(T entity)
{
var id = _mapper.GetId(entity);
if (EqualityComparer<TId>.Default.Equals(id, default))
{
if (typeof(TId) == typeof(ObjectId))
{
id = (TId)(object)ObjectId.NewObjectId();
_mapper.SetId(entity, id);
}
else if (typeof(TId) == typeof(Guid))
{
id = (TId)(object)Guid.NewGuid();
_mapper.SetId(entity, id);
}
}
return id;
}
private TId InsertCore(T entity)
{
var id = EnsureId(entity);
var length = SerializeWithRetry(entity, out var buffer);
try
{
InsertDataCore(id, entity, buffer.AsSpan(0, length));
return id;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
private void InsertDataCore(TId id, T entity, ReadOnlySpan<byte> docData)
{
RefreshPrimaryIndexRootFromMetadata();
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
var (storedPayloadOverride, storedPayloadFlags) = PreparePayloadForStorage(docData);
ReadOnlySpan<byte> storedPayload = storedPayloadOverride is null ? docData : storedPayloadOverride;
DocumentLocation location;
if (storedPayload.Length + SlotEntry.Size <= _maxDocumentSizeForSinglePage)
{
var pageId = FindPageWithSpace(storedPayload.Length + SlotEntry.Size);
if (pageId == 0) pageId = AllocateNewDataPage();
var slotIndex = InsertIntoPage(pageId, storedPayload, storedPayloadFlags);
location = new DocumentLocation(pageId, slotIndex);
}
else
{
var (pageId, slotIndex) = InsertWithOverflow(storedPayload, storedPayloadFlags);
location = new DocumentLocation(pageId, slotIndex);
}
var key = _mapper.ToIndexKey(id);
_primaryIndex.Insert(key, location, transaction.TransactionId);
_indexManager.InsertIntoAll(entity, location, transaction);
// Notify CDC
NotifyCdc(OperationType.Insert, id, docData);
}
#endregion
#region Find
/// <summary>
/// Finds a document by its ObjectId.
/// If called within a transaction, will see uncommitted changes ("Read Your Own Writes").
/// Otherwise creates a read-only snapshot transaction.
/// </summary>
/// <param name="id">ObjectId of the document</param>
/// <param name="transaction">Optional transaction for isolation (supports Read Your Own Writes)</param>
/// <returns>The document, or null if not found</returns>
public T? FindById(TId id)
{
RefreshPrimaryIndexRootFromMetadata();
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
try
{
var key = _mapper.ToIndexKey(id);
if (!_primaryIndex.TryFind(key, out var location, transaction.TransactionId))
return null;
return FindByLocation(location);
}
finally
{
}
}
/// <summary>
/// Returns all documents in the collection.
/// WARNING: This method requires an external transaction for proper isolation!
/// If no transaction is provided, reads committed snapshot only (may see partial updates).
/// </summary>
/// <param name="transaction">Transaction for isolation (REQUIRED for consistent reads during concurrent writes)</param>
/// <returns>Enumerable of all documents</returns>
public IEnumerable<T> FindAll()
{
RefreshPrimaryIndexRootFromMetadata();
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
var txnId = transaction?.TransactionId ?? 0;
var minKey = new IndexKey(Array.Empty<byte>());
var maxKey = new IndexKey(Enumerable.Repeat((byte)0xFF, 32).ToArray());
foreach (var entry in _primaryIndex.Range(minKey, maxKey, IndexDirection.Forward, txnId))
{
var entity = FindByLocation(entry.Location);
if (entity != null)
yield return entity;
}
}
/// <summary>
/// Finds a document by its physical storage location.
/// </summary>
/// <param name="location">The page and slot location of the document.</param>
/// <returns>The document if found; otherwise, null.</returns>
internal T? FindByLocation(DocumentLocation location)
{
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
var txnId = transaction?.TransactionId ?? 0;
var buffer = ArrayPool<byte>.Shared.Rent(_storage.PageSize);
try
{
// Read from StorageEngine with transaction isolation
_storage.ReadPage(location.PageId, txnId, buffer);
var header = SlottedPageHeader.ReadFrom(buffer);
if (location.SlotIndex >= header.SlotCount)
return null;
var slotOffset = SlottedPageHeader.Size + (location.SlotIndex * SlotEntry.Size);
var slot = SlotEntry.ReadFrom(buffer.AsSpan(slotOffset));
if ((slot.Flags & SlotFlags.Deleted) != 0)
return null;
ValidateSlotBounds(slot, buffer.Length, location);
if ((slot.Flags & SlotFlags.HasOverflow) != 0)
{
var storedPayload = ReassembleOverflowPayload(buffer.AsSpan(slot.Offset, slot.Length), txnId, buffer, location);
var logicalPayload = (slot.Flags & SlotFlags.Compressed) != 0
? DecompressStoredPayload(storedPayload, location)
: storedPayload;
return _mapper.Deserialize(new BsonSpanReader(logicalPayload, _storage.GetKeyReverseMap()));
}
var docData = buffer.AsSpan(slot.Offset, slot.Length);
if ((slot.Flags & SlotFlags.Compressed) != 0)
{
var logicalPayload = DecompressStoredPayload(docData, location);
return _mapper.Deserialize(new BsonSpanReader(logicalPayload, _storage.GetKeyReverseMap()));
}
return _mapper.Deserialize(new BsonSpanReader(docData, _storage.GetKeyReverseMap()));
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
#endregion
#region Update & Delete
/// <summary>
/// Updates an existing document in the collection
/// </summary>
/// <param name="entity">The entity containing updated values.</param>
/// <returns>True if the document was updated; otherwise, false.</returns>
public bool Update(T entity)
{
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
if (entity == null) throw new ArgumentNullException(nameof(entity));
_collectionLock.Wait();
try
{
try
{
var result = UpdateCore(entity);
return result;
}
catch
{
transaction.Rollback();
throw;
}
}
finally
{
_collectionLock.Release();
}
}
/// <summary>
/// Asynchronously updates an existing document in the collection
/// </summary>
/// <param name="entity">The entity containing updated values.</param>
/// <returns>True if the document was updated; otherwise, false.</returns>
public async Task<bool> UpdateAsync(T entity)
{
if (entity == null) throw new ArgumentNullException(nameof(entity));
await _collectionLock.WaitAsync();
try
{
var result = UpdateCore(entity);
return result;
}
finally
{
_collectionLock.Release();
}
}
/// <summary>
/// Updates multiple documents in a single operation.
/// </summary>
/// <param name="entities">The entities to update.</param>
/// <returns>The number of updated documents.</returns>
public int UpdateBulk(IEnumerable<T> entities)
{
if (entities == null) throw new ArgumentNullException(nameof(entities));
var entityList = entities.ToList();
int updateCount = 0;
_collectionLock.Wait();
try
{
updateCount = UpdateBulkInternal(entityList);
return updateCount;
}
finally
{
_collectionLock.Release();
}
}
/// <summary>
/// Asynchronously updates multiple documents in a single transaction.
/// </summary>
/// <param name="entities">The entities to update.</param>
/// <returns>The number of updated documents.</returns>
public async Task<int> UpdateBulkAsync(IEnumerable<T> entities)
{
if (entities == null) throw new ArgumentNullException(nameof(entities));
var entityList = entities.ToList();
int updateCount = 0;
await _collectionLock.WaitAsync();
try
{
updateCount = UpdateBulkInternal(entityList);
return updateCount;
}
finally
{
_collectionLock.Release();
}
}
private int UpdateBulkInternal(List<T> entityList)
{
RefreshPrimaryIndexRootFromMetadata();
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
int updateCount = 0;
const int BATCH_SIZE = 50;
for (int batchStart = 0; batchStart < entityList.Count; batchStart += BATCH_SIZE)
{
int batchEnd = Math.Min(batchStart + BATCH_SIZE, entityList.Count);
int batchCount = batchEnd - batchStart;
// PHASE 1: Parallel Serialization
var serializedBatch = new (TId id, byte[] data, int length, bool found)[batchCount];
for (int i = 0; i < batchCount; i++)
{
var entity = entityList[batchStart + i];
var id = _mapper.GetId(entity);
var key = _mapper.ToIndexKey(id);
// Check if entity exists
// We do this sequentially to avoid ThreadPool exhaustion or IO-related deadlocks
if (_primaryIndex.TryFind(key, out var _, transaction.TransactionId))
{
var length = SerializeWithRetry(entity, out var buffer);
serializedBatch[i] = (id, buffer, length, true);
}
else
{
serializedBatch[i] = (default!, null!, 0, false);
}
}
// PHASE 2: Sequential Update
for (int i = 0; i < batchCount; i++)
{
var (id, docData, length, found) = serializedBatch[i];
if (!found) continue;
var entity = entityList[batchStart + i];
try
{
if (UpdateDataCore(id, entity, docData.AsSpan(0, length)))
updateCount++;
}
finally
{
ArrayPool<byte>.Shared.Return(docData);
}
}
}
return updateCount;
}
private bool UpdateCore(T entity)
{
var id = _mapper.GetId(entity);
var length = SerializeWithRetry(entity, out var buffer);
try
{
return UpdateDataCore(id, entity, buffer.AsSpan(0, length));
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
private bool UpdateDataCore(TId id, T entity, ReadOnlySpan<byte> docData)
{
RefreshPrimaryIndexRootFromMetadata();
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
var key = _mapper.ToIndexKey(id);
var (storedPayloadOverride, storedPayloadFlags) = PreparePayloadForStorage(docData);
ReadOnlySpan<byte> storedPayload = storedPayloadOverride is null ? docData : storedPayloadOverride;
var bytesWritten = storedPayload.Length;
if (!_primaryIndex.TryFind(key, out var oldLocation, transaction.TransactionId))
return false;
// Retrieve old version for index updates
var oldEntity = FindByLocation(oldLocation);
if (oldEntity == null) return false;
// Read old page
var pageBuffer = ArrayPool<byte>.Shared.Rent(_storage.PageSize);
try
{
_storage.ReadPage(oldLocation.PageId, transaction.TransactionId, pageBuffer);
var slotOffset = SlottedPageHeader.Size + (oldLocation.SlotIndex * SlotEntry.Size);
var oldSlot = SlotEntry.ReadFrom(pageBuffer.AsSpan(slotOffset));
if (bytesWritten <= oldSlot.Length && (oldSlot.Flags & SlotFlags.HasOverflow) == 0)
{
// In-place update
storedPayload.CopyTo(pageBuffer.AsSpan(oldSlot.Offset, bytesWritten));
var newSlot = oldSlot;
newSlot.Length = (ushort)bytesWritten;
newSlot.Flags = storedPayloadFlags;
newSlot.WriteTo(pageBuffer.AsSpan(slotOffset));
_storage.WritePage(oldLocation.PageId, transaction.TransactionId, pageBuffer);
// Notify secondary indexes (primary index unchanged)
_indexManager.UpdateInAll(oldEntity, entity, oldLocation, oldLocation, transaction);
// Notify CDC
NotifyCdc(OperationType.Update, id, docData);
return true;
}
else
{
// Delete old + insert new
DeleteCore(id, notifyCdc: false);
DocumentLocation newLocation;
if (bytesWritten + SlotEntry.Size <= _maxDocumentSizeForSinglePage)
{
var newPageId = FindPageWithSpace(bytesWritten + SlotEntry.Size);
if (newPageId == 0) newPageId = AllocateNewDataPage();
var newSlotIndex = InsertIntoPage(newPageId, storedPayload, storedPayloadFlags);
newLocation = new DocumentLocation(newPageId, newSlotIndex);
}
else
{
var (newPageId, newSlotIndex) = InsertWithOverflow(storedPayload, storedPayloadFlags);
newLocation = new DocumentLocation(newPageId, newSlotIndex);
}
_primaryIndex.Insert(key, newLocation, transaction.TransactionId);
_indexManager.UpdateInAll(oldEntity, entity, oldLocation, newLocation, transaction);
// Notify CDC
NotifyCdc(OperationType.Update, id, docData);
return true;
}
}
finally
{
ArrayPool<byte>.Shared.Return(pageBuffer);
}
}
/// <summary>
/// Deletes a document by its primary key.
/// </summary>
/// <param name="id">The identifier of the document to delete.</param>
/// <returns>True if a document was deleted; otherwise, false.</returns>
public bool Delete(TId id)
{
_collectionLock.Wait();
try
{
var result = DeleteCore(id);
return result;
}
finally
{
_collectionLock.Release();
}
}
/// <summary>
/// Asynchronously deletes a document by its primary key.
/// </summary>
/// <param name="id">The identifier of the document to delete.</param>
/// <returns>True if a document was deleted; otherwise, false.</returns>
public async Task<bool> DeleteAsync(TId id)
{
await _collectionLock.WaitAsync();
try
{
var result = DeleteCore(id);
return result;
}
finally
{
_collectionLock.Release();
}
}
/// <summary>
/// Deletes multiple documents in a single transaction.
/// Efficiently updates storage and index.
/// </summary>
/// <param name="ids">The identifiers of documents to delete.</param>
/// <returns>The number of deleted documents.</returns>
public int DeleteBulk(IEnumerable<TId> ids)
{
if (ids == null) throw new ArgumentNullException(nameof(ids));
int deleteCount = 0;
_collectionLock.Wait();
try
{
deleteCount = DeleteBulkInternal(ids);
return deleteCount;
}
finally
{
_collectionLock.Release();
}
}
/// <summary>
/// Asynchronously deletes multiple documents in a single transaction.
/// </summary>
/// <param name="ids">The identifiers of documents to delete.</param>
/// <returns>The number of deleted documents.</returns>
public async Task<int> DeleteBulkAsync(IEnumerable<TId> ids)
{
if (ids == null) throw new ArgumentNullException(nameof(ids));
int deleteCount = 0;
await _collectionLock.WaitAsync();
try
{
deleteCount = DeleteBulkInternal(ids);
return deleteCount;
}
finally
{
_collectionLock.Release();
}
}
private int DeleteBulkInternal(IEnumerable<TId> ids)
{
int deleteCount = 0;
foreach (var id in ids)
{
if (DeleteCore(id))
deleteCount++;
}
return deleteCount;
}
private bool DeleteCore(TId id, bool notifyCdc = true)
{
RefreshPrimaryIndexRootFromMetadata();
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
var key = _mapper.ToIndexKey(id);
if (!_primaryIndex.TryFind(key, out var location, transaction.TransactionId))
return false;
// Notify secondary indexes BEFORE deleting document from storage
var entity = FindByLocation(location);
if (entity != null)
{
_indexManager.DeleteFromAll(entity, location, transaction);
}
// Read page
var buffer = ArrayPool<byte>.Shared.Rent(_storage.PageSize);
try
{
_storage.ReadPage(location.PageId, transaction.TransactionId, buffer);
var slotOffset = SlottedPageHeader.Size + (location.SlotIndex * SlotEntry.Size);
var slot = SlotEntry.ReadFrom(buffer.AsSpan(slotOffset));
// Check if slot has overflow and free it
if ((slot.Flags & SlotFlags.HasOverflow) != 0)
{
var nextOverflowPage = System.Buffers.Binary.BinaryPrimitives.ReadUInt32LittleEndian(
buffer.AsSpan(slot.Offset + 4, 4));
FreeOverflowChain(nextOverflowPage);
}
// Mark slot as deleted
var newSlot = slot;
newSlot.Flags |= SlotFlags.Deleted;
newSlot.WriteTo(buffer.AsSpan(slotOffset));
_storage.WritePage(location.PageId, transaction.TransactionId, buffer);
// Remove from primary index
_primaryIndex.Delete(key, location, transaction.TransactionId);
// Notify CDC
if (notifyCdc) NotifyCdc(OperationType.Delete, id);
return true;
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
private void FreeOverflowChain(uint overflowPageId)
{
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
var tempBuffer = ArrayPool<byte>.Shared.Rent(_storage.PageSize);
try
{
while (overflowPageId != 0)
{
_storage.ReadPage(overflowPageId, transaction.TransactionId, tempBuffer);
var header = SlottedPageHeader.ReadFrom(tempBuffer);
var nextPage = header.NextOverflowPage;
// Recycle this page
_storage.FreePage(overflowPageId);
overflowPageId = nextPage;
}
}
finally
{
ArrayPool<byte>.Shared.Return(tempBuffer);
}
}
#endregion
#region Query Helpers
/// <summary>
/// Counts all documents in the collection.
/// If called within a transaction, will count uncommitted changes.
/// </summary>
/// <param name="transaction">Optional transaction for isolation</param>
/// <returns>Number of documents</returns>
public int Count()
{
RefreshPrimaryIndexRootFromMetadata();
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
// Count all entries in primary index
// Use generic min/max keys for the index
var minKey = IndexKey.MinKey;
var maxKey = IndexKey.MaxKey;
return _primaryIndex.Range(minKey, maxKey, IndexDirection.Forward, transaction.TransactionId).Count();
}
/// <summary>
/// Finds all documents matching the predicate.
/// If transaction is provided, will see uncommitted changes.
/// </summary>
/// <param name="predicate">Predicate used to filter documents.</param>
/// <returns>Documents that match the predicate.</returns>
public IEnumerable<T> FindAll(Func<T, bool> predicate)
{
foreach (var entity in FindAll())
{
if (predicate(entity))
yield return entity;
}
}
/// <summary>
/// Find entities matching predicate (alias for FindAll with predicate)
/// </summary>
/// <param name="predicate">Predicate used to filter documents.</param>
/// <returns>Documents that match the predicate.</returns>
public IEnumerable<T> Find(Func<T, bool> predicate)
=> FindAll(predicate);
#endregion
private (byte[]? storedPayloadOverride, SlotFlags slotFlags) PreparePayloadForStorage(ReadOnlySpan<byte> logicalPayload)
{
if (TryCreateCompressedPayload(logicalPayload, out var compressedPayload))
{
return (compressedPayload, SlotFlags.Compressed);
}
return (null, SlotFlags.None);
}
private bool TryCreateCompressedPayload(ReadOnlySpan<byte> logicalPayload, out byte[]? storedPayload)
{
storedPayload = null;
var options = _storage.CompressionOptions;
var telemetry = _storage.CompressionTelemetry;
if (!options.EnableCompression)
return false;
if (logicalPayload.Length < options.MinSizeBytes)
{
telemetry.RecordCompressionSkippedTooSmall();
return false;
}
if (options.MaxCompressionInputBytes.HasValue && logicalPayload.Length > options.MaxCompressionInputBytes.Value)
{
telemetry.RecordSafetyLimitRejection();
return false;
}
telemetry.RecordCompressionAttempt(logicalPayload.Length);
try
{
long startedAt = Stopwatch.GetTimestamp();
var compressedPayload = _storage.CompressionService.Compress(logicalPayload, options.Codec, options.Level);
long elapsedTicks = Stopwatch.GetTimestamp() - startedAt;
telemetry.RecordCompressionCpuTicks(elapsedTicks);
int compressedStorageLength = CompressedPayloadHeader.Size + compressedPayload.Length;
if (!MeetsMinSavingsPercent(logicalPayload.Length, compressedStorageLength, options.MinSavingsPercent))
{
telemetry.RecordCompressionSkippedInsufficientSavings();
return false;
}
var output = new byte[compressedStorageLength];
var header = CompressedPayloadHeader.Create(options.Codec, logicalPayload.Length, compressedPayload);
header.WriteTo(output.AsSpan(0, CompressedPayloadHeader.Size));
compressedPayload.CopyTo(output.AsSpan(CompressedPayloadHeader.Size));
telemetry.RecordCompressionSuccess(output.Length);
storedPayload = output;
return true;
}
catch
{
telemetry.RecordCompressionFailure();
return false;
}
}
private static bool MeetsMinSavingsPercent(int originalLength, int compressedStorageLength, int minSavingsPercent)
{
if (originalLength <= 0)
return false;
int savedBytes = originalLength - compressedStorageLength;
if (savedBytes <= 0)
return false;
int savingsPercent = (int)((savedBytes * 100L) / originalLength);
return savingsPercent >= minSavingsPercent;
}
private static void ValidateSlotBounds(in SlotEntry slot, int bufferLength, in DocumentLocation location)
{
int endOffset = slot.Offset + slot.Length;
if (slot.Offset < SlottedPageHeader.Size || endOffset > bufferLength)
{
throw new InvalidDataException(
$"Corrupted slot bounds: Offset={slot.Offset}, Length={slot.Length}, Buffer={bufferLength}, SlotIndex={location.SlotIndex}, PageId={location.PageId}, Flags={slot.Flags}");
}
}
private byte[] ReassembleOverflowPayload(ReadOnlySpan<byte> primaryPayload, ulong transactionId, byte[] pageBuffer, in DocumentLocation location)
{
if (primaryPayload.Length < OverflowMetadataSize)
{
throw new InvalidDataException(
$"Corrupted overflow metadata: primary slot too small ({primaryPayload.Length} bytes) at {location.PageId}:{location.SlotIndex}.");
}
int totalLength = BinaryPrimitives.ReadInt32LittleEndian(primaryPayload.Slice(0, 4));
if (totalLength < 0 || totalLength > MaxStoredPayloadSizeBytes)
{
_storage.CompressionTelemetry.RecordSafetyLimitRejection();
throw new InvalidDataException(
$"Corrupted overflow metadata: invalid total length {totalLength} at {location.PageId}:{location.SlotIndex}.");
}
uint currentOverflowPageId = BinaryPrimitives.ReadUInt32LittleEndian(primaryPayload.Slice(4, 4));
int primaryChunkSize = primaryPayload.Length - OverflowMetadataSize;
if (totalLength < primaryChunkSize)
{
throw new InvalidDataException(
$"Corrupted overflow metadata: total length {totalLength} is smaller than primary chunk {primaryChunkSize} at {location.PageId}:{location.SlotIndex}.");
}
var fullPayload = new byte[totalLength];
primaryPayload.Slice(OverflowMetadataSize, primaryChunkSize).CopyTo(fullPayload);
int offset = primaryChunkSize;
int maxChunkSize = _storage.PageSize - SlottedPageHeader.Size;
while (currentOverflowPageId != 0 && offset < totalLength)
{
_storage.ReadPage(currentOverflowPageId, transactionId, pageBuffer);
var overflowHeader = SlottedPageHeader.ReadFrom(pageBuffer);
if (overflowHeader.PageType != PageType.Overflow)
{
throw new InvalidDataException(
$"Corrupted overflow chain: page {currentOverflowPageId} is not an overflow page.");
}
int remaining = totalLength - offset;
int chunkSize = Math.Min(maxChunkSize, remaining);
pageBuffer.AsSpan(SlottedPageHeader.Size, chunkSize).CopyTo(fullPayload.AsSpan(offset));
offset += chunkSize;
currentOverflowPageId = overflowHeader.NextOverflowPage;
}
if (offset != totalLength)
{
throw new InvalidDataException(
$"Corrupted overflow chain: expected {totalLength} bytes but reconstructed {offset} bytes at {location.PageId}:{location.SlotIndex}.");
}
if (currentOverflowPageId != 0)
{
throw new InvalidDataException(
$"Corrupted overflow chain: extra overflow pages remain after reconstruction at {location.PageId}:{location.SlotIndex}.");
}
return fullPayload;
}
private byte[] DecompressStoredPayload(ReadOnlySpan<byte> storedPayload, in DocumentLocation location)
{
var telemetry = _storage.CompressionTelemetry;
telemetry.RecordDecompressionAttempt();
try
{
if (storedPayload.Length < CompressedPayloadHeader.Size)
{
throw new InvalidDataException(
$"Corrupted compressed payload: missing header at {location.PageId}:{location.SlotIndex}.");
}
var header = CompressedPayloadHeader.ReadFrom(storedPayload.Slice(0, CompressedPayloadHeader.Size));
if (!Enum.IsDefined(typeof(CompressionCodec), header.Codec) || header.Codec == CompressionCodec.None)
{
throw new InvalidDataException(
$"Corrupted compressed payload: invalid codec '{header.Codec}' at {location.PageId}:{location.SlotIndex}.");
}
if (header.OriginalLength < 0 || header.OriginalLength > _storage.CompressionOptions.MaxDecompressedSizeBytes)
{
telemetry.RecordSafetyLimitRejection();
throw new InvalidDataException(
$"Corrupted compressed payload: invalid decompressed length {header.OriginalLength} at {location.PageId}:{location.SlotIndex}.");
}
int compressedLength = storedPayload.Length - CompressedPayloadHeader.Size;
if (header.CompressedLength < 0 || header.CompressedLength != compressedLength)
{
throw new InvalidDataException(
$"Corrupted compressed payload: invalid compressed length {header.CompressedLength} (actual {compressedLength}) at {location.PageId}:{location.SlotIndex}.");
}
var compressedPayload = storedPayload.Slice(CompressedPayloadHeader.Size, header.CompressedLength);
if (!header.ValidateChecksum(compressedPayload))
{
telemetry.RecordChecksumFailure();
throw new InvalidDataException(
$"Corrupted compressed payload: checksum mismatch at {location.PageId}:{location.SlotIndex}.");
}
if (!_storage.CompressionService.TryGetCodec(header.Codec, out _))
{
throw new InvalidDataException(
$"Corrupted compressed payload: codec '{header.Codec}' is not registered at {location.PageId}:{location.SlotIndex}.");
}
long startedAt = Stopwatch.GetTimestamp();
var decompressed = _storage.CompressionService.Decompress(
compressedPayload,
header.Codec,
header.OriginalLength,
_storage.CompressionOptions.MaxDecompressedSizeBytes);
long elapsedTicks = Stopwatch.GetTimestamp() - startedAt;
telemetry.RecordDecompressionCpuTicks(elapsedTicks);
if (decompressed.Length != header.OriginalLength)
{
throw new InvalidDataException(
$"Corrupted compressed payload: decompressed length {decompressed.Length} does not match expected {header.OriginalLength} at {location.PageId}:{location.SlotIndex}.");
}
telemetry.RecordDecompressionSuccess(decompressed.Length);
return decompressed;
}
catch (InvalidDataException)
{
telemetry.RecordDecompressionFailure();
throw;
}
catch (Exception ex)
{
telemetry.RecordDecompressionFailure();
throw new InvalidDataException(
$"Failed to decompress payload at {location.PageId}:{location.SlotIndex}.", ex);
}
}
/// <summary>
/// Serializes an entity with adaptive buffer sizing (Stepped Retry).
/// Strategies:
/// 1. 64KB (Covers 99% of docs, small overhead)
/// 2. 2MB (Covers large docs)
/// 3. 16MB (Max limit)
/// </summary>
private int SerializeWithRetry(T entity, out byte[] rentedBuffer)
{
// 64KB, 2MB, 16MB
int[] steps = { 65536, 2097152, 16777216 };
for (int i = 0; i < steps.Length; i++)
{
int size = steps[i];
// Ensure we at least cover PageSize (unlikely to be > 64KB but safe)
if (size < _storage.PageSize) size = _storage.PageSize;
var buffer = ArrayPool<byte>.Shared.Rent(size);
try
{
int bytesWritten = _mapper.Serialize(entity, new BsonSpanWriter(buffer, _storage.GetKeyMap()));
// Inject schema version if available
if (CurrentSchemaVersion != null)
{
if (bytesWritten + 8 > buffer.Length)
{
throw new IndexOutOfRangeException("Not enough space for version field");
}
AppendVersionField(buffer, ref bytesWritten);
}
rentedBuffer = buffer;
return bytesWritten;
}
catch (Exception ex) when (ex is ArgumentException || ex is IndexOutOfRangeException || ex is ArgumentOutOfRangeException)
{
ArrayPool<byte>.Shared.Return(buffer);
// Continue to next step
}
catch
{
ArrayPool<byte>.Shared.Return(buffer);
throw;
}
}
rentedBuffer = null!; // specific compiler satisfaction, though we throw
throw new InvalidOperationException($"Document too large. Maximum size allowed is 16MB.");
}
/// <summary>
/// Appends a version field to the specified BSON buffer if a current schema version is set.
/// </summary>
/// <remarks>The version field is only appended if a current schema version is available. The method
/// updates the BSON document's size and ensures the buffer remains in a valid BSON format.</remarks>
/// <param name="buffer">The byte array buffer to which the version field is appended. Must be large enough to accommodate the additional
/// bytes.</param>
/// <param name="bytesWritten">A reference to the number of bytes written to the buffer. Updated to reflect the new total after the version
/// field is appended.</param>
private void AppendVersionField(byte[] buffer, ref int bytesWritten)
{
if (CurrentSchemaVersion == null) return;
int version = CurrentSchemaVersion.Value.Version;
// BSON element for _v (Int32) with Compressed Key:
// Type (1 byte: 0x10)
// Key ID (2 bytes, little-endian)
// Value (4 bytes: int32)
// Total = 7 bytes
int pos = bytesWritten - 1; // Position of old 0x00 terminator
buffer[pos++] = 0x10; // Int32
ushort versionKeyId = _storage.GetOrAddDictionaryEntry("_v");
BinaryPrimitives.WriteUInt16LittleEndian(buffer.AsSpan(pos, 2), versionKeyId);
pos += 2;
BinaryPrimitives.WriteInt32LittleEndian(buffer.AsSpan(pos, 4), version);
pos += 4;
buffer[pos++] = 0x00; // new document terminator
bytesWritten = pos;
// Update total size (first 4 bytes)
BinaryPrimitives.WriteInt32LittleEndian(buffer.AsSpan(0, 4), bytesWritten);
}
/// <summary>
/// Performs a vector similarity search on the specified index and returns up to the top-k matching documents of
/// type T.
/// </summary>
/// <remarks>The search uses approximate nearest neighbor algorithms for efficient retrieval. The efSearch
/// parameter can be tuned to balance search speed and accuracy. Results are filtered to include only documents that
/// can be successfully retrieved from storage.</remarks>
/// <param name="indexName">The name of the index to search. Cannot be null or empty.</param>
/// <param name="query">The query vector used to find similar documents. The array length must match the dimensionality of the index.</param>
/// <param name="k">The maximum number of nearest neighbors to return. Must be greater than zero.</param>
/// <param name="efSearch">The size of the dynamic candidate list during search. Higher values may improve recall at the cost of
/// performance. Must be greater than zero. The default is 100.</param>
/// <param name="transaction">An optional transaction context to use for the search. If null, the operation is performed without a
/// transaction.</param>
/// <returns>An enumerable collection of up to k documents of type T that are most similar to the query vector. The
/// collection may be empty if no matches are found.</returns>
/// <exception cref="ArgumentException">Thrown if indexName is null, empty, or does not correspond to an existing index.</exception>
public IEnumerable<T> VectorSearch(string indexName, float[] query, int k, int efSearch = 100)
{
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
var index = _indexManager.GetIndex(indexName);
if (index == null)
throw new ArgumentException($"Index '{indexName}' not found.", nameof(indexName));
foreach (var result in index.VectorSearch(query, k, efSearch, transaction))
{
var doc = FindByLocation(result.Location);
if (doc != null) yield return doc;
}
}
/// <summary>
/// Finds all documents located within a specified radius of a geographic center point using a spatial index.
/// </summary>
/// <param name="indexName">The name of the spatial index to use for the search. Cannot be null or empty.</param>
/// <param name="center">A tuple representing the latitude and longitude of the center point, in decimal degrees.</param>
/// <param name="radiusKm">The search radius, in kilometers. Must be greater than zero.</param>
/// <param name="transaction">An optional transaction context to use for the operation. If null, the default transaction is used.</param>
/// <returns>An enumerable collection of documents of type T that are located within the specified radius of the center
/// point. The collection is empty if no documents are found.</returns>
/// <exception cref="ArgumentException">Thrown if indexName is null, empty, or does not correspond to an existing index.</exception>
public IEnumerable<T> Near(string indexName, (double Latitude, double Longitude) center, double radiusKm)
{
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
var index = _indexManager.GetIndex(indexName);
if (index == null)
throw new ArgumentException($"Index '{indexName}' not found.", nameof(indexName));
foreach (var loc in index.Near(center, radiusKm, transaction))
{
var doc = FindByLocation(loc);
if (doc != null) yield return doc;
}
}
/// <summary>
/// Returns all documents within the specified rectangular geographic area from the given spatial index.
/// </summary>
/// <param name="indexName">The name of the spatial index to search within. Cannot be null or empty.</param>
/// <param name="min">The minimum latitude and longitude coordinates defining one corner of the search rectangle.</param>
/// <param name="max">The maximum latitude and longitude coordinates defining the opposite corner of the search rectangle.</param>
/// <returns>An enumerable collection of documents of type T that are located within the specified geographic bounds. The
/// collection is empty if no documents are found.</returns>
/// <remarks>
/// Transactions are managed implicitly through the collection's <see cref="ITransactionHolder"/>; callers do not supply a transaction parameter.
/// </remarks>
/// <exception cref="ArgumentException">Thrown if indexName is null, empty, or does not correspond to an existing index.</exception>
public IEnumerable<T> Within(string indexName, (double Latitude, double Longitude) min, (double Latitude, double Longitude) max)
{
var transaction = _transactionHolder.GetCurrentTransactionOrStart();
var index = _indexManager.GetIndex(indexName);
if (index == null)
throw new ArgumentException($"Index '{indexName}' not found.", nameof(indexName));
foreach (var loc in index.Within(min, max, transaction))
{
var doc = FindByLocation(loc);
if (doc != null) yield return doc;
}
}
/// <summary>
/// Subscribes to a change stream that notifies observers of changes to the collection.
/// </summary>
/// <remarks>The returned observable emits events as changes are detected in the collection. Observers can
/// subscribe to receive real-time updates. The behavior of the event payload depends on the value of the
/// capturePayload parameter.</remarks>
/// <param name="capturePayload">true to include the full payload of changed documents in each event; otherwise, false to include only metadata
/// about the change. The default is false.</param>
/// <returns>An observable sequence of change stream events for the collection. Subscribers receive notifications as changes
/// occur.</returns>
/// <exception cref="InvalidOperationException">Thrown if change data capture (CDC) is not initialized for the storage.</exception>
public IObservable<ChangeStreamEvent<TId, T>> Watch(bool capturePayload = false)
{
return _cdcPublisher.Watch(capturePayload);
}
private void NotifyCdc(OperationType type, TId id, ReadOnlySpan<byte> docData = default)
{
_cdcPublisher.Notify(type, id, docData);
}
/// <summary>
/// Releases all resources used by the current instance of the class.
/// </summary>
/// <remarks>Call this method when you are finished using the object to free unmanaged resources
/// immediately. After calling Dispose, the object should not be used.</remarks>
public void Dispose()
{
_indexManager.Dispose();
GC.SuppressFinalize(this);
}
}