feat(etl): implement TransformingDataReader and DataTransformerBase

Add core transformer infrastructure for the ETL pipeline:
- DataTransformerBase: abstract base class with virtual methods for
  field count, names, types, values, ordinals, and null checking
- TransformingDataReader: IDataReader wrapper that delegates to
  transformer, enabling on-the-fly data transformations
This commit is contained in:
Joseph Doherty
2026-01-03 09:08:17 -05:00
parent c644b578ba
commit 6e7bcadf68
3 changed files with 370 additions and 0 deletions
@@ -0,0 +1,65 @@
using System.Data;
using JdeScoping.DataSync.Etl.Contracts;
namespace JdeScoping.DataSync.Etl.Transformers;
/// <summary>
/// Base class for data transformers that modify data during the ETL process.
/// Derived classes can override specific methods to customize transformation behavior.
/// </summary>
public abstract class DataTransformerBase : IDataTransformer
{
/// <inheritdoc />
public abstract string TransformerName { get; }
/// <inheritdoc />
public IDataReader Transform(IDataReader source)
{
ArgumentNullException.ThrowIfNull(source);
OnInitialize(source);
return new TransformingDataReader(source, this);
}
/// <summary>
/// Called when the transformer is initialized with a source reader.
/// Override to perform initialization logic.
/// </summary>
/// <param name="source">The source data reader.</param>
protected virtual void OnInitialize(IDataReader source) { }
/// <summary>
/// Gets the field count from the source reader.
/// Override to add or remove fields.
/// </summary>
public virtual int GetFieldCount(IDataReader source) => source.FieldCount;
/// <summary>
/// Gets the name of a field at the specified ordinal.
/// Override to rename fields.
/// </summary>
public virtual string GetName(int ordinal, IDataReader source) => source.GetName(ordinal);
/// <summary>
/// Gets the type of a field at the specified ordinal.
/// Override to change field types.
/// </summary>
public virtual Type GetFieldType(int ordinal, IDataReader source) => source.GetFieldType(ordinal);
/// <summary>
/// Gets the value of a field at the specified ordinal.
/// Override to transform values.
/// </summary>
public virtual object GetValue(int ordinal, IDataReader source) => source.GetValue(ordinal);
/// <summary>
/// Gets the ordinal of a field by name.
/// Override to support renamed fields.
/// </summary>
public virtual int GetOrdinal(string name, IDataReader source) => source.GetOrdinal(name);
/// <summary>
/// Checks if a field value is DBNull.
/// Override to handle null transformations.
/// </summary>
public virtual bool IsDBNull(int ordinal, IDataReader source) => source.IsDBNull(ordinal);
}
@@ -0,0 +1,73 @@
using System.Data;
namespace JdeScoping.DataSync.Etl.Transformers;
/// <summary>
/// An IDataReader wrapper that delegates field access to a DataTransformerBase,
/// allowing transformations to be applied during data reading.
/// </summary>
internal sealed class TransformingDataReader : IDataReader
{
private readonly IDataReader _source;
private readonly DataTransformerBase _transformer;
public TransformingDataReader(IDataReader source, DataTransformerBase transformer)
{
_source = source ?? throw new ArgumentNullException(nameof(source));
_transformer = transformer ?? throw new ArgumentNullException(nameof(transformer));
}
// Properties and methods delegated to transformer
public int FieldCount => _transformer.GetFieldCount(_source);
public string GetName(int i) => _transformer.GetName(i, _source);
public Type GetFieldType(int i) => _transformer.GetFieldType(i, _source);
public int GetOrdinal(string name) => _transformer.GetOrdinal(name, _source);
public object GetValue(int i) => _transformer.GetValue(i, _source);
public bool IsDBNull(int i) => _transformer.IsDBNull(i, _source);
public object this[int i] => GetValue(i);
public object this[string name] => GetValue(GetOrdinal(name));
// Row navigation - delegated directly to source
public bool Read() => _source.Read();
public bool NextResult() => _source.NextResult();
public int Depth => _source.Depth;
public bool IsClosed => _source.IsClosed;
public int RecordsAffected => _source.RecordsAffected;
public void Close() => _source.Close();
public void Dispose() => _source.Dispose();
// Typed accessors - use GetValue for transformation support
public bool GetBoolean(int i) => (bool)GetValue(i);
public byte GetByte(int i) => (byte)GetValue(i);
public char GetChar(int i) => (char)GetValue(i);
public DateTime GetDateTime(int i) => (DateTime)GetValue(i);
public decimal GetDecimal(int i) => (decimal)GetValue(i);
public double GetDouble(int i) => (double)GetValue(i);
public float GetFloat(int i) => (float)GetValue(i);
public Guid GetGuid(int i) => (Guid)GetValue(i);
public short GetInt16(int i) => (short)GetValue(i);
public int GetInt32(int i) => (int)GetValue(i);
public long GetInt64(int i) => (long)GetValue(i);
public string GetString(int i) => (string)GetValue(i);
// Schema and bulk data access - delegated to source
public string GetDataTypeName(int i) => _source.GetDataTypeName(i);
public int GetValues(object[] values)
{
var count = Math.Min(values.Length, FieldCount);
for (int i = 0; i < count; i++)
values[i] = GetValue(i);
return count;
}
public long GetBytes(int i, long fieldOffset, byte[]? buffer, int bufferoffset, int length)
=> _source.GetBytes(i, fieldOffset, buffer, bufferoffset, length);
public long GetChars(int i, long fieldoffset, char[]? buffer, int bufferoffset, int length)
=> _source.GetChars(i, fieldoffset, buffer, bufferoffset, length);
public IDataReader GetData(int i) => _source.GetData(i);
public DataTable? GetSchemaTable() => _source.GetSchemaTable();
}
@@ -0,0 +1,232 @@
using System.Data;
using JdeScoping.DataSync.Etl.Transformers;
using NSubstitute;
namespace JdeScoping.DataSync.Tests.Etl.Transformers;
public class TransformingDataReaderTests
{
[Fact]
public void Read_DelegatesToSourceReader()
{
var source = Substitute.For<IDataReader>();
source.Read().Returns(true, true, false);
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
Assert.True(reader.Read());
Assert.True(reader.Read());
Assert.False(reader.Read());
}
[Fact]
public void FieldCount_DelegatesToTransformer()
{
var source = Substitute.For<IDataReader>();
source.FieldCount.Returns(5);
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
Assert.Equal(5, reader.FieldCount);
}
[Fact]
public void GetName_DelegatesToTransformer()
{
var source = Substitute.For<IDataReader>();
source.GetName(0).Returns("OriginalName");
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
Assert.Equal("OriginalName", reader.GetName(0));
}
[Fact]
public void GetValue_DelegatesToTransformer()
{
var source = Substitute.For<IDataReader>();
source.GetValue(0).Returns("TestValue");
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
Assert.Equal("TestValue", reader.GetValue(0));
}
[Fact]
public void GetOrdinal_DelegatesToTransformer()
{
var source = Substitute.For<IDataReader>();
source.GetOrdinal("TestField").Returns(3);
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
Assert.Equal(3, reader.GetOrdinal("TestField"));
}
[Fact]
public void GetFieldType_DelegatesToTransformer()
{
var source = Substitute.For<IDataReader>();
source.GetFieldType(0).Returns(typeof(string));
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
Assert.Equal(typeof(string), reader.GetFieldType(0));
}
[Fact]
public void IsDBNull_DelegatesToTransformer()
{
var source = Substitute.For<IDataReader>();
source.IsDBNull(0).Returns(true);
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
Assert.True(reader.IsDBNull(0));
}
[Fact]
public void Indexer_ByOrdinal_UsesGetValue()
{
var source = Substitute.For<IDataReader>();
source.GetValue(1).Returns("IndexedValue");
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
Assert.Equal("IndexedValue", reader[1]);
}
[Fact]
public void Indexer_ByName_UsesGetOrdinalAndGetValue()
{
var source = Substitute.For<IDataReader>();
source.GetOrdinal("TestColumn").Returns(2);
source.GetValue(2).Returns("NamedValue");
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
Assert.Equal("NamedValue", reader["TestColumn"]);
}
[Fact]
public void GetValues_PopulatesArrayWithTransformedValues()
{
var source = Substitute.For<IDataReader>();
source.FieldCount.Returns(3);
source.GetValue(0).Returns("Value0");
source.GetValue(1).Returns("Value1");
source.GetValue(2).Returns("Value2");
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
var values = new object[3];
var count = reader.GetValues(values);
Assert.Equal(3, count);
Assert.Equal("Value0", values[0]);
Assert.Equal("Value1", values[1]);
Assert.Equal("Value2", values[2]);
}
[Fact]
public void Dispose_DisposesSourceReader()
{
var source = Substitute.For<IDataReader>();
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
reader.Dispose();
source.Received(1).Dispose();
}
[Fact]
public void Close_ClosesSourceReader()
{
var source = Substitute.For<IDataReader>();
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
reader.Close();
source.Received(1).Close();
}
[Fact]
public void NextResult_DelegatesToSourceReader()
{
var source = Substitute.For<IDataReader>();
source.NextResult().Returns(true);
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
Assert.True(reader.NextResult());
}
[Fact]
public void Depth_DelegatesToSourceReader()
{
var source = Substitute.For<IDataReader>();
source.Depth.Returns(2);
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
Assert.Equal(2, reader.Depth);
}
[Fact]
public void IsClosed_DelegatesToSourceReader()
{
var source = Substitute.For<IDataReader>();
source.IsClosed.Returns(true);
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
Assert.True(reader.IsClosed);
}
[Fact]
public void RecordsAffected_DelegatesToSourceReader()
{
var source = Substitute.For<IDataReader>();
source.RecordsAffected.Returns(42);
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
Assert.Equal(42, reader.RecordsAffected);
}
[Fact]
public void Transform_ThrowsOnNullSource()
{
var transformer = new PassThroughTransformer();
Assert.Throws<ArgumentNullException>(() => transformer.Transform(null!));
}
[Fact]
public void TransformerName_ReturnsExpectedName()
{
var transformer = new PassThroughTransformer();
Assert.Equal("PassThrough", transformer.TransformerName);
}
private class PassThroughTransformer : DataTransformerBase
{
public override string TransformerName => "PassThrough";
}
}