8-task plan to create comprehensive ETL pipeline documentation: - Overview, Sources, Transformers, Destinations, Configuration, Troubleshooting - Update ComponentMap with ETL source paths - Final verification of links and commits
48 KiB
ETL Pipeline Documentation Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Create 6 documentation files for the ETL pipeline covering architecture, extension patterns, configuration, and troubleshooting.
Architecture: Each task creates one markdown file in DOCUMENTATION/DataSync/. Code snippets come directly from source files. Verification checks that links resolve and code matches source.
Tech Stack: Markdown, code from NEW/src/JdeScoping.DataSync/Etl/
Task 1: Create DataSync folder and Overview.md
Files:
- Create:
DOCUMENTATION/DataSync/Overview.md
Step 1: Create the DataSync directory
mkdir -p DOCUMENTATION/DataSync
Step 2: Write Overview.md with architecture and core concepts
Create DOCUMENTATION/DataSync/Overview.md with:
# ETL Pipeline
The ETL pipeline streams data from enterprise sources (JDE, CMS) through transformations into SQL Server cache tables. It supports batched processing, pre/post scripts for index management, and detailed execution tracking.
## Architecture
┌─────────────┐ ┌──────────────────┐ ┌─────────────────┐ │ IImportSource│───▶│ IDataTransformer │───▶│IImportDestination│ └─────────────┘ │ (chain of N) │ └─────────────────┘ └──────────────────┘ ▲ │ │ ┌──────────────┐ │ └─────────│ Pre-Scripts │ ▼ └──────────────┘ ┌──────────────┐ │ Post-Scripts │ └──────────────┘
**Execution flow:**
1. Run pre-scripts (e.g., disable indexes)
2. Open source and get `IDataReader`
3. Apply transformer chain (each wraps the previous reader)
4. Write to destination (bulk copy or merge)
5. Run post-scripts (e.g., rebuild indexes)
## Core Contracts
### IImportSource
Provides data to the pipeline. Returns an `IDataReader` that streams rows.
```csharp
public interface IImportSource : IAsyncDisposable
{
Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default);
string SourceName { get; }
}
IDataTransformer
Modifies data during transfer. Wraps the source reader in a decorator.
public interface IDataTransformer
{
IDataReader Transform(IDataReader source);
string TransformerName { get; }
int MapOrdinal(int transformedOrdinal, IDataReader source);
}
IImportDestination
Consumes data and writes to storage. Returns statistics about the operation.
public interface IImportDestination
{
Task<DestinationResult> WriteAsync(IDataReader source, CancellationToken cancellationToken = default);
string DestinationName { get; }
}
IScriptRunner
Executes SQL scripts before or after data transfer.
public interface IScriptRunner
{
Task ExecuteAsync(CancellationToken cancellationToken = default);
string ScriptName { get; }
}
Pipeline Execution
The EtlPipeline class orchestrates execution and tracks timing for each step:
public async Task<PipelineResult> ExecuteAsync(CancellationToken cancellationToken = default)
{
// 1. Run pre-scripts
foreach (var script in _preScripts)
{
var stepResult = await RunScriptAsync(script, cancellationToken);
steps.Add(stepResult);
}
// 2. Open source
await using (_source)
{
var reader = await _source.ReadDataAsync(cancellationToken);
// 3. Apply transformers
foreach (var transformer in _transformers)
{
reader = transformer.Transform(reader);
}
// 4. Write to destination
var destResult = await _destination.WriteAsync(reader, cancellationToken);
}
// 5. Run post-scripts
foreach (var script in _postScripts)
{
var stepResult = await RunScriptAsync(script, cancellationToken);
}
return PipelineResult.Succeeded(totalRows, totalStopwatch.Elapsed, steps);
}
Result Model
PipelineResult
public record PipelineResult(
bool Success,
long TotalRows,
TimeSpan Elapsed,
IReadOnlyList<StepResult> Steps,
Exception? Error = null);
StepResult
public record StepResult(
string StepName,
string StepType,
long RowsAffected,
TimeSpan Elapsed);
DestinationResult
public record DestinationResult(
long RowsProcessed,
int BatchCount,
TimeSpan Elapsed);
Related Documentation
- Sources - Writing custom data sources
- Transformers - Writing custom transformers
- Destinations - Writing destinations and scripts
- Configuration - Pipeline builder and DI setup
- Troubleshooting - Debugging and performance
**Step 3: Verify the file was created**
```bash
ls -la DOCUMENTATION/DataSync/
Expected: Overview.md exists
Step 4: Commit
git add DOCUMENTATION/DataSync/Overview.md
git commit -m "docs: add ETL pipeline overview documentation"
Task 2: Create Sources.md
Files:
- Create:
DOCUMENTATION/DataSync/Sources.md
Step 1: Write Sources.md with interface and DbQuerySource walkthrough
Create DOCUMENTATION/DataSync/Sources.md with:
# Data Sources
Sources provide data to the ETL pipeline by implementing `IImportSource`. They return an `IDataReader` that streams rows to transformers and destinations.
## Interface Contract
```csharp
public interface IImportSource : IAsyncDisposable
{
Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default);
string SourceName { get; }
}
Key requirements:
- Implement
IAsyncDisposablefor connection cleanup - Return a live
IDataReader(not buffered) for memory efficiency SourceNameis used in logging andStepResulttracking
DbQuerySource Implementation
DbQuerySource executes a SQL query against the local cache database:
public class DbQuerySource : IImportSource
{
private readonly IDbConnectionFactory _connectionFactory;
private readonly string _sql;
private readonly object? _parameters;
private readonly int _commandTimeout;
private SqlConnection? _connection;
private SqlCommand? _command;
public string SourceName { get; }
public DbQuerySource(
IDbConnectionFactory connectionFactory,
string sql,
string? name = null,
object? parameters = null,
int commandTimeout = 3600)
{
_connectionFactory = connectionFactory;
_sql = sql;
_parameters = parameters;
_commandTimeout = commandTimeout;
SourceName = $"DbQuery:{name ?? "Query"}";
}
Reading data
The connection opens in ReadDataAsync and stays open until disposal:
public async Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default)
{
_connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
_command = _connection.CreateCommand();
_command.CommandText = _sql;
_command.CommandTimeout = _commandTimeout;
AddParameters(_command, _parameters);
return await _command.ExecuteReaderAsync(cancellationToken);
}
Parameter handling
Parameters are added from an anonymous object using reflection:
private static void AddParameters(SqlCommand command, object? parameters)
{
if (parameters == null) return;
var properties = parameters.GetType().GetProperties();
foreach (var prop in properties)
{
var value = prop.GetValue(parameters) ?? DBNull.Value;
command.Parameters.AddWithValue($"@{prop.Name}", value);
}
}
Resource cleanup
Both the command and connection are disposed asynchronously:
public async ValueTask DisposeAsync()
{
if (_command != null)
{
await _command.DisposeAsync();
_command = null;
}
if (_connection != null)
{
await _connection.DisposeAsync();
_connection = null;
}
}
}
Key Patterns
Keep sources stateless until ReadDataAsync
Don't open connections or execute queries in the constructor. The source should be configurable without side effects until ReadDataAsync is called.
Streaming, not buffering
Return a live IDataReader rather than loading all data into memory. This allows processing millions of rows without memory pressure.
Use SourceName for diagnostics
Format: "DbQuery:{table}" or "File:{filename}". This appears in logs and StepResult.StepName.
Future source types
The interface supports additional source types not yet implemented:
- File-based sources - CSV, Excel files
- API sources - REST endpoints returning paged data
- Oracle/Sybase sources - Direct queries against JDE or CMS
Each would implement the same interface with different connection and reader implementations.
Related Documentation
- Overview - Pipeline architecture
- Transformers - Processing source data
- Configuration - Connection factory setup
**Step 2: Verify the file was created**
```bash
ls -la DOCUMENTATION/DataSync/Sources.md
Expected: File exists
Step 3: Commit
git add DOCUMENTATION/DataSync/Sources.md
git commit -m "docs: add ETL sources documentation"
Task 3: Create Transformers.md
Files:
- Create:
DOCUMENTATION/DataSync/Transformers.md
Step 1: Write Transformers.md with interface, base class, and three transformer walkthroughs
Create DOCUMENTATION/DataSync/Transformers.md with:
# Data Transformers
Transformers modify data as it flows through the pipeline. They wrap the source `IDataReader` in a decorator, allowing column renaming, dropping, type conversion, and computed columns.
## Interface Contract
```csharp
public interface IDataTransformer
{
IDataReader Transform(IDataReader source);
string TransformerName { get; }
int MapOrdinal(int transformedOrdinal, IDataReader source);
}
Key methods:
Transform()- Wraps the source reader, returns a new reader with modificationsTransformerName- Used in logging andStepResulttrackingMapOrdinal()- Maps transformed ordinals to source ordinals. Returns-1for computed columns.
DataTransformerBase
The base class provides default implementations and handles the decorator pattern:
public abstract class DataTransformerBase : IDataTransformer
{
public abstract string TransformerName { get; }
public IDataReader Transform(IDataReader source)
{
ArgumentNullException.ThrowIfNull(source);
OnInitialize(source);
return new TransformingDataReader(source, this);
}
protected virtual void OnInitialize(IDataReader source) { }
Default pass-through methods
Override only what you need to change:
public virtual int GetFieldCount(IDataReader source) => source.FieldCount;
public virtual string GetName(int ordinal, IDataReader source) => source.GetName(ordinal);
public virtual Type GetFieldType(int ordinal, IDataReader source) => source.GetFieldType(ordinal);
public virtual object GetValue(int ordinal, IDataReader source) => source.GetValue(ordinal);
public virtual int GetOrdinal(string name, IDataReader source) => source.GetOrdinal(name);
public virtual bool IsDBNull(int ordinal, IDataReader source) => source.IsDBNull(ordinal);
public virtual int MapOrdinal(int transformedOrdinal, IDataReader source) => transformedOrdinal;
Binary method handling
Computed columns (where MapOrdinal returns -1) throw NotSupportedException:
public virtual long GetBytes(int ordinal, long fieldOffset, byte[]? buffer,
int bufferOffset, int length, IDataReader source)
{
var sourceOrdinal = MapOrdinal(ordinal, source);
if (sourceOrdinal < 0)
throw new NotSupportedException(
$"GetBytes not supported for computed column at ordinal {ordinal}.");
return source.GetBytes(sourceOrdinal, fieldOffset, buffer, bufferOffset, length);
}
ColumnRenameTransformer
Renames columns without changing values or order:
public class ColumnRenameTransformer : DataTransformerBase
{
private readonly Dictionary<string, string> _renames;
private string[]? _outputNames;
private Dictionary<string, int>? _nameToOrdinal;
public override string TransformerName => $"RenameColumns:{_renames.Count}";
public ColumnRenameTransformer(params (string OldName, string NewName)[] renames)
{
_renames = renames.ToDictionary(
r => r.OldName, r => r.NewName, StringComparer.OrdinalIgnoreCase);
}
Collision detection
The transformer validates that renames don't create duplicate column names:
protected override void OnInitialize(IDataReader source)
{
_outputNames = new string[source.FieldCount];
_nameToOrdinal = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
for (int i = 0; i < source.FieldCount; i++)
{
var originalName = source.GetName(i);
var outputName = _renames.TryGetValue(originalName, out var newName)
? newName : originalName;
if (_nameToOrdinal.TryGetValue(outputName, out var existingOrdinal))
{
throw new InvalidOperationException(
$"Column name collision: '{originalName}' → '{outputName}' conflicts with " +
$"'{source.GetName(existingOrdinal)}' (already at ordinal {existingOrdinal}).");
}
_outputNames[i] = outputName;
_nameToOrdinal[outputName] = i;
}
}
ColumnDropTransformer
Removes specified columns from the output:
public class ColumnDropTransformer : DataTransformerBase
{
private readonly HashSet<string> _columnsToDrop;
private int[]? _ordinalMap;
private Dictionary<string, int>? _nameToOrdinal;
public override string TransformerName => $"DropColumns:{string.Join(",", _columnsToDrop)}";
public ColumnDropTransformer(params string[] columnsToDrop)
{
_columnsToDrop = new HashSet<string>(columnsToDrop, StringComparer.OrdinalIgnoreCase);
}
Ordinal mapping
Builds a map from output ordinals to source ordinals, excluding dropped columns:
protected override void OnInitialize(IDataReader source)
{
var ordinalList = new List<int>();
_nameToOrdinal = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
for (int i = 0; i < source.FieldCount; i++)
{
var name = source.GetName(i);
if (!_columnsToDrop.Contains(name))
{
_nameToOrdinal[name] = ordinalList.Count;
ordinalList.Add(i);
}
}
_ordinalMap = ordinalList.ToArray();
}
public override int MapOrdinal(int transformedOrdinal, IDataReader source)
=> _ordinalMap![transformedOrdinal];
JdeDateTransformer
Combines JDE Julian date (CYYDDD) and time (HHMMSS) columns into a single DateTime:
public class JdeDateTransformer : DataTransformerBase
{
public static readonly DateTime DefaultInvalidDateSentinel = new(1900, 1, 1);
private readonly string _dateColumn;
private readonly string _timeColumn;
private readonly string _outputColumn;
private readonly DateTime _invalidDateSentinel;
Computed column handling
The output DateTime column has no direct source ordinal, so MapOrdinal returns -1:
public override int MapOrdinal(int transformedOrdinal, IDataReader source)
{
var sourceOrdinal = _ordinalMap![transformedOrdinal];
return sourceOrdinal == _dateOrdinal ? -1 : sourceOrdinal;
}
public override string GetDataTypeName(int ordinal, IDataReader source)
{
var sourceOrdinal = _ordinalMap![ordinal];
return sourceOrdinal == _dateOrdinal ? "datetime" : source.GetDataTypeName(sourceOrdinal);
}
Date parsing with validation
Invalid dates return a configurable sentinel value (default: 1900-01-01):
public static DateTime ParseJdeDateTime(decimal julianDate, decimal time, DateTime sentinel)
{
var dateInt = (int)julianDate;
if (dateInt <= 0) return sentinel;
var century = dateInt / 100000;
var year = (dateInt / 1000) % 100;
var dayOfYear = dateInt % 1000;
if (century < 0 || century > 1) return sentinel;
if (year < 0 || year > 99) return sentinel;
if (dayOfYear < 1 || dayOfYear > 366) return sentinel;
var fullYear = (century == 0 ? 1900 : 2000) + year;
var daysInYear = DateTime.IsLeapYear(fullYear) ? 366 : 365;
if (dayOfYear > daysInYear) return sentinel;
var date = new DateTime(fullYear, 1, 1).AddDays(dayOfYear - 1);
// Parse time (HHMMSS format)
var timeInt = (int)time;
var hours = timeInt / 10000;
var minutes = (timeInt / 100) % 100;
var seconds = timeInt % 100;
if (hours < 0 || hours > 23) return sentinel;
if (minutes < 0 || minutes > 59) return sentinel;
if (seconds < 0 || seconds > 59) return sentinel;
return date.AddHours(hours).AddMinutes(minutes).AddSeconds(seconds);
}
Transformer Chaining
Transformers compose by wrapping each other. The pipeline applies them in order:
foreach (var transformer in _transformers)
{
reader = transformer.Transform(reader);
}
Each transformer sees the output of the previous one. Ordinal mappings accumulate through the chain.
Validation in OnInitialize
Perform all validation in OnInitialize() to fail fast before processing data:
- Check that required columns exist
- Validate rename mappings don't create collisions
- Build ordinal maps for efficient lookup during row processing
Related Documentation
- Overview - Pipeline architecture
- Sources - Data sources that feed transformers
- Destinations - Where transformed data goes
**Step 2: Verify the file was created**
```bash
ls -la DOCUMENTATION/DataSync/Transformers.md
Expected: File exists
Step 3: Commit
git add DOCUMENTATION/DataSync/Transformers.md
git commit -m "docs: add ETL transformers documentation"
Task 4: Create Destinations.md
Files:
- Create:
DOCUMENTATION/DataSync/Destinations.md
Step 1: Write Destinations.md with interface, both destinations, and script patterns
Create DOCUMENTATION/DataSync/Destinations.md with:
# Destinations and Scripts
Destinations consume data from the pipeline and write it to storage. Scripts run SQL operations before or after data transfer, commonly for index management.
## IImportDestination Contract
```csharp
public interface IImportDestination
{
Task<DestinationResult> WriteAsync(IDataReader source, CancellationToken cancellationToken = default);
string DestinationName { get; }
}
Key requirements:
- Consume the entire
IDataReaderinWriteAsync - Return
DestinationResultwith row count, batch count, and elapsed time DestinationNameis used in logging andStepResulttracking
DbBulkImportDestination
Full table refresh using TRUNCATE + bulk copy:
public class DbBulkImportDestination : IImportDestination
{
private const int DefaultBatchSize = 10000;
private const int DefaultCommandTimeoutSeconds = 600;
public DbBulkImportDestination(
IDbConnectionFactory connectionFactory,
string tableName,
int batchSize = 0,
int commandTimeoutSeconds = 0)
{
_batchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
_commandTimeoutSeconds = commandTimeoutSeconds > 0
? commandTimeoutSeconds : DefaultCommandTimeoutSeconds;
}
Column mapping
Queries destination schema and maps only matching columns. Extra source columns are ignored:
var destColumns = await GetDestinationColumnsAsync(connection, cancellationToken);
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = qualifiedName,
BatchSize = _batchSize,
BulkCopyTimeout = _commandTimeoutSeconds,
EnableStreaming = true
};
for (int i = 0; i < source.FieldCount; i++)
{
var columnName = source.GetName(i);
if (destColumns.Contains(columnName))
{
bulkCopy.ColumnMappings.Add(columnName, columnName);
}
}
if (bulkCopy.ColumnMappings.Count == 0)
throw new InvalidOperationException(
$"No columns from source exist in destination table '{_tableName}'.");
Destination column discovery
Uses INFORMATION_SCHEMA.COLUMNS with schema support:
private async Task<HashSet<string>> GetDestinationColumnsAsync(
SqlConnection connection, CancellationToken ct)
{
var (schema, table) = CommonScripts.ParseTableName(_tableName);
var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName";
var columns = await connection.QueryAsync<string>(
new CommandDefinition(sql, new { tableName = table, schemaName = schema },
commandTimeout: _commandTimeoutSeconds, cancellationToken: ct));
return columns.ToHashSet(StringComparer.OrdinalIgnoreCase);
}
DbBulkMergeDestination
Incremental updates using bulk copy to temp table + MERGE:
public class DbBulkMergeDestination : IImportDestination
{
public DbBulkMergeDestination(
IDbConnectionFactory connectionFactory,
string tableName,
string[] matchColumns,
string[]? updateColumns = null,
int batchSize = 0,
int commandTimeoutSeconds = 0)
{
if (matchColumns.Length == 0)
throw new ArgumentException("At least one match column is required.");
}
Batch processing
Creates a temp table, bulk copies in batches, then merges each batch:
var tempTableName = $"#ETL_{_tableName.Replace(".", "_").Replace("[", "").Replace("]", "")}";
await CreateTempTableAsync(connection, tempTableName, cancellationToken);
while (source.Read())
{
// Buffer rows into DataTable
if (batch.Rows.Count >= _batchSize)
{
await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, destColumns, ct);
totalRows += batch.Rows.Count;
batch.Clear();
}
}
MERGE SQL generation
Generates MERGE statement with configurable match and update columns:
private string BuildMergeSql(string tempTableName,
IReadOnlyList<string> allColumns, IReadOnlyList<string> updateColumns)
{
var qualifiedName = CommonScripts.FormatQualifiedTableName(_tableName);
var sb = new StringBuilder();
sb.AppendLine($"MERGE INTO {qualifiedName} AS target");
sb.AppendLine($"USING {tempTableName} AS source");
sb.Append("ON ");
sb.AppendLine(string.Join(" AND ",
_matchColumns.Select(c => $"target.[{c}] = source.[{c}]")));
if (updateColumns.Count > 0)
{
sb.AppendLine("WHEN MATCHED THEN UPDATE SET");
sb.AppendLine(string.Join(", ",
updateColumns.Select(c => $"target.[{c}] = source.[{c}]")));
}
sb.AppendLine("WHEN NOT MATCHED THEN INSERT");
sb.AppendLine($"({string.Join(", ", allColumns.Select(c => $"[{c}]"))})");
sb.AppendLine($"VALUES ({string.Join(", ", allColumns.Select(c => $"source.[{c}]"))});");
return sb.ToString();
}
Schema-Qualified Table Names
Both destinations support schema-qualified names via CommonScripts:
public static (string Schema, string Table) ParseTableName(string tableName)
{
var cleaned = tableName.Replace("[", "").Replace("]", "");
var parts = cleaned.Split('.', 2);
return parts.Length == 2 ? (parts[0], parts[1]) : ("dbo", parts[0]);
}
public static string FormatQualifiedTableName(string tableName)
{
var (schema, table) = ParseTableName(tableName);
return $"[{schema}].[{table}]";
}
Supported formats: "Table", "dbo.Table", "[dbo].[Table]"
Script Patterns
IScriptRunner Contract
public interface IScriptRunner
{
Task ExecuteAsync(CancellationToken cancellationToken = default);
string ScriptName { get; }
}
SqlScriptRunner Implementation
public class SqlScriptRunner : IScriptRunner
{
public SqlScriptRunner(
IDbConnectionFactory connectionFactory,
string sql,
string? name = null,
object? parameters = null,
int timeoutSeconds = 3600)
{
ScriptName = name ?? "SqlScript";
}
public async Task ExecuteAsync(CancellationToken cancellationToken = default)
{
await using var connection = await _connectionFactory
.CreateLotFinderConnectionAsync(cancellationToken);
await connection.ExecuteAsync(
new CommandDefinition(_sql, _parameters,
commandTimeout: _timeoutSeconds, cancellationToken: cancellationToken));
}
}
Common Scripts
CommonScripts provides factory methods for index management:
public static IScriptRunner DisableIndexes(
IDbConnectionFactory factory, string tableName, int timeoutSeconds = 300)
{
var (schema, table) = ParseTableName(tableName);
var sql = @"
DECLARE @sql NVARCHAR(MAX) = '';
DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName);
SELECT @sql = @sql + 'ALTER INDEX ' + QUOTENAME(i.name) + ' ON ' + @fullTableName + ' DISABLE;'
FROM sys.indexes i
INNER JOIN sys.tables t ON i.object_id = t.object_id
INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
WHERE t.name = @tableName AND s.name = @schemaName
AND i.type = 2 AND i.is_disabled = 0;
IF LEN(@sql) > 0 EXEC sp_executesql @sql;";
return new SqlScriptRunner(factory, sql, $"DisableIndexes:{schema}.{table}",
parameters: new { tableName = table, schemaName = schema },
timeoutSeconds: timeoutSeconds);
}
public static IScriptRunner RebuildIndexes(
IDbConnectionFactory factory, string tableName, int timeoutSeconds = 3600)
{
// Similar pattern with ALTER INDEX ALL ... REBUILD
}
public static IScriptRunner UpdateStatistics(
IDbConnectionFactory factory, string tableName, int timeoutSeconds = 600)
{
// UPDATE STATISTICS with QUOTENAME protection
}
SQL injection protection
All dynamic SQL uses QUOTENAME() for identifier escaping:
DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName);
When to use scripts
Use pre/post scripts for large bulk loads where index overhead matters:
var pipeline = new EtlPipelineBuilder()
.WithName("LargeTableSync")
.WithSource(source)
.WithPreScript(CommonScripts.DisableIndexes(factory, "WorkOrder"))
.WithDestination(new DbBulkImportDestination(factory, "WorkOrder"))
.WithPostScript(CommonScripts.RebuildIndexes(factory, "WorkOrder"))
.WithPostScript(CommonScripts.UpdateStatistics(factory, "WorkOrder"))
.Build();
Related Documentation
- Overview - Pipeline architecture
- Transformers - Data transformation
- Configuration - Timeout and batch size options
- Troubleshooting - Performance tuning
**Step 2: Verify the file was created**
```bash
ls -la DOCUMENTATION/DataSync/Destinations.md
Expected: File exists
Step 3: Commit
git add DOCUMENTATION/DataSync/Destinations.md
git commit -m "docs: add ETL destinations and scripts documentation"
Task 5: Create Configuration.md
Files:
- Create:
DOCUMENTATION/DataSync/Configuration.md
Step 1: Write Configuration.md with builder API, connection setup, and DI registration
Create DOCUMENTATION/DataSync/Configuration.md with:
# Configuration
This document covers pipeline builder configuration, connection factory setup, and dependency injection registration.
## Pipeline Builder API
`EtlPipelineBuilder` uses a fluent API to construct pipelines:
```csharp
var pipeline = new EtlPipelineBuilder()
.WithName("WorkOrderSync")
.WithSource(new DbQuerySource(factory, "SELECT * FROM Source.WorkOrders", "WorkOrders"))
.WithTransformer(new JdeDateTransformer("STRDJ", "TRDJ", "StartDate"))
.WithTransformer(new ColumnDropTransformer("STRDJ", "TRDJ"))
.WithPreScript(CommonScripts.DisableIndexes(factory, "WorkOrder"))
.WithDestination(new DbBulkMergeDestination(factory, "WorkOrder", new[] { "OrderNumber" }))
.WithPostScript(CommonScripts.RebuildIndexes(factory, "WorkOrder"))
.WithLogger(logger)
.Build();
Builder Methods
| Method | Required | Description |
|---|---|---|
WithName(string) |
No | Pipeline name for logging. Default: "Unnamed" |
WithSource(IImportSource) |
Yes | Data source. Throws if not set before Build() |
WithTransformer(IDataTransformer) |
No | Add transformer. Can be called multiple times (chained) |
WithDestination(IImportDestination) |
Yes | Data destination. Throws if not set before Build() |
WithPreScript(IScriptRunner) |
No | Script to run before data transfer. Can be called multiple times |
WithPostScript(IScriptRunner) |
No | Script to run after data transfer. Can be called multiple times |
WithCommandTimeout(TimeSpan) |
No | Default timeout. Range: 0-24 hours. Default: 600s |
WithLogger(ILogger<EtlPipeline>) |
No | Logger for pipeline events. Default: NullLogger |
WithCommandTimeout Validation
public EtlPipelineBuilder WithCommandTimeout(TimeSpan timeout)
{
if (timeout < TimeSpan.Zero || timeout > TimeSpan.FromHours(24))
throw new ArgumentOutOfRangeException(nameof(timeout),
"Timeout must be between 0 and 24 hours.");
_defaultCommandTimeoutSeconds = (int)timeout.TotalSeconds;
return this;
}
Build Validation
public EtlPipeline Build()
{
if (_source == null)
throw new InvalidOperationException(
"Source is required. Call WithSource() before Build().");
if (_destination == null)
throw new InvalidOperationException(
"Destination is required. Call WithDestination() before Build().");
return new EtlPipeline(_name, _source, _transformers, _destination,
_preScripts, _postScripts, _logger ?? NullLogger<EtlPipeline>.Instance);
}
Component Configuration
DbQuerySource Options
| Parameter | Default | Description |
|---|---|---|
connectionFactory |
Required | Factory for database connections |
sql |
Required | SQL query to execute |
name |
"Query" |
Name for logging (appears as DbQuery:{name}) |
parameters |
null |
Anonymous object for query parameters |
commandTimeout |
3600 |
Query timeout in seconds |
DbBulkImportDestination Options
| Parameter | Default | Description |
|---|---|---|
connectionFactory |
Required | Factory for database connections |
tableName |
Required | Destination table (supports schema: dbo.Table) |
batchSize |
10000 |
Rows per batch for progress tracking |
commandTimeoutSeconds |
600 |
Timeout for TRUNCATE and bulk copy |
DbBulkMergeDestination Options
| Parameter | Default | Description |
|---|---|---|
connectionFactory |
Required | Factory for database connections |
tableName |
Required | Destination table (supports schema: dbo.Table) |
matchColumns |
Required | Key columns for MERGE matching |
updateColumns |
All non-match | Columns to update on match |
batchSize |
10000 |
Rows per batch |
commandTimeoutSeconds |
600 |
Timeout for bulk copy and MERGE |
Script Timeout Defaults
| Script | Default Timeout |
|---|---|
DisableIndexes |
300s (5 min) |
RebuildIndexes |
3600s (1 hour) |
UpdateStatistics |
600s (10 min) |
SqlScriptRunner |
3600s (1 hour) |
Connection Factory Setup
The pipeline uses IDbConnectionFactory for database connections. Register it with your connection strings:
services.AddSingleton<IDbConnectionFactory>(sp =>
{
var configuration = sp.GetRequiredService<IConfiguration>();
return new DbConnectionFactory(
configuration.GetConnectionString("LotFinder"),
configuration.GetConnectionString("JDE"),
configuration.GetConnectionString("CMS"));
});
Connection string examples
{
"ConnectionStrings": {
"LotFinder": "Server=localhost,1434;Database=LotFinder;User Id=scopingapp;Password=...;TrustServerCertificate=true",
"JDE": "Data Source=jde-oracle;User Id=...;Password=...",
"CMS": "Data Source=cms-sybase;User Id=...;Password=..."
}
}
Dependency Injection Registration
Basic registration
services.AddEtlPipeline();
This registers EtlPipelineBuilder as transient so each request gets a fresh builder.
Extension method implementation
public static class EtlServiceCollectionExtensions
{
public static IServiceCollection AddEtlPipeline(this IServiceCollection services)
{
services.AddTransient<EtlPipelineBuilder>();
return services;
}
}
Full registration example
public static IServiceCollection AddDataSync(this IServiceCollection services)
{
// Connection factory (singleton - manages connection pooling)
services.AddSingleton<IDbConnectionFactory, DbConnectionFactory>();
// ETL pipeline builder (transient - fresh instance per use)
services.AddEtlPipeline();
// Background service for scheduled syncs
services.AddHostedService<DataSyncService>();
return services;
}
Using the builder in a service
public class DataSyncService : BackgroundService
{
private readonly EtlPipelineBuilder _pipelineBuilder;
private readonly IDbConnectionFactory _connectionFactory;
private readonly ILogger<EtlPipeline> _pipelineLogger;
public DataSyncService(
EtlPipelineBuilder pipelineBuilder,
IDbConnectionFactory connectionFactory,
ILogger<EtlPipeline> pipelineLogger)
{
_pipelineBuilder = pipelineBuilder;
_connectionFactory = connectionFactory;
_pipelineLogger = pipelineLogger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var pipeline = _pipelineBuilder
.WithName("WorkOrderSync")
.WithSource(new DbQuerySource(_connectionFactory, "SELECT * FROM JDE.WorkOrders"))
.WithDestination(new DbBulkImportDestination(_connectionFactory, "WorkOrder"))
.WithLogger(_pipelineLogger)
.Build();
var result = await pipeline.ExecuteAsync(stoppingToken);
}
}
Configuration Summary
| Component | Option | Default | Valid Range |
|---|---|---|---|
EtlPipelineBuilder |
WithCommandTimeout |
600s | 0-24 hours |
DbQuerySource |
commandTimeout |
3600s | > 0 |
DbBulkImportDestination |
batchSize |
10000 | > 0 |
DbBulkImportDestination |
commandTimeoutSeconds |
600s | > 0 |
DbBulkMergeDestination |
batchSize |
10000 | > 0 |
DbBulkMergeDestination |
commandTimeoutSeconds |
600s | > 0 |
Related Documentation
- Overview - Pipeline architecture
- Destinations - Destination-specific options
- Troubleshooting - Timeout and batch size tuning
**Step 2: Verify the file was created**
```bash
ls -la DOCUMENTATION/DataSync/Configuration.md
Expected: File exists
Step 3: Commit
git add DOCUMENTATION/DataSync/Configuration.md
git commit -m "docs: add ETL configuration documentation"
Task 6: Create Troubleshooting.md
Files:
- Create:
DOCUMENTATION/DataSync/Troubleshooting.md
Step 1: Write Troubleshooting.md with error catalog, debugging patterns, and performance tuning
Create DOCUMENTATION/DataSync/Troubleshooting.md with:
# Troubleshooting
This document covers common errors, debugging patterns, and performance tuning for the ETL pipeline.
## Common Errors
### Column mapping errors
| Error | Cause | Resolution |
|-------|-------|------------|
| "No columns from source exist in destination table" | Source column names don't match destination | Check source query column aliases match destination table columns exactly (case-insensitive) |
| "Column name collision" | Transformer creates duplicate column names | Review rename mappings; ensure no two columns map to the same output name |
| "Column '{name}' not found or was dropped" | Accessing a column that was dropped | Check transformer chain order; don't access dropped columns in later transformers |
### Computed column errors
| Error | Cause | Resolution |
|-------|-------|------------|
| "GetBytes not supported for computed column at ordinal N" | Binary access on transformed column | Use `GetValue()` instead; computed columns (like JDE dates) don't support binary access |
| "GetChars not supported for computed column at ordinal N" | Same as above | Use `GetValue()` or `GetString()` |
| "GetData not supported for computed column at ordinal N" | Same as above | Computed columns can't return nested readers |
### Timeout errors
| Error | Cause | Resolution |
|-------|-------|------------|
| `SqlException: Timeout expired` during bulk copy | Large dataset, slow network | Increase `commandTimeoutSeconds` on destination |
| `SqlException: Timeout expired` during MERGE | Many rows to match | Increase timeout; consider smaller batches |
| `SqlException: Timeout expired` during script | Index rebuild on large table | Increase script `timeoutSeconds` (default 3600s for rebuild) |
### Validation errors
| Error | Cause | Resolution |
|-------|-------|------------|
| "Source is required. Call WithSource() before Build()" | Missing source in pipeline | Add `.WithSource()` to builder chain |
| "Destination is required. Call WithDestination() before Build()" | Missing destination in pipeline | Add `.WithDestination()` to builder chain |
| "At least one match column is required" | Empty matchColumns array | Provide key columns for MERGE matching |
| "Timeout must be between 0 and 24 hours" | Invalid timeout value | Use `TimeSpan` between 0 and 24 hours |
## Debugging Patterns
### Inspecting pipeline results
Check `PipelineResult` after execution to understand what happened:
```csharp
var result = await pipeline.ExecuteAsync(cancellationToken);
if (!result.Success)
{
logger.LogError(result.Error, "Pipeline failed after {Rows} rows in {Elapsed}",
result.TotalRows, result.Elapsed);
// Find which step failed
var lastStep = result.Steps.LastOrDefault();
if (lastStep != null)
{
logger.LogError("Failed at step: {Step} ({Type})",
lastStep.StepName, lastStep.StepType);
}
}
Tracking step-by-step progress
Each step records timing and row counts:
foreach (var step in result.Steps)
{
logger.LogInformation("Step {Name} ({Type}): {Rows} rows in {Elapsed}ms",
step.StepName,
step.StepType,
step.RowsAffected,
step.Elapsed.TotalMilliseconds);
}
Enabling detailed logging
Inject a logger into the pipeline for execution-level logging:
var pipeline = new EtlPipelineBuilder()
.WithName("DebugPipeline")
.WithSource(source)
.WithDestination(destination)
.WithLogger(loggerFactory.CreateLogger<EtlPipeline>())
.Build();
Pipeline logs include:
Information: Pipeline start/complete with row countsDebug: Individual script executionError: Failure with exception and last step
Identifying the failure point
When a pipeline fails, PipelineResult.Steps contains all completed steps:
if (!result.Success)
{
// Steps completed before failure
var completedSteps = result.Steps.Select(s => s.StepName);
logger.LogError("Completed steps: {Steps}", string.Join(" → ", completedSteps));
// The exception contains root cause
logger.LogError(result.Error, "Root cause");
}
Performance Tuning
Batch size optimization
Default batch size is 10,000 rows. Adjust based on row width:
| Row Size | Recommended Batch Size |
|---|---|
| Narrow (< 20 columns) | 10,000 - 50,000 |
| Medium (20-50 columns) | 5,000 - 10,000 |
| Wide (> 50 columns) | 1,000 - 5,000 |
// Large batch for narrow rows
new DbBulkImportDestination(factory, "LookupTable", batchSize: 50000)
// Small batch for wide rows
new DbBulkMergeDestination(factory, "DetailTable", matchColumns, batchSize: 2000)
Index management for bulk loads
Disable indexes before large imports, rebuild after:
var pipeline = new EtlPipelineBuilder()
.WithName("FullTableRefresh")
.WithPreScript(CommonScripts.DisableIndexes(factory, "LargeTable"))
.WithSource(source)
.WithDestination(new DbBulkImportDestination(factory, "LargeTable"))
.WithPostScript(CommonScripts.RebuildIndexes(factory, "LargeTable"))
.WithPostScript(CommonScripts.UpdateStatistics(factory, "LargeTable"))
.Build();
When to use:
- Full table refreshes (TRUNCATE + import)
- Tables with 3+ non-clustered indexes
- Import of 100,000+ rows
When to skip:
- Incremental merges with few rows
- Tables with only a clustered index
- Frequent small updates
Timeout sizing guidelines
| Operation | Rows | Suggested Timeout |
|---|---|---|
| Bulk import | < 100K | 600s (default) |
| Bulk import | 100K - 1M | 1800s (30 min) |
| Bulk import | > 1M | 3600s (1 hour) |
| Bulk merge | < 50K | 600s (default) |
| Bulk merge | 50K - 500K | 1800s (30 min) |
| Index rebuild | Any | 3600s (default) |
// Large table with extended timeout
new DbBulkMergeDestination(factory, "HistoricalData",
matchColumns: new[] { "RecordId" },
commandTimeoutSeconds: 1800)
Reducing network and memory usage
Select only needed columns in the source query:
// Good - select only needed columns
var source = new DbQuerySource(factory,
"SELECT OrderNumber, Status, StartDate FROM JDE.WorkOrders");
// Avoid - selecting all columns wastes bandwidth
var source = new DbQuerySource(factory,
"SELECT * FROM JDE.WorkOrders");
Extra columns in the source are ignored by the destination column mapping, but they still consume network bandwidth and memory.
Monitoring baseline performance
Track PipelineResult.Elapsed over time to detect degradation:
var result = await pipeline.ExecuteAsync(ct);
metrics.RecordPipeline(
pipelineName: pipeline.PipelineName,
success: result.Success,
rows: result.TotalRows,
durationMs: result.Elapsed.TotalMilliseconds);
// Alert if duration exceeds baseline by 50%
if (result.Elapsed > baselineDuration * 1.5)
{
logger.LogWarning("Pipeline {Name} took {Elapsed} (baseline: {Baseline})",
pipeline.PipelineName, result.Elapsed, baselineDuration);
}
Step-level performance analysis
Identify slow steps using StepResult.Elapsed:
var slowSteps = result.Steps
.Where(s => s.Elapsed > TimeSpan.FromSeconds(30))
.OrderByDescending(s => s.Elapsed);
foreach (var step in slowSteps)
{
logger.LogWarning("Slow step: {Name} took {Elapsed}",
step.StepName, step.Elapsed);
}
Related Documentation
- Overview - Pipeline architecture
- Configuration - Timeout and batch size options
- Destinations - Script patterns for index management
**Step 2: Verify the file was created**
```bash
ls -la DOCUMENTATION/DataSync/Troubleshooting.md
Expected: File exists
Step 3: Commit
git add DOCUMENTATION/DataSync/Troubleshooting.md
git commit -m "docs: add ETL troubleshooting documentation"
Task 7: Update ComponentMap.md
Files:
- Modify:
DOCUMENTATION/Instructions/ComponentMap.md
Step 1: Read the current ComponentMap.md
Read DOCUMENTATION/Instructions/ComponentMap.md to find where to add the ETL source mapping.
Step 2: Add ETL source paths to the DataSync section
Add the new ETL source paths to the DataSync section:
### DataSync/
Documents data synchronization from enterprise systems.
**Source paths (Legacy):**
- `OLD/DataModel/Process/JDE*.cs` - JDE Oracle queries
- `OLD/DataModel/Process/CMS*.cs` - CMS Sybase queries
- `OLD/WorkerService/Process/UpdateProcessor.cs` - Sync orchestration
- `OLD/WorkerService/dsconfig/*.json` - Data source configs
**Source paths (New):**
- `NEW/src/JdeScoping.DataSync/Etl/` - ETL pipeline framework
- `NEW/src/JdeScoping.DataSync/Etl/Contracts/` - Core interfaces
- `NEW/src/JdeScoping.DataSync/Etl/Pipeline/` - Pipeline and builder
- `NEW/src/JdeScoping.DataSync/Etl/Sources/` - Data sources
- `NEW/src/JdeScoping.DataSync/Etl/Transformers/` - Data transformers
- `NEW/src/JdeScoping.DataSync/Etl/Destinations/` - Bulk copy/merge destinations
- `NEW/src/JdeScoping.DataSync/Etl/Scripts/` - SQL script runners
- `NEW/src/JdeScoping.DataSync/Etl/Results/` - Execution result types
**Typical files:**
- `Overview.md` - ETL pipeline architecture
- `Sources.md` - Writing custom data sources
- `Transformers.md` - Writing custom transformers
- `Destinations.md` - Bulk destinations and scripts
- `Configuration.md` - Pipeline builder and DI setup
- `Troubleshooting.md` - Debugging and performance
- `JDE.md` - JD Edwards (Oracle) integration
- `CMS.md` - CMS (Sybase) integration
- `Scheduling.md` - Mass/daily/hourly sync schedules
Step 3: Verify the edit is correct
Read the modified section to confirm the changes.
Step 4: Commit
git add DOCUMENTATION/Instructions/ComponentMap.md
git commit -m "docs: add ETL source paths to ComponentMap"
Task 8: Final verification
Step 1: List all created documentation files
ls -la DOCUMENTATION/DataSync/
Expected output:
Configuration.md
Destinations.md
Overview.md
Sources.md
Transformers.md
Troubleshooting.md
Step 2: Verify all internal links resolve
Check that all cross-references between docs point to existing files:
grep -h "\[.*\](\./" DOCUMENTATION/DataSync/*.md | sort -u
All referenced files should exist.
Step 3: Check git log for all commits
git log --oneline -8
Expected: 7 commits for the documentation (6 docs + 1 ComponentMap update)
Step 4: Final status check
git status
Expected: Clean working tree