diff --git a/NEW/src/JdeScoping.DataSync/Services/TableSyncOperation.cs b/NEW/src/JdeScoping.DataSync/Services/TableSyncOperation.cs index 67422cd..ec566b1 100644 --- a/NEW/src/JdeScoping.DataSync/Services/TableSyncOperation.cs +++ b/NEW/src/JdeScoping.DataSync/Services/TableSyncOperation.cs @@ -133,15 +133,12 @@ public class TableSyncOperation : ITableSyncOperation /// private async Task ExecuteSyncCoreAsync(DataUpdateTask task, CancellationToken cancellationToken) { - // Determine sync mode based on update type - var syncMode = task.UpdateType == UpdateTypes.Mass ? SyncMode.Mass : SyncMode.Incremental; + _logger.LogDebug("Building pipeline for {Table} with UpdateType={UpdateType}", task.TableName, task.UpdateType); - _logger.LogDebug("Building pipeline for {Table} in {Mode} mode", task.TableName, syncMode); - - // Build and execute the pipeline + // Build and execute the pipeline using the task's UpdateType directly var pipeline = _pipelineFactory .ForTable(task.TableName) - .WithMode(syncMode) + .WithUpdateType(task.UpdateType) .WithMinimumDate(task.MinimumDt) .Build(); diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Services/TableSyncOperationTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Services/TableSyncOperationTests.cs new file mode 100644 index 0000000..c99e5cd --- /dev/null +++ b/NEW/tests/JdeScoping.DataSync.Tests/Services/TableSyncOperationTests.cs @@ -0,0 +1,411 @@ +using System.Data; +using System.Diagnostics.Metrics; +using JdeScoping.Core.Models.Enums; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.Etl.Contracts; +using JdeScoping.DataSync.Etl.Pipeline; +using JdeScoping.DataSync.Etl.Results; +using JdeScoping.DataSync.Models; +using JdeScoping.DataSync.Options; +using JdeScoping.DataSync.Services; +using JdeScoping.DataSync.Telemetry; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using NSubstitute; +using Shouldly; + +namespace JdeScoping.DataSync.Tests.Services; + +/// +/// Unit tests for TableSyncOperation. +/// Tests that the operation correctly uses the ETL pipeline with UpdateTypes. +/// +public class TableSyncOperationTests +{ + private readonly IDataUpdateRepository _updateRepository; + private readonly IOptions _options; + private readonly DataSyncMetrics _metrics; + + public TableSyncOperationTests() + { + _updateRepository = Substitute.For(); + _updateRepository.StartUpdateAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any()) + .Returns(1); + + _options = Microsoft.Extensions.Options.Options.Create(new DataSyncOptions()); + + var services = new ServiceCollection(); + services.AddMetrics(); + var provider = services.BuildServiceProvider(); + var meterFactory = provider.GetRequiredService(); + _metrics = new DataSyncMetrics(meterFactory); + } + + #region WithUpdateType Tests + + [Fact] + public async Task ExecuteAsync_WithUpdateTypesDaily_CallsWithUpdateTypeWithDaily() + { + // Arrange + var task = CreateTask("TestTable", UpdateTypes.Daily); + UpdateTypes? receivedUpdateType = null; + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()) + .Returns(callInfo => + { + receivedUpdateType = callInfo.Arg(); + return mockBuilder; + }); + mockBuilder.WithMinimumDate(Arg.Any()).Returns(mockBuilder); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()).Returns(mockBuilder); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act + await sut.ExecuteAsync(task); + + // Assert - Verify WithUpdateType was called with Daily (not mapped to Incremental) + receivedUpdateType.ShouldBe(UpdateTypes.Daily); + } + + [Fact] + public async Task ExecuteAsync_WithUpdateTypesHourly_CallsWithUpdateTypeWithHourly() + { + // Arrange + var task = CreateTask("TestTable", UpdateTypes.Hourly); + UpdateTypes? receivedUpdateType = null; + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()) + .Returns(callInfo => + { + receivedUpdateType = callInfo.Arg(); + return mockBuilder; + }); + mockBuilder.WithMinimumDate(Arg.Any()).Returns(mockBuilder); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()).Returns(mockBuilder); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act + await sut.ExecuteAsync(task); + + // Assert + receivedUpdateType.ShouldBe(UpdateTypes.Hourly); + } + + [Fact] + public async Task ExecuteAsync_WithUpdateTypesMass_CallsWithUpdateTypeWithMass() + { + // Arrange + var task = CreateTask("TestTable", UpdateTypes.Mass); + UpdateTypes? receivedUpdateType = null; + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()) + .Returns(callInfo => + { + receivedUpdateType = callInfo.Arg(); + return mockBuilder; + }); + mockBuilder.WithMinimumDate(Arg.Any()).Returns(mockBuilder); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()).Returns(mockBuilder); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act + await sut.ExecuteAsync(task); + + // Assert + receivedUpdateType.ShouldBe(UpdateTypes.Mass); + } + + [Fact] + public async Task ExecuteAsync_DoesNotCallObsoleteWithModeMethod() + { + // Arrange + var task = CreateTask("TestTable", UpdateTypes.Daily); + var withModeCalled = false; + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()).Returns(mockBuilder); + mockBuilder.WithMode(Arg.Any()) + .Returns(callInfo => + { + withModeCalled = true; + return mockBuilder; + }); + mockBuilder.WithMinimumDate(Arg.Any()).Returns(mockBuilder); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()).Returns(mockBuilder); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act + await sut.ExecuteAsync(task); + + // Assert - Verify the obsolete WithMode method was NOT called + withModeCalled.ShouldBeFalse(); + } + + #endregion + + #region Pipeline Execution Tests + + [Fact] + public async Task ExecuteAsync_CallsForTableWithCorrectTableName() + { + // Arrange + var task = CreateTask("WorkOrder", UpdateTypes.Daily); + string? receivedTableName = null; + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()).Returns(mockBuilder); + mockBuilder.WithMinimumDate(Arg.Any()).Returns(mockBuilder); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()) + .Returns(callInfo => + { + receivedTableName = callInfo.Arg(); + return mockBuilder; + }); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act + await sut.ExecuteAsync(task); + + // Assert + receivedTableName.ShouldBe("WorkOrder"); + } + + [Fact] + public async Task ExecuteAsync_CallsWithMinimumDateWithTaskMinimumDt() + { + // Arrange + var minDt = new DateTime(2024, 1, 15, 10, 30, 0, DateTimeKind.Utc); + var task = CreateTask("TestTable", UpdateTypes.Daily, minDt); + DateTime? receivedMinDt = null; + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()).Returns(mockBuilder); + mockBuilder.WithMinimumDate(Arg.Any()) + .Returns(callInfo => + { + receivedMinDt = callInfo.Arg(); + return mockBuilder; + }); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()).Returns(mockBuilder); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act + await sut.ExecuteAsync(task); + + // Assert + receivedMinDt.ShouldBe(minDt); + } + + [Fact] + public async Task ExecuteAsync_SuccessfulPipeline_CompletesUpdateAsSuccess() + { + // Arrange + var task = CreateTask("TestTable", UpdateTypes.Daily); + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(totalRows: 100); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()).Returns(mockBuilder); + mockBuilder.WithMinimumDate(Arg.Any()).Returns(mockBuilder); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()).Returns(mockBuilder); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act + await sut.ExecuteAsync(task); + + // Assert + await _updateRepository.Received(1).CompleteUpdateAsync( + Arg.Any(), + Arg.Is(rows => rows == 100), + Arg.Is(success => success == true), + Arg.Any()); + } + + [Fact] + public async Task ExecuteAsync_FailedPipeline_ThrowsAndCompletesUpdateAsFailure() + { + // Arrange + var task = CreateTask("TestTable", UpdateTypes.Daily); + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(success: false); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()).Returns(mockBuilder); + mockBuilder.WithMinimumDate(Arg.Any()).Returns(mockBuilder); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()).Returns(mockBuilder); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act & Assert + await Should.ThrowAsync(() => sut.ExecuteAsync(task)); + + await _updateRepository.Received(1).CompleteUpdateAsync( + Arg.Any(), + Arg.Is(rows => rows == -1), + Arg.Is(success => success == false), + Arg.Any()); + } + + #endregion + + #region Helper Methods + + private static DataUpdateTask CreateTask(string tableName, UpdateTypes updateType, DateTime? minDt = null) + { + return new DataUpdateTask + { + TableName = tableName, + SourceSystem = "JDE", + SourceData = tableName.ToUpper(), + UpdateType = updateType, + MinimumDt = minDt, + Config = new DataSourceConfig + { + TableName = tableName, + SourceSystem = "JDE", + SourceData = tableName.ToUpper(), + IsEnabled = true, + MassConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 10080 }, + DailyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 1440 }, + HourlyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 60 } + } + }; + } + + /// + /// Creates a real EtlPipeline with mocked source and destination for testing. + /// + private static EtlPipeline CreateTestPipeline(bool success = true, long totalRows = 0) + { + // Create mock source + var mockSource = Substitute.For(); + mockSource.SourceName.Returns("TestSource"); + var mockReader = Substitute.For(); + mockReader.Read().Returns(false); // No rows + mockSource.ReadDataAsync(Arg.Any()).Returns(mockReader); + + // Create mock destination + var mockDestination = Substitute.For(); + mockDestination.DestinationName.Returns("TestDestination"); + + if (success) + { + mockDestination.WriteAsync(Arg.Any(), Arg.Any()) + .Returns(new DestinationResult(totalRows, 1, TimeSpan.FromMilliseconds(10))); + } + else + { + mockDestination.WriteAsync(Arg.Any(), Arg.Any()) + .Returns>(_ => throw new Exception("Pipeline failed")); + } + + // Build the pipeline using the real builder + return new EtlPipelineBuilder() + .WithName("TestPipeline") + .WithSource(mockSource) + .WithDestination(mockDestination) + .Build(); + } + + #endregion +}