138 lines
4.5 KiB
C#
138 lines
4.5 KiB
C#
using ZB.MOM.WW.CBDD.Bson;
|
|
using ZB.MOM.WW.CBDD.Core.Storage;
|
|
using ZB.MOM.WW.CBDD.Shared;
|
|
|
|
namespace ZB.MOM.WW.CBDD.Tests;
|
|
|
|
public class CompactionOnlineConcurrencyTests
|
|
{
|
|
[Fact]
|
|
public async Task OnlineCompaction_WithConcurrentishWorkload_ShouldCompleteWithoutDeadlock()
|
|
{
|
|
var dbPath = NewDbPath();
|
|
var activeIds = new List<ObjectId>();
|
|
var sync = new object();
|
|
var completedOps = 0;
|
|
|
|
try
|
|
{
|
|
using var db = new TestDbContext(dbPath);
|
|
var testCancellation = TestContext.Current.CancellationToken;
|
|
|
|
for (var i = 0; i < 120; i++)
|
|
{
|
|
var id = db.Users.Insert(new User { Name = $"seed-{i:D4}", Age = i % 40 });
|
|
activeIds.Add(id);
|
|
}
|
|
|
|
db.SaveChanges();
|
|
db.ForceCheckpoint();
|
|
|
|
var workloadTask = Task.Run(() =>
|
|
{
|
|
for (var i = 0; i < 150; i++)
|
|
{
|
|
if (i % 3 == 0)
|
|
{
|
|
var id = db.Users.Insert(new User { Name = $"insert-{i:D4}", Age = i % 60 });
|
|
lock (sync)
|
|
{
|
|
activeIds.Add(id);
|
|
}
|
|
}
|
|
else if (i % 3 == 1)
|
|
{
|
|
ObjectId? candidate = null;
|
|
lock (sync)
|
|
{
|
|
if (activeIds.Count > 0)
|
|
{
|
|
candidate = activeIds[i % activeIds.Count];
|
|
}
|
|
}
|
|
|
|
if (candidate.HasValue)
|
|
{
|
|
var entity = db.Users.FindById(candidate.Value);
|
|
if (entity != null)
|
|
{
|
|
entity.Age += 1;
|
|
db.Users.Update(entity).ShouldBeTrue();
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ObjectId? candidate = null;
|
|
lock (sync)
|
|
{
|
|
if (activeIds.Count > 60)
|
|
{
|
|
candidate = activeIds[^1];
|
|
activeIds.RemoveAt(activeIds.Count - 1);
|
|
}
|
|
}
|
|
|
|
if (candidate.HasValue)
|
|
{
|
|
db.Users.Delete(candidate.Value);
|
|
}
|
|
}
|
|
|
|
db.SaveChanges();
|
|
_ = db.Users.Count();
|
|
db.SaveChanges();
|
|
Interlocked.Increment(ref completedOps);
|
|
}
|
|
}, testCancellation);
|
|
|
|
var compactionTask = Task.Run(() => db.Compact(new CompactionOptions
|
|
{
|
|
OnlineMode = true,
|
|
OnlineBatchPageLimit = 4,
|
|
OnlineBatchDelay = TimeSpan.FromMilliseconds(2),
|
|
MaxOnlineDuration = TimeSpan.FromMilliseconds(400),
|
|
EnableTailTruncation = true
|
|
}), testCancellation);
|
|
|
|
await Task.WhenAll(workloadTask, compactionTask).WaitAsync(TimeSpan.FromSeconds(20), testCancellation);
|
|
|
|
var stats = await compactionTask;
|
|
stats.OnlineMode.ShouldBeTrue();
|
|
completedOps.ShouldBeGreaterThanOrEqualTo(100);
|
|
|
|
var allUsers = db.Users.FindAll().ToList();
|
|
allUsers.Count.ShouldBeGreaterThan(0);
|
|
db.SaveChanges();
|
|
|
|
List<ObjectId> snapshotIds;
|
|
lock (sync)
|
|
{
|
|
snapshotIds = activeIds.ToList();
|
|
}
|
|
|
|
var actualIds = allUsers.Select(x => x.Id).ToHashSet();
|
|
foreach (var id in snapshotIds)
|
|
{
|
|
actualIds.ShouldContain(id);
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
CleanupFiles(dbPath);
|
|
}
|
|
}
|
|
|
|
private static string NewDbPath()
|
|
=> Path.Combine(Path.GetTempPath(), $"compaction_online_{Guid.NewGuid():N}.db");
|
|
|
|
private static void CleanupFiles(string dbPath)
|
|
{
|
|
var walPath = Path.ChangeExtension(dbPath, ".wal");
|
|
var markerPath = $"{dbPath}.compact.state";
|
|
if (File.Exists(dbPath)) File.Delete(dbPath);
|
|
if (File.Exists(walPath)) File.Delete(walPath);
|
|
if (File.Exists(markerPath)) File.Delete(markerPath);
|
|
}
|
|
}
|