diff --git a/PLANS/2026-01-03-etl-documentation-implementation.md b/PLANS/2026-01-03-etl-documentation-implementation.md new file mode 100644 index 0000000..33c8676 --- /dev/null +++ b/PLANS/2026-01-03-etl-documentation-implementation.md @@ -0,0 +1,1579 @@ +# ETL Pipeline Documentation Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Create 6 documentation files for the ETL pipeline covering architecture, extension patterns, configuration, and troubleshooting. + +**Architecture:** Each task creates one markdown file in `DOCUMENTATION/DataSync/`. Code snippets come directly from source files. Verification checks that links resolve and code matches source. + +**Tech Stack:** Markdown, code from `NEW/src/JdeScoping.DataSync/Etl/` + +--- + +### Task 1: Create DataSync folder and Overview.md + +**Files:** +- Create: `DOCUMENTATION/DataSync/Overview.md` + +**Step 1: Create the DataSync directory** + +```bash +mkdir -p DOCUMENTATION/DataSync +``` + +**Step 2: Write Overview.md with architecture and core concepts** + +Create `DOCUMENTATION/DataSync/Overview.md` with: + +```markdown +# ETL Pipeline + +The ETL pipeline streams data from enterprise sources (JDE, CMS) through transformations into SQL Server cache tables. It supports batched processing, pre/post scripts for index management, and detailed execution tracking. + +## Architecture + +``` +┌─────────────┐ ┌──────────────────┐ ┌─────────────────┐ +│ IImportSource│───▶│ IDataTransformer │───▶│IImportDestination│ +└─────────────┘ │ (chain of N) │ └─────────────────┘ + └──────────────────┘ + ▲ │ + │ ┌──────────────┐ │ + └─────────│ Pre-Scripts │ ▼ + └──────────────┘ ┌──────────────┐ + │ Post-Scripts │ + └──────────────┘ +``` + +**Execution flow:** +1. Run pre-scripts (e.g., disable indexes) +2. Open source and get `IDataReader` +3. Apply transformer chain (each wraps the previous reader) +4. Write to destination (bulk copy or merge) +5. Run post-scripts (e.g., rebuild indexes) + +## Core Contracts + +### IImportSource + +Provides data to the pipeline. Returns an `IDataReader` that streams rows. + +```csharp +public interface IImportSource : IAsyncDisposable +{ + Task ReadDataAsync(CancellationToken cancellationToken = default); + string SourceName { get; } +} +``` + +### IDataTransformer + +Modifies data during transfer. Wraps the source reader in a decorator. + +```csharp +public interface IDataTransformer +{ + IDataReader Transform(IDataReader source); + string TransformerName { get; } + int MapOrdinal(int transformedOrdinal, IDataReader source); +} +``` + +### IImportDestination + +Consumes data and writes to storage. Returns statistics about the operation. + +```csharp +public interface IImportDestination +{ + Task WriteAsync(IDataReader source, CancellationToken cancellationToken = default); + string DestinationName { get; } +} +``` + +### IScriptRunner + +Executes SQL scripts before or after data transfer. + +```csharp +public interface IScriptRunner +{ + Task ExecuteAsync(CancellationToken cancellationToken = default); + string ScriptName { get; } +} +``` + +## Pipeline Execution + +The `EtlPipeline` class orchestrates execution and tracks timing for each step: + +```csharp +public async Task ExecuteAsync(CancellationToken cancellationToken = default) +{ + // 1. Run pre-scripts + foreach (var script in _preScripts) + { + var stepResult = await RunScriptAsync(script, cancellationToken); + steps.Add(stepResult); + } + + // 2. Open source + await using (_source) + { + var reader = await _source.ReadDataAsync(cancellationToken); + + // 3. Apply transformers + foreach (var transformer in _transformers) + { + reader = transformer.Transform(reader); + } + + // 4. Write to destination + var destResult = await _destination.WriteAsync(reader, cancellationToken); + } + + // 5. Run post-scripts + foreach (var script in _postScripts) + { + var stepResult = await RunScriptAsync(script, cancellationToken); + } + + return PipelineResult.Succeeded(totalRows, totalStopwatch.Elapsed, steps); +} +``` + +## Result Model + +### PipelineResult + +```csharp +public record PipelineResult( + bool Success, + long TotalRows, + TimeSpan Elapsed, + IReadOnlyList Steps, + Exception? Error = null); +``` + +### StepResult + +```csharp +public record StepResult( + string StepName, + string StepType, + long RowsAffected, + TimeSpan Elapsed); +``` + +### DestinationResult + +```csharp +public record DestinationResult( + long RowsProcessed, + int BatchCount, + TimeSpan Elapsed); +``` + +## Related Documentation + +- [Sources](./Sources.md) - Writing custom data sources +- [Transformers](./Transformers.md) - Writing custom transformers +- [Destinations](./Destinations.md) - Writing destinations and scripts +- [Configuration](./Configuration.md) - Pipeline builder and DI setup +- [Troubleshooting](./Troubleshooting.md) - Debugging and performance +``` + +**Step 3: Verify the file was created** + +```bash +ls -la DOCUMENTATION/DataSync/ +``` +Expected: `Overview.md` exists + +**Step 4: Commit** + +```bash +git add DOCUMENTATION/DataSync/Overview.md +git commit -m "docs: add ETL pipeline overview documentation" +``` + +--- + +### Task 2: Create Sources.md + +**Files:** +- Create: `DOCUMENTATION/DataSync/Sources.md` + +**Step 1: Write Sources.md with interface and DbQuerySource walkthrough** + +Create `DOCUMENTATION/DataSync/Sources.md` with: + +```markdown +# Data Sources + +Sources provide data to the ETL pipeline by implementing `IImportSource`. They return an `IDataReader` that streams rows to transformers and destinations. + +## Interface Contract + +```csharp +public interface IImportSource : IAsyncDisposable +{ + Task ReadDataAsync(CancellationToken cancellationToken = default); + string SourceName { get; } +} +``` + +**Key requirements:** +- Implement `IAsyncDisposable` for connection cleanup +- Return a live `IDataReader` (not buffered) for memory efficiency +- `SourceName` is used in logging and `StepResult` tracking + +## DbQuerySource Implementation + +`DbQuerySource` executes a SQL query against the local cache database: + +```csharp +public class DbQuerySource : IImportSource +{ + private readonly IDbConnectionFactory _connectionFactory; + private readonly string _sql; + private readonly object? _parameters; + private readonly int _commandTimeout; + private SqlConnection? _connection; + private SqlCommand? _command; + + public string SourceName { get; } + + public DbQuerySource( + IDbConnectionFactory connectionFactory, + string sql, + string? name = null, + object? parameters = null, + int commandTimeout = 3600) + { + _connectionFactory = connectionFactory; + _sql = sql; + _parameters = parameters; + _commandTimeout = commandTimeout; + SourceName = $"DbQuery:{name ?? "Query"}"; + } +``` + +### Reading data + +The connection opens in `ReadDataAsync` and stays open until disposal: + +```csharp + public async Task ReadDataAsync(CancellationToken cancellationToken = default) + { + _connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken); + _command = _connection.CreateCommand(); + _command.CommandText = _sql; + _command.CommandTimeout = _commandTimeout; + AddParameters(_command, _parameters); + return await _command.ExecuteReaderAsync(cancellationToken); + } +``` + +### Parameter handling + +Parameters are added from an anonymous object using reflection: + +```csharp + private static void AddParameters(SqlCommand command, object? parameters) + { + if (parameters == null) return; + + var properties = parameters.GetType().GetProperties(); + foreach (var prop in properties) + { + var value = prop.GetValue(parameters) ?? DBNull.Value; + command.Parameters.AddWithValue($"@{prop.Name}", value); + } + } +``` + +### Resource cleanup + +Both the command and connection are disposed asynchronously: + +```csharp + public async ValueTask DisposeAsync() + { + if (_command != null) + { + await _command.DisposeAsync(); + _command = null; + } + if (_connection != null) + { + await _connection.DisposeAsync(); + _connection = null; + } + } +} +``` + +## Key Patterns + +### Keep sources stateless until ReadDataAsync + +Don't open connections or execute queries in the constructor. The source should be configurable without side effects until `ReadDataAsync` is called. + +### Streaming, not buffering + +Return a live `IDataReader` rather than loading all data into memory. This allows processing millions of rows without memory pressure. + +### Use SourceName for diagnostics + +Format: `"DbQuery:{table}"` or `"File:{filename}"`. This appears in logs and `StepResult.StepName`. + +## Future source types + +The interface supports additional source types not yet implemented: + +- **File-based sources** - CSV, Excel files +- **API sources** - REST endpoints returning paged data +- **Oracle/Sybase sources** - Direct queries against JDE or CMS + +Each would implement the same interface with different connection and reader implementations. + +## Related Documentation + +- [Overview](./Overview.md) - Pipeline architecture +- [Transformers](./Transformers.md) - Processing source data +- [Configuration](./Configuration.md) - Connection factory setup +``` + +**Step 2: Verify the file was created** + +```bash +ls -la DOCUMENTATION/DataSync/Sources.md +``` +Expected: File exists + +**Step 3: Commit** + +```bash +git add DOCUMENTATION/DataSync/Sources.md +git commit -m "docs: add ETL sources documentation" +``` + +--- + +### Task 3: Create Transformers.md + +**Files:** +- Create: `DOCUMENTATION/DataSync/Transformers.md` + +**Step 1: Write Transformers.md with interface, base class, and three transformer walkthroughs** + +Create `DOCUMENTATION/DataSync/Transformers.md` with: + +```markdown +# Data Transformers + +Transformers modify data as it flows through the pipeline. They wrap the source `IDataReader` in a decorator, allowing column renaming, dropping, type conversion, and computed columns. + +## Interface Contract + +```csharp +public interface IDataTransformer +{ + IDataReader Transform(IDataReader source); + string TransformerName { get; } + int MapOrdinal(int transformedOrdinal, IDataReader source); +} +``` + +**Key methods:** +- `Transform()` - Wraps the source reader, returns a new reader with modifications +- `TransformerName` - Used in logging and `StepResult` tracking +- `MapOrdinal()` - Maps transformed ordinals to source ordinals. Returns `-1` for computed columns. + +## DataTransformerBase + +The base class provides default implementations and handles the decorator pattern: + +```csharp +public abstract class DataTransformerBase : IDataTransformer +{ + public abstract string TransformerName { get; } + + public IDataReader Transform(IDataReader source) + { + ArgumentNullException.ThrowIfNull(source); + OnInitialize(source); + return new TransformingDataReader(source, this); + } + + protected virtual void OnInitialize(IDataReader source) { } +``` + +### Default pass-through methods + +Override only what you need to change: + +```csharp + public virtual int GetFieldCount(IDataReader source) => source.FieldCount; + public virtual string GetName(int ordinal, IDataReader source) => source.GetName(ordinal); + public virtual Type GetFieldType(int ordinal, IDataReader source) => source.GetFieldType(ordinal); + public virtual object GetValue(int ordinal, IDataReader source) => source.GetValue(ordinal); + public virtual int GetOrdinal(string name, IDataReader source) => source.GetOrdinal(name); + public virtual bool IsDBNull(int ordinal, IDataReader source) => source.IsDBNull(ordinal); + public virtual int MapOrdinal(int transformedOrdinal, IDataReader source) => transformedOrdinal; +``` + +### Binary method handling + +Computed columns (where `MapOrdinal` returns `-1`) throw `NotSupportedException`: + +```csharp + public virtual long GetBytes(int ordinal, long fieldOffset, byte[]? buffer, + int bufferOffset, int length, IDataReader source) + { + var sourceOrdinal = MapOrdinal(ordinal, source); + if (sourceOrdinal < 0) + throw new NotSupportedException( + $"GetBytes not supported for computed column at ordinal {ordinal}."); + return source.GetBytes(sourceOrdinal, fieldOffset, buffer, bufferOffset, length); + } +``` + +## ColumnRenameTransformer + +Renames columns without changing values or order: + +```csharp +public class ColumnRenameTransformer : DataTransformerBase +{ + private readonly Dictionary _renames; + private string[]? _outputNames; + private Dictionary? _nameToOrdinal; + + public override string TransformerName => $"RenameColumns:{_renames.Count}"; + + public ColumnRenameTransformer(params (string OldName, string NewName)[] renames) + { + _renames = renames.ToDictionary( + r => r.OldName, r => r.NewName, StringComparer.OrdinalIgnoreCase); + } +``` + +### Collision detection + +The transformer validates that renames don't create duplicate column names: + +```csharp + protected override void OnInitialize(IDataReader source) + { + _outputNames = new string[source.FieldCount]; + _nameToOrdinal = new Dictionary(StringComparer.OrdinalIgnoreCase); + + for (int i = 0; i < source.FieldCount; i++) + { + var originalName = source.GetName(i); + var outputName = _renames.TryGetValue(originalName, out var newName) + ? newName : originalName; + + if (_nameToOrdinal.TryGetValue(outputName, out var existingOrdinal)) + { + throw new InvalidOperationException( + $"Column name collision: '{originalName}' → '{outputName}' conflicts with " + + $"'{source.GetName(existingOrdinal)}' (already at ordinal {existingOrdinal})."); + } + + _outputNames[i] = outputName; + _nameToOrdinal[outputName] = i; + } + } +``` + +## ColumnDropTransformer + +Removes specified columns from the output: + +```csharp +public class ColumnDropTransformer : DataTransformerBase +{ + private readonly HashSet _columnsToDrop; + private int[]? _ordinalMap; + private Dictionary? _nameToOrdinal; + + public override string TransformerName => $"DropColumns:{string.Join(",", _columnsToDrop)}"; + + public ColumnDropTransformer(params string[] columnsToDrop) + { + _columnsToDrop = new HashSet(columnsToDrop, StringComparer.OrdinalIgnoreCase); + } +``` + +### Ordinal mapping + +Builds a map from output ordinals to source ordinals, excluding dropped columns: + +```csharp + protected override void OnInitialize(IDataReader source) + { + var ordinalList = new List(); + _nameToOrdinal = new Dictionary(StringComparer.OrdinalIgnoreCase); + + for (int i = 0; i < source.FieldCount; i++) + { + var name = source.GetName(i); + if (!_columnsToDrop.Contains(name)) + { + _nameToOrdinal[name] = ordinalList.Count; + ordinalList.Add(i); + } + } + _ordinalMap = ordinalList.ToArray(); + } + + public override int MapOrdinal(int transformedOrdinal, IDataReader source) + => _ordinalMap![transformedOrdinal]; +``` + +## JdeDateTransformer + +Combines JDE Julian date (CYYDDD) and time (HHMMSS) columns into a single `DateTime`: + +```csharp +public class JdeDateTransformer : DataTransformerBase +{ + public static readonly DateTime DefaultInvalidDateSentinel = new(1900, 1, 1); + + private readonly string _dateColumn; + private readonly string _timeColumn; + private readonly string _outputColumn; + private readonly DateTime _invalidDateSentinel; +``` + +### Computed column handling + +The output `DateTime` column has no direct source ordinal, so `MapOrdinal` returns `-1`: + +```csharp + public override int MapOrdinal(int transformedOrdinal, IDataReader source) + { + var sourceOrdinal = _ordinalMap![transformedOrdinal]; + return sourceOrdinal == _dateOrdinal ? -1 : sourceOrdinal; + } + + public override string GetDataTypeName(int ordinal, IDataReader source) + { + var sourceOrdinal = _ordinalMap![ordinal]; + return sourceOrdinal == _dateOrdinal ? "datetime" : source.GetDataTypeName(sourceOrdinal); + } +``` + +### Date parsing with validation + +Invalid dates return a configurable sentinel value (default: 1900-01-01): + +```csharp + public static DateTime ParseJdeDateTime(decimal julianDate, decimal time, DateTime sentinel) + { + var dateInt = (int)julianDate; + if (dateInt <= 0) return sentinel; + + var century = dateInt / 100000; + var year = (dateInt / 1000) % 100; + var dayOfYear = dateInt % 1000; + + if (century < 0 || century > 1) return sentinel; + if (year < 0 || year > 99) return sentinel; + if (dayOfYear < 1 || dayOfYear > 366) return sentinel; + + var fullYear = (century == 0 ? 1900 : 2000) + year; + var daysInYear = DateTime.IsLeapYear(fullYear) ? 366 : 365; + if (dayOfYear > daysInYear) return sentinel; + + var date = new DateTime(fullYear, 1, 1).AddDays(dayOfYear - 1); + + // Parse time (HHMMSS format) + var timeInt = (int)time; + var hours = timeInt / 10000; + var minutes = (timeInt / 100) % 100; + var seconds = timeInt % 100; + + if (hours < 0 || hours > 23) return sentinel; + if (minutes < 0 || minutes > 59) return sentinel; + if (seconds < 0 || seconds > 59) return sentinel; + + return date.AddHours(hours).AddMinutes(minutes).AddSeconds(seconds); + } +``` + +## Transformer Chaining + +Transformers compose by wrapping each other. The pipeline applies them in order: + +```csharp +foreach (var transformer in _transformers) +{ + reader = transformer.Transform(reader); +} +``` + +Each transformer sees the output of the previous one. Ordinal mappings accumulate through the chain. + +## Validation in OnInitialize + +Perform all validation in `OnInitialize()` to fail fast before processing data: + +- Check that required columns exist +- Validate rename mappings don't create collisions +- Build ordinal maps for efficient lookup during row processing + +## Related Documentation + +- [Overview](./Overview.md) - Pipeline architecture +- [Sources](./Sources.md) - Data sources that feed transformers +- [Destinations](./Destinations.md) - Where transformed data goes +``` + +**Step 2: Verify the file was created** + +```bash +ls -la DOCUMENTATION/DataSync/Transformers.md +``` +Expected: File exists + +**Step 3: Commit** + +```bash +git add DOCUMENTATION/DataSync/Transformers.md +git commit -m "docs: add ETL transformers documentation" +``` + +--- + +### Task 4: Create Destinations.md + +**Files:** +- Create: `DOCUMENTATION/DataSync/Destinations.md` + +**Step 1: Write Destinations.md with interface, both destinations, and script patterns** + +Create `DOCUMENTATION/DataSync/Destinations.md` with: + +```markdown +# Destinations and Scripts + +Destinations consume data from the pipeline and write it to storage. Scripts run SQL operations before or after data transfer, commonly for index management. + +## IImportDestination Contract + +```csharp +public interface IImportDestination +{ + Task WriteAsync(IDataReader source, CancellationToken cancellationToken = default); + string DestinationName { get; } +} +``` + +**Key requirements:** +- Consume the entire `IDataReader` in `WriteAsync` +- Return `DestinationResult` with row count, batch count, and elapsed time +- `DestinationName` is used in logging and `StepResult` tracking + +## DbBulkImportDestination + +Full table refresh using TRUNCATE + bulk copy: + +```csharp +public class DbBulkImportDestination : IImportDestination +{ + private const int DefaultBatchSize = 10000; + private const int DefaultCommandTimeoutSeconds = 600; + + public DbBulkImportDestination( + IDbConnectionFactory connectionFactory, + string tableName, + int batchSize = 0, + int commandTimeoutSeconds = 0) + { + _batchSize = batchSize > 0 ? batchSize : DefaultBatchSize; + _commandTimeoutSeconds = commandTimeoutSeconds > 0 + ? commandTimeoutSeconds : DefaultCommandTimeoutSeconds; + } +``` + +### Column mapping + +Queries destination schema and maps only matching columns. Extra source columns are ignored: + +```csharp + var destColumns = await GetDestinationColumnsAsync(connection, cancellationToken); + + using var bulkCopy = new SqlBulkCopy(connection) + { + DestinationTableName = qualifiedName, + BatchSize = _batchSize, + BulkCopyTimeout = _commandTimeoutSeconds, + EnableStreaming = true + }; + + for (int i = 0; i < source.FieldCount; i++) + { + var columnName = source.GetName(i); + if (destColumns.Contains(columnName)) + { + bulkCopy.ColumnMappings.Add(columnName, columnName); + } + } + + if (bulkCopy.ColumnMappings.Count == 0) + throw new InvalidOperationException( + $"No columns from source exist in destination table '{_tableName}'."); +``` + +### Destination column discovery + +Uses `INFORMATION_SCHEMA.COLUMNS` with schema support: + +```csharp + private async Task> GetDestinationColumnsAsync( + SqlConnection connection, CancellationToken ct) + { + var (schema, table) = CommonScripts.ParseTableName(_tableName); + var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName"; + var columns = await connection.QueryAsync( + new CommandDefinition(sql, new { tableName = table, schemaName = schema }, + commandTimeout: _commandTimeoutSeconds, cancellationToken: ct)); + return columns.ToHashSet(StringComparer.OrdinalIgnoreCase); + } +``` + +## DbBulkMergeDestination + +Incremental updates using bulk copy to temp table + MERGE: + +```csharp +public class DbBulkMergeDestination : IImportDestination +{ + public DbBulkMergeDestination( + IDbConnectionFactory connectionFactory, + string tableName, + string[] matchColumns, + string[]? updateColumns = null, + int batchSize = 0, + int commandTimeoutSeconds = 0) + { + if (matchColumns.Length == 0) + throw new ArgumentException("At least one match column is required."); + } +``` + +### Batch processing + +Creates a temp table, bulk copies in batches, then merges each batch: + +```csharp + var tempTableName = $"#ETL_{_tableName.Replace(".", "_").Replace("[", "").Replace("]", "")}"; + await CreateTempTableAsync(connection, tempTableName, cancellationToken); + + while (source.Read()) + { + // Buffer rows into DataTable + if (batch.Rows.Count >= _batchSize) + { + await ProcessBatchAsync(connection, batch, tempTableName, mergeSql, destColumns, ct); + totalRows += batch.Rows.Count; + batch.Clear(); + } + } +``` + +### MERGE SQL generation + +Generates MERGE statement with configurable match and update columns: + +```csharp + private string BuildMergeSql(string tempTableName, + IReadOnlyList allColumns, IReadOnlyList updateColumns) + { + var qualifiedName = CommonScripts.FormatQualifiedTableName(_tableName); + var sb = new StringBuilder(); + sb.AppendLine($"MERGE INTO {qualifiedName} AS target"); + sb.AppendLine($"USING {tempTableName} AS source"); + sb.Append("ON "); + sb.AppendLine(string.Join(" AND ", + _matchColumns.Select(c => $"target.[{c}] = source.[{c}]"))); + + if (updateColumns.Count > 0) + { + sb.AppendLine("WHEN MATCHED THEN UPDATE SET"); + sb.AppendLine(string.Join(", ", + updateColumns.Select(c => $"target.[{c}] = source.[{c}]"))); + } + + sb.AppendLine("WHEN NOT MATCHED THEN INSERT"); + sb.AppendLine($"({string.Join(", ", allColumns.Select(c => $"[{c}]"))})"); + sb.AppendLine($"VALUES ({string.Join(", ", allColumns.Select(c => $"source.[{c}]"))});"); + + return sb.ToString(); + } +``` + +## Schema-Qualified Table Names + +Both destinations support schema-qualified names via `CommonScripts`: + +```csharp +public static (string Schema, string Table) ParseTableName(string tableName) +{ + var cleaned = tableName.Replace("[", "").Replace("]", ""); + var parts = cleaned.Split('.', 2); + return parts.Length == 2 ? (parts[0], parts[1]) : ("dbo", parts[0]); +} + +public static string FormatQualifiedTableName(string tableName) +{ + var (schema, table) = ParseTableName(tableName); + return $"[{schema}].[{table}]"; +} +``` + +Supported formats: `"Table"`, `"dbo.Table"`, `"[dbo].[Table]"` + +## Script Patterns + +### IScriptRunner Contract + +```csharp +public interface IScriptRunner +{ + Task ExecuteAsync(CancellationToken cancellationToken = default); + string ScriptName { get; } +} +``` + +### SqlScriptRunner Implementation + +```csharp +public class SqlScriptRunner : IScriptRunner +{ + public SqlScriptRunner( + IDbConnectionFactory connectionFactory, + string sql, + string? name = null, + object? parameters = null, + int timeoutSeconds = 3600) + { + ScriptName = name ?? "SqlScript"; + } + + public async Task ExecuteAsync(CancellationToken cancellationToken = default) + { + await using var connection = await _connectionFactory + .CreateLotFinderConnectionAsync(cancellationToken); + await connection.ExecuteAsync( + new CommandDefinition(_sql, _parameters, + commandTimeout: _timeoutSeconds, cancellationToken: cancellationToken)); + } +} +``` + +### Common Scripts + +`CommonScripts` provides factory methods for index management: + +```csharp +public static IScriptRunner DisableIndexes( + IDbConnectionFactory factory, string tableName, int timeoutSeconds = 300) +{ + var (schema, table) = ParseTableName(tableName); + var sql = @" +DECLARE @sql NVARCHAR(MAX) = ''; +DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName); + +SELECT @sql = @sql + 'ALTER INDEX ' + QUOTENAME(i.name) + ' ON ' + @fullTableName + ' DISABLE;' +FROM sys.indexes i +INNER JOIN sys.tables t ON i.object_id = t.object_id +INNER JOIN sys.schemas s ON t.schema_id = s.schema_id +WHERE t.name = @tableName AND s.name = @schemaName + AND i.type = 2 AND i.is_disabled = 0; + +IF LEN(@sql) > 0 EXEC sp_executesql @sql;"; + + return new SqlScriptRunner(factory, sql, $"DisableIndexes:{schema}.{table}", + parameters: new { tableName = table, schemaName = schema }, + timeoutSeconds: timeoutSeconds); +} + +public static IScriptRunner RebuildIndexes( + IDbConnectionFactory factory, string tableName, int timeoutSeconds = 3600) +{ + // Similar pattern with ALTER INDEX ALL ... REBUILD +} + +public static IScriptRunner UpdateStatistics( + IDbConnectionFactory factory, string tableName, int timeoutSeconds = 600) +{ + // UPDATE STATISTICS with QUOTENAME protection +} +``` + +### SQL injection protection + +All dynamic SQL uses `QUOTENAME()` for identifier escaping: + +```csharp +DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName); +``` + +### When to use scripts + +Use pre/post scripts for large bulk loads where index overhead matters: + +```csharp +var pipeline = new EtlPipelineBuilder() + .WithName("LargeTableSync") + .WithSource(source) + .WithPreScript(CommonScripts.DisableIndexes(factory, "WorkOrder")) + .WithDestination(new DbBulkImportDestination(factory, "WorkOrder")) + .WithPostScript(CommonScripts.RebuildIndexes(factory, "WorkOrder")) + .WithPostScript(CommonScripts.UpdateStatistics(factory, "WorkOrder")) + .Build(); +``` + +## Related Documentation + +- [Overview](./Overview.md) - Pipeline architecture +- [Transformers](./Transformers.md) - Data transformation +- [Configuration](./Configuration.md) - Timeout and batch size options +- [Troubleshooting](./Troubleshooting.md) - Performance tuning +``` + +**Step 2: Verify the file was created** + +```bash +ls -la DOCUMENTATION/DataSync/Destinations.md +``` +Expected: File exists + +**Step 3: Commit** + +```bash +git add DOCUMENTATION/DataSync/Destinations.md +git commit -m "docs: add ETL destinations and scripts documentation" +``` + +--- + +### Task 5: Create Configuration.md + +**Files:** +- Create: `DOCUMENTATION/DataSync/Configuration.md` + +**Step 1: Write Configuration.md with builder API, connection setup, and DI registration** + +Create `DOCUMENTATION/DataSync/Configuration.md` with: + +```markdown +# Configuration + +This document covers pipeline builder configuration, connection factory setup, and dependency injection registration. + +## Pipeline Builder API + +`EtlPipelineBuilder` uses a fluent API to construct pipelines: + +```csharp +var pipeline = new EtlPipelineBuilder() + .WithName("WorkOrderSync") + .WithSource(new DbQuerySource(factory, "SELECT * FROM Source.WorkOrders", "WorkOrders")) + .WithTransformer(new JdeDateTransformer("STRDJ", "TRDJ", "StartDate")) + .WithTransformer(new ColumnDropTransformer("STRDJ", "TRDJ")) + .WithPreScript(CommonScripts.DisableIndexes(factory, "WorkOrder")) + .WithDestination(new DbBulkMergeDestination(factory, "WorkOrder", new[] { "OrderNumber" })) + .WithPostScript(CommonScripts.RebuildIndexes(factory, "WorkOrder")) + .WithLogger(logger) + .Build(); +``` + +### Builder Methods + +| Method | Required | Description | +|--------|----------|-------------| +| `WithName(string)` | No | Pipeline name for logging. Default: "Unnamed" | +| `WithSource(IImportSource)` | **Yes** | Data source. Throws if not set before `Build()` | +| `WithTransformer(IDataTransformer)` | No | Add transformer. Can be called multiple times (chained) | +| `WithDestination(IImportDestination)` | **Yes** | Data destination. Throws if not set before `Build()` | +| `WithPreScript(IScriptRunner)` | No | Script to run before data transfer. Can be called multiple times | +| `WithPostScript(IScriptRunner)` | No | Script to run after data transfer. Can be called multiple times | +| `WithCommandTimeout(TimeSpan)` | No | Default timeout. Range: 0-24 hours. Default: 600s | +| `WithLogger(ILogger)` | No | Logger for pipeline events. Default: NullLogger | + +### WithCommandTimeout Validation + +```csharp +public EtlPipelineBuilder WithCommandTimeout(TimeSpan timeout) +{ + if (timeout < TimeSpan.Zero || timeout > TimeSpan.FromHours(24)) + throw new ArgumentOutOfRangeException(nameof(timeout), + "Timeout must be between 0 and 24 hours."); + _defaultCommandTimeoutSeconds = (int)timeout.TotalSeconds; + return this; +} +``` + +### Build Validation + +```csharp +public EtlPipeline Build() +{ + if (_source == null) + throw new InvalidOperationException( + "Source is required. Call WithSource() before Build()."); + if (_destination == null) + throw new InvalidOperationException( + "Destination is required. Call WithDestination() before Build()."); + + return new EtlPipeline(_name, _source, _transformers, _destination, + _preScripts, _postScripts, _logger ?? NullLogger.Instance); +} +``` + +## Component Configuration + +### DbQuerySource Options + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `connectionFactory` | Required | Factory for database connections | +| `sql` | Required | SQL query to execute | +| `name` | `"Query"` | Name for logging (appears as `DbQuery:{name}`) | +| `parameters` | `null` | Anonymous object for query parameters | +| `commandTimeout` | `3600` | Query timeout in seconds | + +### DbBulkImportDestination Options + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `connectionFactory` | Required | Factory for database connections | +| `tableName` | Required | Destination table (supports schema: `dbo.Table`) | +| `batchSize` | `10000` | Rows per batch for progress tracking | +| `commandTimeoutSeconds` | `600` | Timeout for TRUNCATE and bulk copy | + +### DbBulkMergeDestination Options + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `connectionFactory` | Required | Factory for database connections | +| `tableName` | Required | Destination table (supports schema: `dbo.Table`) | +| `matchColumns` | Required | Key columns for MERGE matching | +| `updateColumns` | All non-match | Columns to update on match | +| `batchSize` | `10000` | Rows per batch | +| `commandTimeoutSeconds` | `600` | Timeout for bulk copy and MERGE | + +### Script Timeout Defaults + +| Script | Default Timeout | +|--------|-----------------| +| `DisableIndexes` | 300s (5 min) | +| `RebuildIndexes` | 3600s (1 hour) | +| `UpdateStatistics` | 600s (10 min) | +| `SqlScriptRunner` | 3600s (1 hour) | + +## Connection Factory Setup + +The pipeline uses `IDbConnectionFactory` for database connections. Register it with your connection strings: + +```csharp +services.AddSingleton(sp => +{ + var configuration = sp.GetRequiredService(); + return new DbConnectionFactory( + configuration.GetConnectionString("LotFinder"), + configuration.GetConnectionString("JDE"), + configuration.GetConnectionString("CMS")); +}); +``` + +### Connection string examples + +```json +{ + "ConnectionStrings": { + "LotFinder": "Server=localhost,1434;Database=LotFinder;User Id=scopingapp;Password=...;TrustServerCertificate=true", + "JDE": "Data Source=jde-oracle;User Id=...;Password=...", + "CMS": "Data Source=cms-sybase;User Id=...;Password=..." + } +} +``` + +## Dependency Injection Registration + +### Basic registration + +```csharp +services.AddEtlPipeline(); +``` + +This registers `EtlPipelineBuilder` as transient so each request gets a fresh builder. + +### Extension method implementation + +```csharp +public static class EtlServiceCollectionExtensions +{ + public static IServiceCollection AddEtlPipeline(this IServiceCollection services) + { + services.AddTransient(); + return services; + } +} +``` + +### Full registration example + +```csharp +public static IServiceCollection AddDataSync(this IServiceCollection services) +{ + // Connection factory (singleton - manages connection pooling) + services.AddSingleton(); + + // ETL pipeline builder (transient - fresh instance per use) + services.AddEtlPipeline(); + + // Background service for scheduled syncs + services.AddHostedService(); + + return services; +} +``` + +### Using the builder in a service + +```csharp +public class DataSyncService : BackgroundService +{ + private readonly EtlPipelineBuilder _pipelineBuilder; + private readonly IDbConnectionFactory _connectionFactory; + private readonly ILogger _pipelineLogger; + + public DataSyncService( + EtlPipelineBuilder pipelineBuilder, + IDbConnectionFactory connectionFactory, + ILogger pipelineLogger) + { + _pipelineBuilder = pipelineBuilder; + _connectionFactory = connectionFactory; + _pipelineLogger = pipelineLogger; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var pipeline = _pipelineBuilder + .WithName("WorkOrderSync") + .WithSource(new DbQuerySource(_connectionFactory, "SELECT * FROM JDE.WorkOrders")) + .WithDestination(new DbBulkImportDestination(_connectionFactory, "WorkOrder")) + .WithLogger(_pipelineLogger) + .Build(); + + var result = await pipeline.ExecuteAsync(stoppingToken); + } +} +``` + +## Configuration Summary + +| Component | Option | Default | Valid Range | +|-----------|--------|---------|-------------| +| `EtlPipelineBuilder` | `WithCommandTimeout` | 600s | 0-24 hours | +| `DbQuerySource` | `commandTimeout` | 3600s | > 0 | +| `DbBulkImportDestination` | `batchSize` | 10000 | > 0 | +| `DbBulkImportDestination` | `commandTimeoutSeconds` | 600s | > 0 | +| `DbBulkMergeDestination` | `batchSize` | 10000 | > 0 | +| `DbBulkMergeDestination` | `commandTimeoutSeconds` | 600s | > 0 | + +## Related Documentation + +- [Overview](./Overview.md) - Pipeline architecture +- [Destinations](./Destinations.md) - Destination-specific options +- [Troubleshooting](./Troubleshooting.md) - Timeout and batch size tuning +``` + +**Step 2: Verify the file was created** + +```bash +ls -la DOCUMENTATION/DataSync/Configuration.md +``` +Expected: File exists + +**Step 3: Commit** + +```bash +git add DOCUMENTATION/DataSync/Configuration.md +git commit -m "docs: add ETL configuration documentation" +``` + +--- + +### Task 6: Create Troubleshooting.md + +**Files:** +- Create: `DOCUMENTATION/DataSync/Troubleshooting.md` + +**Step 1: Write Troubleshooting.md with error catalog, debugging patterns, and performance tuning** + +Create `DOCUMENTATION/DataSync/Troubleshooting.md` with: + +```markdown +# Troubleshooting + +This document covers common errors, debugging patterns, and performance tuning for the ETL pipeline. + +## Common Errors + +### Column mapping errors + +| Error | Cause | Resolution | +|-------|-------|------------| +| "No columns from source exist in destination table" | Source column names don't match destination | Check source query column aliases match destination table columns exactly (case-insensitive) | +| "Column name collision" | Transformer creates duplicate column names | Review rename mappings; ensure no two columns map to the same output name | +| "Column '{name}' not found or was dropped" | Accessing a column that was dropped | Check transformer chain order; don't access dropped columns in later transformers | + +### Computed column errors + +| Error | Cause | Resolution | +|-------|-------|------------| +| "GetBytes not supported for computed column at ordinal N" | Binary access on transformed column | Use `GetValue()` instead; computed columns (like JDE dates) don't support binary access | +| "GetChars not supported for computed column at ordinal N" | Same as above | Use `GetValue()` or `GetString()` | +| "GetData not supported for computed column at ordinal N" | Same as above | Computed columns can't return nested readers | + +### Timeout errors + +| Error | Cause | Resolution | +|-------|-------|------------| +| `SqlException: Timeout expired` during bulk copy | Large dataset, slow network | Increase `commandTimeoutSeconds` on destination | +| `SqlException: Timeout expired` during MERGE | Many rows to match | Increase timeout; consider smaller batches | +| `SqlException: Timeout expired` during script | Index rebuild on large table | Increase script `timeoutSeconds` (default 3600s for rebuild) | + +### Validation errors + +| Error | Cause | Resolution | +|-------|-------|------------| +| "Source is required. Call WithSource() before Build()" | Missing source in pipeline | Add `.WithSource()` to builder chain | +| "Destination is required. Call WithDestination() before Build()" | Missing destination in pipeline | Add `.WithDestination()` to builder chain | +| "At least one match column is required" | Empty matchColumns array | Provide key columns for MERGE matching | +| "Timeout must be between 0 and 24 hours" | Invalid timeout value | Use `TimeSpan` between 0 and 24 hours | + +## Debugging Patterns + +### Inspecting pipeline results + +Check `PipelineResult` after execution to understand what happened: + +```csharp +var result = await pipeline.ExecuteAsync(cancellationToken); + +if (!result.Success) +{ + logger.LogError(result.Error, "Pipeline failed after {Rows} rows in {Elapsed}", + result.TotalRows, result.Elapsed); + + // Find which step failed + var lastStep = result.Steps.LastOrDefault(); + if (lastStep != null) + { + logger.LogError("Failed at step: {Step} ({Type})", + lastStep.StepName, lastStep.StepType); + } +} +``` + +### Tracking step-by-step progress + +Each step records timing and row counts: + +```csharp +foreach (var step in result.Steps) +{ + logger.LogInformation("Step {Name} ({Type}): {Rows} rows in {Elapsed}ms", + step.StepName, + step.StepType, + step.RowsAffected, + step.Elapsed.TotalMilliseconds); +} +``` + +### Enabling detailed logging + +Inject a logger into the pipeline for execution-level logging: + +```csharp +var pipeline = new EtlPipelineBuilder() + .WithName("DebugPipeline") + .WithSource(source) + .WithDestination(destination) + .WithLogger(loggerFactory.CreateLogger()) + .Build(); +``` + +Pipeline logs include: +- `Information`: Pipeline start/complete with row counts +- `Debug`: Individual script execution +- `Error`: Failure with exception and last step + +### Identifying the failure point + +When a pipeline fails, `PipelineResult.Steps` contains all completed steps: + +```csharp +if (!result.Success) +{ + // Steps completed before failure + var completedSteps = result.Steps.Select(s => s.StepName); + logger.LogError("Completed steps: {Steps}", string.Join(" → ", completedSteps)); + + // The exception contains root cause + logger.LogError(result.Error, "Root cause"); +} +``` + +## Performance Tuning + +### Batch size optimization + +Default batch size is 10,000 rows. Adjust based on row width: + +| Row Size | Recommended Batch Size | +|----------|------------------------| +| Narrow (< 20 columns) | 10,000 - 50,000 | +| Medium (20-50 columns) | 5,000 - 10,000 | +| Wide (> 50 columns) | 1,000 - 5,000 | + +```csharp +// Large batch for narrow rows +new DbBulkImportDestination(factory, "LookupTable", batchSize: 50000) + +// Small batch for wide rows +new DbBulkMergeDestination(factory, "DetailTable", matchColumns, batchSize: 2000) +``` + +### Index management for bulk loads + +Disable indexes before large imports, rebuild after: + +```csharp +var pipeline = new EtlPipelineBuilder() + .WithName("FullTableRefresh") + .WithPreScript(CommonScripts.DisableIndexes(factory, "LargeTable")) + .WithSource(source) + .WithDestination(new DbBulkImportDestination(factory, "LargeTable")) + .WithPostScript(CommonScripts.RebuildIndexes(factory, "LargeTable")) + .WithPostScript(CommonScripts.UpdateStatistics(factory, "LargeTable")) + .Build(); +``` + +**When to use:** +- Full table refreshes (TRUNCATE + import) +- Tables with 3+ non-clustered indexes +- Import of 100,000+ rows + +**When to skip:** +- Incremental merges with few rows +- Tables with only a clustered index +- Frequent small updates + +### Timeout sizing guidelines + +| Operation | Rows | Suggested Timeout | +|-----------|------|-------------------| +| Bulk import | < 100K | 600s (default) | +| Bulk import | 100K - 1M | 1800s (30 min) | +| Bulk import | > 1M | 3600s (1 hour) | +| Bulk merge | < 50K | 600s (default) | +| Bulk merge | 50K - 500K | 1800s (30 min) | +| Index rebuild | Any | 3600s (default) | + +```csharp +// Large table with extended timeout +new DbBulkMergeDestination(factory, "HistoricalData", + matchColumns: new[] { "RecordId" }, + commandTimeoutSeconds: 1800) +``` + +### Reducing network and memory usage + +Select only needed columns in the source query: + +```csharp +// Good - select only needed columns +var source = new DbQuerySource(factory, + "SELECT OrderNumber, Status, StartDate FROM JDE.WorkOrders"); + +// Avoid - selecting all columns wastes bandwidth +var source = new DbQuerySource(factory, + "SELECT * FROM JDE.WorkOrders"); +``` + +Extra columns in the source are ignored by the destination column mapping, but they still consume network bandwidth and memory. + +### Monitoring baseline performance + +Track `PipelineResult.Elapsed` over time to detect degradation: + +```csharp +var result = await pipeline.ExecuteAsync(ct); + +metrics.RecordPipeline( + pipelineName: pipeline.PipelineName, + success: result.Success, + rows: result.TotalRows, + durationMs: result.Elapsed.TotalMilliseconds); + +// Alert if duration exceeds baseline by 50% +if (result.Elapsed > baselineDuration * 1.5) +{ + logger.LogWarning("Pipeline {Name} took {Elapsed} (baseline: {Baseline})", + pipeline.PipelineName, result.Elapsed, baselineDuration); +} +``` + +### Step-level performance analysis + +Identify slow steps using `StepResult.Elapsed`: + +```csharp +var slowSteps = result.Steps + .Where(s => s.Elapsed > TimeSpan.FromSeconds(30)) + .OrderByDescending(s => s.Elapsed); + +foreach (var step in slowSteps) +{ + logger.LogWarning("Slow step: {Name} took {Elapsed}", + step.StepName, step.Elapsed); +} +``` + +## Related Documentation + +- [Overview](./Overview.md) - Pipeline architecture +- [Configuration](./Configuration.md) - Timeout and batch size options +- [Destinations](./Destinations.md) - Script patterns for index management +``` + +**Step 2: Verify the file was created** + +```bash +ls -la DOCUMENTATION/DataSync/Troubleshooting.md +``` +Expected: File exists + +**Step 3: Commit** + +```bash +git add DOCUMENTATION/DataSync/Troubleshooting.md +git commit -m "docs: add ETL troubleshooting documentation" +``` + +--- + +### Task 7: Update ComponentMap.md + +**Files:** +- Modify: `DOCUMENTATION/Instructions/ComponentMap.md` + +**Step 1: Read the current ComponentMap.md** + +Read `DOCUMENTATION/Instructions/ComponentMap.md` to find where to add the ETL source mapping. + +**Step 2: Add ETL source paths to the DataSync section** + +Add the new ETL source paths to the DataSync section: + +```markdown +### DataSync/ + +Documents data synchronization from enterprise systems. + +**Source paths (Legacy):** +- `OLD/DataModel/Process/JDE*.cs` - JDE Oracle queries +- `OLD/DataModel/Process/CMS*.cs` - CMS Sybase queries +- `OLD/WorkerService/Process/UpdateProcessor.cs` - Sync orchestration +- `OLD/WorkerService/dsconfig/*.json` - Data source configs + +**Source paths (New):** +- `NEW/src/JdeScoping.DataSync/Etl/` - ETL pipeline framework +- `NEW/src/JdeScoping.DataSync/Etl/Contracts/` - Core interfaces +- `NEW/src/JdeScoping.DataSync/Etl/Pipeline/` - Pipeline and builder +- `NEW/src/JdeScoping.DataSync/Etl/Sources/` - Data sources +- `NEW/src/JdeScoping.DataSync/Etl/Transformers/` - Data transformers +- `NEW/src/JdeScoping.DataSync/Etl/Destinations/` - Bulk copy/merge destinations +- `NEW/src/JdeScoping.DataSync/Etl/Scripts/` - SQL script runners +- `NEW/src/JdeScoping.DataSync/Etl/Results/` - Execution result types + +**Typical files:** +- `Overview.md` - ETL pipeline architecture +- `Sources.md` - Writing custom data sources +- `Transformers.md` - Writing custom transformers +- `Destinations.md` - Bulk destinations and scripts +- `Configuration.md` - Pipeline builder and DI setup +- `Troubleshooting.md` - Debugging and performance +- `JDE.md` - JD Edwards (Oracle) integration +- `CMS.md` - CMS (Sybase) integration +- `Scheduling.md` - Mass/daily/hourly sync schedules +``` + +**Step 3: Verify the edit is correct** + +Read the modified section to confirm the changes. + +**Step 4: Commit** + +```bash +git add DOCUMENTATION/Instructions/ComponentMap.md +git commit -m "docs: add ETL source paths to ComponentMap" +``` + +--- + +### Task 8: Final verification + +**Step 1: List all created documentation files** + +```bash +ls -la DOCUMENTATION/DataSync/ +``` + +Expected output: +``` +Configuration.md +Destinations.md +Overview.md +Sources.md +Transformers.md +Troubleshooting.md +``` + +**Step 2: Verify all internal links resolve** + +Check that all cross-references between docs point to existing files: + +```bash +grep -h "\[.*\](\./" DOCUMENTATION/DataSync/*.md | sort -u +``` + +All referenced files should exist. + +**Step 3: Check git log for all commits** + +```bash +git log --oneline -8 +``` + +Expected: 7 commits for the documentation (6 docs + 1 ComponentMap update) + +**Step 4: Final status check** + +```bash +git status +``` + +Expected: Clean working tree