From 8594baf11d406ce9ec62575ac95fb34d29285ee2 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 3 Jan 2026 09:18:58 -0500 Subject: [PATCH] feat(etl): implement DbQuerySource for database queries Adds DbQuerySource, an IImportSource implementation that executes SQL queries against the LotFinder database. Supports parameterized queries using anonymous objects and configurable command timeouts. --- .../Etl/Sources/DbQuerySource.cs | 87 +++++++++++++++++++ .../Etl/Sources/DbQuerySourceTests.cs | 44 ++++++++++ 2 files changed, 131 insertions(+) create mode 100644 NEW/src/JdeScoping.DataSync/Etl/Sources/DbQuerySource.cs create mode 100644 NEW/tests/JdeScoping.DataSync.Tests/Etl/Sources/DbQuerySourceTests.cs diff --git a/NEW/src/JdeScoping.DataSync/Etl/Sources/DbQuerySource.cs b/NEW/src/JdeScoping.DataSync/Etl/Sources/DbQuerySource.cs new file mode 100644 index 0000000..f3dff54 --- /dev/null +++ b/NEW/src/JdeScoping.DataSync/Etl/Sources/DbQuerySource.cs @@ -0,0 +1,87 @@ +using System.Data; +using JdeScoping.DataAccess.Interfaces; +using JdeScoping.DataSync.Etl.Contracts; +using Microsoft.Data.SqlClient; + +namespace JdeScoping.DataSync.Etl.Sources; + +/// +/// An import source that executes a SQL query against the LotFinder database. +/// +public class DbQuerySource : IImportSource +{ + private readonly IDbConnectionFactory _connectionFactory; + private readonly string _sql; + private readonly object? _parameters; + private readonly int _commandTimeout; + + private SqlConnection? _connection; + private SqlCommand? _command; + + /// + public string SourceName { get; } + + /// + /// Creates a new database query source. + /// + /// Factory for creating database connections. + /// The SQL query to execute. + /// Optional name for this source (used in logging). + /// Optional anonymous object containing query parameters. + /// Command timeout in seconds (default: 3600). + public DbQuerySource( + IDbConnectionFactory connectionFactory, + string sql, + string? name = null, + object? parameters = null, + int commandTimeout = 3600) + { + ArgumentNullException.ThrowIfNull(connectionFactory); + ArgumentException.ThrowIfNullOrWhiteSpace(sql); + + _connectionFactory = connectionFactory; + _sql = sql; + _parameters = parameters; + _commandTimeout = commandTimeout; + SourceName = $"DbQuery:{name ?? "Query"}"; + } + + /// + public async Task ReadDataAsync(CancellationToken cancellationToken = default) + { + _connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken); + _command = _connection.CreateCommand(); + _command.CommandText = _sql; + _command.CommandTimeout = _commandTimeout; + AddParameters(_command, _parameters); + return await _command.ExecuteReaderAsync(cancellationToken); + } + + private static void AddParameters(SqlCommand command, object? parameters) + { + if (parameters == null) return; + + var properties = parameters.GetType().GetProperties(); + foreach (var prop in properties) + { + var value = prop.GetValue(parameters) ?? DBNull.Value; + command.Parameters.AddWithValue($"@{prop.Name}", value); + } + } + + /// + public async ValueTask DisposeAsync() + { + if (_command != null) + { + await _command.DisposeAsync(); + _command = null; + } + + if (_connection != null) + { + await _connection.DisposeAsync(); + _connection = null; + } + } +} diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Sources/DbQuerySourceTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Sources/DbQuerySourceTests.cs new file mode 100644 index 0000000..7cab88d --- /dev/null +++ b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Sources/DbQuerySourceTests.cs @@ -0,0 +1,44 @@ +using JdeScoping.DataAccess.Interfaces; +using JdeScoping.DataSync.Etl.Sources; +using NSubstitute; + +namespace JdeScoping.DataSync.Tests.Etl.Sources; + +public class DbQuerySourceTests +{ + [Fact] + public void Constructor_SetsSourceName() + { + var factory = Substitute.For(); + var source = new DbQuerySource(factory, "SELECT 1", "TestSource"); + Assert.Equal("DbQuery:TestSource", source.SourceName); + } + + [Fact] + public void Constructor_NullName_UsesDefault() + { + var factory = Substitute.For(); + var source = new DbQuerySource(factory, "SELECT 1"); + Assert.Equal("DbQuery:Query", source.SourceName); + } + + [Fact] + public void Constructor_NullFactory_ThrowsArgumentNullException() + { + Assert.Throws(() => new DbQuerySource(null!, "SELECT 1")); + } + + [Fact] + public void Constructor_NullSql_ThrowsArgumentNullException() + { + var factory = Substitute.For(); + Assert.Throws(() => new DbQuerySource(factory, null!)); + } + + [Fact] + public void Constructor_EmptySql_ThrowsArgumentException() + { + var factory = Substitute.For(); + Assert.Throws(() => new DbQuerySource(factory, "")); + } +}