Suppress CDC echo events during remote applies to prevent replication loops
All checks were successful
CI / verify (push) Successful in 3m53s

This commit is contained in:
Joseph Doherty
2026-02-20 13:07:24 -05:00
parent 08bfc17218
commit 3a352944c3

View File

@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
@@ -38,6 +39,7 @@ public abstract class BLiteDocumentStore<TDbContext> : IDocumentStore, IDisposab
/// CurrentCount == 1 ? no sync, CDC creates OplogEntry.
/// </summary>
private readonly SemaphoreSlim _remoteSyncGuard = new SemaphoreSlim(1, 1);
private readonly ConcurrentDictionary<string, int> _suppressedCdcEvents = new(StringComparer.Ordinal);
private readonly List<IDisposable> _cdcWatchers = new();
private readonly HashSet<string> _registeredCollections = new();
@@ -126,6 +128,39 @@ public abstract class BLiteDocumentStore<TDbContext> : IDocumentStore, IDisposab
#region CDC Registration
private static string BuildSuppressionKey(string collection, string key, OperationType operationType)
{
return $"{collection}|{key}|{(int)operationType}";
}
private void RegisterSuppressedCdcEvent(string collection, string key, OperationType operationType)
{
var suppressionKey = BuildSuppressionKey(collection, key, operationType);
_suppressedCdcEvents.AddOrUpdate(suppressionKey, 1, (_, current) => current + 1);
}
private bool TryConsumeSuppressedCdcEvent(string collection, string key, OperationType operationType)
{
var suppressionKey = BuildSuppressionKey(collection, key, operationType);
while (true)
{
if (!_suppressedCdcEvents.TryGetValue(suppressionKey, out var current))
{
return false;
}
if (current <= 1)
{
return _suppressedCdcEvents.TryRemove(suppressionKey, out _);
}
if (_suppressedCdcEvents.TryUpdate(suppressionKey, current - 1, current))
{
return true;
}
}
}
/// <summary>
/// Registers a BLite collection for CDC tracking.
/// Call in subclass constructor for each collection to sync.
@@ -180,9 +215,20 @@ public abstract class BLiteDocumentStore<TDbContext> : IDocumentStore, IDisposab
/// <param name="changeEvent">The change event payload.</param>
public void OnNext(ChangeStreamEvent<string, TEntity> changeEvent)
{
if (_store._remoteSyncGuard.CurrentCount == 0) return;
var operationType = changeEvent.Type == BLiteOperationType.Delete ? OperationType.Delete : OperationType.Put;
var entityId = changeEvent.DocumentId?.ToString() ?? "";
if (operationType == OperationType.Put && changeEvent.Entity != null)
{
entityId = _keySelector(changeEvent.Entity);
}
if (_store.TryConsumeSuppressedCdcEvent(_collectionName, entityId, operationType))
{
return;
}
if (_store._remoteSyncGuard.CurrentCount == 0) return;
if (changeEvent.Type == BLiteOperationType.Delete)
{
@@ -348,6 +394,7 @@ public abstract class BLiteDocumentStore<TDbContext> : IDocumentStore, IDisposab
private async Task PutDocumentInternalAsync(Document document, CancellationToken cancellationToken)
{
RegisterSuppressedCdcEvent(document.Collection, document.Key, OperationType.Put);
await ApplyContentToEntityAsync(document.Collection, document.Key, document.Content, cancellationToken);
}
@@ -359,11 +406,17 @@ public abstract class BLiteDocumentStore<TDbContext> : IDocumentStore, IDisposab
/// <returns><see langword="true"/> when the operation succeeds.</returns>
public async Task<bool> UpdateBatchDocumentsAsync(IEnumerable<Document> documents, CancellationToken cancellationToken = default)
{
var documentList = documents.ToList();
await _remoteSyncGuard.WaitAsync(cancellationToken);
try
{
foreach (var document in documentList)
{
RegisterSuppressedCdcEvent(document.Collection, document.Key, OperationType.Put);
}
await ApplyContentToEntitiesBatchAsync(
documents.Select(d => (d.Collection, d.Key, d.Content)), cancellationToken);
documentList.Select(d => (d.Collection, d.Key, d.Content)), cancellationToken);
}
finally
{
@@ -380,11 +433,17 @@ public abstract class BLiteDocumentStore<TDbContext> : IDocumentStore, IDisposab
/// <returns><see langword="true"/> when the operation succeeds.</returns>
public async Task<bool> InsertBatchDocumentsAsync(IEnumerable<Document> documents, CancellationToken cancellationToken = default)
{
var documentList = documents.ToList();
await _remoteSyncGuard.WaitAsync(cancellationToken);
try
{
foreach (var document in documentList)
{
RegisterSuppressedCdcEvent(document.Collection, document.Key, OperationType.Put);
}
await ApplyContentToEntitiesBatchAsync(
documents.Select(d => (d.Collection, d.Key, d.Content)), cancellationToken);
documentList.Select(d => (d.Collection, d.Key, d.Content)), cancellationToken);
}
finally
{
@@ -416,6 +475,7 @@ public abstract class BLiteDocumentStore<TDbContext> : IDocumentStore, IDisposab
private async Task DeleteDocumentInternalAsync(string collection, string key, CancellationToken cancellationToken)
{
RegisterSuppressedCdcEvent(collection, key, OperationType.Delete);
await RemoveEntityAsync(collection, key, cancellationToken);
}
@@ -446,6 +506,11 @@ public abstract class BLiteDocumentStore<TDbContext> : IDocumentStore, IDisposab
await _remoteSyncGuard.WaitAsync(cancellationToken);
try
{
foreach (var (collection, key) in parsedKeys)
{
RegisterSuppressedCdcEvent(collection, key, OperationType.Delete);
}
await RemoveEntitiesBatchAsync(parsedKeys, cancellationToken);
}
finally
@@ -533,11 +598,17 @@ public abstract class BLiteDocumentStore<TDbContext> : IDocumentStore, IDisposab
/// <param name="cancellationToken">The cancellation token.</param>
public async Task ImportAsync(IEnumerable<Document> items, CancellationToken cancellationToken = default)
{
var documents = items.ToList();
await _remoteSyncGuard.WaitAsync(cancellationToken);
try
{
foreach (var document in documents)
{
RegisterSuppressedCdcEvent(document.Collection, document.Key, OperationType.Put);
}
await ApplyContentToEntitiesBatchAsync(
items.Select(d => (d.Collection, d.Key, d.Content)), cancellationToken);
documents.Select(d => (d.Collection, d.Key, d.Content)), cancellationToken);
}
finally
{