refactor: address code review findings across all projects
Apply comprehensive fixes from code reviews including: - Extract shared utilities (SqlFormatHelper, CellValueConverter, DbDestinationBase) - Add interface abstractions (IAuthenticationService, IDatabaseMigrator, IMisQueryBuilder) - Implement SecureStore for encrypted secrets storage - Fix error handling with proper HTTP status codes and logging - Optimize double enumeration in DevEtlRegistry - Add DataSync.Dev README for developer onboarding - Extract filter panel base classes to reduce duplication - Update code review docs to mark all issues as fixed
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
using System.Data;
|
||||
using System.Diagnostics;
|
||||
using Dapper;
|
||||
using JdeScoping.DataAccess.Interfaces;
|
||||
using JdeScoping.DataSync.Etl.Contracts;
|
||||
using JdeScoping.DataSync.Etl.Results;
|
||||
@@ -13,7 +12,7 @@ namespace JdeScoping.DataSync.Etl.Destinations;
|
||||
/// Imports data into a SQL Server table using bulk copy operations.
|
||||
/// Performs a full table refresh by truncating the table before loading.
|
||||
/// </summary>
|
||||
public class DbBulkImportDestination : IImportDestination
|
||||
public class DbBulkImportDestination : DbDestinationBase, IImportDestination
|
||||
{
|
||||
private const int DefaultBatchSize = 100000;
|
||||
private const int DefaultCommandTimeoutSeconds = 600;
|
||||
@@ -22,9 +21,7 @@ public class DbBulkImportDestination : IImportDestination
|
||||
public const int InfiniteTimeout = 0;
|
||||
|
||||
private readonly IDbConnectionFactory _connectionFactory;
|
||||
private readonly string _tableName;
|
||||
private readonly int _batchSize;
|
||||
private readonly int _commandTimeoutSeconds;
|
||||
|
||||
/// <inheritdoc />
|
||||
public string DestinationName => $"BulkImport:{_tableName}";
|
||||
@@ -41,14 +38,13 @@ public class DbBulkImportDestination : IImportDestination
|
||||
string tableName,
|
||||
int batchSize = 0,
|
||||
int commandTimeoutSeconds = 0)
|
||||
: base(tableName, commandTimeoutSeconds > 0 ? commandTimeoutSeconds : DefaultCommandTimeoutSeconds)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(connectionFactory);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(tableName);
|
||||
|
||||
_connectionFactory = connectionFactory;
|
||||
_tableName = tableName;
|
||||
_batchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
|
||||
_commandTimeoutSeconds = commandTimeoutSeconds > 0 ? commandTimeoutSeconds : DefaultCommandTimeoutSeconds;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
@@ -121,17 +117,4 @@ public class DbBulkImportDestination : IImportDestination
|
||||
stopwatch.Stop();
|
||||
return new DestinationResult(totalRows, batchCount, stopwatch.Elapsed);
|
||||
}
|
||||
|
||||
private async Task<HashSet<string>> GetDestinationColumnsAsync(
|
||||
SqlConnection connection,
|
||||
CancellationToken ct)
|
||||
{
|
||||
var (schema, table) = CommonScripts.ParseTableName(_tableName);
|
||||
var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
|
||||
WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName";
|
||||
var columns = await connection.QueryAsync<string>(
|
||||
new CommandDefinition(sql, new { tableName = table, schemaName = schema },
|
||||
commandTimeout: _commandTimeoutSeconds, cancellationToken: ct));
|
||||
return columns.ToHashSet(StringComparer.OrdinalIgnoreCase);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
using System.Data;
|
||||
using System.Diagnostics;
|
||||
using System.Text;
|
||||
using Dapper;
|
||||
using JdeScoping.DataAccess.Interfaces;
|
||||
using JdeScoping.DataSync.Etl.Contracts;
|
||||
using JdeScoping.DataSync.Etl.Results;
|
||||
using JdeScoping.DataSync.Etl.Scripts;
|
||||
using Microsoft.Data.SqlClient;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace JdeScoping.DataSync.Etl.Destinations;
|
||||
|
||||
@@ -15,19 +15,18 @@ namespace JdeScoping.DataSync.Etl.Destinations;
|
||||
/// This approach supports incremental updates by matching on key columns and updating
|
||||
/// existing rows or inserting new ones.
|
||||
/// </summary>
|
||||
public class DbBulkMergeDestination : IImportDestination
|
||||
public class DbBulkMergeDestination : DbDestinationBase, IImportDestination
|
||||
{
|
||||
private const int DefaultBatchSize = 10000;
|
||||
private const int DefaultCommandTimeoutSeconds = 600;
|
||||
|
||||
private readonly IDbConnectionFactory _connectionFactory;
|
||||
private readonly string _tableName;
|
||||
private readonly ILogger<DbBulkMergeDestination>? _logger;
|
||||
private readonly string[] _matchColumns;
|
||||
private readonly string[]? _updateColumns;
|
||||
private readonly string[]? _excludeFromUpdate;
|
||||
private readonly string? _updateCondition;
|
||||
private readonly int _batchSize;
|
||||
private readonly int _commandTimeoutSeconds;
|
||||
|
||||
/// <inheritdoc />
|
||||
public string DestinationName => $"BulkMerge:{_tableName}";
|
||||
@@ -43,6 +42,7 @@ public class DbBulkMergeDestination : IImportDestination
|
||||
/// <param name="updateCondition">Optional SQL condition to add to the WHEN MATCHED clause (e.g., "source.LastUpdate > target.LastUpdate").</param>
|
||||
/// <param name="batchSize">Number of rows per batch. 0 uses the default (10000).</param>
|
||||
/// <param name="commandTimeoutSeconds">Command timeout in seconds. 0 uses the default (600).</param>
|
||||
/// <param name="logger">Optional logger for diagnostic output.</param>
|
||||
public DbBulkMergeDestination(
|
||||
IDbConnectionFactory connectionFactory,
|
||||
string tableName,
|
||||
@@ -51,7 +51,9 @@ public class DbBulkMergeDestination : IImportDestination
|
||||
string[]? excludeFromUpdate = null,
|
||||
string? updateCondition = null,
|
||||
int batchSize = 0,
|
||||
int commandTimeoutSeconds = 0)
|
||||
int commandTimeoutSeconds = 0,
|
||||
ILogger<DbBulkMergeDestination>? logger = null)
|
||||
: base(tableName, commandTimeoutSeconds > 0 ? commandTimeoutSeconds : DefaultCommandTimeoutSeconds)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(connectionFactory);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(tableName);
|
||||
@@ -60,13 +62,12 @@ public class DbBulkMergeDestination : IImportDestination
|
||||
throw new ArgumentException("At least one match column is required.", nameof(matchColumns));
|
||||
|
||||
_connectionFactory = connectionFactory;
|
||||
_tableName = tableName;
|
||||
_logger = logger;
|
||||
_matchColumns = matchColumns;
|
||||
_updateColumns = updateColumns;
|
||||
_excludeFromUpdate = excludeFromUpdate;
|
||||
_updateCondition = updateCondition;
|
||||
_batchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
|
||||
_commandTimeoutSeconds = commandTimeoutSeconds > 0 ? commandTimeoutSeconds : DefaultCommandTimeoutSeconds;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
@@ -177,9 +178,9 @@ public class DbBulkMergeDestination : IImportDestination
|
||||
cmd.CommandTimeout = _commandTimeoutSeconds;
|
||||
await cmd.ExecuteNonQueryAsync();
|
||||
}
|
||||
catch
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Ignore cleanup errors
|
||||
_logger?.LogDebug(ex, "Failed to drop temporary table {TempTableName} during cleanup", tempTableName);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -260,17 +261,4 @@ public class DbBulkMergeDestination : IImportDestination
|
||||
table.Columns.Add(source.GetName(i), baseType);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<HashSet<string>> GetDestinationColumnsAsync(
|
||||
SqlConnection connection,
|
||||
CancellationToken ct)
|
||||
{
|
||||
var (schema, table) = CommonScripts.ParseTableName(_tableName);
|
||||
var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
|
||||
WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName";
|
||||
var columns = await connection.QueryAsync<string>(
|
||||
new CommandDefinition(sql, new { tableName = table, schemaName = schema },
|
||||
commandTimeout: _commandTimeoutSeconds, cancellationToken: ct));
|
||||
return columns.ToHashSet(StringComparer.OrdinalIgnoreCase);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,52 @@
|
||||
using Dapper;
|
||||
using JdeScoping.DataSync.Etl.Scripts;
|
||||
using Microsoft.Data.SqlClient;
|
||||
|
||||
namespace JdeScoping.DataSync.Etl.Destinations;
|
||||
|
||||
/// <summary>
|
||||
/// Base class for SQL Server database destinations providing common functionality
|
||||
/// for table metadata operations.
|
||||
/// </summary>
|
||||
public abstract class DbDestinationBase
|
||||
{
|
||||
/// <summary>
|
||||
/// The name of the destination table.
|
||||
/// </summary>
|
||||
protected readonly string _tableName;
|
||||
|
||||
/// <summary>
|
||||
/// The command timeout in seconds for database operations.
|
||||
/// </summary>
|
||||
protected readonly int _commandTimeoutSeconds;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the DbDestinationBase class.
|
||||
/// </summary>
|
||||
/// <param name="tableName">The name of the destination table.</param>
|
||||
/// <param name="commandTimeoutSeconds">The command timeout in seconds.</param>
|
||||
protected DbDestinationBase(string tableName, int commandTimeoutSeconds)
|
||||
{
|
||||
_tableName = tableName;
|
||||
_commandTimeoutSeconds = commandTimeoutSeconds;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Retrieves the column names from the destination table.
|
||||
/// </summary>
|
||||
/// <param name="connection">The SQL Server connection to use.</param>
|
||||
/// <param name="ct">Cancellation token.</param>
|
||||
/// <returns>A HashSet containing the column names (case-insensitive).</returns>
|
||||
protected async Task<HashSet<string>> GetDestinationColumnsAsync(
|
||||
SqlConnection connection,
|
||||
CancellationToken ct)
|
||||
{
|
||||
var (schema, table) = CommonScripts.ParseTableName(_tableName);
|
||||
var sql = @"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
|
||||
WHERE TABLE_NAME = @tableName AND TABLE_SCHEMA = @schemaName";
|
||||
var columns = await connection.QueryAsync<string>(
|
||||
new CommandDefinition(sql, new { tableName = table, schemaName = schema },
|
||||
commandTimeout: _commandTimeoutSeconds, cancellationToken: ct));
|
||||
return columns.ToHashSet(StringComparer.OrdinalIgnoreCase);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user