From 9a6dfa44b18d3b539a3b257b8926c942548d5276 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 3 Jan 2026 15:33:01 -0500 Subject: [PATCH] docs: add ETL pipeline overview documentation --- DOCUMENTATION/DataSync/Overview.md | 155 +++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 DOCUMENTATION/DataSync/Overview.md diff --git a/DOCUMENTATION/DataSync/Overview.md b/DOCUMENTATION/DataSync/Overview.md new file mode 100644 index 0000000..f6b59b6 --- /dev/null +++ b/DOCUMENTATION/DataSync/Overview.md @@ -0,0 +1,155 @@ +# ETL Pipeline + +The ETL pipeline streams data from enterprise sources (JDE, CMS) through transformations into SQL Server cache tables. It supports batched processing, pre/post scripts for index management, and detailed execution tracking. + +## Architecture + +``` +┌─────────────┐ ┌──────────────────┐ ┌─────────────────┐ +│ IImportSource│───▶│ IDataTransformer │───▶│IImportDestination│ +└─────────────┘ │ (chain of N) │ └─────────────────┘ + └──────────────────┘ + ▲ │ + │ ┌──────────────┐ │ + └─────────│ Pre-Scripts │ ▼ + └──────────────┘ ┌──────────────┐ + │ Post-Scripts │ + └──────────────┘ +``` + +**Execution flow:** +1. Run pre-scripts (e.g., disable indexes) +2. Open source and get `IDataReader` +3. Apply transformer chain (each wraps the previous reader) +4. Write to destination (bulk copy or merge) +5. Run post-scripts (e.g., rebuild indexes) + +## Core Contracts + +### IImportSource + +Provides data to the pipeline. Returns an `IDataReader` that streams rows. + +```csharp +public interface IImportSource : IAsyncDisposable +{ + Task ReadDataAsync(CancellationToken cancellationToken = default); + string SourceName { get; } +} +``` + +### IDataTransformer + +Modifies data during transfer. Wraps the source reader in a decorator. + +```csharp +public interface IDataTransformer +{ + IDataReader Transform(IDataReader source); + string TransformerName { get; } + int MapOrdinal(int transformedOrdinal, IDataReader source); +} +``` + +### IImportDestination + +Consumes data and writes to storage. Returns statistics about the operation. + +```csharp +public interface IImportDestination +{ + Task WriteAsync(IDataReader source, CancellationToken cancellationToken = default); + string DestinationName { get; } +} +``` + +### IScriptRunner + +Executes SQL scripts before or after data transfer. + +```csharp +public interface IScriptRunner +{ + Task ExecuteAsync(CancellationToken cancellationToken = default); + string ScriptName { get; } +} +``` + +## Pipeline Execution + +The `EtlPipeline` class orchestrates execution and tracks timing for each step: + +```csharp +public async Task ExecuteAsync(CancellationToken cancellationToken = default) +{ + // 1. Run pre-scripts + foreach (var script in _preScripts) + { + var stepResult = await RunScriptAsync(script, cancellationToken); + steps.Add(stepResult); + } + + // 2. Open source + await using (_source) + { + var reader = await _source.ReadDataAsync(cancellationToken); + + // 3. Apply transformers + foreach (var transformer in _transformers) + { + reader = transformer.Transform(reader); + } + + // 4. Write to destination + var destResult = await _destination.WriteAsync(reader, cancellationToken); + } + + // 5. Run post-scripts + foreach (var script in _postScripts) + { + var stepResult = await RunScriptAsync(script, cancellationToken); + } + + return PipelineResult.Succeeded(totalRows, totalStopwatch.Elapsed, steps); +} +``` + +## Result Model + +### PipelineResult + +```csharp +public record PipelineResult( + bool Success, + long TotalRows, + TimeSpan Elapsed, + IReadOnlyList Steps, + Exception? Error = null); +``` + +### StepResult + +```csharp +public record StepResult( + string StepName, + string StepType, + long RowsAffected, + TimeSpan Elapsed); +``` + +### DestinationResult + +```csharp +public record DestinationResult( + long RowsProcessed, + int BatchCount, + TimeSpan Elapsed); +``` + +## Related Documentation + +- [Sources](./Sources.md) - Writing custom data sources +- [Transformers](./Transformers.md) - Writing custom transformers +- [Destinations](./Destinations.md) - Writing destinations and scripts +- [Configuration](./Configuration.md) - Pipeline builder and DI setup +- [Troubleshooting](./Troubleshooting.md) - Debugging and performance