172 lines
5.3 KiB
C#
Executable File
172 lines
5.3 KiB
C#
Executable File
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.IO;
|
|
using System.Linq;
|
|
using System.Threading.Tasks;
|
|
using ZB.MOM.WW.CBDD.Bson;
|
|
using ZB.MOM.WW.CBDD.Core.CDC;
|
|
using ZB.MOM.WW.CBDD.Core.Transactions;
|
|
using ZB.MOM.WW.CBDD.Shared;
|
|
using Xunit;
|
|
|
|
namespace ZB.MOM.WW.CBDD.Tests;
|
|
|
|
public class CdcTests : IDisposable
|
|
{
|
|
private static readonly TimeSpan DefaultEventTimeout = TimeSpan.FromSeconds(3);
|
|
private static readonly TimeSpan PollInterval = TimeSpan.FromMilliseconds(10);
|
|
|
|
private readonly string _dbPath = $"cdc_test_{Guid.NewGuid()}.db";
|
|
private readonly Shared.TestDbContext _db;
|
|
|
|
public CdcTests()
|
|
{
|
|
_db = new Shared.TestDbContext(_dbPath);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_Cdc_Basic_Insert_Fires_Event()
|
|
{
|
|
var ct = TestContext.Current.CancellationToken;
|
|
var events = new ConcurrentQueue<ChangeStreamEvent<int, Person>>();
|
|
using var subscription = _db.People.Watch(capturePayload: true).Subscribe(events.Enqueue);
|
|
|
|
var person = new Person { Id = 1, Name = "John", Age = 30 };
|
|
_db.People.Insert(person);
|
|
_db.SaveChanges();
|
|
|
|
await WaitForEventCountAsync(events, expectedCount: 1, ct);
|
|
|
|
var snapshot = events.ToArray();
|
|
snapshot.Length.ShouldBe(1);
|
|
snapshot[0].Type.ShouldBe(OperationType.Insert);
|
|
snapshot[0].DocumentId.ShouldBe(1);
|
|
snapshot[0].Entity.ShouldNotBeNull();
|
|
snapshot[0].Entity!.Name.ShouldBe("John");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_Cdc_No_Payload_When_Not_Requested()
|
|
{
|
|
var ct = TestContext.Current.CancellationToken;
|
|
var events = new ConcurrentQueue<ChangeStreamEvent<int, Person>>();
|
|
using var subscription = _db.People.Watch(capturePayload: false).Subscribe(events.Enqueue);
|
|
|
|
var person = new Person { Id = 1, Name = "John", Age = 30 };
|
|
_db.People.Insert(person);
|
|
_db.SaveChanges();
|
|
|
|
await WaitForEventCountAsync(events, expectedCount: 1, ct);
|
|
|
|
var snapshot = events.ToArray();
|
|
snapshot.Length.ShouldBe(1);
|
|
snapshot[0].Entity.ShouldBeNull();
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_Cdc_Commit_Only()
|
|
{
|
|
var ct = TestContext.Current.CancellationToken;
|
|
var events = new ConcurrentQueue<ChangeStreamEvent<int, Person>>();
|
|
using var subscription = _db.People.Watch(capturePayload: true).Subscribe(events.Enqueue);
|
|
|
|
using (var txn = _db.BeginTransaction())
|
|
{
|
|
_db.People.Insert(new Person { Id = 1, Name = "John" });
|
|
events.Count.ShouldBe(0); // Not committed yet
|
|
|
|
txn.Rollback();
|
|
}
|
|
|
|
await Task.Delay(100, ct);
|
|
events.Count.ShouldBe(0); // Rolled back
|
|
|
|
using (var txn = _db.BeginTransaction())
|
|
{
|
|
_db.People.Insert(new Person { Id = 2, Name = "Jane" });
|
|
txn.Commit();
|
|
}
|
|
|
|
await WaitForEventCountAsync(events, expectedCount: 1, ct);
|
|
var snapshot = events.ToArray();
|
|
snapshot.Length.ShouldBe(1);
|
|
snapshot[0].DocumentId.ShouldBe(2);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Test_Cdc_Update_And_Delete()
|
|
{
|
|
var ct = TestContext.Current.CancellationToken;
|
|
var events = new ConcurrentQueue<ChangeStreamEvent<int, Person>>();
|
|
using var subscription = _db.People.Watch(capturePayload: true).Subscribe(events.Enqueue);
|
|
|
|
var person = new Person { Id = 1, Name = "John", Age = 30 };
|
|
_db.People.Insert(person);
|
|
_db.SaveChanges();
|
|
|
|
person.Name = "Johnny";
|
|
_db.People.Update(person);
|
|
_db.SaveChanges();
|
|
|
|
_db.People.Delete(1);
|
|
_db.SaveChanges();
|
|
|
|
await WaitForEventCountAsync(events, expectedCount: 3, ct);
|
|
|
|
var snapshot = events.ToArray();
|
|
snapshot.Length.ShouldBe(3);
|
|
snapshot[0].Type.ShouldBe(OperationType.Insert);
|
|
snapshot[1].Type.ShouldBe(OperationType.Update);
|
|
snapshot[2].Type.ShouldBe(OperationType.Delete);
|
|
|
|
snapshot[1].Entity!.Name.ShouldBe("Johnny");
|
|
snapshot[2].DocumentId.ShouldBe(1);
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
_db.Dispose();
|
|
if (File.Exists(_dbPath)) File.Delete(_dbPath);
|
|
if (File.Exists(_dbPath + "-wal")) File.Delete(_dbPath + "-wal");
|
|
}
|
|
|
|
private static async Task WaitForEventCountAsync(
|
|
ConcurrentQueue<ChangeStreamEvent<int, Person>> events,
|
|
int expectedCount,
|
|
CancellationToken ct)
|
|
{
|
|
var sw = Stopwatch.StartNew();
|
|
while (sw.Elapsed < DefaultEventTimeout)
|
|
{
|
|
if (events.Count >= expectedCount)
|
|
{
|
|
return;
|
|
}
|
|
|
|
await Task.Delay(PollInterval, ct);
|
|
}
|
|
|
|
events.Count.ShouldBe(expectedCount);
|
|
}
|
|
}
|
|
|
|
// Simple helper to avoid System.Reactive dependency in tests
|
|
public static class ObservableExtensions
|
|
{
|
|
public static IDisposable Subscribe<T>(this IObservable<T> observable, Action<T> onNext)
|
|
{
|
|
return observable.Subscribe(new AnonymousObserver<T>(onNext));
|
|
}
|
|
|
|
private class AnonymousObserver<T> : IObserver<T>
|
|
{
|
|
private readonly Action<T> _onNext;
|
|
public AnonymousObserver(Action<T> onNext) => _onNext = onNext;
|
|
public void OnCompleted() { }
|
|
public void OnError(Exception error) { }
|
|
public void OnNext(T value) => _onNext(value);
|
|
}
|
|
}
|