ba54a87be5
Align ConfigManager with DataSync's per-file pipeline format (pipeline.*.json) by reusing EtlPipelineConfig types directly, eliminating duplicate models and simplifying the codebase. Removes ~3200 lines of obsolete code.
407 lines
13 KiB
C#
407 lines
13 KiB
C#
using System.Data;
|
|
using System.Diagnostics.Metrics;
|
|
using JdeScoping.Core.Models.Enums;
|
|
using JdeScoping.DataSync.Configuration;
|
|
using JdeScoping.DataSync.Contracts;
|
|
using JdeScoping.DataSync.Etl.Contracts;
|
|
using JdeScoping.DataSync.Etl.Pipeline;
|
|
using JdeScoping.DataSync.Etl.Results;
|
|
using JdeScoping.DataSync.Models;
|
|
using JdeScoping.DataSync.Options;
|
|
using JdeScoping.DataSync.Services;
|
|
using JdeScoping.DataSync.Telemetry;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Microsoft.Extensions.Options;
|
|
using NSubstitute;
|
|
using Shouldly;
|
|
|
|
namespace JdeScoping.DataSync.Tests.Services;
|
|
|
|
/// <summary>
|
|
/// Unit tests for TableSyncOperation.
|
|
/// Tests that the operation correctly uses the ETL pipeline builder.
|
|
/// </summary>
|
|
public class TableSyncOperationTests
|
|
{
|
|
private readonly IDataUpdateRepository _updateRepository;
|
|
private readonly IOptions<DataSyncOptions> _options;
|
|
private readonly DataSyncMetrics _metrics;
|
|
|
|
public TableSyncOperationTests()
|
|
{
|
|
_updateRepository = Substitute.For<IDataUpdateRepository>();
|
|
_updateRepository.StartUpdateAsync(
|
|
Arg.Any<string>(),
|
|
Arg.Any<string>(),
|
|
Arg.Any<string>(),
|
|
Arg.Any<UpdateTypes>(),
|
|
Arg.Any<string?>(),
|
|
Arg.Any<CancellationToken>())
|
|
.Returns(1);
|
|
|
|
_options = Microsoft.Extensions.Options.Options.Create(new DataSyncOptions());
|
|
|
|
var services = new ServiceCollection();
|
|
services.AddMetrics();
|
|
var provider = services.BuildServiceProvider();
|
|
var meterFactory = provider.GetRequiredService<IMeterFactory>();
|
|
_metrics = new DataSyncMetrics(meterFactory);
|
|
}
|
|
|
|
#region Pipeline Builder Tests
|
|
|
|
[Fact]
|
|
public async Task ExecuteAsync_WithDailyUpdateType_CallsBuildWithDailyUpdateType()
|
|
{
|
|
// Arrange
|
|
var task = CreateTask("TestTable", UpdateTypes.Daily);
|
|
UpdateTypes? receivedUpdateType = null;
|
|
|
|
var testPipeline = CreateTestPipeline();
|
|
|
|
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
|
mockBuilder.Build(
|
|
Arg.Any<EtlPipelineConfig>(),
|
|
Arg.Do<UpdateTypes>(ut => receivedUpdateType = ut),
|
|
Arg.Any<DateTime?>())
|
|
.Returns(testPipeline);
|
|
|
|
var sut = new TableSyncOperation(
|
|
mockBuilder,
|
|
_updateRepository,
|
|
_options,
|
|
NullLogger<TableSyncOperation>.Instance,
|
|
_metrics);
|
|
|
|
// Act
|
|
await sut.ExecuteAsync(task);
|
|
|
|
// Assert
|
|
receivedUpdateType.ShouldBe(UpdateTypes.Daily);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ExecuteAsync_WithHourlyUpdateType_CallsBuildWithHourlyUpdateType()
|
|
{
|
|
// Arrange
|
|
var task = CreateTask("TestTable", UpdateTypes.Hourly);
|
|
UpdateTypes? receivedUpdateType = null;
|
|
|
|
var testPipeline = CreateTestPipeline();
|
|
|
|
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
|
mockBuilder.Build(
|
|
Arg.Any<EtlPipelineConfig>(),
|
|
Arg.Do<UpdateTypes>(ut => receivedUpdateType = ut),
|
|
Arg.Any<DateTime?>())
|
|
.Returns(testPipeline);
|
|
|
|
var sut = new TableSyncOperation(
|
|
mockBuilder,
|
|
_updateRepository,
|
|
_options,
|
|
NullLogger<TableSyncOperation>.Instance,
|
|
_metrics);
|
|
|
|
// Act
|
|
await sut.ExecuteAsync(task);
|
|
|
|
// Assert
|
|
receivedUpdateType.ShouldBe(UpdateTypes.Hourly);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ExecuteAsync_WithMassUpdateType_CallsBuildWithMassUpdateType()
|
|
{
|
|
// Arrange
|
|
var task = CreateTask("TestTable", UpdateTypes.Mass);
|
|
UpdateTypes? receivedUpdateType = null;
|
|
|
|
var testPipeline = CreateTestPipeline();
|
|
|
|
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
|
mockBuilder.Build(
|
|
Arg.Any<EtlPipelineConfig>(),
|
|
Arg.Do<UpdateTypes>(ut => receivedUpdateType = ut),
|
|
Arg.Any<DateTime?>())
|
|
.Returns(testPipeline);
|
|
|
|
var sut = new TableSyncOperation(
|
|
mockBuilder,
|
|
_updateRepository,
|
|
_options,
|
|
NullLogger<TableSyncOperation>.Instance,
|
|
_metrics);
|
|
|
|
// Act
|
|
await sut.ExecuteAsync(task);
|
|
|
|
// Assert
|
|
receivedUpdateType.ShouldBe(UpdateTypes.Mass);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ExecuteAsync_CallsBuildWithCorrectPipelineConfig()
|
|
{
|
|
// Arrange
|
|
var task = CreateTask("WorkOrder", UpdateTypes.Daily);
|
|
EtlPipelineConfig? receivedConfig = null;
|
|
|
|
var testPipeline = CreateTestPipeline();
|
|
|
|
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
|
mockBuilder.Build(
|
|
Arg.Do<EtlPipelineConfig>(c => receivedConfig = c),
|
|
Arg.Any<UpdateTypes>(),
|
|
Arg.Any<DateTime?>())
|
|
.Returns(testPipeline);
|
|
|
|
var sut = new TableSyncOperation(
|
|
mockBuilder,
|
|
_updateRepository,
|
|
_options,
|
|
NullLogger<TableSyncOperation>.Instance,
|
|
_metrics);
|
|
|
|
// Act
|
|
await sut.ExecuteAsync(task);
|
|
|
|
// Assert
|
|
receivedConfig.ShouldNotBeNull();
|
|
receivedConfig.Name.ShouldBe("WorkOrder");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ExecuteAsync_CallsBuildWithCorrectMinimumDate()
|
|
{
|
|
// Arrange
|
|
var minDt = new DateTime(2024, 1, 15, 10, 30, 0, DateTimeKind.Utc);
|
|
var task = CreateTask("TestTable", UpdateTypes.Daily, minDt);
|
|
DateTime? receivedMinDt = null;
|
|
|
|
var testPipeline = CreateTestPipeline();
|
|
|
|
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
|
mockBuilder.Build(
|
|
Arg.Any<EtlPipelineConfig>(),
|
|
Arg.Any<UpdateTypes>(),
|
|
Arg.Do<DateTime?>(dt => receivedMinDt = dt))
|
|
.Returns(testPipeline);
|
|
|
|
var sut = new TableSyncOperation(
|
|
mockBuilder,
|
|
_updateRepository,
|
|
_options,
|
|
NullLogger<TableSyncOperation>.Instance,
|
|
_metrics);
|
|
|
|
// Act
|
|
await sut.ExecuteAsync(task);
|
|
|
|
// Assert
|
|
receivedMinDt.ShouldBe(minDt);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ExecuteAsync_TaskWithNoPipeline_ThrowsInvalidOperationException()
|
|
{
|
|
// Arrange
|
|
var task = new DataUpdateTask
|
|
{
|
|
TableName = "TestTable",
|
|
SourceSystem = "JDE",
|
|
SourceData = "TESTTABLE",
|
|
UpdateType = UpdateTypes.Daily,
|
|
Pipeline = null // No pipeline!
|
|
};
|
|
|
|
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
|
|
|
var sut = new TableSyncOperation(
|
|
mockBuilder,
|
|
_updateRepository,
|
|
_options,
|
|
NullLogger<TableSyncOperation>.Instance,
|
|
_metrics);
|
|
|
|
// Act & Assert
|
|
var ex = await Should.ThrowAsync<InvalidOperationException>(() => sut.ExecuteAsync(task));
|
|
ex.Message.ShouldContain("No pipeline configuration");
|
|
ex.Message.ShouldContain("TestTable");
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Pipeline Execution Tests
|
|
|
|
[Fact]
|
|
public async Task ExecuteAsync_SuccessfulPipeline_CompletesUpdateAsSuccess()
|
|
{
|
|
// Arrange
|
|
var task = CreateTask("TestTable", UpdateTypes.Daily);
|
|
var testPipeline = CreateTestPipeline(totalRows: 100);
|
|
|
|
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
|
mockBuilder.Build(
|
|
Arg.Any<EtlPipelineConfig>(),
|
|
Arg.Any<UpdateTypes>(),
|
|
Arg.Any<DateTime?>())
|
|
.Returns(testPipeline);
|
|
|
|
var sut = new TableSyncOperation(
|
|
mockBuilder,
|
|
_updateRepository,
|
|
_options,
|
|
NullLogger<TableSyncOperation>.Instance,
|
|
_metrics);
|
|
|
|
// Act
|
|
await sut.ExecuteAsync(task);
|
|
|
|
// Assert
|
|
await _updateRepository.Received(1).CompleteUpdateAsync(
|
|
Arg.Any<int>(),
|
|
Arg.Is<long>(rows => rows == 100),
|
|
Arg.Is<bool>(success => success == true),
|
|
Arg.Any<CancellationToken>());
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ExecuteAsync_PassesParametersJsonToStartUpdate()
|
|
{
|
|
// Arrange
|
|
var minDt = new DateTime(2024, 1, 15, 10, 30, 0, DateTimeKind.Utc);
|
|
var task = CreateTask("TestTable", UpdateTypes.Daily, minDt);
|
|
string? capturedParameters = null;
|
|
|
|
_updateRepository.StartUpdateAsync(
|
|
Arg.Any<string>(),
|
|
Arg.Any<string>(),
|
|
Arg.Any<string>(),
|
|
Arg.Any<UpdateTypes>(),
|
|
Arg.Do<string?>(p => capturedParameters = p),
|
|
Arg.Any<CancellationToken>())
|
|
.Returns(1);
|
|
|
|
var testPipeline = CreateTestPipeline();
|
|
|
|
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
|
mockBuilder.Build(
|
|
Arg.Any<EtlPipelineConfig>(),
|
|
Arg.Any<UpdateTypes>(),
|
|
Arg.Any<DateTime?>())
|
|
.Returns(testPipeline);
|
|
|
|
var sut = new TableSyncOperation(
|
|
mockBuilder,
|
|
_updateRepository,
|
|
_options,
|
|
NullLogger<TableSyncOperation>.Instance,
|
|
_metrics);
|
|
|
|
// Act
|
|
await sut.ExecuteAsync(task);
|
|
|
|
// Assert
|
|
capturedParameters.ShouldNotBeNull();
|
|
capturedParameters.ShouldContain("OperationId");
|
|
capturedParameters.ShouldContain("MinimumDt");
|
|
capturedParameters.ShouldContain("2024-01-15");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ExecuteAsync_FailedPipeline_ThrowsAndCompletesUpdateAsFailure()
|
|
{
|
|
// Arrange
|
|
var task = CreateTask("TestTable", UpdateTypes.Daily);
|
|
var testPipeline = CreateTestPipeline(success: false);
|
|
|
|
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
|
|
mockBuilder.Build(
|
|
Arg.Any<EtlPipelineConfig>(),
|
|
Arg.Any<UpdateTypes>(),
|
|
Arg.Any<DateTime?>())
|
|
.Returns(testPipeline);
|
|
|
|
var sut = new TableSyncOperation(
|
|
mockBuilder,
|
|
_updateRepository,
|
|
_options,
|
|
NullLogger<TableSyncOperation>.Instance,
|
|
_metrics);
|
|
|
|
// Act & Assert
|
|
await Should.ThrowAsync<InvalidOperationException>(() => sut.ExecuteAsync(task));
|
|
|
|
await _updateRepository.Received(1).CompleteUpdateAsync(
|
|
Arg.Any<int>(),
|
|
Arg.Is<long>(rows => rows == -1),
|
|
Arg.Is<bool>(success => success == false),
|
|
Arg.Any<CancellationToken>());
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Helper Methods
|
|
|
|
private static DataUpdateTask CreateTask(string tableName, UpdateTypes updateType, DateTime? minDt = null)
|
|
{
|
|
return new DataUpdateTask
|
|
{
|
|
TableName = tableName,
|
|
SourceSystem = "JDE",
|
|
SourceData = tableName.ToUpper(),
|
|
UpdateType = updateType,
|
|
MinimumDt = minDt,
|
|
Pipeline = new EtlPipelineConfig
|
|
{
|
|
Name = tableName,
|
|
IsEnabled = true,
|
|
MassSyncIntervalMinutes = 10080,
|
|
DailySyncIntervalMinutes = 1440,
|
|
HourlySyncIntervalMinutes = 60,
|
|
Source = new SourceElement { Connection = "JDE", Query = "SELECT 1" },
|
|
Destination = new DestinationElement { Table = tableName, MatchColumns = ["Id"] }
|
|
}
|
|
};
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates a real EtlPipeline with mocked source and destination for testing.
|
|
/// </summary>
|
|
private static EtlPipeline CreateTestPipeline(bool success = true, long totalRows = 0)
|
|
{
|
|
// Create mock source
|
|
var mockSource = Substitute.For<IImportSource>();
|
|
mockSource.SourceName.Returns("TestSource");
|
|
var mockReader = Substitute.For<IDataReader>();
|
|
mockReader.Read().Returns(false); // No rows
|
|
mockSource.ReadDataAsync(Arg.Any<CancellationToken>()).Returns(mockReader);
|
|
|
|
// Create mock destination
|
|
var mockDestination = Substitute.For<IImportDestination>();
|
|
mockDestination.DestinationName.Returns("TestDestination");
|
|
|
|
if (success)
|
|
{
|
|
mockDestination.WriteAsync(Arg.Any<IDataReader>(), Arg.Any<CancellationToken>())
|
|
.Returns(new DestinationResult(totalRows, 1, TimeSpan.FromMilliseconds(10)));
|
|
}
|
|
else
|
|
{
|
|
mockDestination.WriteAsync(Arg.Any<IDataReader>(), Arg.Any<CancellationToken>())
|
|
.Returns<Task<DestinationResult>>(_ => throw new Exception("Pipeline failed"));
|
|
}
|
|
|
|
// Build the pipeline using the real builder
|
|
return new EtlPipelineBuilder()
|
|
.WithName("TestPipeline")
|
|
.WithSource(mockSource)
|
|
.WithDestination(mockDestination)
|
|
.Build();
|
|
}
|
|
|
|
#endregion
|
|
}
|