using System.Data;
using System.Diagnostics.Metrics;
using JdeScoping.Core.Models.Enums;
using JdeScoping.DataSync.Configuration;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Etl.Contracts;
using JdeScoping.DataSync.Etl.Pipeline;
using JdeScoping.DataSync.Etl.Results;
using JdeScoping.DataSync.Models;
using JdeScoping.DataSync.Options;
using JdeScoping.DataSync.Services;
using JdeScoping.DataSync.Telemetry;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NSubstitute;
using Shouldly;
namespace JdeScoping.DataSync.Tests.Services;
///
/// Unit tests for TableSyncOperation.
/// Tests that the operation correctly uses the ETL pipeline builder.
///
public class TableSyncOperationTests
{
private readonly IDataUpdateRepository _updateRepository;
private readonly IOptions _options;
private readonly DataSyncMetrics _metrics;
public TableSyncOperationTests()
{
_updateRepository = Substitute.For();
_updateRepository.StartUpdateAsync(
Arg.Any(),
Arg.Any(),
Arg.Any(),
Arg.Any(),
Arg.Any(),
Arg.Any())
.Returns(1);
_options = Microsoft.Extensions.Options.Options.Create(new DataSyncOptions());
var services = new ServiceCollection();
services.AddMetrics();
var provider = services.BuildServiceProvider();
var meterFactory = provider.GetRequiredService();
_metrics = new DataSyncMetrics(meterFactory);
}
#region Pipeline Builder Tests
[Fact]
public async Task ExecuteAsync_WithDailyUpdateType_CallsBuildWithDailyUpdateType()
{
// Arrange
var task = CreateTask("TestTable", UpdateTypes.Daily);
UpdateTypes? receivedUpdateType = null;
var testPipeline = CreateTestPipeline();
var mockBuilder = Substitute.For();
mockBuilder.Build(
Arg.Any(),
Arg.Do(ut => receivedUpdateType = ut),
Arg.Any())
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger.Instance,
_metrics);
// Act
await sut.ExecuteAsync(task);
// Assert
receivedUpdateType.ShouldBe(UpdateTypes.Daily);
}
[Fact]
public async Task ExecuteAsync_WithHourlyUpdateType_CallsBuildWithHourlyUpdateType()
{
// Arrange
var task = CreateTask("TestTable", UpdateTypes.Hourly);
UpdateTypes? receivedUpdateType = null;
var testPipeline = CreateTestPipeline();
var mockBuilder = Substitute.For();
mockBuilder.Build(
Arg.Any(),
Arg.Do(ut => receivedUpdateType = ut),
Arg.Any())
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger.Instance,
_metrics);
// Act
await sut.ExecuteAsync(task);
// Assert
receivedUpdateType.ShouldBe(UpdateTypes.Hourly);
}
[Fact]
public async Task ExecuteAsync_WithMassUpdateType_CallsBuildWithMassUpdateType()
{
// Arrange
var task = CreateTask("TestTable", UpdateTypes.Mass);
UpdateTypes? receivedUpdateType = null;
var testPipeline = CreateTestPipeline();
var mockBuilder = Substitute.For();
mockBuilder.Build(
Arg.Any(),
Arg.Do(ut => receivedUpdateType = ut),
Arg.Any())
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger.Instance,
_metrics);
// Act
await sut.ExecuteAsync(task);
// Assert
receivedUpdateType.ShouldBe(UpdateTypes.Mass);
}
[Fact]
public async Task ExecuteAsync_CallsBuildWithCorrectPipelineConfig()
{
// Arrange
var task = CreateTask("WorkOrder", UpdateTypes.Daily);
EtlPipelineConfig? receivedConfig = null;
var testPipeline = CreateTestPipeline();
var mockBuilder = Substitute.For();
mockBuilder.Build(
Arg.Do(c => receivedConfig = c),
Arg.Any(),
Arg.Any())
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger.Instance,
_metrics);
// Act
await sut.ExecuteAsync(task);
// Assert
receivedConfig.ShouldNotBeNull();
receivedConfig.Name.ShouldBe("WorkOrder");
}
[Fact]
public async Task ExecuteAsync_CallsBuildWithCorrectMinimumDate()
{
// Arrange
var minDt = new DateTime(2024, 1, 15, 10, 30, 0, DateTimeKind.Utc);
var task = CreateTask("TestTable", UpdateTypes.Daily, minDt);
DateTime? receivedMinDt = null;
var testPipeline = CreateTestPipeline();
var mockBuilder = Substitute.For();
mockBuilder.Build(
Arg.Any(),
Arg.Any(),
Arg.Do(dt => receivedMinDt = dt))
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger.Instance,
_metrics);
// Act
await sut.ExecuteAsync(task);
// Assert
receivedMinDt.ShouldBe(minDt);
}
[Fact]
public async Task ExecuteAsync_TaskWithNoPipeline_ThrowsInvalidOperationException()
{
// Arrange
var task = new DataUpdateTask
{
TableName = "TestTable",
SourceSystem = "JDE",
SourceData = "TESTTABLE",
UpdateType = UpdateTypes.Daily,
Pipeline = null // No pipeline!
};
var mockBuilder = Substitute.For();
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger.Instance,
_metrics);
// Act & Assert
var ex = await Should.ThrowAsync(() => sut.ExecuteAsync(task));
ex.Message.ShouldContain("No pipeline configuration");
ex.Message.ShouldContain("TestTable");
}
#endregion
#region Pipeline Execution Tests
[Fact]
public async Task ExecuteAsync_SuccessfulPipeline_CompletesUpdateAsSuccess()
{
// Arrange
var task = CreateTask("TestTable", UpdateTypes.Daily);
var testPipeline = CreateTestPipeline(totalRows: 100);
var mockBuilder = Substitute.For();
mockBuilder.Build(
Arg.Any(),
Arg.Any(),
Arg.Any())
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger.Instance,
_metrics);
// Act
await sut.ExecuteAsync(task);
// Assert
await _updateRepository.Received(1).CompleteUpdateAsync(
Arg.Any(),
Arg.Is(rows => rows == 100),
Arg.Is(success => success == true),
Arg.Any());
}
[Fact]
public async Task ExecuteAsync_PassesParametersJsonToStartUpdate()
{
// Arrange
var minDt = new DateTime(2024, 1, 15, 10, 30, 0, DateTimeKind.Utc);
var task = CreateTask("TestTable", UpdateTypes.Daily, minDt);
string? capturedParameters = null;
_updateRepository.StartUpdateAsync(
Arg.Any(),
Arg.Any(),
Arg.Any(),
Arg.Any(),
Arg.Do(p => capturedParameters = p),
Arg.Any())
.Returns(1);
var testPipeline = CreateTestPipeline();
var mockBuilder = Substitute.For();
mockBuilder.Build(
Arg.Any(),
Arg.Any(),
Arg.Any())
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger.Instance,
_metrics);
// Act
await sut.ExecuteAsync(task);
// Assert
capturedParameters.ShouldNotBeNull();
capturedParameters.ShouldContain("OperationId");
capturedParameters.ShouldContain("MinimumDt");
capturedParameters.ShouldContain("2024-01-15");
}
[Fact]
public async Task ExecuteAsync_FailedPipeline_ThrowsAndCompletesUpdateAsFailure()
{
// Arrange
var task = CreateTask("TestTable", UpdateTypes.Daily);
var testPipeline = CreateTestPipeline(success: false);
var mockBuilder = Substitute.For();
mockBuilder.Build(
Arg.Any(),
Arg.Any(),
Arg.Any())
.Returns(testPipeline);
var sut = new TableSyncOperation(
mockBuilder,
_updateRepository,
_options,
NullLogger.Instance,
_metrics);
// Act & Assert
await Should.ThrowAsync(() => sut.ExecuteAsync(task));
await _updateRepository.Received(1).CompleteUpdateAsync(
Arg.Any(),
Arg.Is(rows => rows == -1),
Arg.Is(success => success == false),
Arg.Any());
}
#endregion
#region Helper Methods
private static DataUpdateTask CreateTask(string tableName, UpdateTypes updateType, DateTime? minDt = null)
{
return new DataUpdateTask
{
TableName = tableName,
SourceSystem = "JDE",
SourceData = tableName.ToUpper(),
UpdateType = updateType,
MinimumDt = minDt,
Pipeline = new EtlPipelineConfig
{
Name = tableName,
IsEnabled = true,
MassSyncIntervalMinutes = 10080,
DailySyncIntervalMinutes = 1440,
HourlySyncIntervalMinutes = 60,
Source = new SourceElement { Connection = "JDE", Query = "SELECT 1" },
Destination = new DestinationElement { Table = tableName, MatchColumns = ["Id"] }
}
};
}
///
/// Creates a real EtlPipeline with mocked source and destination for testing.
///
private static EtlPipeline CreateTestPipeline(bool success = true, long totalRows = 0)
{
// Create mock source
var mockSource = Substitute.For();
mockSource.SourceName.Returns("TestSource");
var mockReader = Substitute.For();
mockReader.Read().Returns(false); // No rows
mockSource.ReadDataAsync(Arg.Any()).Returns(mockReader);
// Create mock destination
var mockDestination = Substitute.For();
mockDestination.DestinationName.Returns("TestDestination");
if (success)
{
mockDestination.WriteAsync(Arg.Any(), Arg.Any())
.Returns(new DestinationResult(totalRows, 1, TimeSpan.FromMilliseconds(10)));
}
else
{
mockDestination.WriteAsync(Arg.Any(), Arg.Any())
.Returns>(_ => throw new Exception("Pipeline failed"));
}
// Build the pipeline using the real builder
return new EtlPipelineBuilder()
.WithName("TestPipeline")
.WithSource(mockSource)
.WithDestination(mockDestination)
.Build();
}
#endregion
}