Files
jdescopingtool/DOCUMENTATION/DataSync/Overview.md
T
2026-01-03 15:33:01 -05:00

156 lines
4.6 KiB
Markdown

# ETL Pipeline
The ETL pipeline streams data from enterprise sources (JDE, CMS) through transformations into SQL Server cache tables. It supports batched processing, pre/post scripts for index management, and detailed execution tracking.
## Architecture
```
┌─────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ IImportSource│───▶│ IDataTransformer │───▶│IImportDestination│
└─────────────┘ │ (chain of N) │ └─────────────────┘
└──────────────────┘
▲ │
│ ┌──────────────┐ │
└─────────│ Pre-Scripts │ ▼
└──────────────┘ ┌──────────────┐
│ Post-Scripts │
└──────────────┘
```
**Execution flow:**
1. Run pre-scripts (e.g., disable indexes)
2. Open source and get `IDataReader`
3. Apply transformer chain (each wraps the previous reader)
4. Write to destination (bulk copy or merge)
5. Run post-scripts (e.g., rebuild indexes)
## Core Contracts
### IImportSource
Provides data to the pipeline. Returns an `IDataReader` that streams rows.
```csharp
public interface IImportSource : IAsyncDisposable
{
Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default);
string SourceName { get; }
}
```
### IDataTransformer
Modifies data during transfer. Wraps the source reader in a decorator.
```csharp
public interface IDataTransformer
{
IDataReader Transform(IDataReader source);
string TransformerName { get; }
int MapOrdinal(int transformedOrdinal, IDataReader source);
}
```
### IImportDestination
Consumes data and writes to storage. Returns statistics about the operation.
```csharp
public interface IImportDestination
{
Task<DestinationResult> WriteAsync(IDataReader source, CancellationToken cancellationToken = default);
string DestinationName { get; }
}
```
### IScriptRunner
Executes SQL scripts before or after data transfer.
```csharp
public interface IScriptRunner
{
Task ExecuteAsync(CancellationToken cancellationToken = default);
string ScriptName { get; }
}
```
## Pipeline Execution
The `EtlPipeline` class orchestrates execution and tracks timing for each step:
```csharp
public async Task<PipelineResult> ExecuteAsync(CancellationToken cancellationToken = default)
{
// 1. Run pre-scripts
foreach (var script in _preScripts)
{
var stepResult = await RunScriptAsync(script, cancellationToken);
steps.Add(stepResult);
}
// 2. Open source
await using (_source)
{
var reader = await _source.ReadDataAsync(cancellationToken);
// 3. Apply transformers
foreach (var transformer in _transformers)
{
reader = transformer.Transform(reader);
}
// 4. Write to destination
var destResult = await _destination.WriteAsync(reader, cancellationToken);
}
// 5. Run post-scripts
foreach (var script in _postScripts)
{
var stepResult = await RunScriptAsync(script, cancellationToken);
}
return PipelineResult.Succeeded(totalRows, totalStopwatch.Elapsed, steps);
}
```
## Result Model
### PipelineResult
```csharp
public record PipelineResult(
bool Success,
long TotalRows,
TimeSpan Elapsed,
IReadOnlyList<StepResult> Steps,
Exception? Error = null);
```
### StepResult
```csharp
public record StepResult(
string StepName,
string StepType,
long RowsAffected,
TimeSpan Elapsed);
```
### DestinationResult
```csharp
public record DestinationResult(
long RowsProcessed,
int BatchCount,
TimeSpan Elapsed);
```
## Related Documentation
- [Sources](./Sources.md) - Writing custom data sources
- [Transformers](./Transformers.md) - Writing custom transformers
- [Destinations](./Destinations.md) - Writing destinations and scripts
- [Configuration](./Configuration.md) - Pipeline builder and DI setup
- [Troubleshooting](./Troubleshooting.md) - Debugging and performance