docs: add ETL transformers documentation
This commit is contained in:
@@ -0,0 +1,260 @@
|
||||
# Data Transformers
|
||||
|
||||
Transformers modify data as it flows through the pipeline. They wrap the source `IDataReader` in a decorator, allowing column renaming, dropping, type conversion, and computed columns.
|
||||
|
||||
## Interface Contract
|
||||
|
||||
```csharp
|
||||
public interface IDataTransformer
|
||||
{
|
||||
IDataReader Transform(IDataReader source);
|
||||
string TransformerName { get; }
|
||||
int MapOrdinal(int transformedOrdinal, IDataReader source);
|
||||
}
|
||||
```
|
||||
|
||||
**Key methods:**
|
||||
- `Transform()` - Wraps the source reader, returns a new reader with modifications
|
||||
- `TransformerName` - Used in logging and `StepResult` tracking
|
||||
- `MapOrdinal()` - Maps transformed ordinals to source ordinals. Returns `-1` for computed columns.
|
||||
|
||||
## DataTransformerBase
|
||||
|
||||
The base class provides default implementations and handles the decorator pattern:
|
||||
|
||||
```csharp
|
||||
public abstract class DataTransformerBase : IDataTransformer
|
||||
{
|
||||
public abstract string TransformerName { get; }
|
||||
|
||||
public IDataReader Transform(IDataReader source)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(source);
|
||||
OnInitialize(source);
|
||||
return new TransformingDataReader(source, this);
|
||||
}
|
||||
|
||||
protected virtual void OnInitialize(IDataReader source) { }
|
||||
```
|
||||
|
||||
### Default pass-through methods
|
||||
|
||||
Override only what you need to change:
|
||||
|
||||
```csharp
|
||||
public virtual int GetFieldCount(IDataReader source) => source.FieldCount;
|
||||
public virtual string GetName(int ordinal, IDataReader source) => source.GetName(ordinal);
|
||||
public virtual Type GetFieldType(int ordinal, IDataReader source) => source.GetFieldType(ordinal);
|
||||
public virtual object GetValue(int ordinal, IDataReader source) => source.GetValue(ordinal);
|
||||
public virtual int GetOrdinal(string name, IDataReader source) => source.GetOrdinal(name);
|
||||
public virtual bool IsDBNull(int ordinal, IDataReader source) => source.IsDBNull(ordinal);
|
||||
public virtual int MapOrdinal(int transformedOrdinal, IDataReader source) => transformedOrdinal;
|
||||
```
|
||||
|
||||
### Binary method handling
|
||||
|
||||
Computed columns (where `MapOrdinal` returns `-1`) throw `NotSupportedException`:
|
||||
|
||||
```csharp
|
||||
public virtual long GetBytes(int ordinal, long fieldOffset, byte[]? buffer,
|
||||
int bufferOffset, int length, IDataReader source)
|
||||
{
|
||||
var sourceOrdinal = MapOrdinal(ordinal, source);
|
||||
if (sourceOrdinal < 0)
|
||||
throw new NotSupportedException(
|
||||
$"GetBytes not supported for computed column at ordinal {ordinal}.");
|
||||
return source.GetBytes(sourceOrdinal, fieldOffset, buffer, bufferOffset, length);
|
||||
}
|
||||
```
|
||||
|
||||
## ColumnRenameTransformer
|
||||
|
||||
Renames columns without changing values or order:
|
||||
|
||||
```csharp
|
||||
public class ColumnRenameTransformer : DataTransformerBase
|
||||
{
|
||||
private readonly Dictionary<string, string> _renames;
|
||||
private string[]? _outputNames;
|
||||
private Dictionary<string, int>? _nameToOrdinal;
|
||||
|
||||
public override string TransformerName => $"RenameColumns:{_renames.Count}";
|
||||
|
||||
public ColumnRenameTransformer(params (string OldName, string NewName)[] renames)
|
||||
{
|
||||
_renames = renames.ToDictionary(
|
||||
r => r.OldName, r => r.NewName, StringComparer.OrdinalIgnoreCase);
|
||||
}
|
||||
```
|
||||
|
||||
### Collision detection
|
||||
|
||||
The transformer validates that renames don't create duplicate column names:
|
||||
|
||||
```csharp
|
||||
protected override void OnInitialize(IDataReader source)
|
||||
{
|
||||
_outputNames = new string[source.FieldCount];
|
||||
_nameToOrdinal = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
for (int i = 0; i < source.FieldCount; i++)
|
||||
{
|
||||
var originalName = source.GetName(i);
|
||||
var outputName = _renames.TryGetValue(originalName, out var newName)
|
||||
? newName : originalName;
|
||||
|
||||
if (_nameToOrdinal.TryGetValue(outputName, out var existingOrdinal))
|
||||
{
|
||||
throw new InvalidOperationException(
|
||||
$"Column name collision: '{originalName}' → '{outputName}' conflicts with " +
|
||||
$"'{source.GetName(existingOrdinal)}' (already at ordinal {existingOrdinal}).");
|
||||
}
|
||||
|
||||
_outputNames[i] = outputName;
|
||||
_nameToOrdinal[outputName] = i;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## ColumnDropTransformer
|
||||
|
||||
Removes specified columns from the output:
|
||||
|
||||
```csharp
|
||||
public class ColumnDropTransformer : DataTransformerBase
|
||||
{
|
||||
private readonly HashSet<string> _columnsToDrop;
|
||||
private int[]? _ordinalMap;
|
||||
private Dictionary<string, int>? _nameToOrdinal;
|
||||
|
||||
public override string TransformerName => $"DropColumns:{string.Join(",", _columnsToDrop)}";
|
||||
|
||||
public ColumnDropTransformer(params string[] columnsToDrop)
|
||||
{
|
||||
_columnsToDrop = new HashSet<string>(columnsToDrop, StringComparer.OrdinalIgnoreCase);
|
||||
}
|
||||
```
|
||||
|
||||
### Ordinal mapping
|
||||
|
||||
Builds a map from output ordinals to source ordinals, excluding dropped columns:
|
||||
|
||||
```csharp
|
||||
protected override void OnInitialize(IDataReader source)
|
||||
{
|
||||
var ordinalList = new List<int>();
|
||||
_nameToOrdinal = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
for (int i = 0; i < source.FieldCount; i++)
|
||||
{
|
||||
var name = source.GetName(i);
|
||||
if (!_columnsToDrop.Contains(name))
|
||||
{
|
||||
_nameToOrdinal[name] = ordinalList.Count;
|
||||
ordinalList.Add(i);
|
||||
}
|
||||
}
|
||||
_ordinalMap = ordinalList.ToArray();
|
||||
}
|
||||
|
||||
public override int MapOrdinal(int transformedOrdinal, IDataReader source)
|
||||
=> _ordinalMap![transformedOrdinal];
|
||||
```
|
||||
|
||||
## JdeDateTransformer
|
||||
|
||||
Combines JDE Julian date (CYYDDD) and time (HHMMSS) columns into a single `DateTime`:
|
||||
|
||||
```csharp
|
||||
public class JdeDateTransformer : DataTransformerBase
|
||||
{
|
||||
public static readonly DateTime DefaultInvalidDateSentinel = new(1900, 1, 1);
|
||||
|
||||
private readonly string _dateColumn;
|
||||
private readonly string _timeColumn;
|
||||
private readonly string _outputColumn;
|
||||
private readonly DateTime _invalidDateSentinel;
|
||||
```
|
||||
|
||||
### Computed column handling
|
||||
|
||||
The output `DateTime` column has no direct source ordinal, so `MapOrdinal` returns `-1`:
|
||||
|
||||
```csharp
|
||||
public override int MapOrdinal(int transformedOrdinal, IDataReader source)
|
||||
{
|
||||
var sourceOrdinal = _ordinalMap![transformedOrdinal];
|
||||
return sourceOrdinal == _dateOrdinal ? -1 : sourceOrdinal;
|
||||
}
|
||||
|
||||
public override string GetDataTypeName(int ordinal, IDataReader source)
|
||||
{
|
||||
var sourceOrdinal = _ordinalMap![ordinal];
|
||||
return sourceOrdinal == _dateOrdinal ? "datetime" : source.GetDataTypeName(sourceOrdinal);
|
||||
}
|
||||
```
|
||||
|
||||
### Date parsing with validation
|
||||
|
||||
Invalid dates return a configurable sentinel value (default: 1900-01-01):
|
||||
|
||||
```csharp
|
||||
public static DateTime ParseJdeDateTime(decimal julianDate, decimal time, DateTime sentinel)
|
||||
{
|
||||
var dateInt = (int)julianDate;
|
||||
if (dateInt <= 0) return sentinel;
|
||||
|
||||
var century = dateInt / 100000;
|
||||
var year = (dateInt / 1000) % 100;
|
||||
var dayOfYear = dateInt % 1000;
|
||||
|
||||
if (century < 0 || century > 1) return sentinel;
|
||||
if (year < 0 || year > 99) return sentinel;
|
||||
if (dayOfYear < 1 || dayOfYear > 366) return sentinel;
|
||||
|
||||
var fullYear = (century == 0 ? 1900 : 2000) + year;
|
||||
var daysInYear = DateTime.IsLeapYear(fullYear) ? 366 : 365;
|
||||
if (dayOfYear > daysInYear) return sentinel;
|
||||
|
||||
var date = new DateTime(fullYear, 1, 1).AddDays(dayOfYear - 1);
|
||||
|
||||
// Parse time (HHMMSS format)
|
||||
var timeInt = (int)time;
|
||||
var hours = timeInt / 10000;
|
||||
var minutes = (timeInt / 100) % 100;
|
||||
var seconds = timeInt % 100;
|
||||
|
||||
if (hours < 0 || hours > 23) return sentinel;
|
||||
if (minutes < 0 || minutes > 59) return sentinel;
|
||||
if (seconds < 0 || seconds > 59) return sentinel;
|
||||
|
||||
return date.AddHours(hours).AddMinutes(minutes).AddSeconds(seconds);
|
||||
}
|
||||
```
|
||||
|
||||
## Transformer Chaining
|
||||
|
||||
Transformers compose by wrapping each other. The pipeline applies them in order:
|
||||
|
||||
```csharp
|
||||
foreach (var transformer in _transformers)
|
||||
{
|
||||
reader = transformer.Transform(reader);
|
||||
}
|
||||
```
|
||||
|
||||
Each transformer sees the output of the previous one. Ordinal mappings accumulate through the chain.
|
||||
|
||||
## Validation in OnInitialize
|
||||
|
||||
Perform all validation in `OnInitialize()` to fail fast before processing data:
|
||||
|
||||
- Check that required columns exist
|
||||
- Validate rename mappings don't create collisions
|
||||
- Build ordinal maps for efficient lookup during row processing
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- [Overview](./Overview.md) - Pipeline architecture
|
||||
- [Sources](./Sources.md) - Data sources that feed transformers
|
||||
- [Destinations](./Destinations.md) - Where transformed data goes
|
||||
Reference in New Issue
Block a user