feat(datasync): add generic DbQuerySource for JDE/CMS/LotFinder

Extend DbQuerySource to support multiple connection types:
- Add connectionType parameter ("jde", "cms", "lotfinder")
- Use appropriate IDbConnectionFactory method for each type
- Support Dictionary<string, object> parameters
- Use DbConnection/DbCommand for cross-database compatibility
This commit is contained in:
Joseph Doherty
2026-01-06 13:30:00 -05:00
parent 1f7fd9f0f2
commit eb85ab6f34
2 changed files with 102 additions and 38 deletions
@@ -1,22 +1,28 @@
using System.Data;
using System.Data.Common;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Etl.Contracts;
using Microsoft.Data.SqlClient;
namespace JdeScoping.DataSync.Etl.Sources;
/// <summary>
/// An import source that executes a SQL query against the LotFinder database.
/// An import source that executes a SQL query against JDE, CMS, or LotFinder databases.
/// </summary>
public class DbQuerySource : IImportSource
{
private static readonly HashSet<string> ValidConnectionTypes = new(StringComparer.OrdinalIgnoreCase)
{
"jde", "cms", "lotfinder"
};
private readonly IDbConnectionFactory _connectionFactory;
private readonly string _sql;
private readonly object? _parameters;
private readonly string _connectionType;
private readonly string _query;
private readonly Dictionary<string, object> _parameters;
private readonly int _commandTimeout;
private SqlConnection? _connection;
private SqlCommand? _command;
private DbConnection? _connection;
private DbCommand? _command;
/// <inheritdoc />
public string SourceName { get; }
@@ -25,47 +31,65 @@ public class DbQuerySource : IImportSource
/// Creates a new database query source.
/// </summary>
/// <param name="connectionFactory">Factory for creating database connections.</param>
/// <param name="sql">The SQL query to execute.</param>
/// <param name="name">Optional name for this source (used in logging).</param>
/// <param name="parameters">Optional anonymous object containing query parameters.</param>
/// <param name="connectionType">The connection type: "jde", "cms", or "lotfinder".</param>
/// <param name="query">The SQL query to execute.</param>
/// <param name="parameters">Optional dictionary of query parameters.</param>
/// <param name="commandTimeout">Command timeout in seconds (default: 3600).</param>
public DbQuerySource(
IDbConnectionFactory connectionFactory,
string sql,
string? name = null,
object? parameters = null,
string connectionType,
string query,
Dictionary<string, object>? parameters = null,
int commandTimeout = 3600)
{
ArgumentNullException.ThrowIfNull(connectionFactory);
ArgumentException.ThrowIfNullOrWhiteSpace(sql);
ArgumentNullException.ThrowIfNull(connectionType);
ArgumentNullException.ThrowIfNull(query);
if (!ValidConnectionTypes.Contains(connectionType))
{
throw new ArgumentException($"Unknown connection type: {connectionType}. Valid types are: jde, cms, lotfinder.", nameof(connectionType));
}
_connectionFactory = connectionFactory;
_sql = sql;
_parameters = parameters;
_connectionType = connectionType.ToLowerInvariant();
_query = query;
_parameters = parameters ?? new Dictionary<string, object>();
_commandTimeout = commandTimeout;
SourceName = $"DbQuery:{name ?? "Query"}";
SourceName = $"DbQuery:{_connectionType}";
}
/// <inheritdoc />
public async Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default)
{
_connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
_connection = await CreateConnectionAsync(cancellationToken);
_command = _connection.CreateCommand();
_command.CommandText = _sql;
_command.CommandText = _query;
_command.CommandTimeout = _commandTimeout;
AddParameters(_command, _parameters);
AddParameters(_command);
return await _command.ExecuteReaderAsync(cancellationToken);
}
private static void AddParameters(SqlCommand command, object? parameters)
private async Task<DbConnection> CreateConnectionAsync(CancellationToken cancellationToken)
{
if (parameters == null) return;
var properties = parameters.GetType().GetProperties();
foreach (var prop in properties)
return _connectionType switch
{
var value = prop.GetValue(parameters) ?? DBNull.Value;
command.Parameters.AddWithValue($"@{prop.Name}", value);
"jde" => await _connectionFactory.CreateJdeConnectionAsync(cancellationToken),
"cms" => await _connectionFactory.CreateCmsConnectionAsync(cancellationToken),
"lotfinder" => await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken),
_ => throw new InvalidOperationException($"Unknown connection type: {_connectionType}")
};
}
private void AddParameters(DbCommand command)
{
foreach (var (name, value) in _parameters)
{
var param = command.CreateParameter();
param.ParameterName = name.StartsWith('@') ? name : $"@{name}";
param.Value = value ?? DBNull.Value;
command.Parameters.Add(param);
}
}
@@ -1,44 +1,84 @@
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Etl.Sources;
using NSubstitute;
using Shouldly;
namespace JdeScoping.DataSync.Tests.Etl.Sources;
public class DbQuerySourceTests
{
[Fact]
public void Constructor_SetsSourceName()
[Theory]
[InlineData("jde")]
[InlineData("cms")]
[InlineData("lotfinder")]
public void Constructor_ValidConnectionType_Succeeds(string connectionType)
{
var factory = Substitute.For<IDbConnectionFactory>();
var source = new DbQuerySource(factory, "SELECT 1", "TestSource");
Assert.Equal("DbQuery:TestSource", source.SourceName);
var source = new DbQuerySource(factory, connectionType, "SELECT 1");
source.SourceName.ShouldBe($"DbQuery:{connectionType}");
}
[Theory]
[InlineData("JDE")]
[InlineData("CMS")]
[InlineData("LotFinder")]
[InlineData("LOTFINDER")]
public void Constructor_ConnectionType_IsCaseInsensitive(string connectionType)
{
var factory = Substitute.For<IDbConnectionFactory>();
var source = new DbQuerySource(factory, connectionType, "SELECT 1");
source.SourceName.ShouldBe($"DbQuery:{connectionType.ToLowerInvariant()}");
}
[Fact]
public void Constructor_NullName_UsesDefault()
public void Constructor_InvalidConnectionType_Throws()
{
var factory = Substitute.For<IDbConnectionFactory>();
var source = new DbQuerySource(factory, "SELECT 1");
Assert.Equal("DbQuery:Query", source.SourceName);
Should.Throw<ArgumentException>(() =>
new DbQuerySource(factory, "invalid", "SELECT 1"));
}
[Fact]
public void Constructor_NullConnectionType_Throws()
{
var factory = Substitute.For<IDbConnectionFactory>();
Should.Throw<ArgumentNullException>(() =>
new DbQuerySource(factory, null!, "SELECT 1"));
}
[Fact]
public void Constructor_NullQuery_Throws()
{
var factory = Substitute.For<IDbConnectionFactory>();
Should.Throw<ArgumentNullException>(() =>
new DbQuerySource(factory, "jde", null!));
}
[Fact]
public void Constructor_NullFactory_ThrowsArgumentNullException()
{
Assert.Throws<ArgumentNullException>(() => new DbQuerySource(null!, "SELECT 1"));
Assert.Throws<ArgumentNullException>(() => new DbQuerySource(null!, "jde", "SELECT 1"));
}
[Fact]
public void Constructor_NullSql_ThrowsArgumentNullException()
public void Constructor_WithParameters_Succeeds()
{
var factory = Substitute.For<IDbConnectionFactory>();
Assert.Throws<ArgumentNullException>(() => new DbQuerySource(factory, null!));
var parameters = new Dictionary<string, object>
{
{ "MinDate", DateTime.Now },
{ "Status", 1 }
};
var source = new DbQuerySource(factory, "lotfinder", "SELECT * FROM T WHERE Date > @MinDate AND Status = @Status", parameters);
source.SourceName.ShouldBe("DbQuery:lotfinder");
}
[Fact]
public void Constructor_EmptySql_ThrowsArgumentException()
public void Constructor_WithCustomTimeout_Succeeds()
{
var factory = Substitute.For<IDbConnectionFactory>();
Assert.Throws<ArgumentException>(() => new DbQuerySource(factory, ""));
var source = new DbQuerySource(factory, "jde", "SELECT 1", commandTimeout: 7200);
source.SourceName.ShouldBe("DbQuery:jde");
}
}