From 6e7bcadf68469f6f1fabc0837ad1ff6fecbc62e4 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 3 Jan 2026 09:08:17 -0500 Subject: [PATCH] feat(etl): implement TransformingDataReader and DataTransformerBase Add core transformer infrastructure for the ETL pipeline: - DataTransformerBase: abstract base class with virtual methods for field count, names, types, values, ordinals, and null checking - TransformingDataReader: IDataReader wrapper that delegates to transformer, enabling on-the-fly data transformations --- .../Etl/Transformers/DataTransformerBase.cs | 65 +++++ .../Transformers/TransformingDataReader.cs | 73 ++++++ .../TransformingDataReaderTests.cs | 232 ++++++++++++++++++ 3 files changed, 370 insertions(+) create mode 100644 NEW/src/JdeScoping.DataSync/Etl/Transformers/DataTransformerBase.cs create mode 100644 NEW/src/JdeScoping.DataSync/Etl/Transformers/TransformingDataReader.cs create mode 100644 NEW/tests/JdeScoping.DataSync.Tests/Etl/Transformers/TransformingDataReaderTests.cs diff --git a/NEW/src/JdeScoping.DataSync/Etl/Transformers/DataTransformerBase.cs b/NEW/src/JdeScoping.DataSync/Etl/Transformers/DataTransformerBase.cs new file mode 100644 index 0000000..386a88b --- /dev/null +++ b/NEW/src/JdeScoping.DataSync/Etl/Transformers/DataTransformerBase.cs @@ -0,0 +1,65 @@ +using System.Data; +using JdeScoping.DataSync.Etl.Contracts; + +namespace JdeScoping.DataSync.Etl.Transformers; + +/// +/// Base class for data transformers that modify data during the ETL process. +/// Derived classes can override specific methods to customize transformation behavior. +/// +public abstract class DataTransformerBase : IDataTransformer +{ + /// + public abstract string TransformerName { get; } + + /// + public IDataReader Transform(IDataReader source) + { + ArgumentNullException.ThrowIfNull(source); + OnInitialize(source); + return new TransformingDataReader(source, this); + } + + /// + /// Called when the transformer is initialized with a source reader. + /// Override to perform initialization logic. + /// + /// The source data reader. + protected virtual void OnInitialize(IDataReader source) { } + + /// + /// Gets the field count from the source reader. + /// Override to add or remove fields. + /// + public virtual int GetFieldCount(IDataReader source) => source.FieldCount; + + /// + /// Gets the name of a field at the specified ordinal. + /// Override to rename fields. + /// + public virtual string GetName(int ordinal, IDataReader source) => source.GetName(ordinal); + + /// + /// Gets the type of a field at the specified ordinal. + /// Override to change field types. + /// + public virtual Type GetFieldType(int ordinal, IDataReader source) => source.GetFieldType(ordinal); + + /// + /// Gets the value of a field at the specified ordinal. + /// Override to transform values. + /// + public virtual object GetValue(int ordinal, IDataReader source) => source.GetValue(ordinal); + + /// + /// Gets the ordinal of a field by name. + /// Override to support renamed fields. + /// + public virtual int GetOrdinal(string name, IDataReader source) => source.GetOrdinal(name); + + /// + /// Checks if a field value is DBNull. + /// Override to handle null transformations. + /// + public virtual bool IsDBNull(int ordinal, IDataReader source) => source.IsDBNull(ordinal); +} diff --git a/NEW/src/JdeScoping.DataSync/Etl/Transformers/TransformingDataReader.cs b/NEW/src/JdeScoping.DataSync/Etl/Transformers/TransformingDataReader.cs new file mode 100644 index 0000000..04770fd --- /dev/null +++ b/NEW/src/JdeScoping.DataSync/Etl/Transformers/TransformingDataReader.cs @@ -0,0 +1,73 @@ +using System.Data; + +namespace JdeScoping.DataSync.Etl.Transformers; + +/// +/// An IDataReader wrapper that delegates field access to a DataTransformerBase, +/// allowing transformations to be applied during data reading. +/// +internal sealed class TransformingDataReader : IDataReader +{ + private readonly IDataReader _source; + private readonly DataTransformerBase _transformer; + + public TransformingDataReader(IDataReader source, DataTransformerBase transformer) + { + _source = source ?? throw new ArgumentNullException(nameof(source)); + _transformer = transformer ?? throw new ArgumentNullException(nameof(transformer)); + } + + // Properties and methods delegated to transformer + public int FieldCount => _transformer.GetFieldCount(_source); + public string GetName(int i) => _transformer.GetName(i, _source); + public Type GetFieldType(int i) => _transformer.GetFieldType(i, _source); + public int GetOrdinal(string name) => _transformer.GetOrdinal(name, _source); + public object GetValue(int i) => _transformer.GetValue(i, _source); + public bool IsDBNull(int i) => _transformer.IsDBNull(i, _source); + public object this[int i] => GetValue(i); + public object this[string name] => GetValue(GetOrdinal(name)); + + // Row navigation - delegated directly to source + public bool Read() => _source.Read(); + public bool NextResult() => _source.NextResult(); + public int Depth => _source.Depth; + public bool IsClosed => _source.IsClosed; + public int RecordsAffected => _source.RecordsAffected; + public void Close() => _source.Close(); + public void Dispose() => _source.Dispose(); + + // Typed accessors - use GetValue for transformation support + public bool GetBoolean(int i) => (bool)GetValue(i); + public byte GetByte(int i) => (byte)GetValue(i); + public char GetChar(int i) => (char)GetValue(i); + public DateTime GetDateTime(int i) => (DateTime)GetValue(i); + public decimal GetDecimal(int i) => (decimal)GetValue(i); + public double GetDouble(int i) => (double)GetValue(i); + public float GetFloat(int i) => (float)GetValue(i); + public Guid GetGuid(int i) => (Guid)GetValue(i); + public short GetInt16(int i) => (short)GetValue(i); + public int GetInt32(int i) => (int)GetValue(i); + public long GetInt64(int i) => (long)GetValue(i); + public string GetString(int i) => (string)GetValue(i); + + // Schema and bulk data access - delegated to source + public string GetDataTypeName(int i) => _source.GetDataTypeName(i); + + public int GetValues(object[] values) + { + var count = Math.Min(values.Length, FieldCount); + for (int i = 0; i < count; i++) + values[i] = GetValue(i); + return count; + } + + public long GetBytes(int i, long fieldOffset, byte[]? buffer, int bufferoffset, int length) + => _source.GetBytes(i, fieldOffset, buffer, bufferoffset, length); + + public long GetChars(int i, long fieldoffset, char[]? buffer, int bufferoffset, int length) + => _source.GetChars(i, fieldoffset, buffer, bufferoffset, length); + + public IDataReader GetData(int i) => _source.GetData(i); + + public DataTable? GetSchemaTable() => _source.GetSchemaTable(); +} diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Transformers/TransformingDataReaderTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Transformers/TransformingDataReaderTests.cs new file mode 100644 index 0000000..4b9ad52 --- /dev/null +++ b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Transformers/TransformingDataReaderTests.cs @@ -0,0 +1,232 @@ +using System.Data; +using JdeScoping.DataSync.Etl.Transformers; +using NSubstitute; + +namespace JdeScoping.DataSync.Tests.Etl.Transformers; + +public class TransformingDataReaderTests +{ + [Fact] + public void Read_DelegatesToSourceReader() + { + var source = Substitute.For(); + source.Read().Returns(true, true, false); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + + Assert.True(reader.Read()); + Assert.True(reader.Read()); + Assert.False(reader.Read()); + } + + [Fact] + public void FieldCount_DelegatesToTransformer() + { + var source = Substitute.For(); + source.FieldCount.Returns(5); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + + Assert.Equal(5, reader.FieldCount); + } + + [Fact] + public void GetName_DelegatesToTransformer() + { + var source = Substitute.For(); + source.GetName(0).Returns("OriginalName"); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + + Assert.Equal("OriginalName", reader.GetName(0)); + } + + [Fact] + public void GetValue_DelegatesToTransformer() + { + var source = Substitute.For(); + source.GetValue(0).Returns("TestValue"); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + + Assert.Equal("TestValue", reader.GetValue(0)); + } + + [Fact] + public void GetOrdinal_DelegatesToTransformer() + { + var source = Substitute.For(); + source.GetOrdinal("TestField").Returns(3); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + + Assert.Equal(3, reader.GetOrdinal("TestField")); + } + + [Fact] + public void GetFieldType_DelegatesToTransformer() + { + var source = Substitute.For(); + source.GetFieldType(0).Returns(typeof(string)); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + + Assert.Equal(typeof(string), reader.GetFieldType(0)); + } + + [Fact] + public void IsDBNull_DelegatesToTransformer() + { + var source = Substitute.For(); + source.IsDBNull(0).Returns(true); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + + Assert.True(reader.IsDBNull(0)); + } + + [Fact] + public void Indexer_ByOrdinal_UsesGetValue() + { + var source = Substitute.For(); + source.GetValue(1).Returns("IndexedValue"); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + + Assert.Equal("IndexedValue", reader[1]); + } + + [Fact] + public void Indexer_ByName_UsesGetOrdinalAndGetValue() + { + var source = Substitute.For(); + source.GetOrdinal("TestColumn").Returns(2); + source.GetValue(2).Returns("NamedValue"); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + + Assert.Equal("NamedValue", reader["TestColumn"]); + } + + [Fact] + public void GetValues_PopulatesArrayWithTransformedValues() + { + var source = Substitute.For(); + source.FieldCount.Returns(3); + source.GetValue(0).Returns("Value0"); + source.GetValue(1).Returns("Value1"); + source.GetValue(2).Returns("Value2"); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + var values = new object[3]; + var count = reader.GetValues(values); + + Assert.Equal(3, count); + Assert.Equal("Value0", values[0]); + Assert.Equal("Value1", values[1]); + Assert.Equal("Value2", values[2]); + } + + [Fact] + public void Dispose_DisposesSourceReader() + { + var source = Substitute.For(); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + reader.Dispose(); + + source.Received(1).Dispose(); + } + + [Fact] + public void Close_ClosesSourceReader() + { + var source = Substitute.For(); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + reader.Close(); + + source.Received(1).Close(); + } + + [Fact] + public void NextResult_DelegatesToSourceReader() + { + var source = Substitute.For(); + source.NextResult().Returns(true); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + + Assert.True(reader.NextResult()); + } + + [Fact] + public void Depth_DelegatesToSourceReader() + { + var source = Substitute.For(); + source.Depth.Returns(2); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + + Assert.Equal(2, reader.Depth); + } + + [Fact] + public void IsClosed_DelegatesToSourceReader() + { + var source = Substitute.For(); + source.IsClosed.Returns(true); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + + Assert.True(reader.IsClosed); + } + + [Fact] + public void RecordsAffected_DelegatesToSourceReader() + { + var source = Substitute.For(); + source.RecordsAffected.Returns(42); + var transformer = new PassThroughTransformer(); + + var reader = transformer.Transform(source); + + Assert.Equal(42, reader.RecordsAffected); + } + + [Fact] + public void Transform_ThrowsOnNullSource() + { + var transformer = new PassThroughTransformer(); + + Assert.Throws(() => transformer.Transform(null!)); + } + + [Fact] + public void TransformerName_ReturnsExpectedName() + { + var transformer = new PassThroughTransformer(); + + Assert.Equal("PassThrough", transformer.TransformerName); + } + + private class PassThroughTransformer : DataTransformerBase + { + public override string TransformerName => "PassThrough"; + } +}