Files
CBDDC/src/ZB.MOM.WW.CBDDC.Persistence/Surreal/SurrealDocumentStore.cs
Joseph Doherty 8e97061ab8
All checks were successful
NuGet Package Publish / nuget (push) Successful in 1m14s
Implement in-process multi-dataset sync isolation across core, network, persistence, and tests
2026-02-22 11:58:34 -05:00

1412 lines
52 KiB
C#

using System.Collections.Concurrent;
using System.Reflection;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Text.RegularExpressions;
using Dahomey.Cbor.ObjectModel;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using SurrealDb.Net;
using SurrealDb.Net.Models.LiveQuery;
using SurrealDb.Net.Models;
using ZB.MOM.WW.CBDDC.Core;
using ZB.MOM.WW.CBDDC.Core.Network;
using ZB.MOM.WW.CBDDC.Core.Storage;
using ZB.MOM.WW.CBDDC.Core.Sync;
namespace ZB.MOM.WW.CBDDC.Persistence.Surreal;
/// <summary>
/// Abstract base class for Surreal-backed document stores.
/// Handles local oplog/document-metadata persistence and remote-sync suppression.
/// </summary>
/// <typeparam name="TContext">The application context type used by the concrete store.</typeparam>
public abstract class SurrealDocumentStore<TContext> : IDocumentStore, ISurrealCdcWorkerLifecycle, IDisposable
where TContext : class
{
private static readonly Regex SurrealIdentifierRegex = new("^[A-Za-z_][A-Za-z0-9_]*$", RegexOptions.Compiled);
private readonly List<IDisposable> _cdcWatchers = new();
private readonly SurrealCdcPollingOptions _cdcPollingOptions;
private readonly SemaphoreSlim _cdcWorkerLifecycleGate = new(1, 1);
private readonly SemaphoreSlim _liveSelectSignal = new(0, 1);
private readonly ISurrealCdcCheckpointPersistence? _checkpointPersistence;
private readonly object _clockLock = new();
private readonly HashSet<string> _registeredCollections = new(StringComparer.Ordinal);
/// <summary>
/// Semaphore used to suppress CDC-triggered oplog entry creation during remote sync.
/// </summary>
private readonly SemaphoreSlim _remoteSyncGuard = new(1, 1);
private readonly ConcurrentDictionary<string, int> _suppressedCdcEvents = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, WatchedCollectionRegistration> _watchedCollections = new(
StringComparer.Ordinal);
private CancellationTokenSource? _cdcWorkerCts;
private Task? _cdcWorkerTask;
private CancellationTokenSource? _liveSelectCts;
private readonly List<Task> _liveSelectTasks = new();
protected readonly IPeerNodeConfigurationProvider _configProvider;
protected readonly IConflictResolver _conflictResolver;
protected readonly TContext _context;
protected readonly ILogger<SurrealDocumentStore<TContext>> _logger;
protected readonly ICBDDCSurrealSchemaInitializer _schemaInitializer;
protected readonly ISurrealDbClient _surrealClient;
protected readonly IVectorClockService _vectorClock;
// HLC state for local change timestamp generation.
private int _logicalCounter;
private long _lastPhysicalTime;
/// <summary>
/// Initializes a new instance of the <see cref="SurrealDocumentStore{TContext}" /> class.
/// </summary>
/// <param name="context">The application context used by the concrete store.</param>
/// <param name="surrealEmbeddedClient">The embedded Surreal client provider.</param>
/// <param name="schemaInitializer">The Surreal schema initializer.</param>
/// <param name="configProvider">The peer node configuration provider.</param>
/// <param name="vectorClockService">The vector clock service used for local oplog state.</param>
/// <param name="conflictResolver">Optional conflict resolver; defaults to last-write-wins.</param>
/// <param name="checkpointPersistence">Optional CDC checkpoint persistence component.</param>
/// <param name="cdcPollingOptions">Optional CDC polling options.</param>
/// <param name="logger">Optional logger instance.</param>
protected SurrealDocumentStore(
TContext context,
ICBDDCSurrealEmbeddedClient surrealEmbeddedClient,
ICBDDCSurrealSchemaInitializer schemaInitializer,
IPeerNodeConfigurationProvider configProvider,
IVectorClockService vectorClockService,
IConflictResolver? conflictResolver = null,
ISurrealCdcCheckpointPersistence? checkpointPersistence = null,
SurrealCdcPollingOptions? cdcPollingOptions = null,
ILogger? logger = null)
{
_context = context ?? throw new ArgumentNullException(nameof(context));
_ = surrealEmbeddedClient ?? throw new ArgumentNullException(nameof(surrealEmbeddedClient));
_surrealClient = surrealEmbeddedClient.Client;
_schemaInitializer = schemaInitializer ?? throw new ArgumentNullException(nameof(schemaInitializer));
_configProvider = configProvider ?? throw new ArgumentNullException(nameof(configProvider));
_vectorClock = vectorClockService ?? throw new ArgumentNullException(nameof(vectorClockService));
_conflictResolver = conflictResolver ?? new LastWriteWinsConflictResolver();
_checkpointPersistence = checkpointPersistence;
_cdcPollingOptions = NormalizePollingOptions(cdcPollingOptions);
_logger = CreateTypedLogger(logger);
_lastPhysicalTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
_logicalCounter = 0;
}
/// <summary>
/// Releases managed resources used by this document store.
/// </summary>
public virtual void Dispose()
{
try
{
StopCdcWorkerAsync(CancellationToken.None).GetAwaiter().GetResult();
}
catch
{
}
foreach (var watcher in _cdcWatchers)
try
{
watcher.Dispose();
}
catch
{
}
_cdcWatchers.Clear();
_cdcWorkerCts?.Dispose();
_liveSelectCts?.Dispose();
_liveSelectSignal.Dispose();
_cdcWorkerLifecycleGate.Dispose();
_remoteSyncGuard.Dispose();
}
private static ILogger<SurrealDocumentStore<TContext>> CreateTypedLogger(ILogger? logger)
{
if (logger is null) return NullLogger<SurrealDocumentStore<TContext>>.Instance;
if (logger is ILogger<SurrealDocumentStore<TContext>> typedLogger) return typedLogger;
return new ForwardingLogger(logger);
}
private sealed class ForwardingLogger : ILogger<SurrealDocumentStore<TContext>>
{
private readonly ILogger _inner;
/// <summary>
/// Initializes a new instance of the <see cref="ForwardingLogger" /> class.
/// </summary>
/// <param name="inner">The logger instance to forward calls to.</param>
public ForwardingLogger(ILogger inner)
{
_inner = inner;
}
/// <inheritdoc />
public IDisposable? BeginScope<TState>(TState state) where TState : notnull
{
return _inner.BeginScope(state);
}
/// <inheritdoc />
public bool IsEnabled(LogLevel logLevel)
{
return _inner.IsEnabled(logLevel);
}
/// <inheritdoc />
public void Log<TState>(
LogLevel logLevel,
EventId eventId,
TState state,
Exception? exception,
Func<TState, Exception?, string> formatter)
{
_inner.Log(logLevel, eventId, state, exception, formatter);
}
}
#region CDC Registration
private static string BuildSuppressionKey(string collection, string key, OperationType operationType)
{
return $"{collection}|{key}|{(int)operationType}";
}
private void RegisterSuppressedCdcEvent(string collection, string key, OperationType operationType)
{
string suppressionKey = BuildSuppressionKey(collection, key, operationType);
_suppressedCdcEvents.AddOrUpdate(suppressionKey, 1, (_, current) => current + 1);
}
private bool TryConsumeSuppressedCdcEvent(string collection, string key, OperationType operationType)
{
string suppressionKey = BuildSuppressionKey(collection, key, operationType);
while (true)
{
if (!_suppressedCdcEvents.TryGetValue(suppressionKey, out int current)) return false;
if (current <= 1) return _suppressedCdcEvents.TryRemove(suppressionKey, out _);
if (_suppressedCdcEvents.TryUpdate(suppressionKey, current - 1, current)) return true;
}
}
private bool IsCdcPollingWorkerActiveForCollection(string collection)
{
return IsCdcWorkerRunning &&
_watchedCollections.ContainsKey(collection);
}
/// <summary>
/// Registers a watchable collection for local change tracking.
/// </summary>
/// <typeparam name="TEntity">The entity type emitted by the watch source.</typeparam>
/// <param name="collectionName">Logical collection name used by oplog and metadata records.</param>
/// <param name="collection">Watchable change source.</param>
/// <param name="keySelector">Function used to resolve the entity key.</param>
/// <param name="subscribeForInMemoryEvents">Whether to subscribe to in-memory collection events.</param>
protected void WatchCollection<TEntity>(
string collectionName,
ISurrealWatchableCollection<TEntity> collection,
Func<TEntity, string> keySelector,
bool subscribeForInMemoryEvents = true)
where TEntity : class
{
if (string.IsNullOrWhiteSpace(collectionName))
throw new ArgumentException("Collection name is required.", nameof(collectionName));
ArgumentNullException.ThrowIfNull(collection);
ArgumentNullException.ThrowIfNull(keySelector);
_registeredCollections.Add(collectionName);
string tableName = ResolveSurrealTableName(collection, collectionName);
_watchedCollections[collectionName] = new WatchedCollectionRegistration(collectionName, tableName);
if (!subscribeForInMemoryEvents) return;
var watcher = collection.Subscribe(new CdcObserver<TEntity>(collectionName, keySelector, this));
_cdcWatchers.Add(watcher);
}
private sealed class CdcObserver<TEntity> : IObserver<SurrealCollectionChange<TEntity>>
where TEntity : class
{
private readonly string _collectionName;
private readonly Func<TEntity, string> _keySelector;
private readonly SurrealDocumentStore<TContext> _store;
/// <summary>
/// Initializes a new instance of the <see cref="CdcObserver{TEntity}" /> class.
/// </summary>
/// <param name="collectionName">The logical collection name.</param>
/// <param name="keySelector">The key selector for observed entities.</param>
/// <param name="store">The owning document store.</param>
public CdcObserver(
string collectionName,
Func<TEntity, string> keySelector,
SurrealDocumentStore<TContext> store)
{
_collectionName = collectionName;
_keySelector = keySelector;
_store = store;
}
/// <inheritdoc />
public void OnNext(SurrealCollectionChange<TEntity> changeEvent)
{
if (_store.IsCdcPollingWorkerActiveForCollection(_collectionName)) return;
var operationType = changeEvent.OperationType == OperationType.Delete
? OperationType.Delete
: OperationType.Put;
string entityId = changeEvent.DocumentId ?? "";
if (operationType == OperationType.Put && changeEvent.Entity != null)
{
string selectedKey = _keySelector(changeEvent.Entity);
if (!string.IsNullOrWhiteSpace(selectedKey)) entityId = selectedKey;
}
if (operationType == OperationType.Delete && string.IsNullOrWhiteSpace(entityId)) return;
if (_store.TryConsumeSuppressedCdcEvent(_collectionName, entityId, operationType)) return;
if (_store._remoteSyncGuard.CurrentCount == 0) return;
if (operationType == OperationType.Delete)
{
_store.OnLocalChangeDetectedAsync(_collectionName, entityId, OperationType.Delete, null)
.GetAwaiter().GetResult();
return;
}
if (changeEvent.Entity == null) return;
var content = JsonSerializer.SerializeToElement(changeEvent.Entity);
string key = _keySelector(changeEvent.Entity);
if (string.IsNullOrWhiteSpace(key)) key = entityId;
if (string.IsNullOrWhiteSpace(key)) return;
_store.OnLocalChangeDetectedAsync(_collectionName, key, OperationType.Put, content)
.GetAwaiter().GetResult();
}
/// <inheritdoc />
public void OnError(Exception error)
{
}
/// <inheritdoc />
public void OnCompleted()
{
}
}
private static string ResolveSurrealTableName<TEntity>(
ISurrealWatchableCollection<TEntity> collection,
string fallbackCollectionName)
where TEntity : class
{
Type collectionType = collection.GetType();
const BindingFlags flags = BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic;
foreach (string memberName in new[] { "TableName", "_tableName", "tableName" })
{
PropertyInfo? property = collectionType.GetProperty(memberName, flags);
if (property?.CanRead == true &&
property.GetValue(collection) is string propertyValue &&
!string.IsNullOrWhiteSpace(propertyValue))
return propertyValue;
FieldInfo? field = collectionType.GetField(memberName, flags);
if (field?.GetValue(collection) is string fieldValue &&
!string.IsNullOrWhiteSpace(fieldValue))
return fieldValue;
}
return fallbackCollectionName;
}
private static SurrealCdcPollingOptions NormalizePollingOptions(SurrealCdcPollingOptions? options)
{
TimeSpan interval = options?.PollInterval ?? TimeSpan.FromMilliseconds(250);
if (interval <= TimeSpan.Zero) interval = TimeSpan.FromMilliseconds(250);
int batchSize = options?.BatchSize ?? 100;
if (batchSize <= 0) batchSize = 100;
TimeSpan liveReconnectDelay = options?.LiveSelectReconnectDelay ?? TimeSpan.FromSeconds(2);
if (liveReconnectDelay <= TimeSpan.Zero) liveReconnectDelay = TimeSpan.FromSeconds(2);
return new SurrealCdcPollingOptions
{
Enabled = options?.Enabled ?? true,
PollInterval = interval,
BatchSize = batchSize,
EnableLiveSelectAccelerator = options?.EnableLiveSelectAccelerator ?? true,
LiveSelectReconnectDelay = liveReconnectDelay
};
}
private readonly record struct WatchedCollectionRegistration(
string CollectionName,
string TableName);
protected readonly record struct PendingCursorCheckpoint(
string TableName,
ulong Cursor);
#endregion
#region CDC Worker Lifecycle
/// <inheritdoc />
public bool IsCdcWorkerRunning =>
_cdcWorkerTask != null &&
!_cdcWorkerTask.IsCompleted;
/// <inheritdoc />
public async Task StartCdcWorkerAsync(CancellationToken cancellationToken = default)
{
if (!_cdcPollingOptions.Enabled)
{
_logger.LogDebug("Surreal CDC worker start skipped because polling is disabled.");
return;
}
if (_checkpointPersistence == null)
{
_logger.LogDebug("Surreal CDC worker start skipped because checkpoint persistence is not configured.");
return;
}
await _cdcWorkerLifecycleGate.WaitAsync(cancellationToken);
try
{
cancellationToken.ThrowIfCancellationRequested();
if (IsCdcWorkerRunning) return;
await EnsureReadyAsync(cancellationToken);
StartLiveSelectAcceleratorsUnsafe();
_cdcWorkerCts = new CancellationTokenSource();
_cdcWorkerTask = Task.Run(() => RunCdcWorkerAsync(_cdcWorkerCts.Token), CancellationToken.None);
_logger.LogInformation(
"Started Surreal CDC worker with interval {IntervalMs} ms, batch size {BatchSize}, live accelerator {LiveAccelerator}.",
_cdcPollingOptions.PollInterval.TotalMilliseconds,
_cdcPollingOptions.BatchSize,
_cdcPollingOptions.EnableLiveSelectAccelerator);
}
finally
{
_cdcWorkerLifecycleGate.Release();
}
}
/// <inheritdoc />
public async Task PollCdcOnceAsync(CancellationToken cancellationToken = default)
{
if (!_cdcPollingOptions.Enabled) return;
if (_checkpointPersistence == null) return;
if (_watchedCollections.IsEmpty) return;
await EnsureReadyAsync(cancellationToken);
await PollWatchedCollectionsOnceAsync(cancellationToken);
}
/// <inheritdoc />
public async Task StopCdcWorkerAsync(CancellationToken cancellationToken = default)
{
Task? workerTask;
CancellationTokenSource? workerCts;
Task[] liveSelectTasks;
CancellationTokenSource? liveSelectCts;
await _cdcWorkerLifecycleGate.WaitAsync(cancellationToken);
try
{
workerTask = _cdcWorkerTask;
workerCts = _cdcWorkerCts;
_cdcWorkerTask = null;
_cdcWorkerCts = null;
liveSelectTasks = _liveSelectTasks.ToArray();
_liveSelectTasks.Clear();
liveSelectCts = _liveSelectCts;
_liveSelectCts = null;
}
finally
{
_cdcWorkerLifecycleGate.Release();
}
if (workerTask == null)
{
workerCts?.Dispose();
if (liveSelectTasks.Length == 0)
{
liveSelectCts?.Dispose();
return;
}
}
try
{
workerCts?.Cancel();
liveSelectCts?.Cancel();
if (workerTask != null) await workerTask.WaitAsync(cancellationToken);
if (liveSelectTasks.Length > 0)
{
Task waitAll = Task.WhenAll(liveSelectTasks);
try
{
await waitAll.WaitAsync(cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
}
catch
{
}
}
}
catch (OperationCanceledException) when ((workerTask?.IsCanceled ?? false) || cancellationToken.IsCancellationRequested)
{
}
finally
{
workerCts?.Dispose();
liveSelectCts?.Dispose();
}
}
private async Task RunCdcWorkerAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
try
{
await PollCdcOnceAsync(cancellationToken);
if (!_cdcPollingOptions.EnableLiveSelectAccelerator || _liveSelectCts == null || _liveSelectTasks.Count == 0)
{
await Task.Delay(_cdcPollingOptions.PollInterval, cancellationToken);
continue;
}
Task delayTask = Task.Delay(_cdcPollingOptions.PollInterval, cancellationToken);
Task signalTask = _liveSelectSignal.WaitAsync(cancellationToken);
await Task.WhenAny(delayTask, signalTask);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
break;
}
catch (Exception exception)
{
_logger.LogError(exception, "Surreal CDC worker polling iteration failed.");
try
{
await Task.Delay(_cdcPollingOptions.PollInterval, cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
break;
}
}
_logger.LogDebug("Stopped Surreal CDC worker.");
}
private void StartLiveSelectAcceleratorsUnsafe()
{
if (!_cdcPollingOptions.EnableLiveSelectAccelerator) return;
if (_watchedCollections.IsEmpty) return;
if (_liveSelectCts != null) return;
_liveSelectCts = new CancellationTokenSource();
_liveSelectTasks.Clear();
foreach (WatchedCollectionRegistration watched in _watchedCollections.Values
.OrderBy(v => v.CollectionName, StringComparer.Ordinal))
_liveSelectTasks.Add(Task.Run(
() => RunLiveSelectAcceleratorAsync(watched, _liveSelectCts.Token),
CancellationToken.None));
}
private async Task RunLiveSelectAcceleratorAsync(
WatchedCollectionRegistration watched,
CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await using var liveQuery =
await _surrealClient.LiveTable<object>(watched.TableName, false, cancellationToken);
await foreach (SurrealDbLiveQueryResponse response in liveQuery.GetResults(cancellationToken))
{
if (cancellationToken.IsCancellationRequested) break;
if (response is SurrealDbLiveQueryOpenResponse) continue;
if (response is SurrealDbLiveQueryCloseResponse closeResponse)
{
_logger.LogDebug(
"LIVE SELECT stream closed for table {Table} with reason {Reason}.",
watched.TableName,
closeResponse.Reason);
break;
}
SignalLiveSelectWake();
}
}
catch (NotSupportedException)
{
_logger.LogDebug(
"LIVE SELECT accelerator is not supported for table {Table}; fallback remains polling-only.",
watched.TableName);
return;
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
break;
}
catch (Exception exception)
{
_logger.LogDebug(
exception,
"LIVE SELECT accelerator loop failed for table {Table}; retrying.",
watched.TableName);
}
try
{
await Task.Delay(_cdcPollingOptions.LiveSelectReconnectDelay, cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
break;
}
}
}
private void SignalLiveSelectWake()
{
if (_liveSelectSignal.CurrentCount > 0) return;
try
{
_liveSelectSignal.Release();
}
catch (SemaphoreFullException)
{
}
}
private async Task PollWatchedCollectionsOnceAsync(CancellationToken cancellationToken)
{
if (_watchedCollections.IsEmpty) return;
foreach (WatchedCollectionRegistration watched in _watchedCollections.Values
.OrderBy(v => v.CollectionName, StringComparer.Ordinal))
await PollCollectionChangesAsync(watched, cancellationToken);
}
private async Task PollCollectionChangesAsync(
WatchedCollectionRegistration watched,
CancellationToken cancellationToken)
{
if (!SurrealIdentifierRegex.IsMatch(watched.TableName))
{
_logger.LogDebug(
"Skipping CDC polling for collection {Collection} because table name '{Table}' is not a valid Surreal identifier.",
watched.CollectionName,
watched.TableName);
return;
}
ulong cursor = await ReadCursorCheckpointAsync(watched.TableName, cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
IReadOnlyList<SurrealPolledChangeRow> rows;
try
{
rows = await QueryChangeRowsAsync(watched.TableName, cursor, _cdcPollingOptions.BatchSize,
cancellationToken);
}
catch (Exception exception)
{
if (cursor > 0 && IsLikelyChangefeedRetentionBoundary(exception))
_logger.LogWarning(
exception,
"SHOW CHANGES query failed for table {Table} at cursor {Cursor}. " +
"The cursor may be outside configured changefeed retention; checkpoint remains unchanged until replay is re-established.",
watched.TableName,
cursor);
else
_logger.LogDebug(
exception,
"SHOW CHANGES query failed for table {Table}.",
watched.TableName);
return;
}
if (rows.Count == 0) return;
foreach (SurrealPolledChangeRow row in rows)
{
ulong nextCursor = BuildNextCursor(row.Versionstamp);
if (row.Changes.Count == 0)
{
await WriteCursorCheckpointAsync(watched.TableName, nextCursor, cancellationToken);
cursor = nextCursor;
continue;
}
for (var i = 0; i < row.Changes.Count; i++)
{
SurrealPolledChange change = row.Changes[i];
PendingCursorCheckpoint? pendingCursorCheckpoint = i == row.Changes.Count - 1
? new PendingCursorCheckpoint(watched.TableName, nextCursor)
: null;
await OnLocalChangeDetectedAsync(
watched.CollectionName,
change.Key,
change.OperationType,
change.Content,
pendingCursorCheckpoint,
cancellationToken);
}
cursor = nextCursor;
}
if (rows.Count < _cdcPollingOptions.BatchSize) return;
}
}
private async Task<IReadOnlyList<SurrealPolledChangeRow>> QueryChangeRowsAsync(
string tableName,
ulong cursor,
int batchSize,
CancellationToken cancellationToken)
{
string query = $"SHOW CHANGES FOR TABLE {tableName} SINCE {cursor} LIMIT {batchSize};";
var response = await _surrealClient.RawQuery(query, cancellationToken: cancellationToken);
response.EnsureAllOks();
List<CborObject> rows;
try
{
rows = response.GetValues<CborObject>(0).ToList();
}
catch
{
return [];
}
return SurrealShowChangesCborDecoder.DecodeRows(rows, tableName);
}
private async Task<ulong> ReadCursorCheckpointAsync(string tableName, CancellationToken cancellationToken)
{
if (_checkpointPersistence == null) return 0;
var checkpoint = await _checkpointPersistence.GetCheckpointAsync(
BuildCursorCheckpointConsumerId(tableName),
cancellationToken);
if (checkpoint?.VersionstampCursor is > 0)
return (ulong)checkpoint.VersionstampCursor.Value;
if (checkpoint == null || checkpoint.Timestamp.PhysicalTime < 0) return 0;
return (ulong)checkpoint.Timestamp.PhysicalTime;
}
private async Task WriteCursorCheckpointAsync(
string tableName,
ulong cursor,
CancellationToken cancellationToken)
{
if (_checkpointPersistence == null) return;
long encodedCursor = cursor > long.MaxValue
? long.MaxValue
: (long)cursor;
await _checkpointPersistence.UpsertCheckpointAsync(
new HlcTimestamp(encodedCursor, 0, "surreal-cdc"),
"",
BuildCursorCheckpointConsumerId(tableName),
cancellationToken,
encodedCursor);
}
private string BuildCursorCheckpointConsumerId(string tableName)
{
string baseConsumerId = "default";
if (TryGetCheckpointSettings(out _, out string configuredConsumerId))
baseConsumerId = configuredConsumerId;
return BuildCursorCheckpointConsumerId(tableName, baseConsumerId);
}
private static string BuildCursorCheckpointConsumerId(string tableName, string baseConsumerId)
{
return $"{baseConsumerId}:show_changes_cursor:{tableName}";
}
private static ulong BuildNextCursor(ulong versionstamp)
{
ulong majorCursor = versionstamp >> 16;
if (majorCursor == 0) majorCursor = versionstamp;
return majorCursor + 1;
}
private static bool IsLikelyChangefeedRetentionBoundary(Exception exception)
{
string message = exception.ToString();
if (string.IsNullOrWhiteSpace(message)) return false;
string normalized = message.ToLowerInvariant();
return normalized.Contains("retention", StringComparison.Ordinal) ||
(normalized.Contains("versionstamp", StringComparison.Ordinal) &&
normalized.Contains("outside", StringComparison.Ordinal)) ||
(normalized.Contains("change", StringComparison.Ordinal) &&
normalized.Contains("feed", StringComparison.Ordinal) &&
normalized.Contains("since", StringComparison.Ordinal)) ||
(normalized.Contains("history", StringComparison.Ordinal) &&
normalized.Contains("change", StringComparison.Ordinal));
}
#endregion
#region Abstract Methods - Implemented by subclass
/// <summary>
/// Applies JSON content to a single entity in the backing store.
/// </summary>
/// <param name="collection">The collection name.</param>
/// <param name="key">The document key.</param>
/// <param name="content">The JSON payload to persist.</param>
/// <param name="cancellationToken">The cancellation token.</param>
protected abstract Task ApplyContentToEntityAsync(
string collection, string key, JsonElement content, CancellationToken cancellationToken);
/// <summary>
/// Applies JSON content to multiple entities in the backing store.
/// </summary>
/// <param name="documents">The documents to persist.</param>
/// <param name="cancellationToken">The cancellation token.</param>
protected abstract Task ApplyContentToEntitiesBatchAsync(
IEnumerable<(string Collection, string Key, JsonElement Content)> documents,
CancellationToken cancellationToken);
/// <summary>
/// Gets a single entity as JSON content.
/// </summary>
/// <param name="collection">The collection name.</param>
/// <param name="key">The document key.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The JSON content when found; otherwise <see langword="null" />.</returns>
protected abstract Task<JsonElement?> GetEntityAsJsonAsync(
string collection, string key, CancellationToken cancellationToken);
/// <summary>
/// Removes a single entity from the backing store.
/// </summary>
/// <param name="collection">The collection name.</param>
/// <param name="key">The document key.</param>
/// <param name="cancellationToken">The cancellation token.</param>
protected abstract Task RemoveEntityAsync(
string collection, string key, CancellationToken cancellationToken);
/// <summary>
/// Removes multiple entities from the backing store.
/// </summary>
/// <param name="documents">The documents to remove.</param>
/// <param name="cancellationToken">The cancellation token.</param>
protected abstract Task RemoveEntitiesBatchAsync(
IEnumerable<(string Collection, string Key)> documents, CancellationToken cancellationToken);
/// <summary>
/// Gets all entities from a collection as JSON content.
/// </summary>
/// <param name="collection">The collection name.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A sequence of key/content pairs.</returns>
protected abstract Task<IEnumerable<(string Key, JsonElement Content)>> GetAllEntitiesAsJsonAsync(
string collection, CancellationToken cancellationToken);
#endregion
#region IDocumentStore Implementation
/// <inheritdoc />
public IEnumerable<string> InterestedCollection => _registeredCollections;
/// <inheritdoc />
public async Task<Document?> GetDocumentAsync(
string collection,
string key,
CancellationToken cancellationToken = default)
{
var content = await GetEntityAsJsonAsync(collection, key, cancellationToken);
if (content == null) return null;
var timestamp = new HlcTimestamp(0, 0, "");
return new Document(collection, key, content.Value, timestamp, false);
}
/// <inheritdoc />
public async Task<IEnumerable<Document>> GetDocumentsByCollectionAsync(
string collection,
CancellationToken cancellationToken = default)
{
var entities = await GetAllEntitiesAsJsonAsync(collection, cancellationToken);
var timestamp = new HlcTimestamp(0, 0, "");
return entities.Select(e => new Document(collection, e.Key, e.Content, timestamp, false));
}
/// <inheritdoc />
public async Task<IEnumerable<Document>> GetDocumentsAsync(
List<(string Collection, string Key)> documentKeys,
CancellationToken cancellationToken)
{
var documents = new List<Document>();
foreach ((string collection, string key) in documentKeys)
{
var document = await GetDocumentAsync(collection, key, cancellationToken);
if (document != null) documents.Add(document);
}
return documents;
}
/// <inheritdoc />
public async Task<bool> PutDocumentAsync(Document document, CancellationToken cancellationToken = default)
{
await _remoteSyncGuard.WaitAsync(cancellationToken);
try
{
await PutDocumentInternalAsync(document, cancellationToken);
}
finally
{
_remoteSyncGuard.Release();
}
return true;
}
private async Task PutDocumentInternalAsync(Document document, CancellationToken cancellationToken)
{
RegisterSuppressedCdcEvent(document.Collection, document.Key, OperationType.Put);
await ApplyContentToEntityAsync(document.Collection, document.Key, document.Content, cancellationToken);
}
/// <inheritdoc />
public async Task<bool> UpdateBatchDocumentsAsync(
IEnumerable<Document> documents,
CancellationToken cancellationToken = default)
{
var documentList = documents.ToList();
await _remoteSyncGuard.WaitAsync(cancellationToken);
try
{
foreach (var document in documentList)
RegisterSuppressedCdcEvent(document.Collection, document.Key, OperationType.Put);
await ApplyContentToEntitiesBatchAsync(
documentList.Select(d => (d.Collection, d.Key, d.Content)),
cancellationToken);
}
finally
{
_remoteSyncGuard.Release();
}
return true;
}
/// <inheritdoc />
public async Task<bool> InsertBatchDocumentsAsync(
IEnumerable<Document> documents,
CancellationToken cancellationToken = default)
{
var documentList = documents.ToList();
await _remoteSyncGuard.WaitAsync(cancellationToken);
try
{
foreach (var document in documentList)
RegisterSuppressedCdcEvent(document.Collection, document.Key, OperationType.Put);
await ApplyContentToEntitiesBatchAsync(
documentList.Select(d => (d.Collection, d.Key, d.Content)),
cancellationToken);
}
finally
{
_remoteSyncGuard.Release();
}
return true;
}
/// <inheritdoc />
public async Task<bool> DeleteDocumentAsync(
string collection,
string key,
CancellationToken cancellationToken = default)
{
await _remoteSyncGuard.WaitAsync(cancellationToken);
try
{
await DeleteDocumentInternalAsync(collection, key, cancellationToken);
}
finally
{
_remoteSyncGuard.Release();
}
return true;
}
private async Task DeleteDocumentInternalAsync(
string collection,
string key,
CancellationToken cancellationToken)
{
RegisterSuppressedCdcEvent(collection, key, OperationType.Delete);
await RemoveEntityAsync(collection, key, cancellationToken);
}
/// <inheritdoc />
public async Task<bool> DeleteBatchDocumentsAsync(
IEnumerable<string> documentKeys,
CancellationToken cancellationToken = default)
{
var parsedKeys = new List<(string Collection, string Key)>();
foreach (string key in documentKeys)
{
string[] parts = key.Split('/');
if (parts.Length == 2)
parsedKeys.Add((parts[0], parts[1]));
else
_logger.LogWarning("Invalid document key format: {Key}", key);
}
if (parsedKeys.Count == 0) return true;
await _remoteSyncGuard.WaitAsync(cancellationToken);
try
{
foreach ((string collection, string key) in parsedKeys)
RegisterSuppressedCdcEvent(collection, key, OperationType.Delete);
await RemoveEntitiesBatchAsync(parsedKeys, cancellationToken);
}
finally
{
_remoteSyncGuard.Release();
}
return true;
}
/// <inheritdoc />
public async Task<Document> MergeAsync(Document incoming, CancellationToken cancellationToken = default)
{
var existing = await GetDocumentAsync(incoming.Collection, incoming.Key, cancellationToken);
if (existing == null)
{
await PutDocumentInternalAsync(incoming, cancellationToken);
return incoming;
}
var resolution = _conflictResolver.Resolve(existing, new OplogEntry(
incoming.Collection,
incoming.Key,
OperationType.Put,
incoming.Content,
incoming.UpdatedAt,
""));
if (resolution.ShouldApply && resolution.MergedDocument != null)
{
await PutDocumentInternalAsync(resolution.MergedDocument, cancellationToken);
return resolution.MergedDocument;
}
return existing;
}
#endregion
#region ISnapshotable Implementation
/// <inheritdoc />
public async Task DropAsync(CancellationToken cancellationToken = default)
{
foreach (string collection in InterestedCollection)
{
var entities = await GetAllEntitiesAsJsonAsync(collection, cancellationToken);
foreach ((string key, var _) in entities) await RemoveEntityAsync(collection, key, cancellationToken);
}
}
/// <inheritdoc />
public async Task<IEnumerable<Document>> ExportAsync(CancellationToken cancellationToken = default)
{
var documents = new List<Document>();
foreach (string collection in InterestedCollection)
{
var collectionDocuments = await GetDocumentsByCollectionAsync(collection, cancellationToken);
documents.AddRange(collectionDocuments);
}
return documents;
}
/// <inheritdoc />
public async Task ImportAsync(IEnumerable<Document> items, CancellationToken cancellationToken = default)
{
var documents = items.ToList();
await _remoteSyncGuard.WaitAsync(cancellationToken);
try
{
foreach (var document in documents)
RegisterSuppressedCdcEvent(document.Collection, document.Key, OperationType.Put);
await ApplyContentToEntitiesBatchAsync(
documents.Select(d => (d.Collection, d.Key, d.Content)),
cancellationToken);
}
finally
{
_remoteSyncGuard.Release();
}
}
/// <inheritdoc />
public async Task MergeAsync(IEnumerable<Document> items, CancellationToken cancellationToken = default)
{
await _remoteSyncGuard.WaitAsync(cancellationToken);
try
{
foreach (var document in items) await MergeAsync(document, cancellationToken);
}
finally
{
_remoteSyncGuard.Release();
}
}
#endregion
#region Oplog Management
/// <summary>
/// Returns true when remote sync is in progress and local CDC must be suppressed.
/// </summary>
protected bool IsRemoteSyncInProgress => _remoteSyncGuard.CurrentCount == 0;
/// <summary>
/// Handles a local collection change and records oplog/metadata when not suppressed.
/// </summary>
/// <param name="collection">The collection name.</param>
/// <param name="key">The document key.</param>
/// <param name="operationType">The detected operation type.</param>
/// <param name="content">Optional JSON content for non-delete operations.</param>
/// <param name="pendingCursorCheckpoint">Optional pending cursor checkpoint to persist.</param>
/// <param name="cancellationToken">The cancellation token.</param>
protected async Task OnLocalChangeDetectedAsync(
string collection,
string key,
OperationType operationType,
JsonElement? content,
PendingCursorCheckpoint? pendingCursorCheckpoint = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(collection)) return;
if (string.IsNullOrWhiteSpace(key)) return;
if (TryConsumeSuppressedCdcEvent(collection, key, operationType)) return;
if (IsRemoteSyncInProgress) return;
await CreateOplogEntryAsync(collection, key, operationType, content, pendingCursorCheckpoint, cancellationToken);
}
private HlcTimestamp GenerateTimestamp(string nodeId)
{
lock (_clockLock)
{
long now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
if (now > _lastPhysicalTime)
{
_lastPhysicalTime = now;
_logicalCounter = 0;
}
else
{
_logicalCounter++;
}
return new HlcTimestamp(_lastPhysicalTime, _logicalCounter, nodeId);
}
}
private async Task CreateOplogEntryAsync(
string collection,
string key,
OperationType operationType,
JsonElement? content,
PendingCursorCheckpoint? pendingCursorCheckpoint,
CancellationToken cancellationToken)
{
await EnsureReadyAsync(cancellationToken);
var config = await _configProvider.GetConfiguration();
string nodeId = config.NodeId ?? "";
string previousHash = _vectorClock.GetLastHash(nodeId) ??
await QueryLastHashForNodeAsync(nodeId, cancellationToken) ??
string.Empty;
var timestamp = GenerateTimestamp(nodeId);
var oplogEntry = new OplogEntry(
collection,
key,
operationType,
content,
timestamp,
previousHash);
var metadata = new DocumentMetadata(collection, key, timestamp, operationType == OperationType.Delete);
await PersistOplogAndMetadataAtomicallyAsync(oplogEntry, metadata, pendingCursorCheckpoint, cancellationToken);
_vectorClock.Update(oplogEntry);
_logger.LogDebug(
"Created local oplog entry: {Operation} {Collection}/{Key} at {Timestamp} (hash: {Hash})",
operationType, collection, key, timestamp, oplogEntry.Hash);
}
private async Task PersistOplogAndMetadataAtomicallyAsync(
OplogEntry oplogEntry,
DocumentMetadata metadata,
PendingCursorCheckpoint? pendingCursorCheckpoint,
CancellationToken cancellationToken)
{
var parameters = new Dictionary<string, object?>
{
["oplogRecordId"] = SurrealStoreRecordIds.Oplog(oplogEntry.Hash),
["oplogRecord"] = oplogEntry.ToSurrealRecord(),
["metadataRecordId"] = SurrealStoreRecordIds.DocumentMetadata(
metadata.Collection,
metadata.Key,
metadata.DatasetId),
["metadataRecord"] = metadata.ToSurrealRecord()
};
var sqlBuilder = new StringBuilder();
sqlBuilder.AppendLine("BEGIN TRANSACTION;");
sqlBuilder.AppendLine("UPSERT $oplogRecordId CONTENT $oplogRecord;");
sqlBuilder.AppendLine("UPSERT $metadataRecordId CONTENT $metadataRecord;");
bool localCheckpointWrittenInTransaction = TryBuildCheckpointTransactionPayload(
oplogEntry,
out RecordId localCheckpointRecordId,
out Dictionary<string, object?> localCheckpointRecord);
if (localCheckpointWrittenInTransaction)
{
parameters["localCheckpointRecordId"] = localCheckpointRecordId;
parameters["localCheckpointRecord"] = localCheckpointRecord;
sqlBuilder.AppendLine("UPSERT $localCheckpointRecordId CONTENT $localCheckpointRecord;");
}
bool cursorCheckpointWrittenInTransaction = TryBuildCursorCheckpointTransactionPayload(
pendingCursorCheckpoint,
out RecordId cursorCheckpointRecordId,
out Dictionary<string, object?> cursorCheckpointRecord);
if (cursorCheckpointWrittenInTransaction)
{
parameters["cursorCheckpointRecordId"] = cursorCheckpointRecordId;
parameters["cursorCheckpointRecord"] = cursorCheckpointRecord;
sqlBuilder.AppendLine("UPSERT $cursorCheckpointRecordId CONTENT $cursorCheckpointRecord;");
}
sqlBuilder.AppendLine("COMMIT TRANSACTION;");
string sql = sqlBuilder.ToString();
var response = await _surrealClient.RawQuery(sql, parameters, cancellationToken);
response.EnsureAllOks();
if (!localCheckpointWrittenInTransaction && _checkpointPersistence != null)
await _checkpointPersistence.AdvanceCheckpointAsync(oplogEntry, cancellationToken: cancellationToken);
if (pendingCursorCheckpoint is not null && !cursorCheckpointWrittenInTransaction)
await WriteCursorCheckpointAsync(
pendingCursorCheckpoint.Value.TableName,
pendingCursorCheckpoint.Value.Cursor,
cancellationToken);
}
private bool TryBuildCheckpointTransactionPayload(
OplogEntry oplogEntry,
out RecordId checkpointRecordId,
out Dictionary<string, object?> checkpointRecord)
{
checkpointRecordId = RecordId.From(CBDDCSurrealSchemaNames.DocumentMetadataTable, "__unused__");
checkpointRecord = new Dictionary<string, object?>();
if (!TryGetCheckpointSettings(out string checkpointTable, out string consumerId)) return false;
const string datasetId = DatasetId.Primary;
string consumerKey = ComputeConsumerKey(datasetId, consumerId);
checkpointRecordId = RecordId.From(checkpointTable, consumerKey);
checkpointRecord = new Dictionary<string, object?>
{
["datasetId"] = datasetId,
["consumerId"] = consumerId,
["timestampPhysicalTime"] = oplogEntry.Timestamp.PhysicalTime,
["timestampLogicalCounter"] = oplogEntry.Timestamp.LogicalCounter,
["timestampNodeId"] = oplogEntry.Timestamp.NodeId,
["lastHash"] = oplogEntry.Hash,
["updatedUtcMs"] = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
return true;
}
private bool TryBuildCursorCheckpointTransactionPayload(
PendingCursorCheckpoint? pendingCursorCheckpoint,
out RecordId checkpointRecordId,
out Dictionary<string, object?> checkpointRecord)
{
checkpointRecordId = RecordId.From(CBDDCSurrealSchemaNames.DocumentMetadataTable, "__unused__");
checkpointRecord = new Dictionary<string, object?>();
if (pendingCursorCheckpoint is null) return false;
if (!TryGetCheckpointSettings(out string checkpointTable, out string consumerId)) return false;
string cursorConsumerId = BuildCursorCheckpointConsumerId(
pendingCursorCheckpoint.Value.TableName,
consumerId);
long encodedCursor = pendingCursorCheckpoint.Value.Cursor > long.MaxValue
? long.MaxValue
: (long)pendingCursorCheckpoint.Value.Cursor;
const string datasetId = DatasetId.Primary;
string consumerKey = ComputeConsumerKey(datasetId, cursorConsumerId);
checkpointRecordId = RecordId.From(checkpointTable, consumerKey);
checkpointRecord = new Dictionary<string, object?>
{
["datasetId"] = datasetId,
["consumerId"] = cursorConsumerId,
["timestampPhysicalTime"] = encodedCursor,
["timestampLogicalCounter"] = 0,
["timestampNodeId"] = "surreal-cdc",
["lastHash"] = "",
["versionstampCursor"] = encodedCursor,
["updatedUtcMs"] = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
return true;
}
private bool TryGetCheckpointSettings(out string checkpointTable, out string consumerId)
{
checkpointTable = string.Empty;
consumerId = string.Empty;
if (_checkpointPersistence == null) return false;
if (!TryGetPrivateField(_checkpointPersistence, "_enabled", out bool enabled) || !enabled) return false;
if (!TryGetPrivateField(_checkpointPersistence, "_checkpointTable", out string? resolvedCheckpointTable) ||
string.IsNullOrWhiteSpace(resolvedCheckpointTable))
return false;
if (!TryGetPrivateField(_checkpointPersistence, "_defaultConsumerId", out string? resolvedConsumerId) ||
string.IsNullOrWhiteSpace(resolvedConsumerId))
return false;
if (!SurrealIdentifierRegex.IsMatch(resolvedCheckpointTable)) return false;
checkpointTable = resolvedCheckpointTable;
consumerId = resolvedConsumerId;
return true;
}
private static string ComputeConsumerKey(string datasetId, string consumerId)
{
byte[] bytes = Encoding.UTF8.GetBytes($"{datasetId}\n{consumerId}");
return Convert.ToHexString(SHA256.HashData(bytes)).ToLowerInvariant();
}
private static bool TryGetPrivateField<TValue>(object source, string fieldName, out TValue value)
{
const BindingFlags flags = BindingFlags.Instance | BindingFlags.NonPublic;
FieldInfo? fieldInfo = source.GetType().GetField(fieldName, flags);
if (fieldInfo?.GetValue(source) is TValue typedValue)
{
value = typedValue;
return true;
}
value = default!;
return false;
}
private async Task<string?> QueryLastHashForNodeAsync(string nodeId, CancellationToken cancellationToken)
{
var all = await _surrealClient.Select<SurrealOplogRecord>(
CBDDCSurrealSchemaNames.OplogEntriesTable,
cancellationToken);
var latest = all?
.Where(o => string.Equals(o.TimestampNodeId, nodeId, StringComparison.Ordinal))
.OrderByDescending(o => o.TimestampPhysicalTime)
.ThenByDescending(o => o.TimestampLogicalCounter)
.FirstOrDefault();
return latest?.Hash;
}
private async Task EnsureReadyAsync(CancellationToken cancellationToken)
{
await _schemaInitializer.EnsureInitializedAsync(cancellationToken);
}
/// <summary>
/// Marks the start of remote sync operations and suppresses local CDC loopback.
/// </summary>
public IDisposable BeginRemoteSync()
{
_remoteSyncGuard.Wait();
return new RemoteSyncScope(_remoteSyncGuard);
}
private sealed class RemoteSyncScope : IDisposable
{
private readonly SemaphoreSlim _guard;
private int _disposed;
/// <summary>
/// Initializes a new instance of the <see cref="RemoteSyncScope" /> class.
/// </summary>
/// <param name="guard">The guard semaphore to release on dispose.</param>
public RemoteSyncScope(SemaphoreSlim guard)
{
_guard = guard;
}
/// <inheritdoc />
public void Dispose()
{
if (Interlocked.Exchange(ref _disposed, 1) == 1) return;
_guard.Release();
}
}
#endregion
}