docs: add ETL pipeline overview documentation
This commit is contained in:
@@ -0,0 +1,155 @@
|
|||||||
|
# ETL Pipeline
|
||||||
|
|
||||||
|
The ETL pipeline streams data from enterprise sources (JDE, CMS) through transformations into SQL Server cache tables. It supports batched processing, pre/post scripts for index management, and detailed execution tracking.
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
```
|
||||||
|
┌─────────────┐ ┌──────────────────┐ ┌─────────────────┐
|
||||||
|
│ IImportSource│───▶│ IDataTransformer │───▶│IImportDestination│
|
||||||
|
└─────────────┘ │ (chain of N) │ └─────────────────┘
|
||||||
|
└──────────────────┘
|
||||||
|
▲ │
|
||||||
|
│ ┌──────────────┐ │
|
||||||
|
└─────────│ Pre-Scripts │ ▼
|
||||||
|
└──────────────┘ ┌──────────────┐
|
||||||
|
│ Post-Scripts │
|
||||||
|
└──────────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
**Execution flow:**
|
||||||
|
1. Run pre-scripts (e.g., disable indexes)
|
||||||
|
2. Open source and get `IDataReader`
|
||||||
|
3. Apply transformer chain (each wraps the previous reader)
|
||||||
|
4. Write to destination (bulk copy or merge)
|
||||||
|
5. Run post-scripts (e.g., rebuild indexes)
|
||||||
|
|
||||||
|
## Core Contracts
|
||||||
|
|
||||||
|
### IImportSource
|
||||||
|
|
||||||
|
Provides data to the pipeline. Returns an `IDataReader` that streams rows.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
public interface IImportSource : IAsyncDisposable
|
||||||
|
{
|
||||||
|
Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default);
|
||||||
|
string SourceName { get; }
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### IDataTransformer
|
||||||
|
|
||||||
|
Modifies data during transfer. Wraps the source reader in a decorator.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
public interface IDataTransformer
|
||||||
|
{
|
||||||
|
IDataReader Transform(IDataReader source);
|
||||||
|
string TransformerName { get; }
|
||||||
|
int MapOrdinal(int transformedOrdinal, IDataReader source);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### IImportDestination
|
||||||
|
|
||||||
|
Consumes data and writes to storage. Returns statistics about the operation.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
public interface IImportDestination
|
||||||
|
{
|
||||||
|
Task<DestinationResult> WriteAsync(IDataReader source, CancellationToken cancellationToken = default);
|
||||||
|
string DestinationName { get; }
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### IScriptRunner
|
||||||
|
|
||||||
|
Executes SQL scripts before or after data transfer.
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
public interface IScriptRunner
|
||||||
|
{
|
||||||
|
Task ExecuteAsync(CancellationToken cancellationToken = default);
|
||||||
|
string ScriptName { get; }
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Pipeline Execution
|
||||||
|
|
||||||
|
The `EtlPipeline` class orchestrates execution and tracks timing for each step:
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
public async Task<PipelineResult> ExecuteAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
// 1. Run pre-scripts
|
||||||
|
foreach (var script in _preScripts)
|
||||||
|
{
|
||||||
|
var stepResult = await RunScriptAsync(script, cancellationToken);
|
||||||
|
steps.Add(stepResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Open source
|
||||||
|
await using (_source)
|
||||||
|
{
|
||||||
|
var reader = await _source.ReadDataAsync(cancellationToken);
|
||||||
|
|
||||||
|
// 3. Apply transformers
|
||||||
|
foreach (var transformer in _transformers)
|
||||||
|
{
|
||||||
|
reader = transformer.Transform(reader);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Write to destination
|
||||||
|
var destResult = await _destination.WriteAsync(reader, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. Run post-scripts
|
||||||
|
foreach (var script in _postScripts)
|
||||||
|
{
|
||||||
|
var stepResult = await RunScriptAsync(script, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
return PipelineResult.Succeeded(totalRows, totalStopwatch.Elapsed, steps);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Result Model
|
||||||
|
|
||||||
|
### PipelineResult
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
public record PipelineResult(
|
||||||
|
bool Success,
|
||||||
|
long TotalRows,
|
||||||
|
TimeSpan Elapsed,
|
||||||
|
IReadOnlyList<StepResult> Steps,
|
||||||
|
Exception? Error = null);
|
||||||
|
```
|
||||||
|
|
||||||
|
### StepResult
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
public record StepResult(
|
||||||
|
string StepName,
|
||||||
|
string StepType,
|
||||||
|
long RowsAffected,
|
||||||
|
TimeSpan Elapsed);
|
||||||
|
```
|
||||||
|
|
||||||
|
### DestinationResult
|
||||||
|
|
||||||
|
```csharp
|
||||||
|
public record DestinationResult(
|
||||||
|
long RowsProcessed,
|
||||||
|
int BatchCount,
|
||||||
|
TimeSpan Elapsed);
|
||||||
|
```
|
||||||
|
|
||||||
|
## Related Documentation
|
||||||
|
|
||||||
|
- [Sources](./Sources.md) - Writing custom data sources
|
||||||
|
- [Transformers](./Transformers.md) - Writing custom transformers
|
||||||
|
- [Destinations](./Destinations.md) - Writing destinations and scripts
|
||||||
|
- [Configuration](./Configuration.md) - Pipeline builder and DI setup
|
||||||
|
- [Troubleshooting](./Troubleshooting.md) - Debugging and performance
|
||||||
Reference in New Issue
Block a user