# ETL Pipeline Documentation Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Create 6 documentation files for the ETL pipeline covering architecture, extension patterns, configuration, and troubleshooting. **Architecture:** Each task creates one markdown file in `DOCUMENTATION/DataSync/`. Code snippets come directly from source files. Verification checks that links resolve and code matches source. **Tech Stack:** Markdown, code from `NEW/src/JdeScoping.DataSync/Etl/` --- ### Task 1: Create DataSync folder and Overview.md **Files:** - Create: `DOCUMENTATION/DataSync/Overview.md` **Step 1: Create the DataSync directory** ```bash mkdir -p DOCUMENTATION/DataSync ``` **Step 2: Write Overview.md with architecture and core concepts** Create `DOCUMENTATION/DataSync/Overview.md` with: ```markdown # ETL Pipeline The ETL pipeline streams data from enterprise sources (JDE, CMS) through transformations into SQL Server cache tables. It supports batched processing, pre/post scripts for index management, and detailed execution tracking. ## Architecture ``` ┌─────────────┐ ┌──────────────────┐ ┌─────────────────┐ │ IImportSource│───▶│ IDataTransformer │───▶│IImportDestination│ └─────────────┘ │ (chain of N) │ └─────────────────┘ └──────────────────┘ ▲ │ │ ┌──────────────┐ │ └─────────│ Pre-Scripts │ ▼ └──────────────┘ ┌──────────────┐ │ Post-Scripts │ └──────────────┘ ``` **Execution flow:** 1. Run pre-scripts (e.g., disable indexes) 2. Open source and get `IDataReader` 3. Apply transformer chain (each wraps the previous reader) 4. Write to destination (bulk copy or merge) 5. Run post-scripts (e.g., rebuild indexes) ## Core Contracts ### IImportSource Provides data to the pipeline. Returns an `IDataReader` that streams rows. ```csharp public interface IImportSource : IAsyncDisposable { Task ReadDataAsync(CancellationToken cancellationToken = default); string SourceName { get; } } ``` ### IDataTransformer Modifies data during transfer. Wraps the source reader in a decorator. ```csharp public interface IDataTransformer { IDataReader Transform(IDataReader source); string TransformerName { get; } int MapOrdinal(int transformedOrdinal, IDataReader source); } ``` ### IImportDestination Consumes data and writes to storage. Returns statistics about the operation. ```csharp public interface IImportDestination { Task WriteAsync(IDataReader source, CancellationToken cancellationToken = default); string DestinationName { get; } } ``` ### IScriptRunner Executes SQL scripts before or after data transfer. ```csharp public interface IScriptRunner { Task ExecuteAsync(CancellationToken cancellationToken = default); string ScriptName { get; } } ``` ## Pipeline Execution The `EtlPipeline` class orchestrates execution and tracks timing for each step: ```csharp public async Task ExecuteAsync(CancellationToken cancellationToken = default) { // 1. Run pre-scripts foreach (var script in _preScripts) { var stepResult = await RunScriptAsync(script, cancellationToken); steps.Add(stepResult); } // 2. Open source await using (_source) { var reader = await _source.ReadDataAsync(cancellationToken); // 3. Apply transformers foreach (var transformer in _transformers) { reader = transformer.Transform(reader); } // 4. Write to destination var destResult = await _destination.WriteAsync(reader, cancellationToken); } // 5. Run post-scripts foreach (var script in _postScripts) { var stepResult = await RunScriptAsync(script, cancellationToken); } return PipelineResult.Succeeded(totalRows, totalStopwatch.Elapsed, steps); } ``` ## Result Model ### PipelineResult ```csharp public record PipelineResult( bool Success, long TotalRows, TimeSpan Elapsed, IReadOnlyList Steps, Exception? Error = null); ``` ### StepResult ```csharp public record StepResult( string StepName, string StepType, long RowsAffected, TimeSpan Elapsed); ``` ### DestinationResult ```csharp public record DestinationResult( long RowsProcessed, int BatchCount, TimeSpan Elapsed); ``` ## Related Documentation - [Sources](./Sources.md) - Writing custom data sources - [Transformers](./Transformers.md) - Writing custom transformers - [Destinations](./Destinations.md) - Writing destinations and scripts - [Configuration](./Configuration.md) - Pipeline builder and DI setup - [Troubleshooting](./Troubleshooting.md) - Debugging and performance ``` **Step 3: Verify the file was created** ```bash ls -la DOCUMENTATION/DataSync/ ``` Expected: `Overview.md` exists **Step 4: Commit** ```bash git add DOCUMENTATION/DataSync/Overview.md git commit -m "docs: add ETL pipeline overview documentation" ``` --- ### Task 2: Create Sources.md **Files:** - Create: `DOCUMENTATION/DataSync/Sources.md` **Step 1: Write Sources.md with interface and DbQuerySource walkthrough** Create `DOCUMENTATION/DataSync/Sources.md` with: ```markdown # Data Sources Sources provide data to the ETL pipeline by implementing `IImportSource`. They return an `IDataReader` that streams rows to transformers and destinations. ## Interface Contract ```csharp public interface IImportSource : IAsyncDisposable { Task ReadDataAsync(CancellationToken cancellationToken = default); string SourceName { get; } } ``` **Key requirements:** - Implement `IAsyncDisposable` for connection cleanup - Return a live `IDataReader` (not buffered) for memory efficiency - `SourceName` is used in logging and `StepResult` tracking ## DbQuerySource Implementation `DbQuerySource` executes a SQL query against the local cache database: ```csharp public class DbQuerySource : IImportSource { private readonly IDbConnectionFactory _connectionFactory; private readonly string _sql; private readonly object? _parameters; private readonly int _commandTimeout; private SqlConnection? _connection; private SqlCommand? _command; public string SourceName { get; } public DbQuerySource( IDbConnectionFactory connectionFactory, string sql, string? name = null, object? parameters = null, int commandTimeout = 3600) { _connectionFactory = connectionFactory; _sql = sql; _parameters = parameters; _commandTimeout = commandTimeout; SourceName = $"DbQuery:{name ?? "Query"}"; } ``` ### Reading data The connection opens in `ReadDataAsync` and stays open until disposal: ```csharp public async Task ReadDataAsync(CancellationToken cancellationToken = default) { _connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken); _command = _connection.CreateCommand(); _command.CommandText = _sql; _command.CommandTimeout = _commandTimeout; AddParameters(_command, _parameters); return await _command.ExecuteReaderAsync(cancellationToken); } ``` ### Parameter handling Parameters are added from an anonymous object using reflection: ```csharp private static void AddParameters(SqlCommand command, object? parameters) { if (parameters == null) return; var properties = parameters.GetType().GetProperties(); foreach (var prop in properties) { var value = prop.GetValue(parameters) ?? DBNull.Value; command.Parameters.AddWithValue($"@{prop.Name}", value); } } ``` ### Resource cleanup Both the command and connection are disposed asynchronously: ```csharp public async ValueTask DisposeAsync() { if (_command != null) { await _command.DisposeAsync(); _command = null; } if (_connection != null) { await _connection.DisposeAsync(); _connection = null; } } } ``` ## Key Patterns ### Keep sources stateless until ReadDataAsync Don't open connections or execute queries in the constructor. The source should be configurable without side effects until `ReadDataAsync` is called. ### Streaming, not buffering Return a live `IDataReader` rather than loading all data into memory. This allows processing millions of rows without memory pressure. ### Use SourceName for diagnostics Format: `"DbQuery:{table}"` or `"File:{filename}"`. This appears in logs and `StepResult.StepName`. ## Future source types The interface supports additional source types not yet implemented: - **File-based sources** - CSV, Excel files - **API sources** - REST endpoints returning paged data - **Oracle/Sybase sources** - Direct queries against JDE or CMS Each would implement the same interface with different connection and reader implementations. ## Related Documentation - [Overview](./Overview.md) - Pipeline architecture - [Transformers](./Transformers.md) - Processing source data - [Configuration](./Configuration.md) - Connection factory setup ``` **Step 2: Verify the file was created** ```bash ls -la DOCUMENTATION/DataSync/Sources.md ``` Expected: File exists **Step 3: Commit** ```bash git add DOCUMENTATION/DataSync/Sources.md git commit -m "docs: add ETL sources documentation" ``` --- ### Task 3: Create Transformers.md **Files:** - Create: `DOCUMENTATION/DataSync/Transformers.md` **Step 1: Write Transformers.md with interface, base class, and three transformer walkthroughs** Create `DOCUMENTATION/DataSync/Transformers.md` with: ```markdown # Data Transformers Transformers modify data as it flows through the pipeline. They wrap the source `IDataReader` in a decorator, allowing column renaming, dropping, type conversion, and computed columns. ## Interface Contract ```csharp public interface IDataTransformer { IDataReader Transform(IDataReader source); string TransformerName { get; } int MapOrdinal(int transformedOrdinal, IDataReader source); } ``` **Key methods:** - `Transform()` - Wraps the source reader, returns a new reader with modifications - `TransformerName` - Used in logging and `StepResult` tracking - `MapOrdinal()` - Maps transformed ordinals to source ordinals. Returns `-1` for computed columns. ## DataTransformerBase The base class provides default implementations and handles the decorator pattern: ```csharp public abstract class DataTransformerBase : IDataTransformer { public abstract string TransformerName { get; } public IDataReader Transform(IDataReader source) { ArgumentNullException.ThrowIfNull(source); OnInitialize(source); return new TransformingDataReader(source, this); } protected virtual void OnInitialize(IDataReader source) { } ``` ### Default pass-through methods Override only what you need to change: ```csharp public virtual int GetFieldCount(IDataReader source) => source.FieldCount; public virtual string GetName(int ordinal, IDataReader source) => source.GetName(ordinal); public virtual Type GetFieldType(int ordinal, IDataReader source) => source.GetFieldType(ordinal); public virtual object GetValue(int ordinal, IDataReader source) => source.GetValue(ordinal); public virtual int GetOrdinal(string name, IDataReader source) => source.GetOrdinal(name); public virtual bool IsDBNull(int ordinal, IDataReader source) => source.IsDBNull(ordinal); public virtual int MapOrdinal(int transformedOrdinal, IDataReader source) => transformedOrdinal; ``` ### Binary method handling Computed columns (where `MapOrdinal` returns `-1`) throw `NotSupportedException`: ```csharp public virtual long GetBytes(int ordinal, long fieldOffset, byte[]? buffer, int bufferOffset, int length, IDataReader source) { var sourceOrdinal = MapOrdinal(ordinal, source); if (sourceOrdinal < 0) throw new NotSupportedException( $"GetBytes not supported for computed column at ordinal {ordinal}."); return source.GetBytes(sourceOrdinal, fieldOffset, buffer, bufferOffset, length); } ``` ## ColumnRenameTransformer Renames columns without changing values or order: ```csharp public class ColumnRenameTransformer : DataTransformerBase { private readonly Dictionary _renames; private string[]? _outputNames; private Dictionary? _nameToOrdinal; public override string TransformerName => $"RenameColumns:{_renames.Count}"; public ColumnRenameTransformer(params (string OldName, string NewName)[] renames) { _renames = renames.ToDictionary( r => r.OldName, r => r.NewName, StringComparer.OrdinalIgnoreCase); } ``` ### Collision detection The transformer validates that renames don't create duplicate column names: ```csharp protected override void OnInitialize(IDataReader source) { _outputNames = new string[source.FieldCount]; _nameToOrdinal = new Dictionary(StringComparer.OrdinalIgnoreCase); for (int i = 0; i < source.FieldCount; i++) { var originalName = source.GetName(i); var outputName = _renames.TryGetValue(originalName, out var newName) ? newName : originalName; if (_nameToOrdinal.TryGetValue(outputName, out var existingOrdinal)) { throw new InvalidOperationException( $"Column name collision: '{originalName}' → '{outputName}' conflicts with " + $"'{source.GetName(existingOrdinal)}' (already at ordinal {existingOrdinal})."); } _outputNames[i] = outputName; _nameToOrdinal[outputName] = i; } } ``` ## ColumnDropTransformer Removes specified columns from the output: ```csharp public class ColumnDropTransformer : DataTransformerBase { private readonly HashSet _columnsToDrop; private int[]? _ordinalMap; private Dictionary? _nameToOrdinal; public override string TransformerName => $"DropColumns:{string.Join(",", _columnsToDrop)}"; public ColumnDropTransformer(params string[] columnsToDrop) { _columnsToDrop = new HashSet(columnsToDrop, StringComparer.OrdinalIgnoreCase); } ``` ### Ordinal mapping Builds a map from output ordinals to source ordinals, excluding dropped columns: ```csharp protected override void OnInitialize(IDataReader source) { var ordinalList = new List(); _nameToOrdinal = new Dictionary(StringComparer.OrdinalIgnoreCase); for (int i = 0; i < source.FieldCount; i++) { var name = source.GetName(i); if (!_columnsToDrop.Contains(name)) { _nameToOrdinal[name] = ordinalList.Count; ordinalList.Add(i); } } _ordinalMap = ordinalList.ToArray(); } public override int MapOrdinal(int transformedOrdinal, IDataReader source) => _ordinalMap![transformedOrdinal]; ``` ## JdeDateTransformer Combines JDE Julian date (CYYDDD) and time (HHMMSS) columns into a single `DateTime`: ```csharp public class JdeDateTransformer : DataTransformerBase { public static readonly DateTime DefaultInvalidDateSentinel = new(1900, 1, 1); private readonly string _dateColumn; private readonly string _timeColumn; private readonly string _outputColumn; private readonly DateTime _invalidDateSentinel; ``` ### Computed column handling The output `DateTime` column has no direct source ordinal, so `MapOrdinal` returns `-1`: ```csharp public override int MapOrdinal(int transformedOrdinal, IDataReader source) { var sourceOrdinal = _ordinalMap![transformedOrdinal]; return sourceOrdinal == _dateOrdinal ? -1 : sourceOrdinal; } public override string GetDataTypeName(int ordinal, IDataReader source) { var sourceOrdinal = _ordinalMap![ordinal]; return sourceOrdinal == _dateOrdinal ? "datetime" : source.GetDataTypeName(sourceOrdinal); } ``` ### Date parsing with validation Invalid dates return a configurable sentinel value (default: 1900-01-01): ```csharp public static DateTime ParseJdeDateTime(decimal julianDate, decimal time, DateTime sentinel) { var dateInt = (int)julianDate; if (dateInt <= 0) return sentinel; var century = dateInt / 100000; var year = (dateInt / 1000) % 100; var dayOfYear = dateInt % 1000; if (century < 0 || century > 1) return sentinel; if (year < 0 || year > 99) return sentinel; if (dayOfYear < 1 || dayOfYear > 366) return sentinel; var fullYear = (century == 0 ? 1900 : 2000) + year; var daysInYear = DateTime.IsLeapYear(fullYear) ? 366 : 365; if (dayOfYear > daysInYear) return sentinel; var date = new DateTime(fullYear, 1, 1).AddDays(dayOfYear - 1); // Parse time (HHMMSS format) var timeInt = (int)time; var hours = timeInt / 10000; var minutes = (timeInt / 100) % 100; var seconds = timeInt % 100; if (hours < 0 || hours > 23) return sentinel; if (minutes < 0 || minutes > 59) return sentinel; if (seconds < 0 || seconds > 59) return sentinel; return date.AddHours(hours).AddMinutes(minutes).AddSeconds(seconds); } ``` ## Transformer Chaining Transformers compose by wrapping each other. The pipeline applies them in order: ```csharp foreach (var transformer in _transformers) { reader = transformer.Transform(reader); } ``` Each transformer sees the output of the previous one. Ordinal mappings accumulate through the chain. ## Validation in OnInitialize Perform all validation in `OnInitialize()` to fail fast before processing data: - Check that required columns exist - Validate rename mappings don't create collisions - Build ordinal maps for efficient lookup during row processing ## Related Documentation - [Overview](./Overview.md) - Pipeline architecture - [Sources](./Sources.md) - Data sources that feed transformers - [Destinations](./Destinations.md) - Where transformed data goes ``` **Step 2: Verify the file was created** ```bash ls -la DOCUMENTATION/DataSync/Transformers.md ``` Expected: File exists **Step 3: Commit** ```bash git add DOCUMENTATION/DataSync/Transformers.md git commit -m "docs: add ETL transformers documentation" ``` --- ### Task 4: Create Destinations.md **Files:** - Create: `DOCUMENTATION/DataSync/Destinations.md` **Step 1: Write Destinations.md with interface, both destinations, and script patterns** Create `DOCUMENTATION/DataSync/Destinations.md` with: ```markdown # Destinations and Scripts Destinations consume data from the pipeline and write it to storage. Scripts run SQL operations before or after data transfer, commonly for index management. ## IImportDestination Contract ```csharp public interface IImportDestination { Task WriteAsync(IDataReader source, CancellationToken cancellationToken = default); string DestinationName { get; } } ``` **Key requirements:** - Consume the entire `IDataReader` in `WriteAsync` - Return `DestinationResult` with row count, batch count, and elapsed time - `DestinationName` is used in logging and `StepResult` tracking ## DbBulkImportDestination Full table refresh using TRUNCATE + bulk copy: ```csharp public class DbBulkImportDestination : IImportDestination { private const int DefaultBatchSize = 10000; private const int DefaultCommandTimeoutSeconds = 600; public DbBulkImportDestination( IDbConnectionFactory connectionFactory, string tableName, int batchSize = 0, int commandTimeoutSeconds = 0) { _batchSize = batchSize > 0 ? batchSize : DefaultBatchSize; _commandTimeoutSeconds = commandTimeoutSeconds > 0 ? commandTimeoutSeconds : DefaultCommandTimeoutSeconds; } ``` ### Column mapping Queries destination schema and maps only matching columns. Extra source columns are ignored: ```csharp var destColumns = await GetDestinationColumnsAsync(connection, cancellationToken); using var bulkCopy = new SqlBulkCopy(connection) { DestinationTableName = qualifiedName, BatchSize = _batchSize, BulkCopyTimeout = _commandTimeoutSeconds, EnableStreaming = true }; for (int i = 0; i < source.FieldCount; i++) { var columnName = source.GetName(i); if (destColumns.Contains(columnName)) { bulkCopy.ColumnMappings.Add(columnName, columnName); } } if (bulkCopy.ColumnMappings.Count == 0) throw new InvalidOperationException( $"No columns from source exist in destination table '{_tableName}'."); ``` ### Destination column discovery Uses `INFORMATION_SCHEMA.COLUMNS` with schema support: ```csharp private async Task> GetDestinationColumnsAsync( SqlConnection connection, CancellationToken ct) { var (schema, table) = CommonScripts.ParseTableName(_tableName); var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName"; var columns = await connection.QueryAsync( new CommandDefinition(sql, new { tableName = table, schemaName = schema }, commandTimeout: _commandTimeoutSeconds, cancellationToken: ct)); return columns.ToHashSet(StringComparer.OrdinalIgnoreCase); } ``` ## DbBulkMergeDestination Incremental updates using bulk copy to temp table + MERGE: ```csharp public class DbBulkMergeDestination : IImportDestination { public DbBulkMergeDestination( IDbConnectionFactory connectionFactory, string tableName, string[] matchColumns, string[]? updateColumns = null, int batchSize = 0, int commandTimeoutSeconds = 0) { if (matchColumns.Length == 0) throw new ArgumentException("At least one match column is required."); } ``` ### Batch processing Creates a temp table, bulk copies in batches, then merges each batch: ```csharp var tempTableName = $"#ETL_{_tableName.Replace(".", "_").Replace("[", "").Replace("]", "")}"; await CreateTempTableAsync(connection, tempTableName, cancellationToken); while (source.Read()) { // Buffer rows into DataTable if (batch.Rows.Count >= _batchSize) { await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, destColumns, ct); totalRows += batch.Rows.Count; batch.Clear(); } } ``` ### MERGE SQL generation Generates MERGE statement with configurable match and update columns: ```csharp private string BuildMergeSql(string tempTableName, IReadOnlyList allColumns, IReadOnlyList updateColumns) { var qualifiedName = CommonScripts.FormatQualifiedTableName(_tableName); var sb = new StringBuilder(); sb.AppendLine($"MERGE INTO {qualifiedName} AS target"); sb.AppendLine($"USING {tempTableName} AS source"); sb.Append("ON "); sb.AppendLine(string.Join(" AND ", _matchColumns.Select(c => $"target.[{c}] = source.[{c}]"))); if (updateColumns.Count > 0) { sb.AppendLine("WHEN MATCHED THEN UPDATE SET"); sb.AppendLine(string.Join(", ", updateColumns.Select(c => $"target.[{c}] = source.[{c}]"))); } sb.AppendLine("WHEN NOT MATCHED THEN INSERT"); sb.AppendLine($"({string.Join(", ", allColumns.Select(c => $"[{c}]"))})"); sb.AppendLine($"VALUES ({string.Join(", ", allColumns.Select(c => $"source.[{c}]"))});"); return sb.ToString(); } ``` ## Schema-Qualified Table Names Both destinations support schema-qualified names via `CommonScripts`: ```csharp public static (string Schema, string Table) ParseTableName(string tableName) { var cleaned = tableName.Replace("[", "").Replace("]", ""); var parts = cleaned.Split('.', 2); return parts.Length == 2 ? (parts[0], parts[1]) : ("dbo", parts[0]); } public static string FormatQualifiedTableName(string tableName) { var (schema, table) = ParseTableName(tableName); return $"[{schema}].[{table}]"; } ``` Supported formats: `"Table"`, `"dbo.Table"`, `"[dbo].[Table]"` ## Script Patterns ### IScriptRunner Contract ```csharp public interface IScriptRunner { Task ExecuteAsync(CancellationToken cancellationToken = default); string ScriptName { get; } } ``` ### SqlScriptRunner Implementation ```csharp public class SqlScriptRunner : IScriptRunner { public SqlScriptRunner( IDbConnectionFactory connectionFactory, string sql, string? name = null, object? parameters = null, int timeoutSeconds = 3600) { ScriptName = name ?? "SqlScript"; } public async Task ExecuteAsync(CancellationToken cancellationToken = default) { await using var connection = await _connectionFactory .CreateLotFinderConnectionAsync(cancellationToken); await connection.ExecuteAsync( new CommandDefinition(_sql, _parameters, commandTimeout: _timeoutSeconds, cancellationToken: cancellationToken)); } } ``` ### Common Scripts `CommonScripts` provides factory methods for index management: ```csharp public static IScriptRunner DisableIndexes( IDbConnectionFactory factory, string tableName, int timeoutSeconds = 300) { var (schema, table) = ParseTableName(tableName); var sql = @" DECLARE @sql NVARCHAR(MAX) = ''; DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName); SELECT @sql = @sql + 'ALTER INDEX ' + QUOTENAME(i.name) + ' ON ' + @fullTableName + ' DISABLE;' FROM sys.indexes i INNER JOIN sys.tables t ON i.object_id = t.object_id INNER JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE t.name = @tableName AND s.name = @schemaName AND i.type = 2 AND i.is_disabled = 0; IF LEN(@sql) > 0 EXEC sp_executesql @sql;"; return new SqlScriptRunner(factory, sql, $"DisableIndexes:{schema}.{table}", parameters: new { tableName = table, schemaName = schema }, timeoutSeconds: timeoutSeconds); } public static IScriptRunner RebuildIndexes( IDbConnectionFactory factory, string tableName, int timeoutSeconds = 3600) { // Similar pattern with ALTER INDEX ALL ... REBUILD } public static IScriptRunner UpdateStatistics( IDbConnectionFactory factory, string tableName, int timeoutSeconds = 600) { // UPDATE STATISTICS with QUOTENAME protection } ``` ### SQL injection protection All dynamic SQL uses `QUOTENAME()` for identifier escaping: ```csharp DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName); ``` ### When to use scripts Use pre/post scripts for large bulk loads where index overhead matters: ```csharp var pipeline = new EtlPipelineBuilder() .WithName("LargeTableSync") .WithSource(source) .WithPreScript(CommonScripts.DisableIndexes(factory, "WorkOrder")) .WithDestination(new DbBulkImportDestination(factory, "WorkOrder")) .WithPostScript(CommonScripts.RebuildIndexes(factory, "WorkOrder")) .WithPostScript(CommonScripts.UpdateStatistics(factory, "WorkOrder")) .Build(); ``` ## Related Documentation - [Overview](./Overview.md) - Pipeline architecture - [Transformers](./Transformers.md) - Data transformation - [Configuration](./Configuration.md) - Timeout and batch size options - [Troubleshooting](./Troubleshooting.md) - Performance tuning ``` **Step 2: Verify the file was created** ```bash ls -la DOCUMENTATION/DataSync/Destinations.md ``` Expected: File exists **Step 3: Commit** ```bash git add DOCUMENTATION/DataSync/Destinations.md git commit -m "docs: add ETL destinations and scripts documentation" ``` --- ### Task 5: Create Configuration.md **Files:** - Create: `DOCUMENTATION/DataSync/Configuration.md` **Step 1: Write Configuration.md with builder API, connection setup, and DI registration** Create `DOCUMENTATION/DataSync/Configuration.md` with: ```markdown # Configuration This document covers pipeline builder configuration, connection factory setup, and dependency injection registration. ## Pipeline Builder API `EtlPipelineBuilder` uses a fluent API to construct pipelines: ```csharp var pipeline = new EtlPipelineBuilder() .WithName("WorkOrderSync") .WithSource(new DbQuerySource(factory, "SELECT * FROM Source.WorkOrders", "WorkOrders")) .WithTransformer(new JdeDateTransformer("STRDJ", "TRDJ", "StartDate")) .WithTransformer(new ColumnDropTransformer("STRDJ", "TRDJ")) .WithPreScript(CommonScripts.DisableIndexes(factory, "WorkOrder")) .WithDestination(new DbBulkMergeDestination(factory, "WorkOrder", new[] { "OrderNumber" })) .WithPostScript(CommonScripts.RebuildIndexes(factory, "WorkOrder")) .WithLogger(logger) .Build(); ``` ### Builder Methods | Method | Required | Description | |--------|----------|-------------| | `WithName(string)` | No | Pipeline name for logging. Default: "Unnamed" | | `WithSource(IImportSource)` | **Yes** | Data source. Throws if not set before `Build()` | | `WithTransformer(IDataTransformer)` | No | Add transformer. Can be called multiple times (chained) | | `WithDestination(IImportDestination)` | **Yes** | Data destination. Throws if not set before `Build()` | | `WithPreScript(IScriptRunner)` | No | Script to run before data transfer. Can be called multiple times | | `WithPostScript(IScriptRunner)` | No | Script to run after data transfer. Can be called multiple times | | `WithCommandTimeout(TimeSpan)` | No | Default timeout. Range: 0-24 hours. Default: 600s | | `WithLogger(ILogger)` | No | Logger for pipeline events. Default: NullLogger | ### WithCommandTimeout Validation ```csharp public EtlPipelineBuilder WithCommandTimeout(TimeSpan timeout) { if (timeout < TimeSpan.Zero || timeout > TimeSpan.FromHours(24)) throw new ArgumentOutOfRangeException(nameof(timeout), "Timeout must be between 0 and 24 hours."); _defaultCommandTimeoutSeconds = (int)timeout.TotalSeconds; return this; } ``` ### Build Validation ```csharp public EtlPipeline Build() { if (_source == null) throw new InvalidOperationException( "Source is required. Call WithSource() before Build()."); if (_destination == null) throw new InvalidOperationException( "Destination is required. Call WithDestination() before Build()."); return new EtlPipeline(_name, _source, _transformers, _destination, _preScripts, _postScripts, _logger ?? NullLogger.Instance); } ``` ## Component Configuration ### DbQuerySource Options | Parameter | Default | Description | |-----------|---------|-------------| | `connectionFactory` | Required | Factory for database connections | | `sql` | Required | SQL query to execute | | `name` | `"Query"` | Name for logging (appears as `DbQuery:{name}`) | | `parameters` | `null` | Anonymous object for query parameters | | `commandTimeout` | `3600` | Query timeout in seconds | ### DbBulkImportDestination Options | Parameter | Default | Description | |-----------|---------|-------------| | `connectionFactory` | Required | Factory for database connections | | `tableName` | Required | Destination table (supports schema: `dbo.Table`) | | `batchSize` | `10000` | Rows per batch for progress tracking | | `commandTimeoutSeconds` | `600` | Timeout for TRUNCATE and bulk copy | ### DbBulkMergeDestination Options | Parameter | Default | Description | |-----------|---------|-------------| | `connectionFactory` | Required | Factory for database connections | | `tableName` | Required | Destination table (supports schema: `dbo.Table`) | | `matchColumns` | Required | Key columns for MERGE matching | | `updateColumns` | All non-match | Columns to update on match | | `batchSize` | `10000` | Rows per batch | | `commandTimeoutSeconds` | `600` | Timeout for bulk copy and MERGE | ### Script Timeout Defaults | Script | Default Timeout | |--------|-----------------| | `DisableIndexes` | 300s (5 min) | | `RebuildIndexes` | 3600s (1 hour) | | `UpdateStatistics` | 600s (10 min) | | `SqlScriptRunner` | 3600s (1 hour) | ## Connection Factory Setup The pipeline uses `IDbConnectionFactory` for database connections. Register it with your connection strings: ```csharp services.AddSingleton(sp => { var configuration = sp.GetRequiredService(); return new DbConnectionFactory( configuration.GetConnectionString("LotFinder"), configuration.GetConnectionString("JDE"), configuration.GetConnectionString("CMS")); }); ``` ### Connection string examples ```json { "ConnectionStrings": { "LotFinder": "Server=localhost,1434;Database=LotFinder;User Id=scopingapp;Password=...;TrustServerCertificate=true", "JDE": "Data Source=jde-oracle;User Id=...;Password=...", "CMS": "Data Source=cms-sybase;User Id=...;Password=..." } } ``` ## Dependency Injection Registration ### Basic registration ```csharp services.AddEtlPipeline(); ``` This registers `EtlPipelineBuilder` as transient so each request gets a fresh builder. ### Extension method implementation ```csharp public static class EtlServiceCollectionExtensions { public static IServiceCollection AddEtlPipeline(this IServiceCollection services) { services.AddTransient(); return services; } } ``` ### Full registration example ```csharp public static IServiceCollection AddDataSync(this IServiceCollection services) { // Connection factory (singleton - manages connection pooling) services.AddSingleton(); // ETL pipeline builder (transient - fresh instance per use) services.AddEtlPipeline(); // Background service for scheduled syncs services.AddHostedService(); return services; } ``` ### Using the builder in a service ```csharp public class DataSyncService : BackgroundService { private readonly EtlPipelineBuilder _pipelineBuilder; private readonly IDbConnectionFactory _connectionFactory; private readonly ILogger _pipelineLogger; public DataSyncService( EtlPipelineBuilder pipelineBuilder, IDbConnectionFactory connectionFactory, ILogger pipelineLogger) { _pipelineBuilder = pipelineBuilder; _connectionFactory = connectionFactory; _pipelineLogger = pipelineLogger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var pipeline = _pipelineBuilder .WithName("WorkOrderSync") .WithSource(new DbQuerySource(_connectionFactory, "SELECT * FROM JDE.WorkOrders")) .WithDestination(new DbBulkImportDestination(_connectionFactory, "WorkOrder")) .WithLogger(_pipelineLogger) .Build(); var result = await pipeline.ExecuteAsync(stoppingToken); } } ``` ## Configuration Summary | Component | Option | Default | Valid Range | |-----------|--------|---------|-------------| | `EtlPipelineBuilder` | `WithCommandTimeout` | 600s | 0-24 hours | | `DbQuerySource` | `commandTimeout` | 3600s | > 0 | | `DbBulkImportDestination` | `batchSize` | 10000 | > 0 | | `DbBulkImportDestination` | `commandTimeoutSeconds` | 600s | > 0 | | `DbBulkMergeDestination` | `batchSize` | 10000 | > 0 | | `DbBulkMergeDestination` | `commandTimeoutSeconds` | 600s | > 0 | ## Related Documentation - [Overview](./Overview.md) - Pipeline architecture - [Destinations](./Destinations.md) - Destination-specific options - [Troubleshooting](./Troubleshooting.md) - Timeout and batch size tuning ``` **Step 2: Verify the file was created** ```bash ls -la DOCUMENTATION/DataSync/Configuration.md ``` Expected: File exists **Step 3: Commit** ```bash git add DOCUMENTATION/DataSync/Configuration.md git commit -m "docs: add ETL configuration documentation" ``` --- ### Task 6: Create Troubleshooting.md **Files:** - Create: `DOCUMENTATION/DataSync/Troubleshooting.md` **Step 1: Write Troubleshooting.md with error catalog, debugging patterns, and performance tuning** Create `DOCUMENTATION/DataSync/Troubleshooting.md` with: ```markdown # Troubleshooting This document covers common errors, debugging patterns, and performance tuning for the ETL pipeline. ## Common Errors ### Column mapping errors | Error | Cause | Resolution | |-------|-------|------------| | "No columns from source exist in destination table" | Source column names don't match destination | Check source query column aliases match destination table columns exactly (case-insensitive) | | "Column name collision" | Transformer creates duplicate column names | Review rename mappings; ensure no two columns map to the same output name | | "Column '{name}' not found or was dropped" | Accessing a column that was dropped | Check transformer chain order; don't access dropped columns in later transformers | ### Computed column errors | Error | Cause | Resolution | |-------|-------|------------| | "GetBytes not supported for computed column at ordinal N" | Binary access on transformed column | Use `GetValue()` instead; computed columns (like JDE dates) don't support binary access | | "GetChars not supported for computed column at ordinal N" | Same as above | Use `GetValue()` or `GetString()` | | "GetData not supported for computed column at ordinal N" | Same as above | Computed columns can't return nested readers | ### Timeout errors | Error | Cause | Resolution | |-------|-------|------------| | `SqlException: Timeout expired` during bulk copy | Large dataset, slow network | Increase `commandTimeoutSeconds` on destination | | `SqlException: Timeout expired` during MERGE | Many rows to match | Increase timeout; consider smaller batches | | `SqlException: Timeout expired` during script | Index rebuild on large table | Increase script `timeoutSeconds` (default 3600s for rebuild) | ### Validation errors | Error | Cause | Resolution | |-------|-------|------------| | "Source is required. Call WithSource() before Build()" | Missing source in pipeline | Add `.WithSource()` to builder chain | | "Destination is required. Call WithDestination() before Build()" | Missing destination in pipeline | Add `.WithDestination()` to builder chain | | "At least one match column is required" | Empty matchColumns array | Provide key columns for MERGE matching | | "Timeout must be between 0 and 24 hours" | Invalid timeout value | Use `TimeSpan` between 0 and 24 hours | ## Debugging Patterns ### Inspecting pipeline results Check `PipelineResult` after execution to understand what happened: ```csharp var result = await pipeline.ExecuteAsync(cancellationToken); if (!result.Success) { logger.LogError(result.Error, "Pipeline failed after {Rows} rows in {Elapsed}", result.TotalRows, result.Elapsed); // Find which step failed var lastStep = result.Steps.LastOrDefault(); if (lastStep != null) { logger.LogError("Failed at step: {Step} ({Type})", lastStep.StepName, lastStep.StepType); } } ``` ### Tracking step-by-step progress Each step records timing and row counts: ```csharp foreach (var step in result.Steps) { logger.LogInformation("Step {Name} ({Type}): {Rows} rows in {Elapsed}ms", step.StepName, step.StepType, step.RowsAffected, step.Elapsed.TotalMilliseconds); } ``` ### Enabling detailed logging Inject a logger into the pipeline for execution-level logging: ```csharp var pipeline = new EtlPipelineBuilder() .WithName("DebugPipeline") .WithSource(source) .WithDestination(destination) .WithLogger(loggerFactory.CreateLogger()) .Build(); ``` Pipeline logs include: - `Information`: Pipeline start/complete with row counts - `Debug`: Individual script execution - `Error`: Failure with exception and last step ### Identifying the failure point When a pipeline fails, `PipelineResult.Steps` contains all completed steps: ```csharp if (!result.Success) { // Steps completed before failure var completedSteps = result.Steps.Select(s => s.StepName); logger.LogError("Completed steps: {Steps}", string.Join(" → ", completedSteps)); // The exception contains root cause logger.LogError(result.Error, "Root cause"); } ``` ## Performance Tuning ### Batch size optimization Default batch size is 10,000 rows. Adjust based on row width: | Row Size | Recommended Batch Size | |----------|------------------------| | Narrow (< 20 columns) | 10,000 - 50,000 | | Medium (20-50 columns) | 5,000 - 10,000 | | Wide (> 50 columns) | 1,000 - 5,000 | ```csharp // Large batch for narrow rows new DbBulkImportDestination(factory, "LookupTable", batchSize: 50000) // Small batch for wide rows new DbBulkMergeDestination(factory, "DetailTable", matchColumns, batchSize: 2000) ``` ### Index management for bulk loads Disable indexes before large imports, rebuild after: ```csharp var pipeline = new EtlPipelineBuilder() .WithName("FullTableRefresh") .WithPreScript(CommonScripts.DisableIndexes(factory, "LargeTable")) .WithSource(source) .WithDestination(new DbBulkImportDestination(factory, "LargeTable")) .WithPostScript(CommonScripts.RebuildIndexes(factory, "LargeTable")) .WithPostScript(CommonScripts.UpdateStatistics(factory, "LargeTable")) .Build(); ``` **When to use:** - Full table refreshes (TRUNCATE + import) - Tables with 3+ non-clustered indexes - Import of 100,000+ rows **When to skip:** - Incremental merges with few rows - Tables with only a clustered index - Frequent small updates ### Timeout sizing guidelines | Operation | Rows | Suggested Timeout | |-----------|------|-------------------| | Bulk import | < 100K | 600s (default) | | Bulk import | 100K - 1M | 1800s (30 min) | | Bulk import | > 1M | 3600s (1 hour) | | Bulk merge | < 50K | 600s (default) | | Bulk merge | 50K - 500K | 1800s (30 min) | | Index rebuild | Any | 3600s (default) | ```csharp // Large table with extended timeout new DbBulkMergeDestination(factory, "HistoricalData", matchColumns: new[] { "RecordId" }, commandTimeoutSeconds: 1800) ``` ### Reducing network and memory usage Select only needed columns in the source query: ```csharp // Good - select only needed columns var source = new DbQuerySource(factory, "SELECT OrderNumber, Status, StartDate FROM JDE.WorkOrders"); // Avoid - selecting all columns wastes bandwidth var source = new DbQuerySource(factory, "SELECT * FROM JDE.WorkOrders"); ``` Extra columns in the source are ignored by the destination column mapping, but they still consume network bandwidth and memory. ### Monitoring baseline performance Track `PipelineResult.Elapsed` over time to detect degradation: ```csharp var result = await pipeline.ExecuteAsync(ct); metrics.RecordPipeline( pipelineName: pipeline.PipelineName, success: result.Success, rows: result.TotalRows, durationMs: result.Elapsed.TotalMilliseconds); // Alert if duration exceeds baseline by 50% if (result.Elapsed > baselineDuration * 1.5) { logger.LogWarning("Pipeline {Name} took {Elapsed} (baseline: {Baseline})", pipeline.PipelineName, result.Elapsed, baselineDuration); } ``` ### Step-level performance analysis Identify slow steps using `StepResult.Elapsed`: ```csharp var slowSteps = result.Steps .Where(s => s.Elapsed > TimeSpan.FromSeconds(30)) .OrderByDescending(s => s.Elapsed); foreach (var step in slowSteps) { logger.LogWarning("Slow step: {Name} took {Elapsed}", step.StepName, step.Elapsed); } ``` ## Related Documentation - [Overview](./Overview.md) - Pipeline architecture - [Configuration](./Configuration.md) - Timeout and batch size options - [Destinations](./Destinations.md) - Script patterns for index management ``` **Step 2: Verify the file was created** ```bash ls -la DOCUMENTATION/DataSync/Troubleshooting.md ``` Expected: File exists **Step 3: Commit** ```bash git add DOCUMENTATION/DataSync/Troubleshooting.md git commit -m "docs: add ETL troubleshooting documentation" ``` --- ### Task 7: Update ComponentMap.md **Files:** - Modify: `DOCUMENTATION/Instructions/ComponentMap.md` **Step 1: Read the current ComponentMap.md** Read `DOCUMENTATION/Instructions/ComponentMap.md` to find where to add the ETL source mapping. **Step 2: Add ETL source paths to the DataSync section** Add the new ETL source paths to the DataSync section: ```markdown ### DataSync/ Documents data synchronization from enterprise systems. **Source paths (Legacy):** - `OLD/DataModel/Process/JDE*.cs` - JDE Oracle queries - `OLD/DataModel/Process/CMS*.cs` - CMS Sybase queries - `OLD/WorkerService/Process/UpdateProcessor.cs` - Sync orchestration - `OLD/WorkerService/dsconfig/*.json` - Data source configs **Source paths (New):** - `NEW/src/JdeScoping.DataSync/Etl/` - ETL pipeline framework - `NEW/src/JdeScoping.DataSync/Etl/Contracts/` - Core interfaces - `NEW/src/JdeScoping.DataSync/Etl/Pipeline/` - Pipeline and builder - `NEW/src/JdeScoping.DataSync/Etl/Sources/` - Data sources - `NEW/src/JdeScoping.DataSync/Etl/Transformers/` - Data transformers - `NEW/src/JdeScoping.DataSync/Etl/Destinations/` - Bulk copy/merge destinations - `NEW/src/JdeScoping.DataSync/Etl/Scripts/` - SQL script runners - `NEW/src/JdeScoping.DataSync/Etl/Results/` - Execution result types **Typical files:** - `Overview.md` - ETL pipeline architecture - `Sources.md` - Writing custom data sources - `Transformers.md` - Writing custom transformers - `Destinations.md` - Bulk destinations and scripts - `Configuration.md` - Pipeline builder and DI setup - `Troubleshooting.md` - Debugging and performance - `JDE.md` - JD Edwards (Oracle) integration - `CMS.md` - CMS (Sybase) integration - `Scheduling.md` - Mass/daily/hourly sync schedules ``` **Step 3: Verify the edit is correct** Read the modified section to confirm the changes. **Step 4: Commit** ```bash git add DOCUMENTATION/Instructions/ComponentMap.md git commit -m "docs: add ETL source paths to ComponentMap" ``` --- ### Task 8: Final verification **Step 1: List all created documentation files** ```bash ls -la DOCUMENTATION/DataSync/ ``` Expected output: ``` Configuration.md Destinations.md Overview.md Sources.md Transformers.md Troubleshooting.md ``` **Step 2: Verify all internal links resolve** Check that all cross-references between docs point to existing files: ```bash grep -h "\[.*\](\./" DOCUMENTATION/DataSync/*.md | sort -u ``` All referenced files should exist. **Step 3: Check git log for all commits** ```bash git log --oneline -8 ``` Expected: 7 commits for the documentation (6 docs + 1 ComponentMap update) **Step 4: Final status check** ```bash git status ``` Expected: Clean working tree