156 lines
4.6 KiB
Markdown
156 lines
4.6 KiB
Markdown
# ETL Pipeline
|
|
|
|
The ETL pipeline streams data from enterprise sources (JDE, CMS) through transformations into SQL Server cache tables. It supports batched processing, pre/post scripts for index management, and detailed execution tracking.
|
|
|
|
## Architecture
|
|
|
|
```
|
|
┌─────────────┐ ┌──────────────────┐ ┌─────────────────┐
|
|
│ IImportSource│───▶│ IDataTransformer │───▶│IImportDestination│
|
|
└─────────────┘ │ (chain of N) │ └─────────────────┘
|
|
└──────────────────┘
|
|
▲ │
|
|
│ ┌──────────────┐ │
|
|
└─────────│ Pre-Scripts │ ▼
|
|
└──────────────┘ ┌──────────────┐
|
|
│ Post-Scripts │
|
|
└──────────────┘
|
|
```
|
|
|
|
**Execution flow:**
|
|
1. Run pre-scripts (e.g., disable indexes)
|
|
2. Open source and get `IDataReader`
|
|
3. Apply transformer chain (each wraps the previous reader)
|
|
4. Write to destination (bulk copy or merge)
|
|
5. Run post-scripts (e.g., rebuild indexes)
|
|
|
|
## Core Contracts
|
|
|
|
### IImportSource
|
|
|
|
Provides data to the pipeline. Returns an `IDataReader` that streams rows.
|
|
|
|
```csharp
|
|
public interface IImportSource : IAsyncDisposable
|
|
{
|
|
Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default);
|
|
string SourceName { get; }
|
|
}
|
|
```
|
|
|
|
### IDataTransformer
|
|
|
|
Modifies data during transfer. Wraps the source reader in a decorator.
|
|
|
|
```csharp
|
|
public interface IDataTransformer
|
|
{
|
|
IDataReader Transform(IDataReader source);
|
|
string TransformerName { get; }
|
|
int MapOrdinal(int transformedOrdinal, IDataReader source);
|
|
}
|
|
```
|
|
|
|
### IImportDestination
|
|
|
|
Consumes data and writes to storage. Returns statistics about the operation.
|
|
|
|
```csharp
|
|
public interface IImportDestination
|
|
{
|
|
Task<DestinationResult> WriteAsync(IDataReader source, CancellationToken cancellationToken = default);
|
|
string DestinationName { get; }
|
|
}
|
|
```
|
|
|
|
### IScriptRunner
|
|
|
|
Executes SQL scripts before or after data transfer.
|
|
|
|
```csharp
|
|
public interface IScriptRunner
|
|
{
|
|
Task ExecuteAsync(CancellationToken cancellationToken = default);
|
|
string ScriptName { get; }
|
|
}
|
|
```
|
|
|
|
## Pipeline Execution
|
|
|
|
The `EtlPipeline` class orchestrates execution and tracks timing for each step:
|
|
|
|
```csharp
|
|
public async Task<PipelineResult> ExecuteAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
// 1. Run pre-scripts
|
|
foreach (var script in _preScripts)
|
|
{
|
|
var stepResult = await RunScriptAsync(script, cancellationToken);
|
|
steps.Add(stepResult);
|
|
}
|
|
|
|
// 2. Open source
|
|
await using (_source)
|
|
{
|
|
var reader = await _source.ReadDataAsync(cancellationToken);
|
|
|
|
// 3. Apply transformers
|
|
foreach (var transformer in _transformers)
|
|
{
|
|
reader = transformer.Transform(reader);
|
|
}
|
|
|
|
// 4. Write to destination
|
|
var destResult = await _destination.WriteAsync(reader, cancellationToken);
|
|
}
|
|
|
|
// 5. Run post-scripts
|
|
foreach (var script in _postScripts)
|
|
{
|
|
var stepResult = await RunScriptAsync(script, cancellationToken);
|
|
}
|
|
|
|
return PipelineResult.Succeeded(totalRows, totalStopwatch.Elapsed, steps);
|
|
}
|
|
```
|
|
|
|
## Result Model
|
|
|
|
### PipelineResult
|
|
|
|
```csharp
|
|
public record PipelineResult(
|
|
bool Success,
|
|
long TotalRows,
|
|
TimeSpan Elapsed,
|
|
IReadOnlyList<StepResult> Steps,
|
|
Exception? Error = null);
|
|
```
|
|
|
|
### StepResult
|
|
|
|
```csharp
|
|
public record StepResult(
|
|
string StepName,
|
|
string StepType,
|
|
long RowsAffected,
|
|
TimeSpan Elapsed);
|
|
```
|
|
|
|
### DestinationResult
|
|
|
|
```csharp
|
|
public record DestinationResult(
|
|
long RowsProcessed,
|
|
int BatchCount,
|
|
TimeSpan Elapsed);
|
|
```
|
|
|
|
## Related Documentation
|
|
|
|
- [Sources](./Sources.md) - Writing custom data sources
|
|
- [Transformers](./Transformers.md) - Writing custom transformers
|
|
- [Destinations](./Destinations.md) - Writing destinations and scripts
|
|
- [Configuration](./Configuration.md) - Pipeline builder and DI setup
|
|
- [Troubleshooting](./Troubleshooting.md) - Debugging and performance
|