Files
2026-01-03 15:36:22 -05:00

8.7 KiB

Data Transformers

Transformers modify data as it flows through the pipeline. They wrap the source IDataReader in a decorator, allowing column renaming, dropping, type conversion, and computed columns.

Interface Contract

public interface IDataTransformer
{
    IDataReader Transform(IDataReader source);
    string TransformerName { get; }
    int MapOrdinal(int transformedOrdinal, IDataReader source);
}

Key methods:

  • Transform() - Wraps the source reader, returns a new reader with modifications
  • TransformerName - Used in logging and StepResult tracking
  • MapOrdinal() - Maps transformed ordinals to source ordinals. Returns -1 for computed columns.

DataTransformerBase

The base class provides default implementations and handles the decorator pattern:

public abstract class DataTransformerBase : IDataTransformer
{
    public abstract string TransformerName { get; }

    public IDataReader Transform(IDataReader source)
    {
        ArgumentNullException.ThrowIfNull(source);
        OnInitialize(source);
        return new TransformingDataReader(source, this);
    }

    protected virtual void OnInitialize(IDataReader source) { }

Default pass-through methods

Override only what you need to change:

    public virtual int GetFieldCount(IDataReader source) => source.FieldCount;
    public virtual string GetName(int ordinal, IDataReader source) => source.GetName(ordinal);
    public virtual Type GetFieldType(int ordinal, IDataReader source) => source.GetFieldType(ordinal);
    public virtual object GetValue(int ordinal, IDataReader source) => source.GetValue(ordinal);
    public virtual int GetOrdinal(string name, IDataReader source) => source.GetOrdinal(name);
    public virtual bool IsDBNull(int ordinal, IDataReader source) => source.IsDBNull(ordinal);
    public virtual int MapOrdinal(int transformedOrdinal, IDataReader source) => transformedOrdinal;

Binary method handling

Computed columns (where MapOrdinal returns -1) throw NotSupportedException:

    public virtual long GetBytes(int ordinal, long fieldOffset, byte[]? buffer,
        int bufferOffset, int length, IDataReader source)
    {
        var sourceOrdinal = MapOrdinal(ordinal, source);
        if (sourceOrdinal < 0)
            throw new NotSupportedException(
                $"GetBytes not supported for computed column at ordinal {ordinal}.");
        return source.GetBytes(sourceOrdinal, fieldOffset, buffer, bufferOffset, length);
    }

ColumnRenameTransformer

Renames columns without changing values or order:

public class ColumnRenameTransformer : DataTransformerBase
{
    private readonly Dictionary<string, string> _renames;
    private string[]? _outputNames;
    private Dictionary<string, int>? _nameToOrdinal;

    public override string TransformerName => $"RenameColumns:{_renames.Count}";

    public ColumnRenameTransformer(params (string OldName, string NewName)[] renames)
    {
        _renames = renames.ToDictionary(
            r => r.OldName, r => r.NewName, StringComparer.OrdinalIgnoreCase);
    }

Collision detection

The transformer validates that renames don't create duplicate column names:

    protected override void OnInitialize(IDataReader source)
    {
        _outputNames = new string[source.FieldCount];
        _nameToOrdinal = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);

        for (int i = 0; i < source.FieldCount; i++)
        {
            var originalName = source.GetName(i);
            var outputName = _renames.TryGetValue(originalName, out var newName)
                ? newName : originalName;

            if (_nameToOrdinal.TryGetValue(outputName, out var existingOrdinal))
            {
                throw new InvalidOperationException(
                    $"Column name collision: '{originalName}' → '{outputName}' conflicts with " +
                    $"'{source.GetName(existingOrdinal)}' (already at ordinal {existingOrdinal}).");
            }

            _outputNames[i] = outputName;
            _nameToOrdinal[outputName] = i;
        }
    }

ColumnDropTransformer

Removes specified columns from the output:

public class ColumnDropTransformer : DataTransformerBase
{
    private readonly HashSet<string> _columnsToDrop;
    private int[]? _ordinalMap;
    private Dictionary<string, int>? _nameToOrdinal;

    public override string TransformerName => $"DropColumns:{string.Join(",", _columnsToDrop)}";

    public ColumnDropTransformer(params string[] columnsToDrop)
    {
        _columnsToDrop = new HashSet<string>(columnsToDrop, StringComparer.OrdinalIgnoreCase);
    }

Ordinal mapping

Builds a map from output ordinals to source ordinals, excluding dropped columns:

    protected override void OnInitialize(IDataReader source)
    {
        var ordinalList = new List<int>();
        _nameToOrdinal = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);

        for (int i = 0; i < source.FieldCount; i++)
        {
            var name = source.GetName(i);
            if (!_columnsToDrop.Contains(name))
            {
                _nameToOrdinal[name] = ordinalList.Count;
                ordinalList.Add(i);
            }
        }
        _ordinalMap = ordinalList.ToArray();
    }

    public override int MapOrdinal(int transformedOrdinal, IDataReader source)
        => _ordinalMap![transformedOrdinal];

JdeDateTransformer

Combines JDE Julian date (CYYDDD) and time (HHMMSS) columns into a single DateTime:

public class JdeDateTransformer : DataTransformerBase
{
    public static readonly DateTime DefaultInvalidDateSentinel = new(1900, 1, 1);

    private readonly string _dateColumn;
    private readonly string _timeColumn;
    private readonly string _outputColumn;
    private readonly DateTime _invalidDateSentinel;

Computed column handling

The output DateTime column has no direct source ordinal, so MapOrdinal returns -1:

    public override int MapOrdinal(int transformedOrdinal, IDataReader source)
    {
        var sourceOrdinal = _ordinalMap![transformedOrdinal];
        return sourceOrdinal == _dateOrdinal ? -1 : sourceOrdinal;
    }

    public override string GetDataTypeName(int ordinal, IDataReader source)
    {
        var sourceOrdinal = _ordinalMap![ordinal];
        return sourceOrdinal == _dateOrdinal ? "datetime" : source.GetDataTypeName(sourceOrdinal);
    }

Date parsing with validation

Invalid dates return a configurable sentinel value (default: 1900-01-01):

    public static DateTime ParseJdeDateTime(decimal julianDate, decimal time, DateTime sentinel)
    {
        var dateInt = (int)julianDate;
        if (dateInt <= 0) return sentinel;

        var century = dateInt / 100000;
        var year = (dateInt / 1000) % 100;
        var dayOfYear = dateInt % 1000;

        if (century < 0 || century > 1) return sentinel;
        if (year < 0 || year > 99) return sentinel;
        if (dayOfYear < 1 || dayOfYear > 366) return sentinel;

        var fullYear = (century == 0 ? 1900 : 2000) + year;
        var daysInYear = DateTime.IsLeapYear(fullYear) ? 366 : 365;
        if (dayOfYear > daysInYear) return sentinel;

        var date = new DateTime(fullYear, 1, 1).AddDays(dayOfYear - 1);

        // Parse time (HHMMSS format)
        var timeInt = (int)time;
        var hours = timeInt / 10000;
        var minutes = (timeInt / 100) % 100;
        var seconds = timeInt % 100;

        if (hours < 0 || hours > 23) return sentinel;
        if (minutes < 0 || minutes > 59) return sentinel;
        if (seconds < 0 || seconds > 59) return sentinel;

        return date.AddHours(hours).AddMinutes(minutes).AddSeconds(seconds);
    }

Transformer Chaining

Transformers compose by wrapping each other. The pipeline applies them in order:

foreach (var transformer in _transformers)
{
    reader = transformer.Transform(reader);
}

Each transformer sees the output of the previous one. Ordinal mappings accumulate through the chain.

Validation in OnInitialize

Perform all validation in OnInitialize() to fail fast before processing data:

  • Check that required columns exist
  • Validate rename mappings don't create collisions
  • Build ordinal maps for efficient lookup during row processing