Files
jdescopingtool/NEW/tests/JdeScoping.DataSync.IntegrationTests/TableSyncOperationTests.cs
T
Joseph Doherty ec4c8fab87 refactor: relocate options classes to dedicated Options folders
Move configuration options from Core/DataAccess/DataSync/ExcelIO to
dedicated Options folders within each project for better organization.
Update all references and tests accordingly.
2026-01-03 08:55:08 -05:00

453 lines
17 KiB
C#

using System.Diagnostics.Metrics;
using System.Linq.Expressions;
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models.Enums;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.IntegrationTests.Infrastructure;
using JdeScoping.DataSync.Models;
using JdeScoping.DataSync.Services;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
namespace JdeScoping.DataSync.IntegrationTests;
/// <summary>
/// Integration tests for TableSyncOperation.
/// Tests end-to-end sync paths (incremental and mass) against real SQL Server.
/// </summary>
[Collection("Database")]
public class TableSyncOperationTests : IAsyncLifetime
{
private readonly SqlServerFixture _fixture;
private SqlConnection _connection = null!;
public TableSyncOperationTests(SqlServerFixture fixture)
{
_fixture = fixture;
}
public async Task InitializeAsync()
{
_connection = await _fixture.CreateConnectionAsync();
await _fixture.CleanupTablesAsync();
}
public async Task DisposeAsync()
{
await _connection.DisposeAsync();
}
#region Daily Update (Incremental Path) Tests
[Fact]
public async Task ExecuteAsync_DailyUpdate_UsesStagingAndMerge()
{
// Arrange
var baseTime = DateTime.UtcNow;
// Pre-populate with some existing records
var existingRecords = new List<WorkOrderTestEntity>
{
new() { OrderNumber = 1, Status = "Old", Description = "Existing 1", Quantity = 10, LastUpdateDT = baseTime.AddHours(-2) },
new() { OrderNumber = 2, Status = "Old", Description = "Existing 2", Quantity = 20, LastUpdateDT = baseTime.AddHours(-2) }
};
await InsertWorkOrdersDirectly(existingRecords);
// New records to sync (one update, one insert)
var syncRecords = new List<WorkOrderTestEntity>
{
new() { OrderNumber = 1, Status = "Updated", Description = "Updated 1", Quantity = 100, LastUpdateDT = baseTime },
new() { OrderNumber = 3, Status = "New", Description = "New 3", Quantity = 30, LastUpdateDT = baseTime }
};
var sut = CreateTableSyncOperation(syncRecords);
var task = CreateTask("WorkOrder_Test", UpdateTypes.Daily, minimumDt: baseTime.AddDays(-1));
// Act
await sut.ExecuteAsync(task);
// Assert
var results = (await _connection.QueryAsync<WorkOrderTestEntity>(
"SELECT * FROM WorkOrder_Test ORDER BY OrderNumber")).ToList();
results.Count.ShouldBe(3);
// Record 1 should be updated
results[0].OrderNumber.ShouldBe(1);
results[0].Status.ShouldBe("Updated");
results[0].Quantity.ShouldBe(100);
// Record 2 should be unchanged (not in sync batch)
results[1].OrderNumber.ShouldBe(2);
results[1].Status.ShouldBe("Old");
// Record 3 should be inserted
results[2].OrderNumber.ShouldBe(3);
results[2].Status.ShouldBe("New");
}
[Fact]
public async Task ExecuteAsync_HourlyUpdate_UsesStagingAndMerge()
{
// Arrange
var syncRecords = TestDataGenerator.GenerateWorkOrders(15);
var sut = CreateTableSyncOperation(syncRecords);
var task = CreateTask("WorkOrder_Test", UpdateTypes.Hourly, minimumDt: DateTime.UtcNow.AddHours(-1));
// Act
await sut.ExecuteAsync(task);
// Assert
var count = await _connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM WorkOrder_Test");
count.ShouldBe(15);
}
#endregion
#region Mass Update (Truncate Path) Tests
[Fact]
public async Task ExecuteAsync_MassUpdate_UsesTruncatePath()
{
// Arrange: Pre-populate table
var existingRecords = TestDataGenerator.GenerateWorkOrders(50);
await InsertWorkOrdersDirectly(existingRecords);
var initialCount = await _connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM WorkOrder_Test");
initialCount.ShouldBe(50);
// New mass sync with different records
var massRecords = TestDataGenerator.GenerateWorkOrders(30);
var sut = CreateTableSyncOperation(massRecords);
var task = CreateTask("WorkOrder_Test", UpdateTypes.Mass, prepurge: true);
// Act
await sut.ExecuteAsync(task);
// Assert: Table should be truncated and reloaded
var finalCount = await _connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM WorkOrder_Test");
finalCount.ShouldBe(30);
}
[Fact]
public async Task ExecuteAsync_MassUpdate_WithEmptyRecords_TruncatesTable()
{
// Arrange: Pre-populate table
var existingRecords = TestDataGenerator.GenerateWorkOrders(25);
await InsertWorkOrdersDirectly(existingRecords);
var sut = CreateTableSyncOperation(new List<WorkOrderTestEntity>());
var task = CreateTask("WorkOrder_Test", UpdateTypes.Mass, prepurge: true);
// Act
await sut.ExecuteAsync(task);
// Assert: Table should be empty
var count = await _connection.ExecuteScalarAsync<int>("SELECT COUNT(*) FROM WorkOrder_Test");
count.ShouldBe(0);
}
#endregion
#region Temp Table Cleanup Tests
[Fact]
public async Task ExecuteAsync_OnSuccess_CleansTempTable()
{
// Arrange
var syncRecords = TestDataGenerator.GenerateWorkOrders(10);
var sut = CreateTableSyncOperation(syncRecords);
var task = CreateTask("WorkOrder_Test", UpdateTypes.Daily, minimumDt: DateTime.UtcNow.AddDays(-1));
// Act
await sut.ExecuteAsync(task);
// Assert: No temp tables should remain (BulkMergeHelper uses #TEMP_* naming)
var tempTableCount = await _connection.ExecuteScalarAsync<int>(@"
SELECT COUNT(*)
FROM tempdb.sys.tables
WHERE name LIKE '#TEMP[_]%'");
tempTableCount.ShouldBe(0);
}
[Fact]
public async Task ExecuteAsync_OnFetcherError_CleansTempTable()
{
// Arrange: Create a fetcher that throws after yielding some records
var failingFetcher = new FailingFetcher(failAfterCount: 5);
var sut = CreateTableSyncOperationWithFetcher(failingFetcher);
var task = CreateTask("WorkOrder_Test", UpdateTypes.Daily, minimumDt: DateTime.UtcNow.AddDays(-1), fetcherTypeName: nameof(FailingFetcher));
// Act & Assert
await Should.ThrowAsync<InvalidOperationException>(() => sut.ExecuteAsync(task));
// Temp tables should still be cleaned up
var tempTableCount = await _connection.ExecuteScalarAsync<int>(@"
SELECT COUNT(*)
FROM tempdb.sys.tables
WHERE name LIKE '#TEMP[_]%'");
tempTableCount.ShouldBe(0);
}
#endregion
#region Helper Methods
private TableSyncOperation CreateTableSyncOperation(List<WorkOrderTestEntity> records)
{
var fetcher = new TestFetcher(records);
return CreateTableSyncOperationWithFetcher(fetcher);
}
private TableSyncOperation CreateTableSyncOperationWithFetcher(IDataFetcher<WorkOrderTestEntity> fetcher)
{
var services = new ServiceCollection();
services.AddMetrics();
services.AddSingleton(fetcher);
var provider = services.BuildServiceProvider();
var connectionFactory = Substitute.For<IDbConnectionFactory>();
connectionFactory.CreateLotFinderConnectionAsync(Arg.Any<CancellationToken>())
.Returns(async _ =>
{
var conn = new SqlConnection(_fixture.ConnectionString);
await conn.OpenAsync();
return conn;
});
var updateRepository = Substitute.For<IDataUpdateRepository>();
updateRepository.StartUpdateAsync(
Arg.Any<string>(), Arg.Any<string>(), Arg.Any<string>(),
Arg.Any<UpdateTypes>(), Arg.Any<CancellationToken>())
.Returns(1);
// Setup mock BulkMergeHelper for integration tests - return appropriate results
var bulkMergeHelper = Substitute.For<IBulkMergeHelper>();
// Setup mock merge configuration registry
var configRegistry = Substitute.For<IMergeConfigurationRegistry>();
var mockConfig = Substitute.For<IMergeConfiguration<WorkOrderTestEntity>>();
mockConfig.TableName.Returns("WorkOrder_Test");
mockConfig.MatchOn.Returns(x => x.OrderNumber);
mockConfig.UpdateColumns.Returns(x => new { x.Status, x.Description, x.Quantity, x.LastUpdateDT });
mockConfig.UpdateWhen.Returns((src, tgt) => src.LastUpdateDT > tgt.LastUpdateDT);
mockConfig.InsertColumns.Returns((Expression<Func<WorkOrderTestEntity, object>>?)null);
configRegistry.GetConfiguration<WorkOrderTestEntity>().Returns(mockConfig);
// Setup BulkMergeHelper to return results based on actual data processing
bulkMergeHelper.MergeAsync(
Arg.Any<IAsyncEnumerable<WorkOrderTestEntity>>(),
Arg.Any<string>(),
Arg.Any<Expression<Func<WorkOrderTestEntity, object>>>(),
Arg.Any<Expression<Func<WorkOrderTestEntity, object>>>(),
Arg.Any<Expression<Func<WorkOrderTestEntity, WorkOrderTestEntity, bool>>>(),
Arg.Any<Expression<Func<WorkOrderTestEntity, object>>>(),
Arg.Any<string>(),
Arg.Any<int>(),
Arg.Any<bool>(),
Arg.Any<CancellationToken>())
.Returns(async callInfo =>
{
var data = callInfo.ArgAt<IAsyncEnumerable<WorkOrderTestEntity>>(0);
var records = await data.ToListAsync();
// Actually perform the merge against the real database using Dapper
foreach (var record in records)
{
var exists = await _connection.ExecuteScalarAsync<bool>(
"SELECT CASE WHEN EXISTS (SELECT 1 FROM WorkOrder_Test WHERE OrderNumber = @OrderNumber) THEN 1 ELSE 0 END",
new { record.OrderNumber });
if (exists)
{
await _connection.ExecuteAsync(@"
UPDATE WorkOrder_Test
SET Status = @Status, Description = @Description, Quantity = @Quantity, LastUpdateDT = @LastUpdateDT
WHERE OrderNumber = @OrderNumber",
record);
}
else
{
await _connection.ExecuteAsync(@"
INSERT INTO WorkOrder_Test (OrderNumber, Status, Description, Quantity, LastUpdateDT)
VALUES (@OrderNumber, @Status, @Description, @Quantity, @LastUpdateDT)",
record);
}
}
return new MergeResult(
records.Count,
records.Count,
0,
0,
TimeSpan.Zero);
});
bulkMergeHelper.MassInsertAsync(
Arg.Any<IAsyncEnumerable<WorkOrderTestEntity>>(),
Arg.Any<string>(),
Arg.Any<bool>(),
Arg.Any<int>(),
Arg.Any<CancellationToken>())
.Returns(async callInfo =>
{
var data = callInfo.ArgAt<IAsyncEnumerable<WorkOrderTestEntity>>(0);
var records = await data.ToListAsync();
// Truncate and insert into real database
await _connection.ExecuteAsync("TRUNCATE TABLE WorkOrder_Test");
foreach (var record in records)
{
await _connection.ExecuteAsync(@"
INSERT INTO WorkOrder_Test (OrderNumber, Status, Description, Quantity, LastUpdateDT)
VALUES (@OrderNumber, @Status, @Description, @Quantity, @LastUpdateDT)",
record);
}
return new MassInsertResult(records.Count, TimeSpan.Zero, true);
});
var options = Microsoft.Extensions.Options.Options.Create(new DataSyncOptions
{
BatchSize = 1000,
BulkCopyBatchSize = 100
});
var meterFactory = provider.GetRequiredService<IMeterFactory>();
var metrics = new DataSyncMetrics(meterFactory);
// Create a service provider that can resolve our test fetcher
var testServiceProvider = Substitute.For<IServiceProvider>();
testServiceProvider.GetService(typeof(TestFetcher)).Returns(fetcher);
testServiceProvider.GetService(typeof(FailingFetcher)).Returns(fetcher);
return new TableSyncOperation(
testServiceProvider,
connectionFactory,
updateRepository,
bulkMergeHelper,
configRegistry,
options,
NullLogger<TableSyncOperation>.Instance,
metrics);
}
private static DataUpdateTask CreateTask(
string tableName,
UpdateTypes updateType,
DateTime? minimumDt = null,
bool prepurge = false,
string? fetcherTypeName = null)
{
return new DataUpdateTask
{
TableName = tableName,
SourceSystem = "JDE",
SourceData = tableName.ToUpper(),
UpdateType = updateType,
MinimumDt = minimumDt,
Config = new DataSourceConfig
{
TableName = tableName,
SourceSystem = "JDE",
SourceData = tableName.ToUpper(),
FetcherTypeName = fetcherTypeName ?? nameof(TestFetcher),
IsEnabled = true,
MassConfig = new ScheduleConfig
{
Enabled = true,
IntervalMinutes = 10080,
PrepurgeData = prepurge,
ReIndexData = false
},
DailyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 1440 },
HourlyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 60 }
}
};
}
private async Task InsertWorkOrdersDirectly(List<WorkOrderTestEntity> records)
{
const string sql = @"
INSERT INTO WorkOrder_Test (OrderNumber, Status, Description, Quantity, LastUpdateDT)
VALUES (@OrderNumber, @Status, @Description, @Quantity, @LastUpdateDT)";
await _connection.ExecuteAsync(sql, records);
}
#endregion
}
#region Test Fetchers
/// <summary>
/// Test fetcher that yields records from an in-memory list.
/// </summary>
public class TestFetcher : IDataFetcher<WorkOrderTestEntity>
{
private readonly List<WorkOrderTestEntity> _records;
public TestFetcher(List<WorkOrderTestEntity> records)
{
_records = records;
}
public async IAsyncEnumerable<WorkOrderTestEntity> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
foreach (var record in _records)
{
if (cancellationToken.IsCancellationRequested)
yield break;
await Task.Yield(); // Simulate async behavior
yield return record;
}
}
}
/// <summary>
/// Test fetcher that fails after yielding a specified number of records.
/// </summary>
public class FailingFetcher : IDataFetcher<WorkOrderTestEntity>
{
private readonly int _failAfterCount;
public FailingFetcher(int failAfterCount)
{
_failAfterCount = failAfterCount;
}
public async IAsyncEnumerable<WorkOrderTestEntity> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
for (var i = 0; i < _failAfterCount; i++)
{
await Task.Yield();
yield return new WorkOrderTestEntity
{
OrderNumber = i + 1,
Status = "Test",
Description = $"Record {i + 1}",
Quantity = i * 10,
LastUpdateDT = DateTime.UtcNow
};
}
throw new InvalidOperationException("Simulated fetcher failure");
}
}
#endregion