# ETL Pipeline Phase 2 Design **Date:** 2026-01-03 **Status:** Reviewed (Codex MCP) **Purpose:** Extend the ETL pipeline with feature improvements: column mapping, schema-qualified tables, configurable timeouts, date validation, ordinal mapping fixes, and collision detection. ## Problem Statement Phase 1 implemented the core ETL pipeline. Phase 2 addresses limitations identified during review: **Feature Gaps:** 1. Bulk merge relies on ordinal matching - source column order must match destination 2. Schema-qualified table names (`dbo.Table`) not fully supported in scripts 3. Bulk merge timeouts not configurable 4. JDE date parsing fails on invalid dates **Correctness Issues:** 5. `TransformingDataReader` bypasses transformer mappings for GetBytes/GetChars/GetData 6. Column rename collisions not validated ## Design Decisions | Issue | Approach | |-------|----------| | Column mapping for bulk ops | Explicit `SqlBulkCopy.ColumnMappings`, intersect with destination schema | | Schema-qualified tables | Parse tableName for schema, default `"dbo"` | | Configurable timeouts | Single `commandTimeoutSeconds` for all SQL operations | | Invalid JDE dates | Return sentinel date (`1900-01-01` default) | | TransformingDataReader bypasses | `MapOrdinal` on interface + base class overrides for computed columns | | Column rename collisions | Throw on collision during init (including pre-existing duplicates) | | Transformer disposal | Do not dispose transformers (single-use documented) | ## Detailed Design ### 1. Bulk Copy Column Mapping **Problem:** `DbBulkMergeDestination` and `DbBulkImportDestination` rely on ordinal matching between source and destination. Transformers that rename or reorder columns break bulk copy. Extra columns in source cause errors. **Solution:** Query destination schema and map only columns that exist in both source and destination. ```csharp // Query destination columns once private async Task> GetDestinationColumnsAsync( SqlConnection connection, string tableName, string schemaName, CancellationToken ct) { var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName"; var columns = await connection.QueryAsync(sql, new { tableName, schemaName }); return columns.ToHashSet(StringComparer.OrdinalIgnoreCase); } // In ProcessBatchAsync var destColumns = await GetDestinationColumnsAsync(connection, _tableName, _schemaName, ct); using var bulkCopy = new SqlBulkCopy(connection) { DestinationTableName = tempTableName, BatchSize = batch.Rows.Count, BulkCopyTimeout = _commandTimeoutSeconds }; // Map only columns that exist in both source and destination for (int i = 0; i < source.FieldCount; i++) { var columnName = source.GetName(i); if (destColumns.Contains(columnName)) { bulkCopy.ColumnMappings.Add(columnName, columnName); } } ``` **Benefits:** 1. Decouples ordinal position from column identity 2. Extra source columns are silently ignored (e.g., computed columns for logging) 3. Missing destination columns cause clear error from SqlBulkCopy Both `DbBulkImportDestination` and `DbBulkMergeDestination` will use this pattern. ### 2. Schema-Qualified Table Names **Problem:** `CommonScripts` methods incorrectly handle `dbo.WorkOrder` format - both the bracketing and the `sys.tables` lookup fail. **Solution:** Parse tableName to extract schema if present, use `QUOTENAME()` for safe dynamic SQL. ```csharp public static class CommonScripts { /// /// Parses a table name, extracting schema if present. /// Supports: "Table", "dbo.Table", "[dbo].[Table]" /// private static (string Schema, string Table) ParseTableName(string tableName) { // Remove brackets for parsing var cleaned = tableName.Replace("[", "").Replace("]", ""); var parts = cleaned.Split('.', 2); return parts.Length == 2 ? (parts[0], parts[1]) : ("dbo", parts[0]); } public static IScriptRunner DisableIndexes( IDbConnectionFactory factory, string tableName, int timeoutSeconds = 300) { var (schema, table) = ParseTableName(tableName); // Use QUOTENAME() for safe dynamic SQL - prevents injection var sql = @" DECLARE @sql NVARCHAR(MAX) = ''; DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName); SELECT @sql = @sql + 'ALTER INDEX ' + QUOTENAME(i.name) + ' ON ' + @fullTableName + ' DISABLE;' + CHAR(13) FROM sys.indexes i INNER JOIN sys.tables t ON i.object_id = t.object_id INNER JOIN sys.schemas s ON t.schema_id = s.schema_id WHERE t.name = @tableName AND s.name = @schemaName AND i.type = 2 AND i.is_disabled = 0; IF LEN(@sql) > 0 EXEC sp_executesql @sql;"; return new SqlScriptRunner(factory, sql, $"DisableIndexes:{schema}.{table}", parameters: new { tableName = table, schemaName = schema }, timeoutSeconds: timeoutSeconds); } // Same pattern for RebuildIndexes, UpdateStatistics } ``` **Changes:** - Parse tableName to extract schema (supports `"Table"`, `"dbo.Table"`, `"[dbo].[Table]"`) - Use `QUOTENAME()` in dynamic SQL (prevents injection, handles special characters) - Join `sys.schemas` to properly filter by schema - `SqlScriptRunner` needs to accept parameters (new capability) ### 3. Configurable Timeouts **Problem:** `SqlBulkCopy` uses 30-second default timeout, insufficient for large tables. MERGE and other SQL commands also need timeout configuration. **Solution:** Single `commandTimeoutSeconds` parameter applied to all SQL operations (bulk copy, MERGE, TRUNCATE, etc.). **Destination changes:** ```csharp public class DbBulkMergeDestination : IImportDestination { private const int DefaultCommandTimeoutSeconds = 600; // 10 minutes private readonly int _commandTimeoutSeconds; public DbBulkMergeDestination( IDbConnectionFactory connectionFactory, string tableName, string[] matchColumns, string[]? updateColumns = null, int batchSize = 0, int commandTimeoutSeconds = 0) // 0 = use default (600) { _commandTimeoutSeconds = commandTimeoutSeconds > 0 ? commandTimeoutSeconds : DefaultCommandTimeoutSeconds; } } // Usage in WriteAsync: using var bulkCopy = new SqlBulkCopy(connection) { BulkCopyTimeout = _commandTimeoutSeconds }; await using var cmd = connection.CreateCommand(); cmd.CommandTimeout = _commandTimeoutSeconds; // For MERGE, TRUNCATE, etc. ``` **Pipeline builder changes:** ```csharp public class EtlPipelineBuilder { private int _defaultCommandTimeoutSeconds = 600; public EtlPipelineBuilder WithCommandTimeout(TimeSpan timeout) { if (timeout < TimeSpan.Zero || timeout > TimeSpan.FromHours(24)) throw new ArgumentOutOfRangeException(nameof(timeout), "Timeout must be between 0 and 24 hours."); _defaultCommandTimeoutSeconds = (int)timeout.TotalSeconds; return this; } } ``` **Key points:** - Single timeout applies to all SQL operations (bulk copy, MERGE, TRUNCATE, CREATE TABLE) - Constructor parameter overrides pipeline default - Validation: throws `ArgumentOutOfRangeException` for negative or > 24 hours - Default: 10 minutes (600 seconds) ### 4. JDE Date Sentinel Handling **Problem:** `JdeDateTransformer.ParseJdeDateTime` assumes valid CYYDDD format. Invalid values produce incorrect dates or exceptions. **Solution:** Validate date components and return configurable sentinel on failure. Default sentinel is `1900-01-01` (compatible with SQL `datetime` and `datetime2`). ```csharp public class JdeDateTransformer : DataTransformerBase { /// /// Default sentinel for invalid dates. 1900-01-01 is compatible with /// both SQL datetime (min 1753-01-01) and datetime2. /// public static readonly DateTime DefaultInvalidDateSentinel = new(1900, 1, 1); private readonly DateTime _invalidDateSentinel; public JdeDateTransformer( string dateColumn, string timeColumn, string outputColumn, DateTime? invalidDateSentinel = null) // null = 1900-01-01 { _invalidDateSentinel = invalidDateSentinel ?? DefaultInvalidDateSentinel; } public static DateTime ParseJdeDateTime(decimal julianDate, decimal time, DateTime sentinel) { var dateInt = (int)julianDate; if (dateInt <= 0) return sentinel; var century = dateInt / 100000; var year = (dateInt / 1000) % 100; var dayOfYear = dateInt % 1000; // Validate components if (century < 0 || century > 1) return sentinel; if (year < 0 || year > 99) return sentinel; if (dayOfYear < 1 || dayOfYear > 366) return sentinel; var fullYear = (century == 0 ? 1900 : 2000) + year; // Validate day of year for the specific year var daysInYear = DateTime.IsLeapYear(fullYear) ? 366 : 365; if (dayOfYear > daysInYear) return sentinel; var date = new DateTime(fullYear, 1, 1).AddDays(dayOfYear - 1); // Validate time components var timeInt = (int)time; var hours = timeInt / 10000; var minutes = (timeInt / 100) % 100; var seconds = timeInt % 100; if (hours < 0 || hours > 23) return sentinel; if (minutes < 0 || minutes > 59) return sentinel; if (seconds < 0 || seconds > 59) return sentinel; return date.AddHours(hours).AddMinutes(minutes).AddSeconds(seconds); } } ``` **Key points:** - Default sentinel `1900-01-01` is compatible with SQL `datetime` (min 1753-01-01) - Validates century (0-1), year (0-99), day of year (1-366), and time components - Returns sentinel for any invalid component rather than throwing ### 5. TransformingDataReader Ordinal Mapping **Problem:** `GetBytes`, `GetChars`, and `GetData` bypass transformer ordinal mapping. For computed columns (like JDE date output), these methods would return source data instead of transformed data. **Solution:** Add `MapOrdinal` to `IDataTransformer` interface. Override `GetBytes`/`GetChars`/`GetData`/`GetDataTypeName` in `DataTransformerBase` to handle computed columns. **IDataTransformer interface changes:** ```csharp public interface IDataTransformer { IDataReader Transform(IDataReader source); string TransformerName { get; } /// /// Maps a transformed ordinal to the source ordinal. /// Returns -1 for computed columns that have no source ordinal. /// int MapOrdinal(int transformedOrdinal, IDataReader source); } ``` **DataTransformerBase changes:** ```csharp public abstract class DataTransformerBase : IDataTransformer { /// /// Maps a transformed ordinal to the source ordinal. /// Returns -1 for computed columns. Default returns ordinal unchanged. /// public virtual int MapOrdinal(int transformedOrdinal, IDataReader source) => transformedOrdinal; /// /// Gets bytes for the specified ordinal. Throws for computed columns. /// public virtual long GetBytes(int ordinal, long fieldOffset, byte[]? buffer, int bufferOffset, int length, IDataReader source) { var sourceOrdinal = MapOrdinal(ordinal, source); if (sourceOrdinal < 0) throw new NotSupportedException( $"GetBytes not supported for computed column at ordinal {ordinal}."); return source.GetBytes(sourceOrdinal, fieldOffset, buffer, bufferOffset, length); } /// /// Gets chars for the specified ordinal. Throws for computed columns. /// public virtual long GetChars(int ordinal, long fieldOffset, char[]? buffer, int bufferOffset, int length, IDataReader source) { var sourceOrdinal = MapOrdinal(ordinal, source); if (sourceOrdinal < 0) throw new NotSupportedException( $"GetChars not supported for computed column at ordinal {ordinal}."); return source.GetChars(sourceOrdinal, fieldOffset, buffer, bufferOffset, length); } // Similar for GetData, GetDataTypeName } ``` **JdeDateTransformer override:** ```csharp public override int MapOrdinal(int transformedOrdinal, IDataReader source) { var sourceOrdinal = _ordinalMap![transformedOrdinal]; // The output DateTime column maps to the date source ordinal, // but GetBytes/GetChars make no sense for it return sourceOrdinal == _dateOrdinal ? -1 : sourceOrdinal; } public override string GetDataTypeName(int ordinal, IDataReader source) { var sourceOrdinal = _ordinalMap![ordinal]; return sourceOrdinal == _dateOrdinal ? "datetime" : source.GetDataTypeName(sourceOrdinal); } ``` **TransformingDataReader delegates to transformer:** ```csharp public long GetBytes(int i, long fieldOffset, byte[]? buffer, int bufferoffset, int length) => _transformer.GetBytes(i, fieldOffset, buffer, bufferoffset, length, _source); public string GetDataTypeName(int i) => _transformer.GetDataTypeName(i, _source); ``` ### 6. Transformer Lifecycle **Problem:** Should transformers be disposed by the reader? **Decision:** Do not dispose transformers. Document single-use pattern. **Rationale:** - Current transformers are stateless - no resources to dispose - Disposing would break if transformer is reused across readers - Adding `IDisposable` now would require all custom transformers to implement it - YAGNI - we can add disposal later if a stateful transformer is needed **Documentation requirement:** ```csharp /// /// Base class for data transformers. /// /// /// Transformers are designed for single-use per reader. If a transformer needs /// to hold disposable resources, it should implement IDisposable directly and /// be disposed by the caller after pipeline completion. /// public abstract class DataTransformerBase : IDataTransformer { // ... } ``` **TransformingDataReader** continues to only dispose the source reader: ```csharp public void Dispose() => _source.Dispose(); ``` ### 7. Column Rename Collision Validation **Problem:** `ColumnRenameTransformer` doesn't detect when renames create duplicate column names. **Solution:** Validate in `OnInitialize()` and throw with clear error message. ```csharp public class ColumnRenameTransformer : DataTransformerBase { protected override void OnInitialize(IDataReader source) { _outputNames = new string[source.FieldCount]; _nameToOrdinal = new Dictionary(StringComparer.OrdinalIgnoreCase); for (int i = 0; i < source.FieldCount; i++) { var originalName = source.GetName(i); var outputName = _renames.TryGetValue(originalName, out var newName) ? newName : originalName; // Collision detection (catches both rename collisions and pre-existing duplicates) if (_nameToOrdinal.ContainsKey(outputName)) { var existingOrdinal = _nameToOrdinal[outputName]; var existingOriginal = source.GetName(existingOrdinal); throw new InvalidOperationException( $"Column name collision: '{originalName}' → '{outputName}' conflicts with " + $"'{existingOriginal}' (already at ordinal {existingOrdinal}). " + $"Each output column name must be unique."); } _outputNames[i] = outputName; _nameToOrdinal[outputName] = i; } } } ``` **Behavior:** - Throws on rename collisions (e.g., `A→B` when `B` exists) - Also throws on pre-existing duplicate column names in source (schema requirement) - Error message includes: which columns collide, at which ordinals - Fails fast during pipeline setup, before any data is processed ## Files to Modify | File | Changes | |------|---------| | `IDataTransformer.cs` | Add `MapOrdinal()` method | | `DataTransformerBase.cs` | Add `MapOrdinal()`, `GetBytes()`, `GetChars()`, `GetData()`, `GetDataTypeName()` | | `TransformingDataReader.cs` | Delegate binary methods to transformer | | `JdeDateTransformer.cs` | Add sentinel parameter, validation, `MapOrdinal()` override, `GetDataTypeName()` | | `ColumnRenameTransformer.cs` | Add collision detection | | `ColumnDropTransformer.cs` | Add `MapOrdinal()` override | | `DbBulkMergeDestination.cs` | Add column mappings, schema parsing, timeout parameter | | `DbBulkImportDestination.cs` | Add column mappings, timeout parameter | | `CommonScripts.cs` | Add `ParseTableName()`, use `QUOTENAME()`, parameterized SQL | | `SqlScriptRunner.cs` | Add parameters support | | `EtlPipelineBuilder.cs` | Add `WithCommandTimeout()` with validation | ## Breaking Changes 1. **IDataTransformer interface** - New `MapOrdinal()` method required (custom transformers must implement) 2. **JdeDateTransformer constructor** - New `invalidDateSentinel` parameter (has default `1900-01-01`) 3. **DbBulkMergeDestination/DbBulkImportDestination constructors** - New `commandTimeoutSeconds` parameter (has default) **Behavioral changes:** - `ColumnRenameTransformer` now throws on pre-existing duplicate column names (previously silent) - Bulk copy now only maps columns that exist in destination (extra source columns ignored)