diff --git a/NEW/src/JdeScoping.DataSync/Etl/Pipeline/EtlPipeline.cs b/NEW/src/JdeScoping.DataSync/Etl/Pipeline/EtlPipeline.cs new file mode 100644 index 0000000..f9edfee --- /dev/null +++ b/NEW/src/JdeScoping.DataSync/Etl/Pipeline/EtlPipeline.cs @@ -0,0 +1,111 @@ +using System.Data; +using System.Diagnostics; +using JdeScoping.DataSync.Etl.Contracts; +using JdeScoping.DataSync.Etl.Results; +using Microsoft.Extensions.Logging; + +namespace JdeScoping.DataSync.Etl.Pipeline; + +public class EtlPipeline +{ + private readonly IImportSource _source; + private readonly IReadOnlyList _transformers; + private readonly IImportDestination _destination; + private readonly IReadOnlyList _preScripts; + private readonly IReadOnlyList _postScripts; + private readonly ILogger _logger; + + public string PipelineName { get; } + + internal EtlPipeline( + string name, + IImportSource source, + IReadOnlyList transformers, + IImportDestination destination, + IReadOnlyList preScripts, + IReadOnlyList postScripts, + ILogger logger) + { + PipelineName = name; + _source = source; + _transformers = transformers; + _destination = destination; + _preScripts = preScripts; + _postScripts = postScripts; + _logger = logger; + } + + public async Task ExecuteAsync(CancellationToken cancellationToken = default) + { + var steps = new List(); + var totalStopwatch = Stopwatch.StartNew(); + + _logger.LogInformation("Starting pipeline {PipelineName}", PipelineName); + + try + { + // 1. Run pre-scripts + foreach (var script in _preScripts) + { + var stepResult = await RunScriptAsync(script, cancellationToken); + steps.Add(stepResult); + } + + // 2. Open source + var sourceStopwatch = Stopwatch.StartNew(); + await using (_source) + { + var reader = await _source.ReadDataAsync(cancellationToken); + sourceStopwatch.Stop(); + steps.Add(new StepResult(_source.SourceName, "Source", 0, sourceStopwatch.Elapsed)); + + // 3. Apply transformers + foreach (var transformer in _transformers) + { + var transformStopwatch = Stopwatch.StartNew(); + reader = transformer.Transform(reader); + transformStopwatch.Stop(); + steps.Add(new StepResult(transformer.TransformerName, "Transform", 0, transformStopwatch.Elapsed)); + } + + // 4. Write to destination + var destResult = await _destination.WriteAsync(reader, cancellationToken); + steps.Add(new StepResult(_destination.DestinationName, "Destination", destResult.RowsProcessed, destResult.Elapsed)); + } + + // 5. Run post-scripts + foreach (var script in _postScripts) + { + var stepResult = await RunScriptAsync(script, cancellationToken); + steps.Add(stepResult); + } + + totalStopwatch.Stop(); + var totalRows = steps.Sum(s => s.RowsAffected); + + _logger.LogInformation("Pipeline {PipelineName} completed. Rows={Rows}, Elapsed={Elapsed}ms", + PipelineName, totalRows, totalStopwatch.ElapsedMilliseconds); + + return PipelineResult.Succeeded(totalRows, totalStopwatch.Elapsed, steps); + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + totalStopwatch.Stop(); + var totalRows = steps.Sum(s => s.RowsAffected); + + _logger.LogError(ex, "Pipeline {PipelineName} failed at step {Step}", + PipelineName, steps.LastOrDefault()?.StepName ?? "Unknown"); + + return PipelineResult.Failed(totalRows, totalStopwatch.Elapsed, steps, ex); + } + } + + private async Task RunScriptAsync(IScriptRunner script, CancellationToken ct) + { + var stopwatch = Stopwatch.StartNew(); + _logger.LogDebug("Running script {ScriptName}", script.ScriptName); + await script.ExecuteAsync(ct); + stopwatch.Stop(); + return new StepResult(script.ScriptName, "Script", 0, stopwatch.Elapsed); + } +} diff --git a/NEW/src/JdeScoping.DataSync/Etl/Pipeline/EtlPipelineBuilder.cs b/NEW/src/JdeScoping.DataSync/Etl/Pipeline/EtlPipelineBuilder.cs new file mode 100644 index 0000000..41c94b5 --- /dev/null +++ b/NEW/src/JdeScoping.DataSync/Etl/Pipeline/EtlPipelineBuilder.cs @@ -0,0 +1,72 @@ +using JdeScoping.DataSync.Etl.Contracts; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace JdeScoping.DataSync.Etl.Pipeline; + +public class EtlPipelineBuilder +{ + private string _name = "Unnamed"; + private IImportSource? _source; + private readonly List _transformers = new(); + private IImportDestination? _destination; + private readonly List _preScripts = new(); + private readonly List _postScripts = new(); + private ILogger? _logger; + + public EtlPipelineBuilder WithName(string name) + { + _name = name ?? throw new ArgumentNullException(nameof(name)); + return this; + } + + public EtlPipelineBuilder WithSource(IImportSource source) + { + _source = source ?? throw new ArgumentNullException(nameof(source)); + return this; + } + + public EtlPipelineBuilder WithTransformer(IDataTransformer transformer) + { + ArgumentNullException.ThrowIfNull(transformer); + _transformers.Add(transformer); + return this; + } + + public EtlPipelineBuilder WithDestination(IImportDestination destination) + { + _destination = destination ?? throw new ArgumentNullException(nameof(destination)); + return this; + } + + public EtlPipelineBuilder WithPreScript(IScriptRunner script) + { + ArgumentNullException.ThrowIfNull(script); + _preScripts.Add(script); + return this; + } + + public EtlPipelineBuilder WithPostScript(IScriptRunner script) + { + ArgumentNullException.ThrowIfNull(script); + _postScripts.Add(script); + return this; + } + + public EtlPipelineBuilder WithLogger(ILogger logger) + { + _logger = logger; + return this; + } + + public EtlPipeline Build() + { + if (_source == null) + throw new InvalidOperationException("Source is required. Call WithSource() before Build()."); + if (_destination == null) + throw new InvalidOperationException("Destination is required. Call WithDestination() before Build()."); + + return new EtlPipeline(_name, _source, _transformers, _destination, _preScripts, _postScripts, + _logger ?? NullLogger.Instance); + } +} diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Pipeline/EtlPipelineTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Pipeline/EtlPipelineTests.cs new file mode 100644 index 0000000..2666dc7 --- /dev/null +++ b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Pipeline/EtlPipelineTests.cs @@ -0,0 +1,145 @@ +using System.Data; +using JdeScoping.DataSync.Etl.Contracts; +using JdeScoping.DataSync.Etl.Pipeline; +using JdeScoping.DataSync.Etl.Results; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using NSubstitute.ExceptionExtensions; + +namespace JdeScoping.DataSync.Tests.Etl.Pipeline; + +public class EtlPipelineTests +{ + [Fact] + public async Task ExecuteAsync_SuccessfulPipeline_ReturnsSuccessResult() + { + var source = CreateMockSource(); + var destination = CreateMockDestination(100); + + var pipeline = new EtlPipelineBuilder() + .WithName("TestPipeline") + .WithSource(source) + .WithDestination(destination) + .WithLogger(NullLogger.Instance) + .Build(); + + var result = await pipeline.ExecuteAsync(); + + Assert.True(result.Success); + Assert.Equal(100, result.TotalRows); + Assert.Null(result.Error); + } + + [Fact] + public async Task ExecuteAsync_WithPreScript_RunsScriptBeforeDestination() + { + var callOrder = new List(); + var source = CreateMockSource(); + var destination = CreateMockDestination(100); + destination.When(d => d.WriteAsync(Arg.Any(), Arg.Any())) + .Do(_ => callOrder.Add("destination")); + + var preScript = Substitute.For(); + preScript.ScriptName.Returns("PreScript"); + preScript.When(s => s.ExecuteAsync(Arg.Any())) + .Do(_ => callOrder.Add("prescript")); + + var pipeline = new EtlPipelineBuilder() + .WithName("TestPipeline") + .WithSource(source) + .WithDestination(destination) + .WithPreScript(preScript) + .WithLogger(NullLogger.Instance) + .Build(); + + await pipeline.ExecuteAsync(); + + Assert.Equal(new[] { "prescript", "destination" }, callOrder); + } + + [Fact] + public async Task ExecuteAsync_DestinationFails_ReturnsFailedResult() + { + var source = CreateMockSource(); + var destination = Substitute.For(); + destination.DestinationName.Returns("FailingDest"); + destination.WriteAsync(Arg.Any(), Arg.Any()) + .ThrowsAsync(new InvalidOperationException("Destination failed")); + + var pipeline = new EtlPipelineBuilder() + .WithName("TestPipeline") + .WithSource(source) + .WithDestination(destination) + .WithLogger(NullLogger.Instance) + .Build(); + + var result = await pipeline.ExecuteAsync(); + + Assert.False(result.Success); + Assert.NotNull(result.Error); + Assert.IsType(result.Error); + } + + [Fact] + public async Task ExecuteAsync_TracksStepResults() + { + var source = CreateMockSource(); + var destination = CreateMockDestination(100); + + var pipeline = new EtlPipelineBuilder() + .WithName("TestPipeline") + .WithSource(source) + .WithDestination(destination) + .WithLogger(NullLogger.Instance) + .Build(); + + var result = await pipeline.ExecuteAsync(); + + Assert.Equal(2, result.Steps.Count); + Assert.Equal("Source", result.Steps[0].StepType); + Assert.Equal("Destination", result.Steps[1].StepType); + } + + [Fact] + public void Build_WithoutSource_ThrowsInvalidOperationException() + { + var destination = CreateMockDestination(100); + var builder = new EtlPipelineBuilder() + .WithName("TestPipeline") + .WithDestination(destination); + + Assert.Throws(() => builder.Build()); + } + + [Fact] + public void Build_WithoutDestination_ThrowsInvalidOperationException() + { + var source = CreateMockSource(); + var builder = new EtlPipelineBuilder() + .WithName("TestPipeline") + .WithSource(source); + + Assert.Throws(() => builder.Build()); + } + + private static IImportSource CreateMockSource() + { + var reader = Substitute.For(); + reader.Read().Returns(false); + reader.FieldCount.Returns(0); + + var source = Substitute.For(); + source.SourceName.Returns("MockSource"); + source.ReadDataAsync(Arg.Any()).Returns(Task.FromResult(reader)); + return source; + } + + private static IImportDestination CreateMockDestination(long rows) + { + var destination = Substitute.For(); + destination.DestinationName.Returns("MockDestination"); + destination.WriteAsync(Arg.Any(), Arg.Any()) + .Returns(Task.FromResult(new DestinationResult(rows, 1, TimeSpan.FromSeconds(1)))); + return destination; + } +}