using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Threading.Tasks; using ZB.MOM.WW.CBDD.Bson; using ZB.MOM.WW.CBDD.Core.CDC; using ZB.MOM.WW.CBDD.Core.Transactions; using ZB.MOM.WW.CBDD.Shared; using Xunit; namespace ZB.MOM.WW.CBDD.Tests; public class CdcTests : IDisposable { private static readonly TimeSpan DefaultEventTimeout = TimeSpan.FromSeconds(3); private static readonly TimeSpan PollInterval = TimeSpan.FromMilliseconds(10); private readonly string _dbPath = $"cdc_test_{Guid.NewGuid()}.db"; private readonly Shared.TestDbContext _db; /// /// Initializes a new instance of the class. /// public CdcTests() { _db = new Shared.TestDbContext(_dbPath); } /// /// Verifies that an insert operation publishes a CDC event. /// [Fact] public async Task Test_Cdc_Basic_Insert_Fires_Event() { var ct = TestContext.Current.CancellationToken; var events = new ConcurrentQueue>(); using var subscription = _db.People.Watch(capturePayload: true).Subscribe(events.Enqueue); var person = new Person { Id = 1, Name = "John", Age = 30 }; _db.People.Insert(person); _db.SaveChanges(); await WaitForEventCountAsync(events, expectedCount: 1, ct); var snapshot = events.ToArray(); snapshot.Length.ShouldBe(1); snapshot[0].Type.ShouldBe(OperationType.Insert); snapshot[0].DocumentId.ShouldBe(1); snapshot[0].Entity.ShouldNotBeNull(); snapshot[0].Entity!.Name.ShouldBe("John"); } /// /// Verifies payload is omitted when CDC capture payload is disabled. /// [Fact] public async Task Test_Cdc_No_Payload_When_Not_Requested() { var ct = TestContext.Current.CancellationToken; var events = new ConcurrentQueue>(); using var subscription = _db.People.Watch(capturePayload: false).Subscribe(events.Enqueue); var person = new Person { Id = 1, Name = "John", Age = 30 }; _db.People.Insert(person); _db.SaveChanges(); await WaitForEventCountAsync(events, expectedCount: 1, ct); var snapshot = events.ToArray(); snapshot.Length.ShouldBe(1); snapshot[0].Entity.ShouldBeNull(); } /// /// Verifies CDC events are published only for committed changes. /// [Fact] public async Task Test_Cdc_Commit_Only() { var ct = TestContext.Current.CancellationToken; var events = new ConcurrentQueue>(); using var subscription = _db.People.Watch(capturePayload: true).Subscribe(events.Enqueue); using (var txn = _db.BeginTransaction()) { _db.People.Insert(new Person { Id = 1, Name = "John" }); events.Count.ShouldBe(0); // Not committed yet txn.Rollback(); } await Task.Delay(100, ct); events.Count.ShouldBe(0); // Rolled back using (var txn = _db.BeginTransaction()) { _db.People.Insert(new Person { Id = 2, Name = "Jane" }); txn.Commit(); } await WaitForEventCountAsync(events, expectedCount: 1, ct); var snapshot = events.ToArray(); snapshot.Length.ShouldBe(1); snapshot[0].DocumentId.ShouldBe(2); } /// /// Verifies update and delete operations publish CDC events. /// [Fact] public async Task Test_Cdc_Update_And_Delete() { var ct = TestContext.Current.CancellationToken; var events = new ConcurrentQueue>(); using var subscription = _db.People.Watch(capturePayload: true).Subscribe(events.Enqueue); var person = new Person { Id = 1, Name = "John", Age = 30 }; _db.People.Insert(person); _db.SaveChanges(); person.Name = "Johnny"; _db.People.Update(person); _db.SaveChanges(); _db.People.Delete(1); _db.SaveChanges(); await WaitForEventCountAsync(events, expectedCount: 3, ct); var snapshot = events.ToArray(); snapshot.Length.ShouldBe(3); snapshot[0].Type.ShouldBe(OperationType.Insert); snapshot[1].Type.ShouldBe(OperationType.Update); snapshot[2].Type.ShouldBe(OperationType.Delete); snapshot[1].Entity!.Name.ShouldBe("Johnny"); snapshot[2].DocumentId.ShouldBe(1); } /// /// Disposes test resources and removes temporary files. /// public void Dispose() { _db.Dispose(); if (File.Exists(_dbPath)) File.Delete(_dbPath); if (File.Exists(_dbPath + "-wal")) File.Delete(_dbPath + "-wal"); } private static async Task WaitForEventCountAsync( ConcurrentQueue> events, int expectedCount, CancellationToken ct) { var sw = Stopwatch.StartNew(); while (sw.Elapsed < DefaultEventTimeout) { if (events.Count >= expectedCount) { return; } await Task.Delay(PollInterval, ct); } events.Count.ShouldBe(expectedCount); } } // Simple helper to avoid System.Reactive dependency in tests public static class ObservableExtensions { /// /// Subscribes to an observable sequence using an action callback. /// /// The event type. /// The observable sequence. /// The callback for next events. /// An subscription. public static IDisposable Subscribe(this IObservable observable, Action onNext) { return observable.Subscribe(new AnonymousObserver(onNext)); } private class AnonymousObserver : IObserver { private readonly Action _onNext; /// /// Initializes a new instance of the class. /// /// The callback for next events. public AnonymousObserver(Action onNext) => _onNext = onNext; /// /// Handles completion. /// public void OnCompleted() { } /// /// Handles an observable error. /// /// The observed error. public void OnError(Exception error) { } /// /// Handles the next value. /// /// The observed value. public void OnNext(T value) => _onNext(value); } }