Files
jdescopingtool/DOCUMENTATION/DataSync/Overview.md
T
2026-01-03 15:33:01 -05:00

4.6 KiB

ETL Pipeline

The ETL pipeline streams data from enterprise sources (JDE, CMS) through transformations into SQL Server cache tables. It supports batched processing, pre/post scripts for index management, and detailed execution tracking.

Architecture

┌─────────────┐    ┌──────────────────┐    ┌─────────────────┐
│ IImportSource│───▶│ IDataTransformer │───▶│IImportDestination│
└─────────────┘    │   (chain of N)   │    └─────────────────┘
                   └──────────────────┘
       ▲                                           │
       │         ┌──────────────┐                  │
       └─────────│ Pre-Scripts  │                  ▼
                 └──────────────┘           ┌──────────────┐
                                            │ Post-Scripts │
                                            └──────────────┘

Execution flow:

  1. Run pre-scripts (e.g., disable indexes)
  2. Open source and get IDataReader
  3. Apply transformer chain (each wraps the previous reader)
  4. Write to destination (bulk copy or merge)
  5. Run post-scripts (e.g., rebuild indexes)

Core Contracts

IImportSource

Provides data to the pipeline. Returns an IDataReader that streams rows.

public interface IImportSource : IAsyncDisposable
{
    Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default);
    string SourceName { get; }
}

IDataTransformer

Modifies data during transfer. Wraps the source reader in a decorator.

public interface IDataTransformer
{
    IDataReader Transform(IDataReader source);
    string TransformerName { get; }
    int MapOrdinal(int transformedOrdinal, IDataReader source);
}

IImportDestination

Consumes data and writes to storage. Returns statistics about the operation.

public interface IImportDestination
{
    Task<DestinationResult> WriteAsync(IDataReader source, CancellationToken cancellationToken = default);
    string DestinationName { get; }
}

IScriptRunner

Executes SQL scripts before or after data transfer.

public interface IScriptRunner
{
    Task ExecuteAsync(CancellationToken cancellationToken = default);
    string ScriptName { get; }
}

Pipeline Execution

The EtlPipeline class orchestrates execution and tracks timing for each step:

public async Task<PipelineResult> ExecuteAsync(CancellationToken cancellationToken = default)
{
    // 1. Run pre-scripts
    foreach (var script in _preScripts)
    {
        var stepResult = await RunScriptAsync(script, cancellationToken);
        steps.Add(stepResult);
    }

    // 2. Open source
    await using (_source)
    {
        var reader = await _source.ReadDataAsync(cancellationToken);

        // 3. Apply transformers
        foreach (var transformer in _transformers)
        {
            reader = transformer.Transform(reader);
        }

        // 4. Write to destination
        var destResult = await _destination.WriteAsync(reader, cancellationToken);
    }

    // 5. Run post-scripts
    foreach (var script in _postScripts)
    {
        var stepResult = await RunScriptAsync(script, cancellationToken);
    }

    return PipelineResult.Succeeded(totalRows, totalStopwatch.Elapsed, steps);
}

Result Model

PipelineResult

public record PipelineResult(
    bool Success,
    long TotalRows,
    TimeSpan Elapsed,
    IReadOnlyList<StepResult> Steps,
    Exception? Error = null);

StepResult

public record StepResult(
    string StepName,
    string StepType,
    long RowsAffected,
    TimeSpan Elapsed);

DestinationResult

public record DestinationResult(
    long RowsProcessed,
    int BatchCount,
    TimeSpan Elapsed);