From 7ae2cd4882fb791c09662a64ee2c993338b25145 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 3 Jan 2026 10:21:02 -0500 Subject: [PATCH] docs: add ETL Pipeline Phase 2 design document Design addresses 7 issues identified in Phase 1 review: - Bulk copy column mapping (by name, intersect with destination) - Schema-qualified table names (parse tableName, QUOTENAME) - Configurable timeouts (single commandTimeoutSeconds) - JDE date sentinel (1900-01-01 default for invalid dates) - TransformingDataReader ordinal mapping (MapOrdinal on interface) - Transformer lifecycle (document single-use, no auto-dispose) - Column rename collision validation (fail-fast on duplicates) Reviewed by Codex MCP with all issues addressed. --- .../2026-01-03-etl-pipeline-phase2-design.md | 467 ++++++++++++++++++ 1 file changed, 467 insertions(+) create mode 100644 PLANS/2026-01-03-etl-pipeline-phase2-design.md diff --git a/PLANS/2026-01-03-etl-pipeline-phase2-design.md b/PLANS/2026-01-03-etl-pipeline-phase2-design.md new file mode 100644 index 0000000..ad33b92 --- /dev/null +++ b/PLANS/2026-01-03-etl-pipeline-phase2-design.md @@ -0,0 +1,467 @@ +# ETL Pipeline Phase 2 Design + +**Date:** 2026-01-03 +**Status:** Reviewed (Codex MCP) +**Purpose:** Extend the ETL pipeline with feature improvements: column mapping, schema-qualified tables, configurable timeouts, date validation, ordinal mapping fixes, and collision detection. + +## Problem Statement + +Phase 1 implemented the core ETL pipeline. Phase 2 addresses limitations identified during review: + +**Feature Gaps:** +1. Bulk merge relies on ordinal matching - source column order must match destination +2. Schema-qualified table names (`dbo.Table`) not fully supported in scripts +3. Bulk merge timeouts not configurable +4. JDE date parsing fails on invalid dates + +**Correctness Issues:** +5. `TransformingDataReader` bypasses transformer mappings for GetBytes/GetChars/GetData +6. Column rename collisions not validated + +## Design Decisions + +| Issue | Approach | +|-------|----------| +| Column mapping for bulk ops | Explicit `SqlBulkCopy.ColumnMappings`, intersect with destination schema | +| Schema-qualified tables | Parse tableName for schema, default `"dbo"` | +| Configurable timeouts | Single `commandTimeoutSeconds` for all SQL operations | +| Invalid JDE dates | Return sentinel date (`1900-01-01` default) | +| TransformingDataReader bypasses | `MapOrdinal` on interface + base class overrides for computed columns | +| Column rename collisions | Throw on collision during init (including pre-existing duplicates) | +| Transformer disposal | Do not dispose transformers (single-use documented) | + +## Detailed Design + +### 1. Bulk Copy Column Mapping + +**Problem:** `DbBulkMergeDestination` and `DbBulkImportDestination` rely on ordinal matching between source and destination. Transformers that rename or reorder columns break bulk copy. Extra columns in source cause errors. + +**Solution:** Query destination schema and map only columns that exist in both source and destination. + +```csharp +// Query destination columns once +private async Task> GetDestinationColumnsAsync( + SqlConnection connection, string tableName, string schemaName, CancellationToken ct) +{ + var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName"; + var columns = await connection.QueryAsync(sql, new { tableName, schemaName }); + return columns.ToHashSet(StringComparer.OrdinalIgnoreCase); +} + +// In ProcessBatchAsync +var destColumns = await GetDestinationColumnsAsync(connection, _tableName, _schemaName, ct); + +using var bulkCopy = new SqlBulkCopy(connection) +{ + DestinationTableName = tempTableName, + BatchSize = batch.Rows.Count, + BulkCopyTimeout = _commandTimeoutSeconds +}; + +// Map only columns that exist in both source and destination +for (int i = 0; i < source.FieldCount; i++) +{ + var columnName = source.GetName(i); + if (destColumns.Contains(columnName)) + { + bulkCopy.ColumnMappings.Add(columnName, columnName); + } +} +``` + +**Benefits:** +1. Decouples ordinal position from column identity +2. Extra source columns are silently ignored (e.g., computed columns for logging) +3. Missing destination columns cause clear error from SqlBulkCopy + +Both `DbBulkImportDestination` and `DbBulkMergeDestination` will use this pattern. + +### 2. Schema-Qualified Table Names + +**Problem:** `CommonScripts` methods incorrectly handle `dbo.WorkOrder` format - both the bracketing and the `sys.tables` lookup fail. + +**Solution:** Parse tableName to extract schema if present, use `QUOTENAME()` for safe dynamic SQL. + +```csharp +public static class CommonScripts +{ + /// + /// Parses a table name, extracting schema if present. + /// Supports: "Table", "dbo.Table", "[dbo].[Table]" + /// + private static (string Schema, string Table) ParseTableName(string tableName) + { + // Remove brackets for parsing + var cleaned = tableName.Replace("[", "").Replace("]", ""); + var parts = cleaned.Split('.', 2); + return parts.Length == 2 + ? (parts[0], parts[1]) + : ("dbo", parts[0]); + } + + public static IScriptRunner DisableIndexes( + IDbConnectionFactory factory, + string tableName, + int timeoutSeconds = 300) + { + var (schema, table) = ParseTableName(tableName); + + // Use QUOTENAME() for safe dynamic SQL - prevents injection + var sql = @" +DECLARE @sql NVARCHAR(MAX) = ''; +DECLARE @fullTableName NVARCHAR(256) = QUOTENAME(@schemaName) + '.' + QUOTENAME(@tableName); + +SELECT @sql = @sql + 'ALTER INDEX ' + QUOTENAME(i.name) + ' ON ' + @fullTableName + ' DISABLE;' + CHAR(13) +FROM sys.indexes i +INNER JOIN sys.tables t ON i.object_id = t.object_id +INNER JOIN sys.schemas s ON t.schema_id = s.schema_id +WHERE t.name = @tableName + AND s.name = @schemaName + AND i.type = 2 + AND i.is_disabled = 0; + +IF LEN(@sql) > 0 EXEC sp_executesql @sql;"; + + return new SqlScriptRunner(factory, sql, $"DisableIndexes:{schema}.{table}", + parameters: new { tableName = table, schemaName = schema }, + timeoutSeconds: timeoutSeconds); + } + + // Same pattern for RebuildIndexes, UpdateStatistics +} +``` + +**Changes:** +- Parse tableName to extract schema (supports `"Table"`, `"dbo.Table"`, `"[dbo].[Table]"`) +- Use `QUOTENAME()` in dynamic SQL (prevents injection, handles special characters) +- Join `sys.schemas` to properly filter by schema +- `SqlScriptRunner` needs to accept parameters (new capability) + +### 3. Configurable Timeouts + +**Problem:** `SqlBulkCopy` uses 30-second default timeout, insufficient for large tables. MERGE and other SQL commands also need timeout configuration. + +**Solution:** Single `commandTimeoutSeconds` parameter applied to all SQL operations (bulk copy, MERGE, TRUNCATE, etc.). + +**Destination changes:** + +```csharp +public class DbBulkMergeDestination : IImportDestination +{ + private const int DefaultCommandTimeoutSeconds = 600; // 10 minutes + private readonly int _commandTimeoutSeconds; + + public DbBulkMergeDestination( + IDbConnectionFactory connectionFactory, + string tableName, + string[] matchColumns, + string[]? updateColumns = null, + int batchSize = 0, + int commandTimeoutSeconds = 0) // 0 = use default (600) + { + _commandTimeoutSeconds = commandTimeoutSeconds > 0 + ? commandTimeoutSeconds + : DefaultCommandTimeoutSeconds; + } +} + +// Usage in WriteAsync: +using var bulkCopy = new SqlBulkCopy(connection) +{ + BulkCopyTimeout = _commandTimeoutSeconds +}; + +await using var cmd = connection.CreateCommand(); +cmd.CommandTimeout = _commandTimeoutSeconds; // For MERGE, TRUNCATE, etc. +``` + +**Pipeline builder changes:** + +```csharp +public class EtlPipelineBuilder +{ + private int _defaultCommandTimeoutSeconds = 600; + + public EtlPipelineBuilder WithCommandTimeout(TimeSpan timeout) + { + if (timeout < TimeSpan.Zero || timeout > TimeSpan.FromHours(24)) + throw new ArgumentOutOfRangeException(nameof(timeout), + "Timeout must be between 0 and 24 hours."); + _defaultCommandTimeoutSeconds = (int)timeout.TotalSeconds; + return this; + } +} +``` + +**Key points:** +- Single timeout applies to all SQL operations (bulk copy, MERGE, TRUNCATE, CREATE TABLE) +- Constructor parameter overrides pipeline default +- Validation: throws `ArgumentOutOfRangeException` for negative or > 24 hours +- Default: 10 minutes (600 seconds) + +### 4. JDE Date Sentinel Handling + +**Problem:** `JdeDateTransformer.ParseJdeDateTime` assumes valid CYYDDD format. Invalid values produce incorrect dates or exceptions. + +**Solution:** Validate date components and return configurable sentinel on failure. Default sentinel is `1900-01-01` (compatible with SQL `datetime` and `datetime2`). + +```csharp +public class JdeDateTransformer : DataTransformerBase +{ + /// + /// Default sentinel for invalid dates. 1900-01-01 is compatible with + /// both SQL datetime (min 1753-01-01) and datetime2. + /// + public static readonly DateTime DefaultInvalidDateSentinel = new(1900, 1, 1); + + private readonly DateTime _invalidDateSentinel; + + public JdeDateTransformer( + string dateColumn, + string timeColumn, + string outputColumn, + DateTime? invalidDateSentinel = null) // null = 1900-01-01 + { + _invalidDateSentinel = invalidDateSentinel ?? DefaultInvalidDateSentinel; + } + + public static DateTime ParseJdeDateTime(decimal julianDate, decimal time, DateTime sentinel) + { + var dateInt = (int)julianDate; + if (dateInt <= 0) return sentinel; + + var century = dateInt / 100000; + var year = (dateInt / 1000) % 100; + var dayOfYear = dateInt % 1000; + + // Validate components + if (century < 0 || century > 1) return sentinel; + if (year < 0 || year > 99) return sentinel; + if (dayOfYear < 1 || dayOfYear > 366) return sentinel; + + var fullYear = (century == 0 ? 1900 : 2000) + year; + + // Validate day of year for the specific year + var daysInYear = DateTime.IsLeapYear(fullYear) ? 366 : 365; + if (dayOfYear > daysInYear) return sentinel; + + var date = new DateTime(fullYear, 1, 1).AddDays(dayOfYear - 1); + + // Validate time components + var timeInt = (int)time; + var hours = timeInt / 10000; + var minutes = (timeInt / 100) % 100; + var seconds = timeInt % 100; + + if (hours < 0 || hours > 23) return sentinel; + if (minutes < 0 || minutes > 59) return sentinel; + if (seconds < 0 || seconds > 59) return sentinel; + + return date.AddHours(hours).AddMinutes(minutes).AddSeconds(seconds); + } +} +``` + +**Key points:** +- Default sentinel `1900-01-01` is compatible with SQL `datetime` (min 1753-01-01) +- Validates century (0-1), year (0-99), day of year (1-366), and time components +- Returns sentinel for any invalid component rather than throwing + +### 5. TransformingDataReader Ordinal Mapping + +**Problem:** `GetBytes`, `GetChars`, and `GetData` bypass transformer ordinal mapping. For computed columns (like JDE date output), these methods would return source data instead of transformed data. + +**Solution:** Add `MapOrdinal` to `IDataTransformer` interface. Override `GetBytes`/`GetChars`/`GetData`/`GetDataTypeName` in `DataTransformerBase` to handle computed columns. + +**IDataTransformer interface changes:** + +```csharp +public interface IDataTransformer +{ + IDataReader Transform(IDataReader source); + string TransformerName { get; } + + /// + /// Maps a transformed ordinal to the source ordinal. + /// Returns -1 for computed columns that have no source ordinal. + /// + int MapOrdinal(int transformedOrdinal, IDataReader source); +} +``` + +**DataTransformerBase changes:** + +```csharp +public abstract class DataTransformerBase : IDataTransformer +{ + /// + /// Maps a transformed ordinal to the source ordinal. + /// Returns -1 for computed columns. Default returns ordinal unchanged. + /// + public virtual int MapOrdinal(int transformedOrdinal, IDataReader source) + => transformedOrdinal; + + /// + /// Gets bytes for the specified ordinal. Throws for computed columns. + /// + public virtual long GetBytes(int ordinal, long fieldOffset, byte[]? buffer, + int bufferOffset, int length, IDataReader source) + { + var sourceOrdinal = MapOrdinal(ordinal, source); + if (sourceOrdinal < 0) + throw new NotSupportedException( + $"GetBytes not supported for computed column at ordinal {ordinal}."); + return source.GetBytes(sourceOrdinal, fieldOffset, buffer, bufferOffset, length); + } + + /// + /// Gets chars for the specified ordinal. Throws for computed columns. + /// + public virtual long GetChars(int ordinal, long fieldOffset, char[]? buffer, + int bufferOffset, int length, IDataReader source) + { + var sourceOrdinal = MapOrdinal(ordinal, source); + if (sourceOrdinal < 0) + throw new NotSupportedException( + $"GetChars not supported for computed column at ordinal {ordinal}."); + return source.GetChars(sourceOrdinal, fieldOffset, buffer, bufferOffset, length); + } + + // Similar for GetData, GetDataTypeName +} +``` + +**JdeDateTransformer override:** + +```csharp +public override int MapOrdinal(int transformedOrdinal, IDataReader source) +{ + var sourceOrdinal = _ordinalMap![transformedOrdinal]; + // The output DateTime column maps to the date source ordinal, + // but GetBytes/GetChars make no sense for it + return sourceOrdinal == _dateOrdinal ? -1 : sourceOrdinal; +} + +public override string GetDataTypeName(int ordinal, IDataReader source) +{ + var sourceOrdinal = _ordinalMap![ordinal]; + return sourceOrdinal == _dateOrdinal ? "datetime" : source.GetDataTypeName(sourceOrdinal); +} +``` + +**TransformingDataReader delegates to transformer:** + +```csharp +public long GetBytes(int i, long fieldOffset, byte[]? buffer, int bufferoffset, int length) + => _transformer.GetBytes(i, fieldOffset, buffer, bufferoffset, length, _source); + +public string GetDataTypeName(int i) + => _transformer.GetDataTypeName(i, _source); +``` + +### 6. Transformer Lifecycle + +**Problem:** Should transformers be disposed by the reader? + +**Decision:** Do not dispose transformers. Document single-use pattern. + +**Rationale:** +- Current transformers are stateless - no resources to dispose +- Disposing would break if transformer is reused across readers +- Adding `IDisposable` now would require all custom transformers to implement it +- YAGNI - we can add disposal later if a stateful transformer is needed + +**Documentation requirement:** + +```csharp +/// +/// Base class for data transformers. +/// +/// +/// Transformers are designed for single-use per reader. If a transformer needs +/// to hold disposable resources, it should implement IDisposable directly and +/// be disposed by the caller after pipeline completion. +/// +public abstract class DataTransformerBase : IDataTransformer +{ + // ... +} +``` + +**TransformingDataReader** continues to only dispose the source reader: + +```csharp +public void Dispose() => _source.Dispose(); +``` + +### 7. Column Rename Collision Validation + +**Problem:** `ColumnRenameTransformer` doesn't detect when renames create duplicate column names. + +**Solution:** Validate in `OnInitialize()` and throw with clear error message. + +```csharp +public class ColumnRenameTransformer : DataTransformerBase +{ + protected override void OnInitialize(IDataReader source) + { + _outputNames = new string[source.FieldCount]; + _nameToOrdinal = new Dictionary(StringComparer.OrdinalIgnoreCase); + + for (int i = 0; i < source.FieldCount; i++) + { + var originalName = source.GetName(i); + var outputName = _renames.TryGetValue(originalName, out var newName) + ? newName + : originalName; + + // Collision detection (catches both rename collisions and pre-existing duplicates) + if (_nameToOrdinal.ContainsKey(outputName)) + { + var existingOrdinal = _nameToOrdinal[outputName]; + var existingOriginal = source.GetName(existingOrdinal); + throw new InvalidOperationException( + $"Column name collision: '{originalName}' → '{outputName}' conflicts with " + + $"'{existingOriginal}' (already at ordinal {existingOrdinal}). " + + $"Each output column name must be unique."); + } + + _outputNames[i] = outputName; + _nameToOrdinal[outputName] = i; + } + } +} +``` + +**Behavior:** +- Throws on rename collisions (e.g., `A→B` when `B` exists) +- Also throws on pre-existing duplicate column names in source (schema requirement) +- Error message includes: which columns collide, at which ordinals +- Fails fast during pipeline setup, before any data is processed + +## Files to Modify + +| File | Changes | +|------|---------| +| `IDataTransformer.cs` | Add `MapOrdinal()` method | +| `DataTransformerBase.cs` | Add `MapOrdinal()`, `GetBytes()`, `GetChars()`, `GetData()`, `GetDataTypeName()` | +| `TransformingDataReader.cs` | Delegate binary methods to transformer | +| `JdeDateTransformer.cs` | Add sentinel parameter, validation, `MapOrdinal()` override, `GetDataTypeName()` | +| `ColumnRenameTransformer.cs` | Add collision detection | +| `ColumnDropTransformer.cs` | Add `MapOrdinal()` override | +| `DbBulkMergeDestination.cs` | Add column mappings, schema parsing, timeout parameter | +| `DbBulkImportDestination.cs` | Add column mappings, timeout parameter | +| `CommonScripts.cs` | Add `ParseTableName()`, use `QUOTENAME()`, parameterized SQL | +| `SqlScriptRunner.cs` | Add parameters support | +| `EtlPipelineBuilder.cs` | Add `WithCommandTimeout()` with validation | + +## Breaking Changes + +1. **IDataTransformer interface** - New `MapOrdinal()` method required (custom transformers must implement) +2. **JdeDateTransformer constructor** - New `invalidDateSentinel` parameter (has default `1900-01-01`) +3. **DbBulkMergeDestination/DbBulkImportDestination constructors** - New `commandTimeoutSeconds` parameter (has default) + +**Behavioral changes:** +- `ColumnRenameTransformer` now throws on pre-existing duplicate column names (previously silent) +- Bulk copy now only maps columns that exist in destination (extra source columns ignored)