using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using ZB.MOM.WW.CBDD.Bson;
using ZB.MOM.WW.CBDD.Core.CDC;
using ZB.MOM.WW.CBDD.Core.Transactions;
using ZB.MOM.WW.CBDD.Shared;
using Xunit;
namespace ZB.MOM.WW.CBDD.Tests;
public class CdcTests : IDisposable
{
private static readonly TimeSpan DefaultEventTimeout = TimeSpan.FromSeconds(3);
private static readonly TimeSpan PollInterval = TimeSpan.FromMilliseconds(10);
private readonly string _dbPath = $"cdc_test_{Guid.NewGuid()}.db";
private readonly Shared.TestDbContext _db;
///
/// Initializes a new instance of the class.
///
public CdcTests()
{
_db = new Shared.TestDbContext(_dbPath);
}
///
/// Verifies that an insert operation publishes a CDC event.
///
[Fact]
public async Task Test_Cdc_Basic_Insert_Fires_Event()
{
var ct = TestContext.Current.CancellationToken;
var events = new ConcurrentQueue>();
using var subscription = _db.People.Watch(capturePayload: true).Subscribe(events.Enqueue);
var person = new Person { Id = 1, Name = "John", Age = 30 };
_db.People.Insert(person);
_db.SaveChanges();
await WaitForEventCountAsync(events, expectedCount: 1, ct);
var snapshot = events.ToArray();
snapshot.Length.ShouldBe(1);
snapshot[0].Type.ShouldBe(OperationType.Insert);
snapshot[0].DocumentId.ShouldBe(1);
snapshot[0].Entity.ShouldNotBeNull();
snapshot[0].Entity!.Name.ShouldBe("John");
}
///
/// Verifies payload is omitted when CDC capture payload is disabled.
///
[Fact]
public async Task Test_Cdc_No_Payload_When_Not_Requested()
{
var ct = TestContext.Current.CancellationToken;
var events = new ConcurrentQueue>();
using var subscription = _db.People.Watch(capturePayload: false).Subscribe(events.Enqueue);
var person = new Person { Id = 1, Name = "John", Age = 30 };
_db.People.Insert(person);
_db.SaveChanges();
await WaitForEventCountAsync(events, expectedCount: 1, ct);
var snapshot = events.ToArray();
snapshot.Length.ShouldBe(1);
snapshot[0].Entity.ShouldBeNull();
}
///
/// Verifies CDC events are published only for committed changes.
///
[Fact]
public async Task Test_Cdc_Commit_Only()
{
var ct = TestContext.Current.CancellationToken;
var events = new ConcurrentQueue>();
using var subscription = _db.People.Watch(capturePayload: true).Subscribe(events.Enqueue);
using (var txn = _db.BeginTransaction())
{
_db.People.Insert(new Person { Id = 1, Name = "John" });
events.Count.ShouldBe(0); // Not committed yet
txn.Rollback();
}
await Task.Delay(100, ct);
events.Count.ShouldBe(0); // Rolled back
using (var txn = _db.BeginTransaction())
{
_db.People.Insert(new Person { Id = 2, Name = "Jane" });
txn.Commit();
}
await WaitForEventCountAsync(events, expectedCount: 1, ct);
var snapshot = events.ToArray();
snapshot.Length.ShouldBe(1);
snapshot[0].DocumentId.ShouldBe(2);
}
///
/// Verifies update and delete operations publish CDC events.
///
[Fact]
public async Task Test_Cdc_Update_And_Delete()
{
var ct = TestContext.Current.CancellationToken;
var events = new ConcurrentQueue>();
using var subscription = _db.People.Watch(capturePayload: true).Subscribe(events.Enqueue);
var person = new Person { Id = 1, Name = "John", Age = 30 };
_db.People.Insert(person);
_db.SaveChanges();
person.Name = "Johnny";
_db.People.Update(person);
_db.SaveChanges();
_db.People.Delete(1);
_db.SaveChanges();
await WaitForEventCountAsync(events, expectedCount: 3, ct);
var snapshot = events.ToArray();
snapshot.Length.ShouldBe(3);
snapshot[0].Type.ShouldBe(OperationType.Insert);
snapshot[1].Type.ShouldBe(OperationType.Update);
snapshot[2].Type.ShouldBe(OperationType.Delete);
snapshot[1].Entity!.Name.ShouldBe("Johnny");
snapshot[2].DocumentId.ShouldBe(1);
}
///
/// Disposes test resources and removes temporary files.
///
public void Dispose()
{
_db.Dispose();
if (File.Exists(_dbPath)) File.Delete(_dbPath);
if (File.Exists(_dbPath + "-wal")) File.Delete(_dbPath + "-wal");
}
private static async Task WaitForEventCountAsync(
ConcurrentQueue> events,
int expectedCount,
CancellationToken ct)
{
var sw = Stopwatch.StartNew();
while (sw.Elapsed < DefaultEventTimeout)
{
if (events.Count >= expectedCount)
{
return;
}
await Task.Delay(PollInterval, ct);
}
events.Count.ShouldBe(expectedCount);
}
}
// Simple helper to avoid System.Reactive dependency in tests
public static class ObservableExtensions
{
///
/// Subscribes to an observable sequence using an action callback.
///
/// The event type.
/// The observable sequence.
/// The callback for next events.
/// An subscription.
public static IDisposable Subscribe(this IObservable observable, Action onNext)
{
return observable.Subscribe(new AnonymousObserver(onNext));
}
private class AnonymousObserver : IObserver
{
private readonly Action _onNext;
///
/// Initializes a new instance of the class.
///
/// The callback for next events.
public AnonymousObserver(Action onNext) => _onNext = onNext;
///
/// Handles completion.
///
public void OnCompleted() { }
///
/// Handles an observable error.
///
/// The observed error.
public void OnError(Exception error) { }
///
/// Handles the next value.
///
/// The observed value.
public void OnNext(T value) => _onNext(value);
}
}