refactor(datasync): use WithUpdateType instead of WithMode in TableSyncOperation
Update TableSyncOperation to pass UpdateTypes directly to the pipeline builder using WithUpdateType() instead of mapping to SyncMode and calling the deprecated WithMode() method. This enables proper schedule-based configuration handling where Daily and Hourly have distinct behaviors. - Remove SyncMode mapping logic from ExecuteSyncCoreAsync - Call WithUpdateType(task.UpdateType) directly - Update log message to reflect UpdateType instead of SyncMode - Add TableSyncOperationTests verifying WithUpdateType is called correctly
This commit is contained in:
@@ -133,15 +133,12 @@ public class TableSyncOperation : ITableSyncOperation
|
||||
/// </summary>
|
||||
private async Task<long> ExecuteSyncCoreAsync(DataUpdateTask task, CancellationToken cancellationToken)
|
||||
{
|
||||
// Determine sync mode based on update type
|
||||
var syncMode = task.UpdateType == UpdateTypes.Mass ? SyncMode.Mass : SyncMode.Incremental;
|
||||
_logger.LogDebug("Building pipeline for {Table} with UpdateType={UpdateType}", task.TableName, task.UpdateType);
|
||||
|
||||
_logger.LogDebug("Building pipeline for {Table} in {Mode} mode", task.TableName, syncMode);
|
||||
|
||||
// Build and execute the pipeline
|
||||
// Build and execute the pipeline using the task's UpdateType directly
|
||||
var pipeline = _pipelineFactory
|
||||
.ForTable(task.TableName)
|
||||
.WithMode(syncMode)
|
||||
.WithUpdateType(task.UpdateType)
|
||||
.WithMinimumDate(task.MinimumDt)
|
||||
.Build();
|
||||
|
||||
|
||||
@@ -0,0 +1,411 @@
|
||||
using System.Data;
|
||||
using System.Diagnostics.Metrics;
|
||||
using JdeScoping.Core.Models.Enums;
|
||||
using JdeScoping.DataSync.Contracts;
|
||||
using JdeScoping.DataSync.Etl.Contracts;
|
||||
using JdeScoping.DataSync.Etl.Pipeline;
|
||||
using JdeScoping.DataSync.Etl.Results;
|
||||
using JdeScoping.DataSync.Models;
|
||||
using JdeScoping.DataSync.Options;
|
||||
using JdeScoping.DataSync.Services;
|
||||
using JdeScoping.DataSync.Telemetry;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using NSubstitute;
|
||||
using Shouldly;
|
||||
|
||||
namespace JdeScoping.DataSync.Tests.Services;
|
||||
|
||||
/// <summary>
|
||||
/// Unit tests for TableSyncOperation.
|
||||
/// Tests that the operation correctly uses the ETL pipeline with UpdateTypes.
|
||||
/// </summary>
|
||||
public class TableSyncOperationTests
|
||||
{
|
||||
private readonly IDataUpdateRepository _updateRepository;
|
||||
private readonly IOptions<DataSyncOptions> _options;
|
||||
private readonly DataSyncMetrics _metrics;
|
||||
|
||||
public TableSyncOperationTests()
|
||||
{
|
||||
_updateRepository = Substitute.For<IDataUpdateRepository>();
|
||||
_updateRepository.StartUpdateAsync(
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<string>(),
|
||||
Arg.Any<UpdateTypes>(),
|
||||
Arg.Any<CancellationToken>())
|
||||
.Returns(1);
|
||||
|
||||
_options = Microsoft.Extensions.Options.Options.Create(new DataSyncOptions());
|
||||
|
||||
var services = new ServiceCollection();
|
||||
services.AddMetrics();
|
||||
var provider = services.BuildServiceProvider();
|
||||
var meterFactory = provider.GetRequiredService<IMeterFactory>();
|
||||
_metrics = new DataSyncMetrics(meterFactory);
|
||||
}
|
||||
|
||||
#region WithUpdateType Tests
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_WithUpdateTypesDaily_CallsWithUpdateTypeWithDaily()
|
||||
{
|
||||
// Arrange
|
||||
var task = CreateTask("TestTable", UpdateTypes.Daily);
|
||||
UpdateTypes? receivedUpdateType = null;
|
||||
|
||||
// Pre-create the test pipeline to avoid NSubstitute issues
|
||||
var testPipeline = CreateTestPipeline();
|
||||
|
||||
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
||||
mockBuilder.WithUpdateType(Arg.Any<UpdateTypes>())
|
||||
.Returns(callInfo =>
|
||||
{
|
||||
receivedUpdateType = callInfo.Arg<UpdateTypes>();
|
||||
return mockBuilder;
|
||||
});
|
||||
mockBuilder.WithMinimumDate(Arg.Any<DateTime?>()).Returns(mockBuilder);
|
||||
mockBuilder.Build().Returns(testPipeline);
|
||||
|
||||
var mockFactory = Substitute.For<IEtlPipelineFactory>();
|
||||
mockFactory.ForTable(Arg.Any<string>()).Returns(mockBuilder);
|
||||
|
||||
var sut = new TableSyncOperation(
|
||||
mockFactory,
|
||||
_updateRepository,
|
||||
_options,
|
||||
NullLogger<TableSyncOperation>.Instance,
|
||||
_metrics);
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert - Verify WithUpdateType was called with Daily (not mapped to Incremental)
|
||||
receivedUpdateType.ShouldBe(UpdateTypes.Daily);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_WithUpdateTypesHourly_CallsWithUpdateTypeWithHourly()
|
||||
{
|
||||
// Arrange
|
||||
var task = CreateTask("TestTable", UpdateTypes.Hourly);
|
||||
UpdateTypes? receivedUpdateType = null;
|
||||
|
||||
// Pre-create the test pipeline to avoid NSubstitute issues
|
||||
var testPipeline = CreateTestPipeline();
|
||||
|
||||
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
||||
mockBuilder.WithUpdateType(Arg.Any<UpdateTypes>())
|
||||
.Returns(callInfo =>
|
||||
{
|
||||
receivedUpdateType = callInfo.Arg<UpdateTypes>();
|
||||
return mockBuilder;
|
||||
});
|
||||
mockBuilder.WithMinimumDate(Arg.Any<DateTime?>()).Returns(mockBuilder);
|
||||
mockBuilder.Build().Returns(testPipeline);
|
||||
|
||||
var mockFactory = Substitute.For<IEtlPipelineFactory>();
|
||||
mockFactory.ForTable(Arg.Any<string>()).Returns(mockBuilder);
|
||||
|
||||
var sut = new TableSyncOperation(
|
||||
mockFactory,
|
||||
_updateRepository,
|
||||
_options,
|
||||
NullLogger<TableSyncOperation>.Instance,
|
||||
_metrics);
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert
|
||||
receivedUpdateType.ShouldBe(UpdateTypes.Hourly);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_WithUpdateTypesMass_CallsWithUpdateTypeWithMass()
|
||||
{
|
||||
// Arrange
|
||||
var task = CreateTask("TestTable", UpdateTypes.Mass);
|
||||
UpdateTypes? receivedUpdateType = null;
|
||||
|
||||
// Pre-create the test pipeline to avoid NSubstitute issues
|
||||
var testPipeline = CreateTestPipeline();
|
||||
|
||||
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
||||
mockBuilder.WithUpdateType(Arg.Any<UpdateTypes>())
|
||||
.Returns(callInfo =>
|
||||
{
|
||||
receivedUpdateType = callInfo.Arg<UpdateTypes>();
|
||||
return mockBuilder;
|
||||
});
|
||||
mockBuilder.WithMinimumDate(Arg.Any<DateTime?>()).Returns(mockBuilder);
|
||||
mockBuilder.Build().Returns(testPipeline);
|
||||
|
||||
var mockFactory = Substitute.For<IEtlPipelineFactory>();
|
||||
mockFactory.ForTable(Arg.Any<string>()).Returns(mockBuilder);
|
||||
|
||||
var sut = new TableSyncOperation(
|
||||
mockFactory,
|
||||
_updateRepository,
|
||||
_options,
|
||||
NullLogger<TableSyncOperation>.Instance,
|
||||
_metrics);
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert
|
||||
receivedUpdateType.ShouldBe(UpdateTypes.Mass);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_DoesNotCallObsoleteWithModeMethod()
|
||||
{
|
||||
// Arrange
|
||||
var task = CreateTask("TestTable", UpdateTypes.Daily);
|
||||
var withModeCalled = false;
|
||||
|
||||
// Pre-create the test pipeline to avoid NSubstitute issues
|
||||
var testPipeline = CreateTestPipeline();
|
||||
|
||||
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
||||
mockBuilder.WithUpdateType(Arg.Any<UpdateTypes>()).Returns(mockBuilder);
|
||||
mockBuilder.WithMode(Arg.Any<SyncMode>())
|
||||
.Returns(callInfo =>
|
||||
{
|
||||
withModeCalled = true;
|
||||
return mockBuilder;
|
||||
});
|
||||
mockBuilder.WithMinimumDate(Arg.Any<DateTime?>()).Returns(mockBuilder);
|
||||
mockBuilder.Build().Returns(testPipeline);
|
||||
|
||||
var mockFactory = Substitute.For<IEtlPipelineFactory>();
|
||||
mockFactory.ForTable(Arg.Any<string>()).Returns(mockBuilder);
|
||||
|
||||
var sut = new TableSyncOperation(
|
||||
mockFactory,
|
||||
_updateRepository,
|
||||
_options,
|
||||
NullLogger<TableSyncOperation>.Instance,
|
||||
_metrics);
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert - Verify the obsolete WithMode method was NOT called
|
||||
withModeCalled.ShouldBeFalse();
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Pipeline Execution Tests
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_CallsForTableWithCorrectTableName()
|
||||
{
|
||||
// Arrange
|
||||
var task = CreateTask("WorkOrder", UpdateTypes.Daily);
|
||||
string? receivedTableName = null;
|
||||
|
||||
// Pre-create the test pipeline to avoid NSubstitute issues
|
||||
var testPipeline = CreateTestPipeline();
|
||||
|
||||
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
||||
mockBuilder.WithUpdateType(Arg.Any<UpdateTypes>()).Returns(mockBuilder);
|
||||
mockBuilder.WithMinimumDate(Arg.Any<DateTime?>()).Returns(mockBuilder);
|
||||
mockBuilder.Build().Returns(testPipeline);
|
||||
|
||||
var mockFactory = Substitute.For<IEtlPipelineFactory>();
|
||||
mockFactory.ForTable(Arg.Any<string>())
|
||||
.Returns(callInfo =>
|
||||
{
|
||||
receivedTableName = callInfo.Arg<string>();
|
||||
return mockBuilder;
|
||||
});
|
||||
|
||||
var sut = new TableSyncOperation(
|
||||
mockFactory,
|
||||
_updateRepository,
|
||||
_options,
|
||||
NullLogger<TableSyncOperation>.Instance,
|
||||
_metrics);
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert
|
||||
receivedTableName.ShouldBe("WorkOrder");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_CallsWithMinimumDateWithTaskMinimumDt()
|
||||
{
|
||||
// Arrange
|
||||
var minDt = new DateTime(2024, 1, 15, 10, 30, 0, DateTimeKind.Utc);
|
||||
var task = CreateTask("TestTable", UpdateTypes.Daily, minDt);
|
||||
DateTime? receivedMinDt = null;
|
||||
|
||||
// Pre-create the test pipeline to avoid NSubstitute issues
|
||||
var testPipeline = CreateTestPipeline();
|
||||
|
||||
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
||||
mockBuilder.WithUpdateType(Arg.Any<UpdateTypes>()).Returns(mockBuilder);
|
||||
mockBuilder.WithMinimumDate(Arg.Any<DateTime?>())
|
||||
.Returns(callInfo =>
|
||||
{
|
||||
receivedMinDt = callInfo.Arg<DateTime?>();
|
||||
return mockBuilder;
|
||||
});
|
||||
mockBuilder.Build().Returns(testPipeline);
|
||||
|
||||
var mockFactory = Substitute.For<IEtlPipelineFactory>();
|
||||
mockFactory.ForTable(Arg.Any<string>()).Returns(mockBuilder);
|
||||
|
||||
var sut = new TableSyncOperation(
|
||||
mockFactory,
|
||||
_updateRepository,
|
||||
_options,
|
||||
NullLogger<TableSyncOperation>.Instance,
|
||||
_metrics);
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert
|
||||
receivedMinDt.ShouldBe(minDt);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_SuccessfulPipeline_CompletesUpdateAsSuccess()
|
||||
{
|
||||
// Arrange
|
||||
var task = CreateTask("TestTable", UpdateTypes.Daily);
|
||||
|
||||
// Pre-create the test pipeline to avoid NSubstitute issues
|
||||
var testPipeline = CreateTestPipeline(totalRows: 100);
|
||||
|
||||
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
||||
mockBuilder.WithUpdateType(Arg.Any<UpdateTypes>()).Returns(mockBuilder);
|
||||
mockBuilder.WithMinimumDate(Arg.Any<DateTime?>()).Returns(mockBuilder);
|
||||
mockBuilder.Build().Returns(testPipeline);
|
||||
|
||||
var mockFactory = Substitute.For<IEtlPipelineFactory>();
|
||||
mockFactory.ForTable(Arg.Any<string>()).Returns(mockBuilder);
|
||||
|
||||
var sut = new TableSyncOperation(
|
||||
mockFactory,
|
||||
_updateRepository,
|
||||
_options,
|
||||
NullLogger<TableSyncOperation>.Instance,
|
||||
_metrics);
|
||||
|
||||
// Act
|
||||
await sut.ExecuteAsync(task);
|
||||
|
||||
// Assert
|
||||
await _updateRepository.Received(1).CompleteUpdateAsync(
|
||||
Arg.Any<int>(),
|
||||
Arg.Is<long>(rows => rows == 100),
|
||||
Arg.Is<bool>(success => success == true),
|
||||
Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ExecuteAsync_FailedPipeline_ThrowsAndCompletesUpdateAsFailure()
|
||||
{
|
||||
// Arrange
|
||||
var task = CreateTask("TestTable", UpdateTypes.Daily);
|
||||
|
||||
// Pre-create the test pipeline to avoid NSubstitute issues
|
||||
var testPipeline = CreateTestPipeline(success: false);
|
||||
|
||||
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
||||
mockBuilder.WithUpdateType(Arg.Any<UpdateTypes>()).Returns(mockBuilder);
|
||||
mockBuilder.WithMinimumDate(Arg.Any<DateTime?>()).Returns(mockBuilder);
|
||||
mockBuilder.Build().Returns(testPipeline);
|
||||
|
||||
var mockFactory = Substitute.For<IEtlPipelineFactory>();
|
||||
mockFactory.ForTable(Arg.Any<string>()).Returns(mockBuilder);
|
||||
|
||||
var sut = new TableSyncOperation(
|
||||
mockFactory,
|
||||
_updateRepository,
|
||||
_options,
|
||||
NullLogger<TableSyncOperation>.Instance,
|
||||
_metrics);
|
||||
|
||||
// Act & Assert
|
||||
await Should.ThrowAsync<InvalidOperationException>(() => sut.ExecuteAsync(task));
|
||||
|
||||
await _updateRepository.Received(1).CompleteUpdateAsync(
|
||||
Arg.Any<int>(),
|
||||
Arg.Is<long>(rows => rows == -1),
|
||||
Arg.Is<bool>(success => success == false),
|
||||
Arg.Any<CancellationToken>());
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Helper Methods
|
||||
|
||||
private static DataUpdateTask CreateTask(string tableName, UpdateTypes updateType, DateTime? minDt = null)
|
||||
{
|
||||
return new DataUpdateTask
|
||||
{
|
||||
TableName = tableName,
|
||||
SourceSystem = "JDE",
|
||||
SourceData = tableName.ToUpper(),
|
||||
UpdateType = updateType,
|
||||
MinimumDt = minDt,
|
||||
Config = new DataSourceConfig
|
||||
{
|
||||
TableName = tableName,
|
||||
SourceSystem = "JDE",
|
||||
SourceData = tableName.ToUpper(),
|
||||
IsEnabled = true,
|
||||
MassConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 10080 },
|
||||
DailyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 1440 },
|
||||
HourlyConfig = new ScheduleConfig { Enabled = true, IntervalMinutes = 60 }
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a real EtlPipeline with mocked source and destination for testing.
|
||||
/// </summary>
|
||||
private static EtlPipeline CreateTestPipeline(bool success = true, long totalRows = 0)
|
||||
{
|
||||
// Create mock source
|
||||
var mockSource = Substitute.For<IImportSource>();
|
||||
mockSource.SourceName.Returns("TestSource");
|
||||
var mockReader = Substitute.For<IDataReader>();
|
||||
mockReader.Read().Returns(false); // No rows
|
||||
mockSource.ReadDataAsync(Arg.Any<CancellationToken>()).Returns(mockReader);
|
||||
|
||||
// Create mock destination
|
||||
var mockDestination = Substitute.For<IImportDestination>();
|
||||
mockDestination.DestinationName.Returns("TestDestination");
|
||||
|
||||
if (success)
|
||||
{
|
||||
mockDestination.WriteAsync(Arg.Any<IDataReader>(), Arg.Any<CancellationToken>())
|
||||
.Returns(new DestinationResult(totalRows, 1, TimeSpan.FromMilliseconds(10)));
|
||||
}
|
||||
else
|
||||
{
|
||||
mockDestination.WriteAsync(Arg.Any<IDataReader>(), Arg.Any<CancellationToken>())
|
||||
.Returns<Task<DestinationResult>>(_ => throw new Exception("Pipeline failed"));
|
||||
}
|
||||
|
||||
// Build the pipeline using the real builder
|
||||
return new EtlPipelineBuilder()
|
||||
.WithName("TestPipeline")
|
||||
.WithSource(mockSource)
|
||||
.WithDestination(mockDestination)
|
||||
.Build();
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
Reference in New Issue
Block a user