feat(etl): implement EtlPipeline and EtlPipelineBuilder

Add pipeline orchestration for ETL operations:
- EtlPipeline: executes source -> transform -> destination flow
- EtlPipelineBuilder: fluent builder for pipeline configuration
- Supports pre/post scripts, multiple transformers
- Returns PipelineResult with step-by-step timing
This commit is contained in:
Joseph Doherty
2026-01-03 09:31:32 -05:00
parent 644e884b21
commit 4c16e62661
3 changed files with 328 additions and 0 deletions
@@ -0,0 +1,111 @@
using System.Data;
using System.Diagnostics;
using JdeScoping.DataSync.Etl.Contracts;
using JdeScoping.DataSync.Etl.Results;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Etl.Pipeline;
public class EtlPipeline
{
private readonly IImportSource _source;
private readonly IReadOnlyList<IDataTransformer> _transformers;
private readonly IImportDestination _destination;
private readonly IReadOnlyList<IScriptRunner> _preScripts;
private readonly IReadOnlyList<IScriptRunner> _postScripts;
private readonly ILogger<EtlPipeline> _logger;
public string PipelineName { get; }
internal EtlPipeline(
string name,
IImportSource source,
IReadOnlyList<IDataTransformer> transformers,
IImportDestination destination,
IReadOnlyList<IScriptRunner> preScripts,
IReadOnlyList<IScriptRunner> postScripts,
ILogger<EtlPipeline> logger)
{
PipelineName = name;
_source = source;
_transformers = transformers;
_destination = destination;
_preScripts = preScripts;
_postScripts = postScripts;
_logger = logger;
}
public async Task<PipelineResult> ExecuteAsync(CancellationToken cancellationToken = default)
{
var steps = new List<StepResult>();
var totalStopwatch = Stopwatch.StartNew();
_logger.LogInformation("Starting pipeline {PipelineName}", PipelineName);
try
{
// 1. Run pre-scripts
foreach (var script in _preScripts)
{
var stepResult = await RunScriptAsync(script, cancellationToken);
steps.Add(stepResult);
}
// 2. Open source
var sourceStopwatch = Stopwatch.StartNew();
await using (_source)
{
var reader = await _source.ReadDataAsync(cancellationToken);
sourceStopwatch.Stop();
steps.Add(new StepResult(_source.SourceName, "Source", 0, sourceStopwatch.Elapsed));
// 3. Apply transformers
foreach (var transformer in _transformers)
{
var transformStopwatch = Stopwatch.StartNew();
reader = transformer.Transform(reader);
transformStopwatch.Stop();
steps.Add(new StepResult(transformer.TransformerName, "Transform", 0, transformStopwatch.Elapsed));
}
// 4. Write to destination
var destResult = await _destination.WriteAsync(reader, cancellationToken);
steps.Add(new StepResult(_destination.DestinationName, "Destination", destResult.RowsProcessed, destResult.Elapsed));
}
// 5. Run post-scripts
foreach (var script in _postScripts)
{
var stepResult = await RunScriptAsync(script, cancellationToken);
steps.Add(stepResult);
}
totalStopwatch.Stop();
var totalRows = steps.Sum(s => s.RowsAffected);
_logger.LogInformation("Pipeline {PipelineName} completed. Rows={Rows}, Elapsed={Elapsed}ms",
PipelineName, totalRows, totalStopwatch.ElapsedMilliseconds);
return PipelineResult.Succeeded(totalRows, totalStopwatch.Elapsed, steps);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
totalStopwatch.Stop();
var totalRows = steps.Sum(s => s.RowsAffected);
_logger.LogError(ex, "Pipeline {PipelineName} failed at step {Step}",
PipelineName, steps.LastOrDefault()?.StepName ?? "Unknown");
return PipelineResult.Failed(totalRows, totalStopwatch.Elapsed, steps, ex);
}
}
private async Task<StepResult> RunScriptAsync(IScriptRunner script, CancellationToken ct)
{
var stopwatch = Stopwatch.StartNew();
_logger.LogDebug("Running script {ScriptName}", script.ScriptName);
await script.ExecuteAsync(ct);
stopwatch.Stop();
return new StepResult(script.ScriptName, "Script", 0, stopwatch.Elapsed);
}
}
@@ -0,0 +1,72 @@
using JdeScoping.DataSync.Etl.Contracts;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
namespace JdeScoping.DataSync.Etl.Pipeline;
public class EtlPipelineBuilder
{
private string _name = "Unnamed";
private IImportSource? _source;
private readonly List<IDataTransformer> _transformers = new();
private IImportDestination? _destination;
private readonly List<IScriptRunner> _preScripts = new();
private readonly List<IScriptRunner> _postScripts = new();
private ILogger<EtlPipeline>? _logger;
public EtlPipelineBuilder WithName(string name)
{
_name = name ?? throw new ArgumentNullException(nameof(name));
return this;
}
public EtlPipelineBuilder WithSource(IImportSource source)
{
_source = source ?? throw new ArgumentNullException(nameof(source));
return this;
}
public EtlPipelineBuilder WithTransformer(IDataTransformer transformer)
{
ArgumentNullException.ThrowIfNull(transformer);
_transformers.Add(transformer);
return this;
}
public EtlPipelineBuilder WithDestination(IImportDestination destination)
{
_destination = destination ?? throw new ArgumentNullException(nameof(destination));
return this;
}
public EtlPipelineBuilder WithPreScript(IScriptRunner script)
{
ArgumentNullException.ThrowIfNull(script);
_preScripts.Add(script);
return this;
}
public EtlPipelineBuilder WithPostScript(IScriptRunner script)
{
ArgumentNullException.ThrowIfNull(script);
_postScripts.Add(script);
return this;
}
public EtlPipelineBuilder WithLogger(ILogger<EtlPipeline> logger)
{
_logger = logger;
return this;
}
public EtlPipeline Build()
{
if (_source == null)
throw new InvalidOperationException("Source is required. Call WithSource() before Build().");
if (_destination == null)
throw new InvalidOperationException("Destination is required. Call WithDestination() before Build().");
return new EtlPipeline(_name, _source, _transformers, _destination, _preScripts, _postScripts,
_logger ?? NullLogger<EtlPipeline>.Instance);
}
}
@@ -0,0 +1,145 @@
using System.Data;
using JdeScoping.DataSync.Etl.Contracts;
using JdeScoping.DataSync.Etl.Pipeline;
using JdeScoping.DataSync.Etl.Results;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using NSubstitute.ExceptionExtensions;
namespace JdeScoping.DataSync.Tests.Etl.Pipeline;
public class EtlPipelineTests
{
[Fact]
public async Task ExecuteAsync_SuccessfulPipeline_ReturnsSuccessResult()
{
var source = CreateMockSource();
var destination = CreateMockDestination(100);
var pipeline = new EtlPipelineBuilder()
.WithName("TestPipeline")
.WithSource(source)
.WithDestination(destination)
.WithLogger(NullLogger<EtlPipeline>.Instance)
.Build();
var result = await pipeline.ExecuteAsync();
Assert.True(result.Success);
Assert.Equal(100, result.TotalRows);
Assert.Null(result.Error);
}
[Fact]
public async Task ExecuteAsync_WithPreScript_RunsScriptBeforeDestination()
{
var callOrder = new List<string>();
var source = CreateMockSource();
var destination = CreateMockDestination(100);
destination.When(d => d.WriteAsync(Arg.Any<IDataReader>(), Arg.Any<CancellationToken>()))
.Do(_ => callOrder.Add("destination"));
var preScript = Substitute.For<IScriptRunner>();
preScript.ScriptName.Returns("PreScript");
preScript.When(s => s.ExecuteAsync(Arg.Any<CancellationToken>()))
.Do(_ => callOrder.Add("prescript"));
var pipeline = new EtlPipelineBuilder()
.WithName("TestPipeline")
.WithSource(source)
.WithDestination(destination)
.WithPreScript(preScript)
.WithLogger(NullLogger<EtlPipeline>.Instance)
.Build();
await pipeline.ExecuteAsync();
Assert.Equal(new[] { "prescript", "destination" }, callOrder);
}
[Fact]
public async Task ExecuteAsync_DestinationFails_ReturnsFailedResult()
{
var source = CreateMockSource();
var destination = Substitute.For<IImportDestination>();
destination.DestinationName.Returns("FailingDest");
destination.WriteAsync(Arg.Any<IDataReader>(), Arg.Any<CancellationToken>())
.ThrowsAsync(new InvalidOperationException("Destination failed"));
var pipeline = new EtlPipelineBuilder()
.WithName("TestPipeline")
.WithSource(source)
.WithDestination(destination)
.WithLogger(NullLogger<EtlPipeline>.Instance)
.Build();
var result = await pipeline.ExecuteAsync();
Assert.False(result.Success);
Assert.NotNull(result.Error);
Assert.IsType<InvalidOperationException>(result.Error);
}
[Fact]
public async Task ExecuteAsync_TracksStepResults()
{
var source = CreateMockSource();
var destination = CreateMockDestination(100);
var pipeline = new EtlPipelineBuilder()
.WithName("TestPipeline")
.WithSource(source)
.WithDestination(destination)
.WithLogger(NullLogger<EtlPipeline>.Instance)
.Build();
var result = await pipeline.ExecuteAsync();
Assert.Equal(2, result.Steps.Count);
Assert.Equal("Source", result.Steps[0].StepType);
Assert.Equal("Destination", result.Steps[1].StepType);
}
[Fact]
public void Build_WithoutSource_ThrowsInvalidOperationException()
{
var destination = CreateMockDestination(100);
var builder = new EtlPipelineBuilder()
.WithName("TestPipeline")
.WithDestination(destination);
Assert.Throws<InvalidOperationException>(() => builder.Build());
}
[Fact]
public void Build_WithoutDestination_ThrowsInvalidOperationException()
{
var source = CreateMockSource();
var builder = new EtlPipelineBuilder()
.WithName("TestPipeline")
.WithSource(source);
Assert.Throws<InvalidOperationException>(() => builder.Build());
}
private static IImportSource CreateMockSource()
{
var reader = Substitute.For<IDataReader>();
reader.Read().Returns(false);
reader.FieldCount.Returns(0);
var source = Substitute.For<IImportSource>();
source.SourceName.Returns("MockSource");
source.ReadDataAsync(Arg.Any<CancellationToken>()).Returns(Task.FromResult(reader));
return source;
}
private static IImportDestination CreateMockDestination(long rows)
{
var destination = Substitute.For<IImportDestination>();
destination.DestinationName.Returns("MockDestination");
destination.WriteAsync(Arg.Any<IDataReader>(), Arg.Any<CancellationToken>())
.Returns(Task.FromResult(new DestinationResult(rows, 1, TimeSpan.FromSeconds(1))));
return destination;
}
}