diff --git a/NEW/src/JdeScoping.DataSync/Etl/Sources/DbQuerySource.cs b/NEW/src/JdeScoping.DataSync/Etl/Sources/DbQuerySource.cs
index f3dff54..eca0ea5 100644
--- a/NEW/src/JdeScoping.DataSync/Etl/Sources/DbQuerySource.cs
+++ b/NEW/src/JdeScoping.DataSync/Etl/Sources/DbQuerySource.cs
@@ -1,22 +1,28 @@
using System.Data;
+using System.Data.Common;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Etl.Contracts;
-using Microsoft.Data.SqlClient;
namespace JdeScoping.DataSync.Etl.Sources;
///
-/// An import source that executes a SQL query against the LotFinder database.
+/// An import source that executes a SQL query against JDE, CMS, or LotFinder databases.
///
public class DbQuerySource : IImportSource
{
+ private static readonly HashSet ValidConnectionTypes = new(StringComparer.OrdinalIgnoreCase)
+ {
+ "jde", "cms", "lotfinder"
+ };
+
private readonly IDbConnectionFactory _connectionFactory;
- private readonly string _sql;
- private readonly object? _parameters;
+ private readonly string _connectionType;
+ private readonly string _query;
+ private readonly Dictionary _parameters;
private readonly int _commandTimeout;
- private SqlConnection? _connection;
- private SqlCommand? _command;
+ private DbConnection? _connection;
+ private DbCommand? _command;
///
public string SourceName { get; }
@@ -25,47 +31,65 @@ public class DbQuerySource : IImportSource
/// Creates a new database query source.
///
/// Factory for creating database connections.
- /// The SQL query to execute.
- /// Optional name for this source (used in logging).
- /// Optional anonymous object containing query parameters.
+ /// The connection type: "jde", "cms", or "lotfinder".
+ /// The SQL query to execute.
+ /// Optional dictionary of query parameters.
/// Command timeout in seconds (default: 3600).
public DbQuerySource(
IDbConnectionFactory connectionFactory,
- string sql,
- string? name = null,
- object? parameters = null,
+ string connectionType,
+ string query,
+ Dictionary? parameters = null,
int commandTimeout = 3600)
{
ArgumentNullException.ThrowIfNull(connectionFactory);
- ArgumentException.ThrowIfNullOrWhiteSpace(sql);
+ ArgumentNullException.ThrowIfNull(connectionType);
+ ArgumentNullException.ThrowIfNull(query);
+
+ if (!ValidConnectionTypes.Contains(connectionType))
+ {
+ throw new ArgumentException($"Unknown connection type: {connectionType}. Valid types are: jde, cms, lotfinder.", nameof(connectionType));
+ }
_connectionFactory = connectionFactory;
- _sql = sql;
- _parameters = parameters;
+ _connectionType = connectionType.ToLowerInvariant();
+ _query = query;
+ _parameters = parameters ?? new Dictionary();
_commandTimeout = commandTimeout;
- SourceName = $"DbQuery:{name ?? "Query"}";
+ SourceName = $"DbQuery:{_connectionType}";
}
///
public async Task ReadDataAsync(CancellationToken cancellationToken = default)
{
- _connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
+ _connection = await CreateConnectionAsync(cancellationToken);
_command = _connection.CreateCommand();
- _command.CommandText = _sql;
+ _command.CommandText = _query;
_command.CommandTimeout = _commandTimeout;
- AddParameters(_command, _parameters);
+ AddParameters(_command);
+
return await _command.ExecuteReaderAsync(cancellationToken);
}
- private static void AddParameters(SqlCommand command, object? parameters)
+ private async Task CreateConnectionAsync(CancellationToken cancellationToken)
{
- if (parameters == null) return;
-
- var properties = parameters.GetType().GetProperties();
- foreach (var prop in properties)
+ return _connectionType switch
{
- var value = prop.GetValue(parameters) ?? DBNull.Value;
- command.Parameters.AddWithValue($"@{prop.Name}", value);
+ "jde" => await _connectionFactory.CreateJdeConnectionAsync(cancellationToken),
+ "cms" => await _connectionFactory.CreateCmsConnectionAsync(cancellationToken),
+ "lotfinder" => await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken),
+ _ => throw new InvalidOperationException($"Unknown connection type: {_connectionType}")
+ };
+ }
+
+ private void AddParameters(DbCommand command)
+ {
+ foreach (var (name, value) in _parameters)
+ {
+ var param = command.CreateParameter();
+ param.ParameterName = name.StartsWith('@') ? name : $"@{name}";
+ param.Value = value ?? DBNull.Value;
+ command.Parameters.Add(param);
}
}
diff --git a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Sources/DbQuerySourceTests.cs b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Sources/DbQuerySourceTests.cs
index 7cab88d..5c07403 100644
--- a/NEW/tests/JdeScoping.DataSync.Tests/Etl/Sources/DbQuerySourceTests.cs
+++ b/NEW/tests/JdeScoping.DataSync.Tests/Etl/Sources/DbQuerySourceTests.cs
@@ -1,44 +1,84 @@
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Etl.Sources;
using NSubstitute;
+using Shouldly;
namespace JdeScoping.DataSync.Tests.Etl.Sources;
public class DbQuerySourceTests
{
- [Fact]
- public void Constructor_SetsSourceName()
+ [Theory]
+ [InlineData("jde")]
+ [InlineData("cms")]
+ [InlineData("lotfinder")]
+ public void Constructor_ValidConnectionType_Succeeds(string connectionType)
{
var factory = Substitute.For();
- var source = new DbQuerySource(factory, "SELECT 1", "TestSource");
- Assert.Equal("DbQuery:TestSource", source.SourceName);
+ var source = new DbQuerySource(factory, connectionType, "SELECT 1");
+ source.SourceName.ShouldBe($"DbQuery:{connectionType}");
+ }
+
+ [Theory]
+ [InlineData("JDE")]
+ [InlineData("CMS")]
+ [InlineData("LotFinder")]
+ [InlineData("LOTFINDER")]
+ public void Constructor_ConnectionType_IsCaseInsensitive(string connectionType)
+ {
+ var factory = Substitute.For();
+ var source = new DbQuerySource(factory, connectionType, "SELECT 1");
+ source.SourceName.ShouldBe($"DbQuery:{connectionType.ToLowerInvariant()}");
}
[Fact]
- public void Constructor_NullName_UsesDefault()
+ public void Constructor_InvalidConnectionType_Throws()
{
var factory = Substitute.For();
- var source = new DbQuerySource(factory, "SELECT 1");
- Assert.Equal("DbQuery:Query", source.SourceName);
+ Should.Throw(() =>
+ new DbQuerySource(factory, "invalid", "SELECT 1"));
+ }
+
+ [Fact]
+ public void Constructor_NullConnectionType_Throws()
+ {
+ var factory = Substitute.For();
+ Should.Throw(() =>
+ new DbQuerySource(factory, null!, "SELECT 1"));
+ }
+
+ [Fact]
+ public void Constructor_NullQuery_Throws()
+ {
+ var factory = Substitute.For();
+ Should.Throw(() =>
+ new DbQuerySource(factory, "jde", null!));
}
[Fact]
public void Constructor_NullFactory_ThrowsArgumentNullException()
{
- Assert.Throws(() => new DbQuerySource(null!, "SELECT 1"));
+ Assert.Throws(() => new DbQuerySource(null!, "jde", "SELECT 1"));
}
[Fact]
- public void Constructor_NullSql_ThrowsArgumentNullException()
+ public void Constructor_WithParameters_Succeeds()
{
var factory = Substitute.For();
- Assert.Throws(() => new DbQuerySource(factory, null!));
+ var parameters = new Dictionary
+ {
+ { "MinDate", DateTime.Now },
+ { "Status", 1 }
+ };
+
+ var source = new DbQuerySource(factory, "lotfinder", "SELECT * FROM T WHERE Date > @MinDate AND Status = @Status", parameters);
+ source.SourceName.ShouldBe("DbQuery:lotfinder");
}
[Fact]
- public void Constructor_EmptySql_ThrowsArgumentException()
+ public void Constructor_WithCustomTimeout_Succeeds()
{
var factory = Substitute.For();
- Assert.Throws(() => new DbQuerySource(factory, ""));
+ var source = new DbQuerySource(factory, "jde", "SELECT 1", commandTimeout: 7200);
+ source.SourceName.ShouldBe("DbQuery:jde");
}
}