Files
Joseph Doherty ba54a87be5 refactor(configmanager): migrate to per-file pipeline system
Align ConfigManager with DataSync's per-file pipeline format (pipeline.*.json)
by reusing EtlPipelineConfig types directly, eliminating duplicate models and
simplifying the codebase. Removes ~3200 lines of obsolete code.
2026-01-23 02:30:48 -05:00

407 lines
13 KiB
C#

using System.Data;
using System.Diagnostics.Metrics;
using JdeScoping.Core.Models.Enums;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Etl.Contracts;
using JdeScoping.DataSync.Etl.Pipeline;
using JdeScoping.DataSync.Etl.Results;
using JdeScoping.DataSync.Models;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Services;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
using Shouldly;
namespace JdeScoping.DataSync.Tests.Services;
/// <summary>
/// Unit tests for TableSyncOperation.
/// Tests that the operation correctly uses the ETL pipeline builder.
/// </summary>
public class TableSyncOperationTests
{
private readonly IDataUpdateRepository _updateRepository;
private readonly IOptions<DataSyncOptions> _options;
private readonly DataSyncMetrics _metrics;
public TableSyncOperationTests()
{
_updateRepository = Substitute.For<IDataUpdateRepository>();
_updateRepository.StartUpdateAsync(
Arg.Any<string>(),
Arg.Any<string>(),
Arg.Any<string>(),
Arg.Any<UpdateTypes>(),
Arg.Any<string?>(),
Arg.Any<CancellationToken>())
.Returns(1);
_options = Microsoft.Extensions.Options.Options.Create(new DataSyncOptions());
var services = new ServiceCollection();
services.AddMetrics();
var provider = services.BuildServiceProvider();
var meterFactory = provider.GetRequiredService<IMeterFactory>();
_metrics = new DataSyncMetrics(meterFactory);
}
#region Pipeline Builder Tests
[Fact]
public async Task ExecuteAsync_WithDailyUpdateType_CallsBuildWithDailyUpdateType()
{
// Arrange
var task = CreateTask("TestTable", UpdateTypes.Daily);
UpdateTypes? receivedUpdateType = null;
var testPipeline = CreateTestPipeline();
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
mockBuilder.Build(
Arg.Any<EtlPipelineConfig>(),
Arg.Do<UpdateTypes>(ut => receivedUpdateType = ut),
Arg.Any<DateTime?>())
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger<TableSyncOperation>.Instance,
_metrics);
// Act
await sut.ExecuteAsync(task);
// Assert
receivedUpdateType.ShouldBe(UpdateTypes.Daily);
}
[Fact]
public async Task ExecuteAsync_WithHourlyUpdateType_CallsBuildWithHourlyUpdateType()
{
// Arrange
var task = CreateTask("TestTable", UpdateTypes.Hourly);
UpdateTypes? receivedUpdateType = null;
var testPipeline = CreateTestPipeline();
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
mockBuilder.Build(
Arg.Any<EtlPipelineConfig>(),
Arg.Do<UpdateTypes>(ut => receivedUpdateType = ut),
Arg.Any<DateTime?>())
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger<TableSyncOperation>.Instance,
_metrics);
// Act
await sut.ExecuteAsync(task);
// Assert
receivedUpdateType.ShouldBe(UpdateTypes.Hourly);
}
[Fact]
public async Task ExecuteAsync_WithMassUpdateType_CallsBuildWithMassUpdateType()
{
// Arrange
var task = CreateTask("TestTable", UpdateTypes.Mass);
UpdateTypes? receivedUpdateType = null;
var testPipeline = CreateTestPipeline();
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
mockBuilder.Build(
Arg.Any<EtlPipelineConfig>(),
Arg.Do<UpdateTypes>(ut => receivedUpdateType = ut),
Arg.Any<DateTime?>())
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger<TableSyncOperation>.Instance,
_metrics);
// Act
await sut.ExecuteAsync(task);
// Assert
receivedUpdateType.ShouldBe(UpdateTypes.Mass);
}
[Fact]
public async Task ExecuteAsync_CallsBuildWithCorrectPipelineConfig()
{
// Arrange
var task = CreateTask("WorkOrder", UpdateTypes.Daily);
EtlPipelineConfig? receivedConfig = null;
var testPipeline = CreateTestPipeline();
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
mockBuilder.Build(
Arg.Do<EtlPipelineConfig>(c => receivedConfig = c),
Arg.Any<UpdateTypes>(),
Arg.Any<DateTime?>())
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger<TableSyncOperation>.Instance,
_metrics);
// Act
await sut.ExecuteAsync(task);
// Assert
receivedConfig.ShouldNotBeNull();
receivedConfig.Name.ShouldBe("WorkOrder");
}
[Fact]
public async Task ExecuteAsync_CallsBuildWithCorrectMinimumDate()
{
// Arrange
var minDt = new DateTime(2024, 1, 15, 10, 30, 0, DateTimeKind.Utc);
var task = CreateTask("TestTable", UpdateTypes.Daily, minDt);
DateTime? receivedMinDt = null;
var testPipeline = CreateTestPipeline();
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
mockBuilder.Build(
Arg.Any<EtlPipelineConfig>(),
Arg.Any<UpdateTypes>(),
Arg.Do<DateTime?>(dt => receivedMinDt = dt))
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger<TableSyncOperation>.Instance,
_metrics);
// Act
await sut.ExecuteAsync(task);
// Assert
receivedMinDt.ShouldBe(minDt);
}
[Fact]
public async Task ExecuteAsync_TaskWithNoPipeline_ThrowsInvalidOperationException()
{
// Arrange
var task = new DataUpdateTask
{
TableName = "TestTable",
SourceSystem = "JDE",
SourceData = "TESTTABLE",
UpdateType = UpdateTypes.Daily,
Pipeline = null // No pipeline!
};
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger<TableSyncOperation>.Instance,
_metrics);
// Act & Assert
var ex = await Should.ThrowAsync<InvalidOperationException>(() => sut.ExecuteAsync(task));
ex.Message.ShouldContain("No pipeline configuration");
ex.Message.ShouldContain("TestTable");
}
#endregion
#region Pipeline Execution Tests
[Fact]
public async Task ExecuteAsync_SuccessfulPipeline_CompletesUpdateAsSuccess()
{
// Arrange
var task = CreateTask("TestTable", UpdateTypes.Daily);
var testPipeline = CreateTestPipeline(totalRows: 100);
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
mockBuilder.Build(
Arg.Any<EtlPipelineConfig>(),
Arg.Any<UpdateTypes>(),
Arg.Any<DateTime?>())
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger<TableSyncOperation>.Instance,
_metrics);
// Act
await sut.ExecuteAsync(task);
// Assert
await _updateRepository.Received(1).CompleteUpdateAsync(
Arg.Any<int>(),
Arg.Is<long>(rows => rows == 100),
Arg.Is<bool>(success => success == true),
Arg.Any<CancellationToken>());
}
[Fact]
public async Task ExecuteAsync_PassesParametersJsonToStartUpdate()
{
// Arrange
var minDt = new DateTime(2024, 1, 15, 10, 30, 0, DateTimeKind.Utc);
var task = CreateTask("TestTable", UpdateTypes.Daily, minDt);
string? capturedParameters = null;
_updateRepository.StartUpdateAsync(
Arg.Any<string>(),
Arg.Any<string>(),
Arg.Any<string>(),
Arg.Any<UpdateTypes>(),
Arg.Do<string?>(p => capturedParameters = p),
Arg.Any<CancellationToken>())
.Returns(1);
var testPipeline = CreateTestPipeline();
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
mockBuilder.Build(
Arg.Any<EtlPipelineConfig>(),
Arg.Any<UpdateTypes>(),
Arg.Any<DateTime?>())
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger<TableSyncOperation>.Instance,
_metrics);
// Act
await sut.ExecuteAsync(task);
// Assert
capturedParameters.ShouldNotBeNull();
capturedParameters.ShouldContain("OperationId");
capturedParameters.ShouldContain("MinimumDt");
capturedParameters.ShouldContain("2024-01-15");
}
[Fact]
public async Task ExecuteAsync_FailedPipeline_ThrowsAndCompletesUpdateAsFailure()
{
// Arrange
var task = CreateTask("TestTable", UpdateTypes.Daily);
var testPipeline = CreateTestPipeline(success: false);
var mockBuilder = Substitute.For<IEtlPipelineBuilder>();
mockBuilder.Build(
Arg.Any<EtlPipelineConfig>(),
Arg.Any<UpdateTypes>(),
Arg.Any<DateTime?>())
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger<TableSyncOperation>.Instance,
_metrics);
// Act & Assert
await Should.ThrowAsync<InvalidOperationException>(() => sut.ExecuteAsync(task));
await _updateRepository.Received(1).CompleteUpdateAsync(
Arg.Any<int>(),
Arg.Is<long>(rows => rows == -1),
Arg.Is<bool>(success => success == false),
Arg.Any<CancellationToken>());
}
#endregion
#region Helper Methods
private static DataUpdateTask CreateTask(string tableName, UpdateTypes updateType, DateTime? minDt = null)
{
return new DataUpdateTask
{
TableName = tableName,
SourceSystem = "JDE",
SourceData = tableName.ToUpper(),
UpdateType = updateType,
MinimumDt = minDt,
Pipeline = new EtlPipelineConfig
{
Name = tableName,
IsEnabled = true,
MassSyncIntervalMinutes = 10080,
DailySyncIntervalMinutes = 1440,
HourlySyncIntervalMinutes = 60,
Source = new SourceElement { Connection = "JDE", Query = "SELECT 1" },
Destination = new DestinationElement { Table = tableName, MatchColumns = ["Id"] }
}
};
}
/// <summary>
/// Creates a real EtlPipeline with mocked source and destination for testing.
/// </summary>
private static EtlPipeline CreateTestPipeline(bool success = true, long totalRows = 0)
{
// Create mock source
var mockSource = Substitute.For<IImportSource>();
mockSource.SourceName.Returns("TestSource");
var mockReader = Substitute.For<IDataReader>();
mockReader.Read().Returns(false); // No rows
mockSource.ReadDataAsync(Arg.Any<CancellationToken>()).Returns(mockReader);
// Create mock destination
var mockDestination = Substitute.For<IImportDestination>();
mockDestination.DestinationName.Returns("TestDestination");
if (success)
{
mockDestination.WriteAsync(Arg.Any<IDataReader>(), Arg.Any<CancellationToken>())
.Returns(new DestinationResult(totalRows, 1, TimeSpan.FromMilliseconds(10)));
}
else
{
mockDestination.WriteAsync(Arg.Any<IDataReader>(), Arg.Any<CancellationToken>())
.Returns<Task<DestinationResult>>(_ => throw new Exception("Pipeline failed"));
}
// Build the pipeline using the real builder
return new EtlPipelineBuilder()
.WithName("TestPipeline")
.WithSource(mockSource)
.WithDestination(mockDestination)
.Build();
}
#endregion
}