26ff8d9b4f
Set up repository with legacy .NET Framework 4.8 source (OLD/), new .NET 10 Blazor solution (NEW/), OpenSpec specifications, documentation, and project configuration.
299 lines
11 KiB
C#
299 lines
11 KiB
C#
using JdeScoping.DataSync.Contracts;
|
|
using JdeScoping.DataSync.IntegrationTests.Infrastructure;
|
|
using JdeScoping.DataSync.Services;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
|
|
namespace JdeScoping.DataSync.IntegrationTests;
|
|
|
|
/// <summary>
|
|
/// Integration tests for BulkMergeHelper.
|
|
/// These tests verify the bulk merge functionality against a real SQL Server database.
|
|
/// </summary>
|
|
[Collection("Database")]
|
|
public class BulkMergeHelperTests : IAsyncLifetime
|
|
{
|
|
private readonly SqlServerFixture _fixture;
|
|
private readonly IBulkMergeHelper _bulkMergeHelper;
|
|
|
|
public BulkMergeHelperTests(SqlServerFixture fixture)
|
|
{
|
|
_fixture = fixture;
|
|
|
|
// Create the BulkMergeHelper with test dependencies
|
|
var connectionFactory = new TestDbConnectionFactory(_fixture.ConnectionString);
|
|
var dataReaderFactory = new TestDataReaderFactory();
|
|
var schemaValidator = new SchemaValidator();
|
|
var logger = NullLogger<BulkMergeHelper>.Instance;
|
|
|
|
_bulkMergeHelper = new BulkMergeHelper(
|
|
connectionFactory,
|
|
dataReaderFactory,
|
|
schemaValidator,
|
|
logger);
|
|
}
|
|
|
|
public Task InitializeAsync() => _fixture.CleanupBulkMergeTestTableAsync();
|
|
|
|
public Task DisposeAsync() => Task.CompletedTask;
|
|
|
|
#region Insert Tests
|
|
|
|
[Fact]
|
|
public async Task MergeAsync_NewRecords_InsertsAll()
|
|
{
|
|
// Arrange
|
|
var data = GenerateTestData(10);
|
|
|
|
// Act
|
|
var result = await _bulkMergeHelper.MergeAsync(
|
|
data.ToAsyncEnumerable(),
|
|
"BulkMergeTest",
|
|
x => x.Id,
|
|
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
|
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt });
|
|
|
|
// Assert
|
|
result.TotalRowsProcessed.ShouldBe(10);
|
|
result.TotalRowsAffected.ShouldBeGreaterThan(0);
|
|
result.BatchCount.ShouldBeGreaterThan(0);
|
|
|
|
// Verify in database
|
|
await using var connection = await _fixture.CreateConnectionAsync();
|
|
var count = await connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM BulkMergeTest");
|
|
count.ShouldBe(10);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task MergeAsync_EmptyData_ReturnsZeroRows()
|
|
{
|
|
// Arrange
|
|
var data = Array.Empty<BulkMergeTestEntity>();
|
|
|
|
// Act
|
|
var result = await _bulkMergeHelper.MergeAsync(
|
|
data.ToAsyncEnumerable(),
|
|
"BulkMergeTest",
|
|
x => x.Id);
|
|
|
|
// Assert
|
|
result.TotalRowsProcessed.ShouldBe(0);
|
|
result.TotalRowsAffected.ShouldBe(0);
|
|
result.BatchCount.ShouldBe(0);
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Update Tests
|
|
|
|
[Fact]
|
|
public async Task MergeAsync_ExistingRecords_UpdatesAll()
|
|
{
|
|
// Arrange - Insert initial data
|
|
var initialData = GenerateTestData(5);
|
|
await _bulkMergeHelper.MergeAsync(
|
|
initialData.ToAsyncEnumerable(),
|
|
"BulkMergeTest",
|
|
x => x.Id,
|
|
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
|
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt });
|
|
|
|
// Modify the data
|
|
var updatedData = initialData.Select(e => new BulkMergeTestEntity
|
|
{
|
|
Id = e.Id,
|
|
Name = e.Name + "_Updated",
|
|
Amount = (e.Amount ?? 0) + 100,
|
|
LastUpdateDt = DateTime.UtcNow
|
|
}).ToList();
|
|
|
|
// Act
|
|
var result = await _bulkMergeHelper.MergeAsync(
|
|
updatedData.ToAsyncEnumerable(),
|
|
"BulkMergeTest",
|
|
x => x.Id,
|
|
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
|
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt });
|
|
|
|
// Assert
|
|
result.TotalRowsProcessed.ShouldBe(5);
|
|
result.TotalRowsAffected.ShouldBeGreaterThan(0);
|
|
|
|
// Verify in database
|
|
await using var connection = await _fixture.CreateConnectionAsync();
|
|
var count = await connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM BulkMergeTest");
|
|
count.ShouldBe(5); // Still 5 records, not 10
|
|
|
|
var updatedCount = await connection.ExecuteScalarAsync<int>(
|
|
"SELECT COUNT(*) FROM BulkMergeTest WHERE Name LIKE '%_Updated'");
|
|
updatedCount.ShouldBe(5);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task MergeAsync_MixedRecords_InsertsAndUpdates()
|
|
{
|
|
// Arrange - Insert initial data
|
|
var initialData = GenerateTestData(5);
|
|
await _bulkMergeHelper.MergeAsync(
|
|
initialData.ToAsyncEnumerable(),
|
|
"BulkMergeTest",
|
|
x => x.Id,
|
|
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
|
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt });
|
|
|
|
// Create mixed data: 3 updates + 2 inserts
|
|
var mixedData = new List<BulkMergeTestEntity>();
|
|
|
|
// Updates
|
|
for (int i = 0; i < 3; i++)
|
|
{
|
|
mixedData.Add(new BulkMergeTestEntity
|
|
{
|
|
Id = initialData[i].Id,
|
|
Name = "Updated_" + i,
|
|
Amount = 999m,
|
|
LastUpdateDt = DateTime.UtcNow
|
|
});
|
|
}
|
|
|
|
// New inserts
|
|
for (int i = 100; i < 102; i++)
|
|
{
|
|
mixedData.Add(new BulkMergeTestEntity
|
|
{
|
|
Id = i,
|
|
Name = "New_" + i,
|
|
Amount = i * 10m,
|
|
LastUpdateDt = DateTime.UtcNow
|
|
});
|
|
}
|
|
|
|
// Act
|
|
var result = await _bulkMergeHelper.MergeAsync(
|
|
mixedData.ToAsyncEnumerable(),
|
|
"BulkMergeTest",
|
|
x => x.Id,
|
|
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
|
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt });
|
|
|
|
// Assert
|
|
result.TotalRowsProcessed.ShouldBe(5);
|
|
result.TotalRowsAffected.ShouldBeGreaterThan(0);
|
|
|
|
// Verify in database
|
|
await using var connection = await _fixture.CreateConnectionAsync();
|
|
var count = await connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM BulkMergeTest");
|
|
count.ShouldBe(7); // 5 initial + 2 new = 7 (3 updated in place)
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Conditional Update Tests
|
|
|
|
[Fact]
|
|
public async Task MergeAsync_WithUpdateWhen_OnlyUpdatesWhenConditionMet()
|
|
{
|
|
// Arrange - Insert initial data with old timestamp
|
|
var oldDate = DateTime.UtcNow.AddDays(-1);
|
|
var initialData = GenerateTestData(3, oldDate);
|
|
await _bulkMergeHelper.MergeAsync(
|
|
initialData.ToAsyncEnumerable(),
|
|
"BulkMergeTest",
|
|
x => x.Id,
|
|
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
|
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt });
|
|
|
|
// Create update data:
|
|
// - First record has NEWER date (should update)
|
|
// - Second record has OLDER date (should NOT update)
|
|
// - Third record has SAME date (should NOT update)
|
|
var newDate = DateTime.UtcNow;
|
|
var olderDate = DateTime.UtcNow.AddDays(-2);
|
|
|
|
var updateData = new List<BulkMergeTestEntity>
|
|
{
|
|
new() { Id = initialData[0].Id, Name = "ShouldUpdate", Amount = 999m, LastUpdateDt = newDate },
|
|
new() { Id = initialData[1].Id, Name = "ShouldNotUpdate", Amount = 888m, LastUpdateDt = olderDate },
|
|
new() { Id = initialData[2].Id, Name = "ShouldNotUpdate", Amount = 777m, LastUpdateDt = oldDate }
|
|
};
|
|
|
|
// Act
|
|
var result = await _bulkMergeHelper.MergeAsync(
|
|
updateData.ToAsyncEnumerable(),
|
|
"BulkMergeTest",
|
|
x => x.Id,
|
|
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
|
updateWhen: (src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt,
|
|
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt });
|
|
|
|
// Assert
|
|
result.TotalRowsProcessed.ShouldBe(3);
|
|
|
|
// Verify in database
|
|
await using var connection = await _fixture.CreateConnectionAsync();
|
|
var shouldUpdate = await connection.QuerySingleAsync<BulkMergeTestEntity>(
|
|
"SELECT Id, Name, Amount, LastUpdateDt FROM BulkMergeTest WHERE Id = @Id",
|
|
new { Id = initialData[0].Id });
|
|
shouldUpdate.Name.ShouldBe("ShouldUpdate");
|
|
|
|
var shouldNotUpdate1 = await connection.QuerySingleAsync<BulkMergeTestEntity>(
|
|
"SELECT Id, Name, Amount, LastUpdateDt FROM BulkMergeTest WHERE Id = @Id",
|
|
new { Id = initialData[1].Id });
|
|
shouldNotUpdate1.Name.ShouldNotBe("ShouldNotUpdate");
|
|
|
|
var shouldNotUpdate2 = await connection.QuerySingleAsync<BulkMergeTestEntity>(
|
|
"SELECT Id, Name, Amount, LastUpdateDt FROM BulkMergeTest WHERE Id = @Id",
|
|
new { Id = initialData[2].Id });
|
|
shouldNotUpdate2.Name.ShouldNotBe("ShouldNotUpdate");
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Batching Tests
|
|
|
|
[Fact]
|
|
public async Task MergeAsync_LargeDataset_ProcessesInBatches()
|
|
{
|
|
// Arrange
|
|
var data = GenerateTestData(250);
|
|
|
|
// Act - Use small batch size to force multiple batches
|
|
var result = await _bulkMergeHelper.MergeAsync(
|
|
data.ToAsyncEnumerable(),
|
|
"BulkMergeTest",
|
|
x => x.Id,
|
|
updateColumns: x => new { x.Name, x.Amount, x.LastUpdateDt },
|
|
insertColumns: x => new { x.Id, x.Name, x.Amount, x.LastUpdateDt },
|
|
batchSize: 50);
|
|
|
|
// Assert
|
|
result.TotalRowsProcessed.ShouldBe(250);
|
|
result.BatchCount.ShouldBe(5); // 250 / 50 = 5 batches
|
|
result.TotalRowsAffected.ShouldBeGreaterThan(0);
|
|
|
|
// Verify in database
|
|
await using var connection = await _fixture.CreateConnectionAsync();
|
|
var count = await connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM BulkMergeTest");
|
|
count.ShouldBe(250);
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Helper Methods
|
|
|
|
private static List<BulkMergeTestEntity> GenerateTestData(int count, DateTime? lastUpdateDt = null)
|
|
{
|
|
var date = lastUpdateDt ?? DateTime.UtcNow;
|
|
return Enumerable.Range(1, count)
|
|
.Select(i => new BulkMergeTestEntity
|
|
{
|
|
Id = i,
|
|
Name = $"TestItem_{i}",
|
|
Amount = i * 10.5m,
|
|
LastUpdateDt = date
|
|
})
|
|
.ToList();
|
|
}
|
|
|
|
#endregion
|
|
}
|