feat(etl): add binary method overrides to DataTransformerBase

Add virtual methods to DataTransformerBase for GetBytes, GetChars,
GetData, and GetDataTypeName that properly handle computed columns
by throwing NotSupportedException when MapOrdinal returns -1.

Update TransformingDataReader to delegate these methods to the
transformer instead of directly to the source reader.
This commit is contained in:
Joseph Doherty
2026-01-03 10:33:13 -05:00
parent f5468d019f
commit 506ba5c61d
3 changed files with 213 additions and 5 deletions
@@ -71,4 +71,58 @@ public abstract class DataTransformerBase : IDataTransformer
/// <param name="source">The source data reader.</param>
/// <returns>The corresponding source ordinal, or -1 for computed columns.</returns>
public virtual int MapOrdinal(int transformedOrdinal, IDataReader source) => transformedOrdinal;
/// <summary>
/// Gets bytes from a field at the specified ordinal.
/// Throws NotSupportedException for computed columns (where MapOrdinal returns -1).
/// </summary>
public virtual long GetBytes(int ordinal, long fieldOffset, byte[]? buffer,
int bufferOffset, int length, IDataReader source)
{
var sourceOrdinal = MapOrdinal(ordinal, source);
if (sourceOrdinal < 0)
throw new NotSupportedException(
$"GetBytes not supported for computed column at ordinal {ordinal}.");
return source.GetBytes(sourceOrdinal, fieldOffset, buffer, bufferOffset, length);
}
/// <summary>
/// Gets characters from a field at the specified ordinal.
/// Throws NotSupportedException for computed columns (where MapOrdinal returns -1).
/// </summary>
public virtual long GetChars(int ordinal, long fieldOffset, char[]? buffer,
int bufferOffset, int length, IDataReader source)
{
var sourceOrdinal = MapOrdinal(ordinal, source);
if (sourceOrdinal < 0)
throw new NotSupportedException(
$"GetChars not supported for computed column at ordinal {ordinal}.");
return source.GetChars(sourceOrdinal, fieldOffset, buffer, bufferOffset, length);
}
/// <summary>
/// Gets nested data reader for a field at the specified ordinal.
/// Throws NotSupportedException for computed columns (where MapOrdinal returns -1).
/// </summary>
public virtual IDataReader GetData(int ordinal, IDataReader source)
{
var sourceOrdinal = MapOrdinal(ordinal, source);
if (sourceOrdinal < 0)
throw new NotSupportedException(
$"GetData not supported for computed column at ordinal {ordinal}.");
return source.GetData(sourceOrdinal);
}
/// <summary>
/// Gets the data type name for a field at the specified ordinal.
/// Throws NotSupportedException for computed columns (where MapOrdinal returns -1).
/// </summary>
public virtual string GetDataTypeName(int ordinal, IDataReader source)
{
var sourceOrdinal = MapOrdinal(ordinal, source);
if (sourceOrdinal < 0)
throw new NotSupportedException(
$"GetDataTypeName not supported for computed column at ordinal {ordinal}.");
return source.GetDataTypeName(sourceOrdinal);
}
}
@@ -50,8 +50,8 @@ internal sealed class TransformingDataReader : IDataReader
public long GetInt64(int i) => (long)GetValue(i);
public string GetString(int i) => (string)GetValue(i);
// Schema and bulk data access - delegated to source
public string GetDataTypeName(int i) => _source.GetDataTypeName(i);
// Schema and bulk data access - delegated to transformer
public string GetDataTypeName(int i) => _transformer.GetDataTypeName(i, _source);
public int GetValues(object[] values)
{
@@ -62,12 +62,12 @@ internal sealed class TransformingDataReader : IDataReader
}
public long GetBytes(int i, long fieldOffset, byte[]? buffer, int bufferoffset, int length)
=> _source.GetBytes(i, fieldOffset, buffer, bufferoffset, length);
=> _transformer.GetBytes(i, fieldOffset, buffer, bufferoffset, length, _source);
public long GetChars(int i, long fieldoffset, char[]? buffer, int bufferoffset, int length)
=> _source.GetChars(i, fieldoffset, buffer, bufferoffset, length);
=> _transformer.GetChars(i, fieldoffset, buffer, bufferoffset, length, _source);
public IDataReader GetData(int i) => _source.GetData(i);
public IDataReader GetData(int i) => _transformer.GetData(i, _source);
public DataTable? GetSchemaTable() => _source.GetSchemaTable();
}
@@ -225,8 +225,162 @@ public class TransformingDataReaderTests
Assert.Equal("PassThrough", transformer.TransformerName);
}
[Fact]
public void GetBytes_ComputedColumn_ThrowsNotSupportedException()
{
// Arrange - transformer that returns -1 for ordinal 0 (computed)
var source = Substitute.For<IDataReader>();
source.FieldCount.Returns(2);
source.GetName(0).Returns("Computed");
source.GetName(1).Returns("B");
var transformer = new ComputedColumnTransformer();
var reader = transformer.Transform(source);
// Act & Assert
Assert.Throws<NotSupportedException>(() =>
reader.GetBytes(0, 0, null, 0, 0));
}
[Fact]
public void GetChars_ComputedColumn_ThrowsNotSupportedException()
{
// Arrange - transformer that returns -1 for ordinal 0 (computed)
var source = Substitute.For<IDataReader>();
source.FieldCount.Returns(2);
source.GetName(0).Returns("Computed");
source.GetName(1).Returns("B");
var transformer = new ComputedColumnTransformer();
var reader = transformer.Transform(source);
// Act & Assert
Assert.Throws<NotSupportedException>(() =>
reader.GetChars(0, 0, null, 0, 0));
}
[Fact]
public void GetData_ComputedColumn_ThrowsNotSupportedException()
{
// Arrange - transformer that returns -1 for ordinal 0 (computed)
var source = Substitute.For<IDataReader>();
source.FieldCount.Returns(2);
source.GetName(0).Returns("Computed");
source.GetName(1).Returns("B");
var transformer = new ComputedColumnTransformer();
var reader = transformer.Transform(source);
// Act & Assert
Assert.Throws<NotSupportedException>(() =>
reader.GetData(0));
}
[Fact]
public void GetDataTypeName_ComputedColumn_ThrowsNotSupportedException()
{
// Arrange - transformer that returns -1 for ordinal 0 (computed)
var source = Substitute.For<IDataReader>();
source.FieldCount.Returns(2);
source.GetName(0).Returns("Computed");
source.GetName(1).Returns("B");
var transformer = new ComputedColumnTransformer();
var reader = transformer.Transform(source);
// Act & Assert
Assert.Throws<NotSupportedException>(() =>
reader.GetDataTypeName(0));
}
[Fact]
public void GetBytes_NonComputedColumn_DelegatesToSource()
{
// Arrange
var source = Substitute.For<IDataReader>();
source.FieldCount.Returns(2);
source.GetName(0).Returns("A");
source.GetName(1).Returns("B");
var expectedBytes = new byte[] { 1, 2, 3 };
source.GetBytes(1, 0, Arg.Any<byte[]?>(), 0, 3).Returns(3L);
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
// Act
var buffer = new byte[3];
var result = reader.GetBytes(1, 0, buffer, 0, 3);
// Assert
Assert.Equal(3L, result);
source.Received(1).GetBytes(1, 0, buffer, 0, 3);
}
[Fact]
public void GetChars_NonComputedColumn_DelegatesToSource()
{
// Arrange
var source = Substitute.For<IDataReader>();
source.FieldCount.Returns(2);
source.GetName(0).Returns("A");
source.GetName(1).Returns("B");
source.GetChars(1, 0, Arg.Any<char[]?>(), 0, 3).Returns(3L);
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
// Act
var buffer = new char[3];
var result = reader.GetChars(1, 0, buffer, 0, 3);
// Assert
Assert.Equal(3L, result);
source.Received(1).GetChars(1, 0, buffer, 0, 3);
}
[Fact]
public void GetData_NonComputedColumn_DelegatesToSource()
{
// Arrange
var source = Substitute.For<IDataReader>();
source.FieldCount.Returns(2);
source.GetName(0).Returns("A");
source.GetName(1).Returns("B");
var nestedReader = Substitute.For<IDataReader>();
source.GetData(1).Returns(nestedReader);
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
// Act
var result = reader.GetData(1);
// Assert
Assert.Same(nestedReader, result);
source.Received(1).GetData(1);
}
[Fact]
public void GetDataTypeName_NonComputedColumn_DelegatesToSource()
{
// Arrange
var source = Substitute.For<IDataReader>();
source.FieldCount.Returns(2);
source.GetName(0).Returns("A");
source.GetName(1).Returns("B");
source.GetDataTypeName(1).Returns("nvarchar");
var transformer = new PassThroughTransformer();
var reader = transformer.Transform(source);
// Act
var result = reader.GetDataTypeName(1);
// Assert
Assert.Equal("nvarchar", result);
source.Received(1).GetDataTypeName(1);
}
private class PassThroughTransformer : DataTransformerBase
{
public override string TransformerName => "PassThrough";
}
private class ComputedColumnTransformer : DataTransformerBase
{
public override string TransformerName => "Computed";
public override int MapOrdinal(int ordinal, IDataReader source) => ordinal == 0 ? -1 : ordinal;
}
}