feat(etl): implement DbQuerySource for database queries

Adds DbQuerySource, an IImportSource implementation that executes SQL
queries against the LotFinder database. Supports parameterized queries
using anonymous objects and configurable command timeouts.
This commit is contained in:
Joseph Doherty
2026-01-03 09:18:58 -05:00
parent 74c3f37446
commit 8594baf11d
2 changed files with 131 additions and 0 deletions
@@ -0,0 +1,87 @@
using System.Data;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Etl.Contracts;
using Microsoft.Data.SqlClient;
namespace JdeScoping.DataSync.Etl.Sources;
/// <summary>
/// An import source that executes a SQL query against the LotFinder database.
/// </summary>
public class DbQuerySource : IImportSource
{
private readonly IDbConnectionFactory _connectionFactory;
private readonly string _sql;
private readonly object? _parameters;
private readonly int _commandTimeout;
private SqlConnection? _connection;
private SqlCommand? _command;
/// <inheritdoc />
public string SourceName { get; }
/// <summary>
/// Creates a new database query source.
/// </summary>
/// <param name="connectionFactory">Factory for creating database connections.</param>
/// <param name="sql">The SQL query to execute.</param>
/// <param name="name">Optional name for this source (used in logging).</param>
/// <param name="parameters">Optional anonymous object containing query parameters.</param>
/// <param name="commandTimeout">Command timeout in seconds (default: 3600).</param>
public DbQuerySource(
IDbConnectionFactory connectionFactory,
string sql,
string? name = null,
object? parameters = null,
int commandTimeout = 3600)
{
ArgumentNullException.ThrowIfNull(connectionFactory);
ArgumentException.ThrowIfNullOrWhiteSpace(sql);
_connectionFactory = connectionFactory;
_sql = sql;
_parameters = parameters;
_commandTimeout = commandTimeout;
SourceName = $"DbQuery:{name ?? "Query"}";
}
/// <inheritdoc />
public async Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default)
{
_connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
_command = _connection.CreateCommand();
_command.CommandText = _sql;
_command.CommandTimeout = _commandTimeout;
AddParameters(_command, _parameters);
return await _command.ExecuteReaderAsync(cancellationToken);
}
private static void AddParameters(SqlCommand command, object? parameters)
{
if (parameters == null) return;
var properties = parameters.GetType().GetProperties();
foreach (var prop in properties)
{
var value = prop.GetValue(parameters) ?? DBNull.Value;
command.Parameters.AddWithValue($"@{prop.Name}", value);
}
}
/// <inheritdoc />
public async ValueTask DisposeAsync()
{
if (_command != null)
{
await _command.DisposeAsync();
_command = null;
}
if (_connection != null)
{
await _connection.DisposeAsync();
_connection = null;
}
}
}
@@ -0,0 +1,44 @@
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Etl.Sources;
using NSubstitute;
namespace JdeScoping.DataSync.Tests.Etl.Sources;
public class DbQuerySourceTests
{
[Fact]
public void Constructor_SetsSourceName()
{
var factory = Substitute.For<IDbConnectionFactory>();
var source = new DbQuerySource(factory, "SELECT 1", "TestSource");
Assert.Equal("DbQuery:TestSource", source.SourceName);
}
[Fact]
public void Constructor_NullName_UsesDefault()
{
var factory = Substitute.For<IDbConnectionFactory>();
var source = new DbQuerySource(factory, "SELECT 1");
Assert.Equal("DbQuery:Query", source.SourceName);
}
[Fact]
public void Constructor_NullFactory_ThrowsArgumentNullException()
{
Assert.Throws<ArgumentNullException>(() => new DbQuerySource(null!, "SELECT 1"));
}
[Fact]
public void Constructor_NullSql_ThrowsArgumentNullException()
{
var factory = Substitute.For<IDbConnectionFactory>();
Assert.Throws<ArgumentNullException>(() => new DbQuerySource(factory, null!));
}
[Fact]
public void Constructor_EmptySql_ThrowsArgumentException()
{
var factory = Substitute.For<IDbConnectionFactory>();
Assert.Throws<ArgumentException>(() => new DbQuerySource(factory, ""));
}
}