From e234c9f29acc3a7e61d25eb61dfd3adadc738f5d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 7 Jan 2026 01:25:24 -0500 Subject: [PATCH] refactor(datasync): use WithUpdateType instead of WithMode in TableSyncOperation Update TableSyncOperation to pass UpdateTypes directly to the pipeline builder using WithUpdateType() instead of mapping to SyncMode and calling the deprecated WithMode() method. This enables proper schedule-based configuration handling where Daily and Hourly have distinct behaviors. - Remove SyncMode mapping logic from ExecuteSyncCoreAsync - Call WithUpdateType(task.UpdateType) directly - Update log message to reflect UpdateType instead of SyncMode - Add TableSyncOperationTests verifying WithUpdateType is called correctly --- .../Services/TableSyncOperation.cs | 9 +- .../Services/TableSyncOperationTests.cs | 411 ++++++++++++++++++ 2 files changed, 414 insertions(+), 6 deletions(-) create mode 100644 NEW/tests/JdeScoping.DataSync.Tests/Services/TableSyncOperationTests.cs diff --git a/NEW/src/JdeScoping.DataSync/Services/TableSyncOperation.cs b/NEW/src/JdeScoping.DataSync/Services/TableSyncOperation.cs index 67422cd..ec566b1 100644 --- a/NEW/src/JdeScoping.DataSync/Services/TableSyncOperation.cs +++ b/NEW/src/JdeScoping.DataSync/Services/TableSyncOperation.cs @@ -133,15 +133,12 @@ public class TableSyncOperation : ITableSyncOperation /// private async Task ExecuteSyncCoreAsync(DataUpdateTask task, CancellationToken cancellationToken) { - // Determine sync mode based on update type - var syncMode = task.UpdateType == UpdateTypes.Mass ? SyncMode.Mass : SyncMode.Incremental; + _logger.LogDebug("Building pipeline for {Table} with UpdateType={UpdateType}", task.TableName, task.UpdateType); - _logger.LogDebug("Building pipeline for {Table} in {Mode} mode", task.TableName, syncMode); - - // Build and execute the pipeline + // Build and execute the pipeline using the task's UpdateType directly var pipeline = _pipelineFactory .ForTable(task.TableName) - .WithMode(syncMode) + .WithUpdateType(task.UpdateType) .WithMinimumDate(task.MinimumDt) .Build(); diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Services/TableSyncOperationTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Services/TableSyncOperationTests.cs new file mode 100644 index 0000000..c99e5cd --- /dev/null +++ b/NEW/tests/JdeScoping.DataSync.Tests/Services/TableSyncOperationTests.cs @@ -0,0 +1,411 @@ +using System.Data; +using System.Diagnostics.Metrics; +using JdeScoping.Core.Models.Enums; +using JdeScoping.DataSync.Contracts; +using JdeScoping.DataSync.Etl.Contracts; +using JdeScoping.DataSync.Etl.Pipeline; +using JdeScoping.DataSync.Etl.Results; +using JdeScoping.DataSync.Models; +using JdeScoping.DataSync.Options; +using JdeScoping.DataSync.Services; +using JdeScoping.DataSync.Telemetry; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using NSubstitute; +using Shouldly; + +namespace JdeScoping.DataSync.Tests.Services; + +/// +/// Unit tests for TableSyncOperation. +/// Tests that the operation correctly uses the ETL pipeline with UpdateTypes. +/// +public class TableSyncOperationTests +{ + private readonly IDataUpdateRepository _updateRepository; + private readonly IOptions _options; + private readonly DataSyncMetrics _metrics; + + public TableSyncOperationTests() + { + _updateRepository = Substitute.For(); + _updateRepository.StartUpdateAsync( + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any(), + Arg.Any()) + .Returns(1); + + _options = Microsoft.Extensions.Options.Options.Create(new DataSyncOptions()); + + var services = new ServiceCollection(); + services.AddMetrics(); + var provider = services.BuildServiceProvider(); + var meterFactory = provider.GetRequiredService(); + _metrics = new DataSyncMetrics(meterFactory); + } + + #region WithUpdateType Tests + + [Fact] + public async Task ExecuteAsync_WithUpdateTypesDaily_CallsWithUpdateTypeWithDaily() + { + // Arrange + var task = CreateTask("TestTable", UpdateTypes.Daily); + UpdateTypes? receivedUpdateType = null; + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()) + .Returns(callInfo => + { + receivedUpdateType = callInfo.Arg(); + return mockBuilder; + }); + mockBuilder.WithMinimumDate(Arg.Any()).Returns(mockBuilder); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()).Returns(mockBuilder); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act + await sut.ExecuteAsync(task); + + // Assert - Verify WithUpdateType was called with Daily (not mapped to Incremental) + receivedUpdateType.ShouldBe(UpdateTypes.Daily); + } + + [Fact] + public async Task ExecuteAsync_WithUpdateTypesHourly_CallsWithUpdateTypeWithHourly() + { + // Arrange + var task = CreateTask("TestTable", UpdateTypes.Hourly); + UpdateTypes? receivedUpdateType = null; + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()) + .Returns(callInfo => + { + receivedUpdateType = callInfo.Arg(); + return mockBuilder; + }); + mockBuilder.WithMinimumDate(Arg.Any()).Returns(mockBuilder); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()).Returns(mockBuilder); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act + await sut.ExecuteAsync(task); + + // Assert + receivedUpdateType.ShouldBe(UpdateTypes.Hourly); + } + + [Fact] + public async Task ExecuteAsync_WithUpdateTypesMass_CallsWithUpdateTypeWithMass() + { + // Arrange + var task = CreateTask("TestTable", UpdateTypes.Mass); + UpdateTypes? receivedUpdateType = null; + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()) + .Returns(callInfo => + { + receivedUpdateType = callInfo.Arg(); + return mockBuilder; + }); + mockBuilder.WithMinimumDate(Arg.Any()).Returns(mockBuilder); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()).Returns(mockBuilder); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act + await sut.ExecuteAsync(task); + + // Assert + receivedUpdateType.ShouldBe(UpdateTypes.Mass); + } + + [Fact] + public async Task ExecuteAsync_DoesNotCallObsoleteWithModeMethod() + { + // Arrange + var task = CreateTask("TestTable", UpdateTypes.Daily); + var withModeCalled = false; + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()).Returns(mockBuilder); + mockBuilder.WithMode(Arg.Any()) + .Returns(callInfo => + { + withModeCalled = true; + return mockBuilder; + }); + mockBuilder.WithMinimumDate(Arg.Any()).Returns(mockBuilder); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()).Returns(mockBuilder); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act + await sut.ExecuteAsync(task); + + // Assert - Verify the obsolete WithMode method was NOT called + withModeCalled.ShouldBeFalse(); + } + + #endregion + + #region Pipeline Execution Tests + + [Fact] + public async Task ExecuteAsync_CallsForTableWithCorrectTableName() + { + // Arrange + var task = CreateTask("WorkOrder", UpdateTypes.Daily); + string? receivedTableName = null; + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()).Returns(mockBuilder); + mockBuilder.WithMinimumDate(Arg.Any()).Returns(mockBuilder); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()) + .Returns(callInfo => + { + receivedTableName = callInfo.Arg(); + return mockBuilder; + }); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act + await sut.ExecuteAsync(task); + + // Assert + receivedTableName.ShouldBe("WorkOrder"); + } + + [Fact] + public async Task ExecuteAsync_CallsWithMinimumDateWithTaskMinimumDt() + { + // Arrange + var minDt = new DateTime(2024, 1, 15, 10, 30, 0, DateTimeKind.Utc); + var task = CreateTask("TestTable", UpdateTypes.Daily, minDt); + DateTime? receivedMinDt = null; + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()).Returns(mockBuilder); + mockBuilder.WithMinimumDate(Arg.Any()) + .Returns(callInfo => + { + receivedMinDt = callInfo.Arg(); + return mockBuilder; + }); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()).Returns(mockBuilder); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act + await sut.ExecuteAsync(task); + + // Assert + receivedMinDt.ShouldBe(minDt); + } + + [Fact] + public async Task ExecuteAsync_SuccessfulPipeline_CompletesUpdateAsSuccess() + { + // Arrange + var task = CreateTask("TestTable", UpdateTypes.Daily); + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(totalRows: 100); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()).Returns(mockBuilder); + mockBuilder.WithMinimumDate(Arg.Any()).Returns(mockBuilder); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()).Returns(mockBuilder); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act + await sut.ExecuteAsync(task); + + // Assert + await _updateRepository.Received(1).CompleteUpdateAsync( + Arg.Any(), + Arg.Is(rows => rows == 100), + Arg.Is(success => success == true), + Arg.Any()); + } + + [Fact] + public async Task ExecuteAsync_FailedPipeline_ThrowsAndCompletesUpdateAsFailure() + { + // Arrange + var task = CreateTask("TestTable", UpdateTypes.Daily); + + // Pre-create the test pipeline to avoid NSubstitute issues + var testPipeline = CreateTestPipeline(success: false); + + var mockBuilder = Substitute.For(); + mockBuilder.WithUpdateType(Arg.Any()).Returns(mockBuilder); + mockBuilder.WithMinimumDate(Arg.Any()).Returns(mockBuilder); + mockBuilder.Build().Returns(testPipeline); + + var mockFactory = Substitute.For(); + mockFactory.ForTable(Arg.Any()).Returns(mockBuilder); + + var sut = new TableSyncOperation( + mockFactory, + _updateRepository, + _options, + NullLogger.Instance, + _metrics); + + // Act & Assert + await Should.ThrowAsync(() => sut.ExecuteAsync(task)); + + await _updateRepository.Received(1).CompleteUpdateAsync( + Arg.Any(), + Arg.Is(rows => rows == -1), + Arg.Is(success => success == false), + Arg.Any()); + } + + #endregion + + #region Helper Methods + + private static DataUpdateTask CreateTask(string tableName, UpdateTypes updateType, DateTime? minDt = null) + { + return new DataUpdateTask + { + TableName = tableName, + SourceSystem = "JDE", + SourceData = tableName.ToUpper(), + UpdateType = updateType, + MinimumDt = minDt, + Config = new DataSourceConfig + { + TableName = tableName, + SourceSystem = "JDE", + SourceData = tableName.ToUpper(), + IsEnabled = true, + MassConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 10080 }, + DailyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 1440 }, + HourlyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 60 } + } + }; + } + + /// + /// Creates a real EtlPipeline with mocked source and destination for testing. + /// + private static EtlPipeline CreateTestPipeline(bool success = true, long totalRows = 0) + { + // Create mock source + var mockSource = Substitute.For(); + mockSource.SourceName.Returns("TestSource"); + var mockReader = Substitute.For(); + mockReader.Read().Returns(false); // No rows + mockSource.ReadDataAsync(Arg.Any()).Returns(mockReader); + + // Create mock destination + var mockDestination = Substitute.For(); + mockDestination.DestinationName.Returns("TestDestination"); + + if (success) + { + mockDestination.WriteAsync(Arg.Any(), Arg.Any()) + .Returns(new DestinationResult(totalRows, 1, TimeSpan.FromMilliseconds(10))); + } + else + { + mockDestination.WriteAsync(Arg.Any(), Arg.Any()) + .Returns>(_ => throw new Exception("Pipeline failed")); + } + + // Build the pipeline using the real builder + return new EtlPipelineBuilder() + .WithName("TestPipeline") + .WithSource(mockSource) + .WithDestination(mockDestination) + .Build(); + } + + #endregion +}