using System.Diagnostics.Metrics; using System.Linq.Expressions; using System.Runtime.CompilerServices; using JdeScoping.Core.Interfaces; using JdeScoping.Core.Models.Enums; using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Options; using JdeScoping.DataSync.Contracts; using JdeScoping.DataSync.IntegrationTests.Infrastructure; using JdeScoping.DataSync.Models; using JdeScoping.DataSync.Services; using JdeScoping.DataSync.Telemetry; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using NSubstitute; namespace JdeScoping.DataSync.IntegrationTests; /// /// Integration tests for TableSyncOperation. /// Tests end-to-end sync paths (incremental and mass) against real SQL Server. /// [Collection("Database")] public class TableSyncOperationTests : IAsyncLifetime { private readonly SqlServerFixture _fixture; private SqlConnection _connection = null!; public TableSyncOperationTests(SqlServerFixture fixture) { _fixture = fixture; } public async Task InitializeAsync() { _connection = await _fixture.CreateConnectionAsync(); await _fixture.CleanupTablesAsync(); } public async Task DisposeAsync() { await _connection.DisposeAsync(); } #region Daily Update (Incremental Path) Tests [Fact] public async Task ExecuteAsync_DailyUpdate_UsesStagingAndMerge() { // Arrange var baseTime = DateTime.UtcNow; // Pre-populate with some existing records var existingRecords = new List { new() { OrderNumber = 1, Status = "Old", Description = "Existing 1", Quantity = 10, LastUpdateDT = baseTime.AddHours(-2) }, new() { OrderNumber = 2, Status = "Old", Description = "Existing 2", Quantity = 20, LastUpdateDT = baseTime.AddHours(-2) } }; await InsertWorkOrdersDirectly(existingRecords); // New records to sync (one update, one insert) var syncRecords = new List { new() { OrderNumber = 1, Status = "Updated", Description = "Updated 1", Quantity = 100, LastUpdateDT = baseTime }, new() { OrderNumber = 3, Status = "New", Description = "New 3", Quantity = 30, LastUpdateDT = baseTime } }; var sut = CreateTableSyncOperation(syncRecords); var task = CreateTask("WorkOrder_Test", UpdateTypes.Daily, minimumDt: baseTime.AddDays(-1)); // Act await sut.ExecuteAsync(task); // Assert var results = (await _connection.QueryAsync( "SELECT * FROM WorkOrder_Test ORDER BY OrderNumber")).ToList(); results.Count.ShouldBe(3); // Record 1 should be updated results[0].OrderNumber.ShouldBe(1); results[0].Status.ShouldBe("Updated"); results[0].Quantity.ShouldBe(100); // Record 2 should be unchanged (not in sync batch) results[1].OrderNumber.ShouldBe(2); results[1].Status.ShouldBe("Old"); // Record 3 should be inserted results[2].OrderNumber.ShouldBe(3); results[2].Status.ShouldBe("New"); } [Fact] public async Task ExecuteAsync_HourlyUpdate_UsesStagingAndMerge() { // Arrange var syncRecords = TestDataGenerator.GenerateWorkOrders(15); var sut = CreateTableSyncOperation(syncRecords); var task = CreateTask("WorkOrder_Test", UpdateTypes.Hourly, minimumDt: DateTime.UtcNow.AddHours(-1)); // Act await sut.ExecuteAsync(task); // Assert var count = await _connection.ExecuteScalarAsync("SELECT COUNT(*) FROM WorkOrder_Test"); count.ShouldBe(15); } #endregion #region Mass Update (Truncate Path) Tests [Fact] public async Task ExecuteAsync_MassUpdate_UsesTruncatePath() { // Arrange: Pre-populate table var existingRecords = TestDataGenerator.GenerateWorkOrders(50); await InsertWorkOrdersDirectly(existingRecords); var initialCount = await _connection.ExecuteScalarAsync("SELECT COUNT(*) FROM WorkOrder_Test"); initialCount.ShouldBe(50); // New mass sync with different records var massRecords = TestDataGenerator.GenerateWorkOrders(30); var sut = CreateTableSyncOperation(massRecords); var task = CreateTask("WorkOrder_Test", UpdateTypes.Mass, prepurge: true); // Act await sut.ExecuteAsync(task); // Assert: Table should be truncated and reloaded var finalCount = await _connection.ExecuteScalarAsync("SELECT COUNT(*) FROM WorkOrder_Test"); finalCount.ShouldBe(30); } [Fact] public async Task ExecuteAsync_MassUpdate_WithEmptyRecords_TruncatesTable() { // Arrange: Pre-populate table var existingRecords = TestDataGenerator.GenerateWorkOrders(25); await InsertWorkOrdersDirectly(existingRecords); var sut = CreateTableSyncOperation(new List()); var task = CreateTask("WorkOrder_Test", UpdateTypes.Mass, prepurge: true); // Act await sut.ExecuteAsync(task); // Assert: Table should be empty var count = await _connection.ExecuteScalarAsync("SELECT COUNT(*) FROM WorkOrder_Test"); count.ShouldBe(0); } #endregion #region Temp Table Cleanup Tests [Fact] public async Task ExecuteAsync_OnSuccess_CleansTempTable() { // Arrange var syncRecords = TestDataGenerator.GenerateWorkOrders(10); var sut = CreateTableSyncOperation(syncRecords); var task = CreateTask("WorkOrder_Test", UpdateTypes.Daily, minimumDt: DateTime.UtcNow.AddDays(-1)); // Act await sut.ExecuteAsync(task); // Assert: No temp tables should remain (BulkMergeHelper uses #TEMP_* naming) var tempTableCount = await _connection.ExecuteScalarAsync(@" SELECT COUNT(*) FROM tempdb.sys.tables WHERE name LIKE '#TEMP[_]%'"); tempTableCount.ShouldBe(0); } [Fact] public async Task ExecuteAsync_OnFetcherError_CleansTempTable() { // Arrange: Create a fetcher that throws after yielding some records var failingFetcher = new FailingFetcher(failAfterCount: 5); var sut = CreateTableSyncOperationWithFetcher(failingFetcher); var task = CreateTask("WorkOrder_Test", UpdateTypes.Daily, minimumDt: DateTime.UtcNow.AddDays(-1), fetcherTypeName: nameof(FailingFetcher)); // Act & Assert await Should.ThrowAsync(() => sut.ExecuteAsync(task)); // Temp tables should still be cleaned up var tempTableCount = await _connection.ExecuteScalarAsync(@" SELECT COUNT(*) FROM tempdb.sys.tables WHERE name LIKE '#TEMP[_]%'"); tempTableCount.ShouldBe(0); } #endregion #region Helper Methods private TableSyncOperation CreateTableSyncOperation(List records) { var fetcher = new TestFetcher(records); return CreateTableSyncOperationWithFetcher(fetcher); } private TableSyncOperation CreateTableSyncOperationWithFetcher(IDataFetcher fetcher) { var services = new ServiceCollection(); services.AddMetrics(); services.AddSingleton(fetcher); var provider = services.BuildServiceProvider(); var connectionFactory = Substitute.For(); connectionFactory.CreateLotFinderConnectionAsync(Arg.Any()) .Returns(async _ => { var conn = new SqlConnection(_fixture.ConnectionString); await conn.OpenAsync(); return conn; }); var updateRepository = Substitute.For(); updateRepository.StartUpdateAsync( Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any()) .Returns(1); // Setup mock BulkMergeHelper for integration tests - return appropriate results var bulkMergeHelper = Substitute.For(); // Setup mock merge configuration registry var configRegistry = Substitute.For(); var mockConfig = Substitute.For>(); mockConfig.TableName.Returns("WorkOrder_Test"); mockConfig.MatchOn.Returns(x => x.OrderNumber); mockConfig.UpdateColumns.Returns(x => new { x.Status, x.Description, x.Quantity, x.LastUpdateDT }); mockConfig.UpdateWhen.Returns((src, tgt) => src.LastUpdateDT > tgt.LastUpdateDT); mockConfig.InsertColumns.Returns((Expression>?)null); configRegistry.GetConfiguration().Returns(mockConfig); // Setup BulkMergeHelper to return results based on actual data processing bulkMergeHelper.MergeAsync( Arg.Any>(), Arg.Any(), Arg.Any>>(), Arg.Any>>(), Arg.Any>>(), Arg.Any>>(), Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any()) .Returns(async callInfo => { var data = callInfo.ArgAt>(0); var records = await data.ToListAsync(); // Actually perform the merge against the real database using Dapper foreach (var record in records) { var exists = await _connection.ExecuteScalarAsync( "SELECT CASE WHEN EXISTS (SELECT 1 FROM WorkOrder_Test WHERE OrderNumber = @OrderNumber) THEN 1 ELSE 0 END", new { record.OrderNumber }); if (exists) { await _connection.ExecuteAsync(@" UPDATE WorkOrder_Test SET Status = @Status, Description = @Description, Quantity = @Quantity, LastUpdateDT = @LastUpdateDT WHERE OrderNumber = @OrderNumber", record); } else { await _connection.ExecuteAsync(@" INSERT INTO WorkOrder_Test (OrderNumber, Status, Description, Quantity, LastUpdateDT) VALUES (@OrderNumber, @Status, @Description, @Quantity, @LastUpdateDT)", record); } } return new MergeResult( records.Count, records.Count, 0, 0, TimeSpan.Zero); }); bulkMergeHelper.MassInsertAsync( Arg.Any>(), Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any()) .Returns(async callInfo => { var data = callInfo.ArgAt>(0); var records = await data.ToListAsync(); // Truncate and insert into real database await _connection.ExecuteAsync("TRUNCATE TABLE WorkOrder_Test"); foreach (var record in records) { await _connection.ExecuteAsync(@" INSERT INTO WorkOrder_Test (OrderNumber, Status, Description, Quantity, LastUpdateDT) VALUES (@OrderNumber, @Status, @Description, @Quantity, @LastUpdateDT)", record); } return new MassInsertResult(records.Count, TimeSpan.Zero, true); }); var options = Microsoft.Extensions.Options.Options.Create(new DataSyncOptions { BatchSize = 1000, BulkCopyBatchSize = 100 }); var meterFactory = provider.GetRequiredService(); var metrics = new DataSyncMetrics(meterFactory); // Create a service provider that can resolve our test fetcher var testServiceProvider = Substitute.For(); testServiceProvider.GetService(typeof(TestFetcher)).Returns(fetcher); testServiceProvider.GetService(typeof(FailingFetcher)).Returns(fetcher); return new TableSyncOperation( testServiceProvider, connectionFactory, updateRepository, bulkMergeHelper, configRegistry, options, NullLogger.Instance, metrics); } private static DataUpdateTask CreateTask( string tableName, UpdateTypes updateType, DateTime? minimumDt = null, bool prepurge = false, string? fetcherTypeName = null) { return new DataUpdateTask { TableName = tableName, SourceSystem = "JDE", SourceData = tableName.ToUpper(), UpdateType = updateType, MinimumDt = minimumDt, Config = new DataSourceConfig { TableName = tableName, SourceSystem = "JDE", SourceData = tableName.ToUpper(), FetcherTypeName = fetcherTypeName ?? nameof(TestFetcher), IsEnabled = true, MassConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 10080, PrepurgeData = prepurge, ReIndexData = false }, DailyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 1440 }, HourlyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 60 } } }; } private async Task InsertWorkOrdersDirectly(List records) { const string sql = @" INSERT INTO WorkOrder_Test (OrderNumber, Status, Description, Quantity, LastUpdateDT) VALUES (@OrderNumber, @Status, @Description, @Quantity, @LastUpdateDT)"; await _connection.ExecuteAsync(sql, records); } #endregion } #region Test Fetchers /// /// Test fetcher that yields records from an in-memory list. /// public class TestFetcher : IDataFetcher { private readonly List _records; public TestFetcher(List records) { _records = records; } public async IAsyncEnumerable FetchAsync( DateTime? minimumDt, [EnumeratorCancellation] CancellationToken cancellationToken = default) { foreach (var record in _records) { if (cancellationToken.IsCancellationRequested) yield break; await Task.Yield(); // Simulate async behavior yield return record; } } } /// /// Test fetcher that fails after yielding a specified number of records. /// public class FailingFetcher : IDataFetcher { private readonly int _failAfterCount; public FailingFetcher(int failAfterCount) { _failAfterCount = failAfterCount; } public async IAsyncEnumerable FetchAsync( DateTime? minimumDt, [EnumeratorCancellation] CancellationToken cancellationToken = default) { for (var i = 0; i < _failAfterCount; i++) { await Task.Yield(); yield return new WorkOrderTestEntity { OrderNumber = i + 1, Status = "Test", Description = $"Record {i + 1}", Quantity = i * 10, LastUpdateDT = DateTime.UtcNow }; } throw new InvalidOperationException("Simulated fetcher failure"); } } #endregion