From 3a352944c3b41c26319563519f6557139cd6522b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 20 Feb 2026 13:07:24 -0500 Subject: [PATCH] Suppress CDC echo events during remote applies to prevent replication loops --- .../BLite/BLiteDocumentStore.cs | 79 ++++++++++++++++++- 1 file changed, 75 insertions(+), 4 deletions(-) diff --git a/src/ZB.MOM.WW.CBDDC.Persistence/BLite/BLiteDocumentStore.cs b/src/ZB.MOM.WW.CBDDC.Persistence/BLite/BLiteDocumentStore.cs index 105fb9c..c88719b 100755 --- a/src/ZB.MOM.WW.CBDDC.Persistence/BLite/BLiteDocumentStore.cs +++ b/src/ZB.MOM.WW.CBDDC.Persistence/BLite/BLiteDocumentStore.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text.Json; @@ -38,6 +39,7 @@ public abstract class BLiteDocumentStore : IDocumentStore, IDisposab /// CurrentCount == 1 ? no sync, CDC creates OplogEntry. /// private readonly SemaphoreSlim _remoteSyncGuard = new SemaphoreSlim(1, 1); + private readonly ConcurrentDictionary _suppressedCdcEvents = new(StringComparer.Ordinal); private readonly List _cdcWatchers = new(); private readonly HashSet _registeredCollections = new(); @@ -126,6 +128,39 @@ public abstract class BLiteDocumentStore : IDocumentStore, IDisposab #region CDC Registration + private static string BuildSuppressionKey(string collection, string key, OperationType operationType) + { + return $"{collection}|{key}|{(int)operationType}"; + } + + private void RegisterSuppressedCdcEvent(string collection, string key, OperationType operationType) + { + var suppressionKey = BuildSuppressionKey(collection, key, operationType); + _suppressedCdcEvents.AddOrUpdate(suppressionKey, 1, (_, current) => current + 1); + } + + private bool TryConsumeSuppressedCdcEvent(string collection, string key, OperationType operationType) + { + var suppressionKey = BuildSuppressionKey(collection, key, operationType); + while (true) + { + if (!_suppressedCdcEvents.TryGetValue(suppressionKey, out var current)) + { + return false; + } + + if (current <= 1) + { + return _suppressedCdcEvents.TryRemove(suppressionKey, out _); + } + + if (_suppressedCdcEvents.TryUpdate(suppressionKey, current - 1, current)) + { + return true; + } + } + } + /// /// Registers a BLite collection for CDC tracking. /// Call in subclass constructor for each collection to sync. @@ -180,9 +215,20 @@ public abstract class BLiteDocumentStore : IDocumentStore, IDisposab /// The change event payload. public void OnNext(ChangeStreamEvent changeEvent) { - if (_store._remoteSyncGuard.CurrentCount == 0) return; + var operationType = changeEvent.Type == BLiteOperationType.Delete ? OperationType.Delete : OperationType.Put; var entityId = changeEvent.DocumentId?.ToString() ?? ""; + if (operationType == OperationType.Put && changeEvent.Entity != null) + { + entityId = _keySelector(changeEvent.Entity); + } + + if (_store.TryConsumeSuppressedCdcEvent(_collectionName, entityId, operationType)) + { + return; + } + + if (_store._remoteSyncGuard.CurrentCount == 0) return; if (changeEvent.Type == BLiteOperationType.Delete) { @@ -348,6 +394,7 @@ public abstract class BLiteDocumentStore : IDocumentStore, IDisposab private async Task PutDocumentInternalAsync(Document document, CancellationToken cancellationToken) { + RegisterSuppressedCdcEvent(document.Collection, document.Key, OperationType.Put); await ApplyContentToEntityAsync(document.Collection, document.Key, document.Content, cancellationToken); } @@ -359,11 +406,17 @@ public abstract class BLiteDocumentStore : IDocumentStore, IDisposab /// when the operation succeeds. public async Task UpdateBatchDocumentsAsync(IEnumerable documents, CancellationToken cancellationToken = default) { + var documentList = documents.ToList(); await _remoteSyncGuard.WaitAsync(cancellationToken); try { + foreach (var document in documentList) + { + RegisterSuppressedCdcEvent(document.Collection, document.Key, OperationType.Put); + } + await ApplyContentToEntitiesBatchAsync( - documents.Select(d => (d.Collection, d.Key, d.Content)), cancellationToken); + documentList.Select(d => (d.Collection, d.Key, d.Content)), cancellationToken); } finally { @@ -380,11 +433,17 @@ public abstract class BLiteDocumentStore : IDocumentStore, IDisposab /// when the operation succeeds. public async Task InsertBatchDocumentsAsync(IEnumerable documents, CancellationToken cancellationToken = default) { + var documentList = documents.ToList(); await _remoteSyncGuard.WaitAsync(cancellationToken); try { + foreach (var document in documentList) + { + RegisterSuppressedCdcEvent(document.Collection, document.Key, OperationType.Put); + } + await ApplyContentToEntitiesBatchAsync( - documents.Select(d => (d.Collection, d.Key, d.Content)), cancellationToken); + documentList.Select(d => (d.Collection, d.Key, d.Content)), cancellationToken); } finally { @@ -416,6 +475,7 @@ public abstract class BLiteDocumentStore : IDocumentStore, IDisposab private async Task DeleteDocumentInternalAsync(string collection, string key, CancellationToken cancellationToken) { + RegisterSuppressedCdcEvent(collection, key, OperationType.Delete); await RemoveEntityAsync(collection, key, cancellationToken); } @@ -446,6 +506,11 @@ public abstract class BLiteDocumentStore : IDocumentStore, IDisposab await _remoteSyncGuard.WaitAsync(cancellationToken); try { + foreach (var (collection, key) in parsedKeys) + { + RegisterSuppressedCdcEvent(collection, key, OperationType.Delete); + } + await RemoveEntitiesBatchAsync(parsedKeys, cancellationToken); } finally @@ -533,11 +598,17 @@ public abstract class BLiteDocumentStore : IDocumentStore, IDisposab /// The cancellation token. public async Task ImportAsync(IEnumerable items, CancellationToken cancellationToken = default) { + var documents = items.ToList(); await _remoteSyncGuard.WaitAsync(cancellationToken); try { + foreach (var document in documents) + { + RegisterSuppressedCdcEvent(document.Collection, document.Key, OperationType.Put); + } + await ApplyContentToEntitiesBatchAsync( - items.Select(d => (d.Collection, d.Key, d.Content)), cancellationToken); + documents.Select(d => (d.Collection, d.Key, d.Content)), cancellationToken); } finally {