261 lines
8.7 KiB
Markdown
261 lines
8.7 KiB
Markdown
# Data Transformers
|
|
|
|
Transformers modify data as it flows through the pipeline. They wrap the source `IDataReader` in a decorator, allowing column renaming, dropping, type conversion, and computed columns.
|
|
|
|
## Interface Contract
|
|
|
|
```csharp
|
|
public interface IDataTransformer
|
|
{
|
|
IDataReader Transform(IDataReader source);
|
|
string TransformerName { get; }
|
|
int MapOrdinal(int transformedOrdinal, IDataReader source);
|
|
}
|
|
```
|
|
|
|
**Key methods:**
|
|
- `Transform()` - Wraps the source reader, returns a new reader with modifications
|
|
- `TransformerName` - Used in logging and `StepResult` tracking
|
|
- `MapOrdinal()` - Maps transformed ordinals to source ordinals. Returns `-1` for computed columns.
|
|
|
|
## DataTransformerBase
|
|
|
|
The base class provides default implementations and handles the decorator pattern:
|
|
|
|
```csharp
|
|
public abstract class DataTransformerBase : IDataTransformer
|
|
{
|
|
public abstract string TransformerName { get; }
|
|
|
|
public IDataReader Transform(IDataReader source)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(source);
|
|
OnInitialize(source);
|
|
return new TransformingDataReader(source, this);
|
|
}
|
|
|
|
protected virtual void OnInitialize(IDataReader source) { }
|
|
```
|
|
|
|
### Default pass-through methods
|
|
|
|
Override only what you need to change:
|
|
|
|
```csharp
|
|
public virtual int GetFieldCount(IDataReader source) => source.FieldCount;
|
|
public virtual string GetName(int ordinal, IDataReader source) => source.GetName(ordinal);
|
|
public virtual Type GetFieldType(int ordinal, IDataReader source) => source.GetFieldType(ordinal);
|
|
public virtual object GetValue(int ordinal, IDataReader source) => source.GetValue(ordinal);
|
|
public virtual int GetOrdinal(string name, IDataReader source) => source.GetOrdinal(name);
|
|
public virtual bool IsDBNull(int ordinal, IDataReader source) => source.IsDBNull(ordinal);
|
|
public virtual int MapOrdinal(int transformedOrdinal, IDataReader source) => transformedOrdinal;
|
|
```
|
|
|
|
### Binary method handling
|
|
|
|
Computed columns (where `MapOrdinal` returns `-1`) throw `NotSupportedException`:
|
|
|
|
```csharp
|
|
public virtual long GetBytes(int ordinal, long fieldOffset, byte[]? buffer,
|
|
int bufferOffset, int length, IDataReader source)
|
|
{
|
|
var sourceOrdinal = MapOrdinal(ordinal, source);
|
|
if (sourceOrdinal < 0)
|
|
throw new NotSupportedException(
|
|
$"GetBytes not supported for computed column at ordinal {ordinal}.");
|
|
return source.GetBytes(sourceOrdinal, fieldOffset, buffer, bufferOffset, length);
|
|
}
|
|
```
|
|
|
|
## ColumnRenameTransformer
|
|
|
|
Renames columns without changing values or order:
|
|
|
|
```csharp
|
|
public class ColumnRenameTransformer : DataTransformerBase
|
|
{
|
|
private readonly Dictionary<string, string> _renames;
|
|
private string[]? _outputNames;
|
|
private Dictionary<string, int>? _nameToOrdinal;
|
|
|
|
public override string TransformerName => $"RenameColumns:{_renames.Count}";
|
|
|
|
public ColumnRenameTransformer(params (string OldName, string NewName)[] renames)
|
|
{
|
|
_renames = renames.ToDictionary(
|
|
r => r.OldName, r => r.NewName, StringComparer.OrdinalIgnoreCase);
|
|
}
|
|
```
|
|
|
|
### Collision detection
|
|
|
|
The transformer validates that renames don't create duplicate column names:
|
|
|
|
```csharp
|
|
protected override void OnInitialize(IDataReader source)
|
|
{
|
|
_outputNames = new string[source.FieldCount];
|
|
_nameToOrdinal = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
|
|
|
|
for (int i = 0; i < source.FieldCount; i++)
|
|
{
|
|
var originalName = source.GetName(i);
|
|
var outputName = _renames.TryGetValue(originalName, out var newName)
|
|
? newName : originalName;
|
|
|
|
if (_nameToOrdinal.TryGetValue(outputName, out var existingOrdinal))
|
|
{
|
|
throw new InvalidOperationException(
|
|
$"Column name collision: '{originalName}' → '{outputName}' conflicts with " +
|
|
$"'{source.GetName(existingOrdinal)}' (already at ordinal {existingOrdinal}).");
|
|
}
|
|
|
|
_outputNames[i] = outputName;
|
|
_nameToOrdinal[outputName] = i;
|
|
}
|
|
}
|
|
```
|
|
|
|
## ColumnDropTransformer
|
|
|
|
Removes specified columns from the output:
|
|
|
|
```csharp
|
|
public class ColumnDropTransformer : DataTransformerBase
|
|
{
|
|
private readonly HashSet<string> _columnsToDrop;
|
|
private int[]? _ordinalMap;
|
|
private Dictionary<string, int>? _nameToOrdinal;
|
|
|
|
public override string TransformerName => $"DropColumns:{string.Join(",", _columnsToDrop)}";
|
|
|
|
public ColumnDropTransformer(params string[] columnsToDrop)
|
|
{
|
|
_columnsToDrop = new HashSet<string>(columnsToDrop, StringComparer.OrdinalIgnoreCase);
|
|
}
|
|
```
|
|
|
|
### Ordinal mapping
|
|
|
|
Builds a map from output ordinals to source ordinals, excluding dropped columns:
|
|
|
|
```csharp
|
|
protected override void OnInitialize(IDataReader source)
|
|
{
|
|
var ordinalList = new List<int>();
|
|
_nameToOrdinal = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
|
|
|
|
for (int i = 0; i < source.FieldCount; i++)
|
|
{
|
|
var name = source.GetName(i);
|
|
if (!_columnsToDrop.Contains(name))
|
|
{
|
|
_nameToOrdinal[name] = ordinalList.Count;
|
|
ordinalList.Add(i);
|
|
}
|
|
}
|
|
_ordinalMap = ordinalList.ToArray();
|
|
}
|
|
|
|
public override int MapOrdinal(int transformedOrdinal, IDataReader source)
|
|
=> _ordinalMap![transformedOrdinal];
|
|
```
|
|
|
|
## JdeDateTransformer
|
|
|
|
Combines JDE Julian date (CYYDDD) and time (HHMMSS) columns into a single `DateTime`:
|
|
|
|
```csharp
|
|
public class JdeDateTransformer : DataTransformerBase
|
|
{
|
|
public static readonly DateTime DefaultInvalidDateSentinel = new(1900, 1, 1);
|
|
|
|
private readonly string _dateColumn;
|
|
private readonly string _timeColumn;
|
|
private readonly string _outputColumn;
|
|
private readonly DateTime _invalidDateSentinel;
|
|
```
|
|
|
|
### Computed column handling
|
|
|
|
The output `DateTime` column has no direct source ordinal, so `MapOrdinal` returns `-1`:
|
|
|
|
```csharp
|
|
public override int MapOrdinal(int transformedOrdinal, IDataReader source)
|
|
{
|
|
var sourceOrdinal = _ordinalMap![transformedOrdinal];
|
|
return sourceOrdinal == _dateOrdinal ? -1 : sourceOrdinal;
|
|
}
|
|
|
|
public override string GetDataTypeName(int ordinal, IDataReader source)
|
|
{
|
|
var sourceOrdinal = _ordinalMap![ordinal];
|
|
return sourceOrdinal == _dateOrdinal ? "datetime" : source.GetDataTypeName(sourceOrdinal);
|
|
}
|
|
```
|
|
|
|
### Date parsing with validation
|
|
|
|
Invalid dates return a configurable sentinel value (default: 1900-01-01):
|
|
|
|
```csharp
|
|
public static DateTime ParseJdeDateTime(decimal julianDate, decimal time, DateTime sentinel)
|
|
{
|
|
var dateInt = (int)julianDate;
|
|
if (dateInt <= 0) return sentinel;
|
|
|
|
var century = dateInt / 100000;
|
|
var year = (dateInt / 1000) % 100;
|
|
var dayOfYear = dateInt % 1000;
|
|
|
|
if (century < 0 || century > 1) return sentinel;
|
|
if (year < 0 || year > 99) return sentinel;
|
|
if (dayOfYear < 1 || dayOfYear > 366) return sentinel;
|
|
|
|
var fullYear = (century == 0 ? 1900 : 2000) + year;
|
|
var daysInYear = DateTime.IsLeapYear(fullYear) ? 366 : 365;
|
|
if (dayOfYear > daysInYear) return sentinel;
|
|
|
|
var date = new DateTime(fullYear, 1, 1).AddDays(dayOfYear - 1);
|
|
|
|
// Parse time (HHMMSS format)
|
|
var timeInt = (int)time;
|
|
var hours = timeInt / 10000;
|
|
var minutes = (timeInt / 100) % 100;
|
|
var seconds = timeInt % 100;
|
|
|
|
if (hours < 0 || hours > 23) return sentinel;
|
|
if (minutes < 0 || minutes > 59) return sentinel;
|
|
if (seconds < 0 || seconds > 59) return sentinel;
|
|
|
|
return date.AddHours(hours).AddMinutes(minutes).AddSeconds(seconds);
|
|
}
|
|
```
|
|
|
|
## Transformer Chaining
|
|
|
|
Transformers compose by wrapping each other. The pipeline applies them in order:
|
|
|
|
```csharp
|
|
foreach (var transformer in _transformers)
|
|
{
|
|
reader = transformer.Transform(reader);
|
|
}
|
|
```
|
|
|
|
Each transformer sees the output of the previous one. Ordinal mappings accumulate through the chain.
|
|
|
|
## Validation in OnInitialize
|
|
|
|
Perform all validation in `OnInitialize()` to fail fast before processing data:
|
|
|
|
- Check that required columns exist
|
|
- Validate rename mappings don't create collisions
|
|
- Build ordinal maps for efficient lookup during row processing
|
|
|
|
## Related Documentation
|
|
|
|
- [Overview](./Overview.md) - Pipeline architecture
|
|
- [Sources](./Sources.md) - Data sources that feed transformers
|
|
- [Destinations](./Destinations.md) - Where transformed data goes
|