# ETL Pipeline The ETL pipeline streams data from enterprise sources (JDE, CMS) through transformations into SQL Server cache tables. It supports batched processing, pre/post scripts for index management, and detailed execution tracking. ## Architecture ``` ┌─────────────┐ ┌──────────────────┐ ┌─────────────────┐ │ IImportSource│───▶│ IDataTransformer │───▶│IImportDestination│ └─────────────┘ │ (chain of N) │ └─────────────────┘ └──────────────────┘ ▲ │ │ ┌──────────────┐ │ └─────────│ Pre-Scripts │ ▼ └──────────────┘ ┌──────────────┐ │ Post-Scripts │ └──────────────┘ ``` **Execution flow:** 1. Run pre-scripts (e.g., disable indexes) 2. Open source and get `IDataReader` 3. Apply transformer chain (each wraps the previous reader) 4. Write to destination (bulk copy or merge) 5. Run post-scripts (e.g., rebuild indexes) ## Core Contracts ### IImportSource Provides data to the pipeline. Returns an `IDataReader` that streams rows. ```csharp public interface IImportSource : IAsyncDisposable { Task ReadDataAsync(CancellationToken cancellationToken = default); string SourceName { get; } } ``` ### IDataTransformer Modifies data during transfer. Wraps the source reader in a decorator. ```csharp public interface IDataTransformer { IDataReader Transform(IDataReader source); string TransformerName { get; } int MapOrdinal(int transformedOrdinal, IDataReader source); } ``` ### IImportDestination Consumes data and writes to storage. Returns statistics about the operation. ```csharp public interface IImportDestination { Task WriteAsync(IDataReader source, CancellationToken cancellationToken = default); string DestinationName { get; } } ``` ### IScriptRunner Executes SQL scripts before or after data transfer. ```csharp public interface IScriptRunner { Task ExecuteAsync(CancellationToken cancellationToken = default); string ScriptName { get; } } ``` ## Pipeline Execution The `EtlPipeline` class orchestrates execution and tracks timing for each step: ```csharp public async Task ExecuteAsync(CancellationToken cancellationToken = default) { // 1. Run pre-scripts foreach (var script in _preScripts) { var stepResult = await RunScriptAsync(script, cancellationToken); steps.Add(stepResult); } // 2. Open source await using (_source) { var reader = await _source.ReadDataAsync(cancellationToken); // 3. Apply transformers foreach (var transformer in _transformers) { reader = transformer.Transform(reader); } // 4. Write to destination var destResult = await _destination.WriteAsync(reader, cancellationToken); } // 5. Run post-scripts foreach (var script in _postScripts) { var stepResult = await RunScriptAsync(script, cancellationToken); } return PipelineResult.Succeeded(totalRows, totalStopwatch.Elapsed, steps); } ``` ## Result Model ### PipelineResult ```csharp public record PipelineResult( bool Success, long TotalRows, TimeSpan Elapsed, IReadOnlyList Steps, Exception? Error = null); ``` ### StepResult ```csharp public record StepResult( string StepName, string StepType, long RowsAffected, TimeSpan Elapsed); ``` ### DestinationResult ```csharp public record DestinationResult( long RowsProcessed, int BatchCount, TimeSpan Elapsed); ``` ## Related Documentation - [Sources](./Sources.md) - Writing custom data sources - [Transformers](./Transformers.md) - Writing custom transformers - [Destinations](./Destinations.md) - Writing destinations and scripts - [Configuration](./Configuration.md) - Pipeline builder and DI setup - [Troubleshooting](./Troubleshooting.md) - Debugging and performance