refactor(datasync): delete old ETL source files

This commit is contained in:
Joseph Doherty
2026-01-06 14:11:13 -05:00
parent 6074424524
commit 34daf6a83b
35 changed files with 0 additions and 2118 deletions
@@ -1,29 +0,0 @@
using JdeScoping.Core.Models.Inventory;
using JdeScoping.Core.Models.Organization;
using JdeScoping.Core.Models.Quality;
using JdeScoping.Core.Models.WorkOrders;
namespace JdeScoping.DataSync;
/// <summary>
/// Registry of types that support bulk copy operations.
/// The source generator analyzes this list to generate IDataReader implementations.
/// </summary>
public static class BulkCopyTypeRegistry
{
/// <summary>
/// Types that support bulk copy. Add new types here to generate converters.
/// </summary>
public static readonly Type[] Types =
[
typeof(WorkOrder),
typeof(Lot),
typeof(LotUsage),
typeof(Item),
typeof(WorkCenter),
typeof(ProfitCenter),
typeof(JdeUser),
typeof(Branch),
typeof(MisData),
];
}
@@ -1,24 +0,0 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for Branch entities.
/// </summary>
public sealed class BranchMergeConfiguration : IMergeConfiguration<Branch>
{
public string TableName => "Branch";
public Expression<Func<Branch, object>> MatchOn =>
x => x.Code;
public Expression<Func<Branch, object>>? UpdateColumns =>
x => x.Description;
public Expression<Func<Branch, Branch, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<Branch, object>>? InsertColumns => null;
}
@@ -1,30 +0,0 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Inventory;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for Item entities.
/// </summary>
public sealed class ItemMergeConfiguration : IMergeConfiguration<Item>
{
public string TableName => "Item";
public Expression<Func<Item, object>> MatchOn =>
x => x.ShortItemNumber;
public Expression<Func<Item, object>>? UpdateColumns =>
x => new
{
x.ItemNumber,
x.Description,
x.PlanningFamily,
x.StockingType
};
public Expression<Func<Item, Item, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<Item, object>>? InsertColumns => null;
}
@@ -1,24 +0,0 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for JdeUser entities.
/// </summary>
public sealed class JdeUserMergeConfiguration : IMergeConfiguration<JdeUser>
{
public string TableName => "JdeUser";
public Expression<Func<JdeUser, object>> MatchOn =>
x => x.AddressNumber;
public Expression<Func<JdeUser, object>>? UpdateColumns =>
x => new { x.UserId, x.FullName };
public Expression<Func<JdeUser, JdeUser, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<JdeUser, object>>? InsertColumns => null;
}
@@ -1,33 +0,0 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Inventory;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for Lot entities.
/// </summary>
public sealed class LotMergeConfiguration : IMergeConfiguration<Lot>
{
public string TableName => "Lot";
public Expression<Func<Lot, object>> MatchOn =>
x => new { x.LotNumber, x.BranchCode };
public Expression<Func<Lot, object>>? UpdateColumns =>
x => new
{
x.ShortItemNumber,
x.ItemNumber,
x.SupplierCode,
x.StatusCode,
x.Memo1,
x.Memo2,
x.Memo3
};
public Expression<Func<Lot, Lot, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<Lot, object>>? InsertColumns => null;
}
@@ -1,31 +0,0 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Inventory;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for LotUsage entities.
/// </summary>
public sealed class LotUsageMergeConfiguration : IMergeConfiguration<LotUsage>
{
public string TableName => "LotUsage";
public Expression<Func<LotUsage, object>> MatchOn =>
x => x.UniqueId;
public Expression<Func<LotUsage, object>>? UpdateColumns =>
x => new
{
x.WorkOrderNumber,
x.LotNumber,
x.BranchCode,
x.ShortItemNumber,
x.Quantity
};
public Expression<Func<LotUsage, LotUsage, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<LotUsage, object>>? InsertColumns => null;
}
@@ -1,34 +0,0 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Quality;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for MisData entities.
/// </summary>
public sealed class MisDataMergeConfiguration : IMergeConfiguration<MisData>
{
public string TableName => "MisData";
public Expression<Func<MisData, object>> MatchOn =>
x => new { x.ItemNumber, x.BranchCode, x.SequenceNumber, x.MisNumber, x.CharNumber };
public Expression<Func<MisData, object>>? UpdateColumns =>
x => new
{
x.RevId,
x.TestDescription,
x.SamplingType,
x.SamplingValue,
x.ToolsGauges,
x.WorkInstructions,
x.Status,
x.ReleaseDate
};
// MisData doesn't have LastUpdateDt, so always update on match
public Expression<Func<MisData, MisData, bool>>? UpdateWhen => null;
public Expression<Func<MisData, object>>? InsertColumns => null;
}
@@ -1,24 +0,0 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for ProfitCenter entities.
/// </summary>
public sealed class ProfitCenterMergeConfiguration : IMergeConfiguration<ProfitCenter>
{
public string TableName => "ProfitCenter";
public Expression<Func<ProfitCenter, object>> MatchOn =>
x => x.Code;
public Expression<Func<ProfitCenter, object>>? UpdateColumns =>
x => x.Description;
public Expression<Func<ProfitCenter, ProfitCenter, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<ProfitCenter, object>>? InsertColumns => null;
}
@@ -1,24 +0,0 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for WorkCenter entities.
/// </summary>
public sealed class WorkCenterMergeConfiguration : IMergeConfiguration<WorkCenter>
{
public string TableName => "WorkCenter";
public Expression<Func<WorkCenter, object>> MatchOn =>
x => x.Code;
public Expression<Func<WorkCenter, object>>? UpdateColumns =>
x => x.Description;
public Expression<Func<WorkCenter, WorkCenter, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<WorkCenter, object>>? InsertColumns => null;
}
@@ -1,38 +0,0 @@
using System.Linq.Expressions;
using JdeScoping.Core.Models.WorkOrders;
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Configuration.MergeConfigurations;
/// <summary>
/// Merge configuration for WorkOrder entities.
/// </summary>
public sealed class WorkOrderMergeConfiguration : IMergeConfiguration<WorkOrder>
{
public string TableName => "WorkOrder";
public Expression<Func<WorkOrder, object>> MatchOn =>
x => new { x.WorkOrderNumber, x.BranchCode };
public Expression<Func<WorkOrder, object>>? UpdateColumns =>
x => new
{
x.LotNumber,
x.ItemNumber,
x.ShortItemNumber,
x.ParentWorkOrderNumber,
x.OrderQuantity,
x.HeldQuantity,
x.ShippedQuantity,
x.StatusCode,
x.StatusCodeUpdateDt,
x.IssueDate,
x.StartDate,
x.RoutingType
};
public Expression<Func<WorkOrder, WorkOrder, bool>>? UpdateWhen =>
(src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt;
public Expression<Func<WorkOrder, object>>? InsertColumns => null; // All columns
}
@@ -1,55 +0,0 @@
using System.Linq.Expressions;
using JdeScoping.DataSync.Models;
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Helper for performing bulk merge operations from async enumerable sources to SQL Server tables.
/// </summary>
public interface IBulkMergeHelper
{
/// <summary>
/// Merges data from an async enumerable source into a destination table.
/// </summary>
/// <typeparam name="T">The entity type.</typeparam>
/// <param name="data">The source data to merge.</param>
/// <param name="destinationTable">The destination SQL table name.</param>
/// <param name="matchOn">Expression defining the columns to match on (primary key).</param>
/// <param name="updateColumns">Expression defining which columns to update on match. Null means all non-PK columns.</param>
/// <param name="updateWhen">Optional condition for when to perform updates. If null, always updates on match.</param>
/// <param name="insertColumns">Expression defining which columns to insert. Null means all columns.</param>
/// <param name="tempTableName">Optional temp table name. Defaults to #TEMP_{destinationTable}.</param>
/// <param name="batchSize">Number of rows per batch. 0 means process all at once.</param>
/// <param name="validateBeforeCopy">If true, validates data against schema before bulk copy.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Result containing row counts and timing information.</returns>
Task<MergeResult> MergeAsync<T>(
IAsyncEnumerable<T> data,
string destinationTable,
Expression<Func<T, object>> matchOn,
Expression<Func<T, object>>? updateColumns = null,
Expression<Func<T, T, bool>>? updateWhen = null,
Expression<Func<T, object>>? insertColumns = null,
string? tempTableName = null,
int batchSize = 0,
bool validateBeforeCopy = false,
CancellationToken cancellationToken = default) where T : class;
/// <summary>
/// Performs a mass insert (full table refresh) with optional index management.
/// Truncates table, disables non-clustered indexes, bulk copies data, rebuilds indexes.
/// </summary>
/// <typeparam name="T">The entity type.</typeparam>
/// <param name="data">The source data to insert.</param>
/// <param name="destinationTable">The destination SQL table name.</param>
/// <param name="rebuildIndexes">If true, rebuilds indexes after insert. If false, just re-enables them.</param>
/// <param name="batchSize">Number of rows per bulk copy batch. 0 = default (10000).</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Result containing row count and timing.</returns>
Task<MassInsertResult> MassInsertAsync<T>(
IAsyncEnumerable<T> data,
string destinationTable,
bool rebuildIndexes = true,
int batchSize = 0,
CancellationToken cancellationToken = default) where T : class;
}
@@ -1,23 +0,0 @@
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Interface for fetching data from source systems (JDE/CMS).
/// </summary>
/// <typeparam name="TEntity">The entity type being fetched.</typeparam>
public interface IDataFetcher<TEntity> where TEntity : class
{
/// <summary>
/// Fetches entities from source system as an async stream.
/// </summary>
/// <param name="minimumDt">For incremental fetches, only return records modified after this time. Null for full fetch.</param>
/// <param name="cancellationToken">Cancellation token for graceful shutdown.</param>
/// <returns>Async enumerable of entities, streamed from source.</returns>
IAsyncEnumerable<TEntity> FetchAsync(
DateTime? minimumDt,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets the entity type name for logging purposes.
/// </summary>
string EntityTypeName => typeof(TEntity).Name;
}
@@ -1,26 +0,0 @@
using System.Data;
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Factory for creating IDataReader instances from IAsyncEnumerable sources.
/// Implementations are typically source-generated.
/// </summary>
public interface IDataReaderFactory
{
/// <summary>
/// Creates an IDataReader that wraps the async enumerable source for use with SqlBulkCopy.
/// </summary>
/// <typeparam name="T">The entity type.</typeparam>
/// <param name="source">The async enumerable data source.</param>
/// <returns>An IDataReader that can be passed to SqlBulkCopy.</returns>
/// <exception cref="NotSupportedException">Thrown when no converter exists for type T.</exception>
IDataReader CreateReader<T>(IAsyncEnumerable<T> source) where T : class;
/// <summary>
/// Gets the column names for a given entity type in ordinal order.
/// </summary>
/// <typeparam name="T">The entity type.</typeparam>
/// <returns>List of column names matching the IDataReader column ordinals.</returns>
IReadOnlyList<string> GetColumnNames<T>() where T : class;
}
@@ -1,38 +0,0 @@
using System.Linq.Expressions;
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Defines merge configuration for an entity type.
/// </summary>
/// <typeparam name="T">The entity type.</typeparam>
public interface IMergeConfiguration<T> where T : class
{
/// <summary>
/// Gets the destination table name in SQL Server.
/// </summary>
string TableName { get; }
/// <summary>
/// Gets the expression defining columns to match on (primary key).
/// </summary>
Expression<Func<T, object>> MatchOn { get; }
/// <summary>
/// Gets the expression defining columns to update when matched.
/// Null means all non-PK columns.
/// </summary>
Expression<Func<T, object>>? UpdateColumns { get; }
/// <summary>
/// Gets the condition for when to perform updates.
/// Null means always update on match.
/// </summary>
Expression<Func<T, T, bool>>? UpdateWhen { get; }
/// <summary>
/// Gets the expression defining columns to insert.
/// Null means all columns.
/// </summary>
Expression<Func<T, object>>? InsertColumns { get; }
}
@@ -1,22 +0,0 @@
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Registry for looking up merge configurations by entity type.
/// </summary>
public interface IMergeConfigurationRegistry
{
/// <summary>
/// Gets the merge configuration for the specified entity type.
/// </summary>
/// <typeparam name="T">The entity type.</typeparam>
/// <returns>The merge configuration.</returns>
/// <exception cref="InvalidOperationException">Thrown if no configuration is registered.</exception>
IMergeConfiguration<T> GetConfiguration<T>() where T : class;
/// <summary>
/// Checks if a merge configuration exists for the specified entity type.
/// </summary>
/// <typeparam name="T">The entity type.</typeparam>
/// <returns>True if configuration exists.</returns>
bool HasConfiguration<T>() where T : class;
}
@@ -1,15 +0,0 @@
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Interface for post-processing operations after data sync.
/// </summary>
public interface IPostProcessor
{
/// <summary>
/// Executes post-processing logic after data has been synced to the table.
/// </summary>
/// <param name="tableName">The table that was synced.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task ProcessAsync(string tableName, CancellationToken cancellationToken = default);
}
@@ -1,36 +0,0 @@
using System.Data;
using JdeScoping.DataSync.Exceptions;
using JdeScoping.DataSync.Models;
namespace JdeScoping.DataSync.Contracts;
/// <summary>
/// Validates data against database table schema.
/// </summary>
public interface ISchemaValidator
{
/// <summary>
/// Gets the schema for a table.
/// </summary>
/// <param name="connection">Database connection.</param>
/// <param name="tableName">Table name.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>List of column schemas.</returns>
Task<IReadOnlyList<ColumnSchema>> GetTableSchemaAsync(
IDbConnection connection,
string tableName,
CancellationToken cancellationToken = default);
/// <summary>
/// Validates a batch of data against the table schema.
/// </summary>
/// <typeparam name="T">Entity type.</typeparam>
/// <param name="data">Data to validate.</param>
/// <param name="schema">Column schemas to validate against.</param>
/// <param name="maxErrors">Maximum number of errors to collect before stopping (0 = unlimited).</param>
/// <returns>List of validation errors.</returns>
IReadOnlyList<ValidationError> ValidateBatch<T>(
IReadOnlyList<T> data,
IReadOnlyList<ColumnSchema> schema,
int maxErrors = 100) where T : class;
}
@@ -1,80 +0,0 @@
namespace JdeScoping.DataSync.Exceptions;
/// <summary>
/// Exception thrown when a bulk merge operation fails.
/// </summary>
public class BulkMergeException : Exception
{
/// <summary>
/// The destination table name.
/// </summary>
public string TableName { get; init; } = string.Empty;
/// <summary>
/// The batch number that failed (1-based).
/// </summary>
public int BatchNumber { get; init; }
/// <summary>
/// Number of rows in the failed batch.
/// </summary>
public int RowsInBatch { get; init; }
/// <summary>
/// The SQL statement that was being executed when the error occurred.
/// </summary>
public string? SqlStatement { get; init; }
public BulkMergeException()
{
}
public BulkMergeException(string message) : base(message)
{
}
public BulkMergeException(string message, Exception innerException) : base(message, innerException)
{
}
}
/// <summary>
/// Exception thrown when validation fails before bulk copy.
/// </summary>
public class BulkMergeValidationException : BulkMergeException
{
/// <summary>
/// The validation errors that were found.
/// </summary>
public IReadOnlyList<ValidationError> Errors { get; init; } = [];
public BulkMergeValidationException()
{
}
public BulkMergeValidationException(string message) : base(message)
{
}
public BulkMergeValidationException(string message, IReadOnlyList<ValidationError> errors) : base(message)
{
Errors = errors;
}
public BulkMergeValidationException(string message, Exception innerException) : base(message, innerException)
{
}
}
/// <summary>
/// Represents a validation error for a specific row and column.
/// </summary>
/// <param name="RowIndex">Zero-based row index in the batch.</param>
/// <param name="ColumnName">The column name that failed validation.</param>
/// <param name="Value">The value that failed validation.</param>
/// <param name="Message">Human-readable error message.</param>
public record ValidationError(
int RowIndex,
string ColumnName,
object? Value,
string Message);
@@ -1,41 +0,0 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Quality;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Cms;
/// <summary>
/// Fetches MIS data from CMS (Sybase/Oracle).
/// </summary>
public class CmsMisDataFetcher : IDataFetcher<MisData>
{
private readonly ICmsDataSource _cmsDataSource;
private readonly ILogger<CmsMisDataFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="CmsMisDataFetcher"/> class.
/// </summary>
public CmsMisDataFetcher(
ICmsDataSource cmsDataSource,
ILogger<CmsMisDataFetcher> logger)
{
_cmsDataSource = cmsDataSource ?? throw new ArgumentNullException(nameof(cmsDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<MisData> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching MIS data from CMS, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var misData in _cmsDataSource.GetMisDataAsync(minimumDt, cancellationToken))
{
yield return misData;
}
}
}
@@ -1,41 +0,0 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches branch data from JDE.
/// </summary>
public class JdeBranchFetcher : IDataFetcher<Branch>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeBranchFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeBranchFetcher"/> class.
/// </summary>
public JdeBranchFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeBranchFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<Branch> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching branches from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var branch in _jdeDataSource.GetBranchesAsync(minimumDt, cancellationToken))
{
yield return branch;
}
}
}
@@ -1,41 +0,0 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Inventory;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches item master data from JDE.
/// </summary>
public class JdeItemFetcher : IDataFetcher<Item>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeItemFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeItemFetcher"/> class.
/// </summary>
public JdeItemFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeItemFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<Item> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching items from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var item in _jdeDataSource.GetItemsAsync(minimumDt, cancellationToken))
{
yield return item;
}
}
}
@@ -1,41 +0,0 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Inventory;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches lot master data from JDE.
/// </summary>
public class JdeLotFetcher : IDataFetcher<Lot>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeLotFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeLotFetcher"/> class.
/// </summary>
public JdeLotFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeLotFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<Lot> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching lots from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var lot in _jdeDataSource.GetLotsAsync(minimumDt, cancellationToken))
{
yield return lot;
}
}
}
@@ -1,41 +0,0 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Inventory;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches lot usage (cardex) data from JDE.
/// </summary>
public class JdeLotUsageFetcher : IDataFetcher<LotUsage>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeLotUsageFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeLotUsageFetcher"/> class.
/// </summary>
public JdeLotUsageFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeLotUsageFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<LotUsage> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching lot usages from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var lotUsage in _jdeDataSource.GetLotUsagesAsync(minimumDt, cancellationToken))
{
yield return lotUsage;
}
}
}
@@ -1,41 +0,0 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches profit center data from JDE.
/// </summary>
public class JdeProfitCenterFetcher : IDataFetcher<ProfitCenter>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeProfitCenterFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeProfitCenterFetcher"/> class.
/// </summary>
public JdeProfitCenterFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeProfitCenterFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<ProfitCenter> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching profit centers from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var profitCenter in _jdeDataSource.GetProfitCentersAsync(minimumDt, cancellationToken))
{
yield return profitCenter;
}
}
}
@@ -1,41 +0,0 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches JDE user data from JDE.
/// </summary>
public class JdeUserFetcher : IDataFetcher<JdeUser>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeUserFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeUserFetcher"/> class.
/// </summary>
public JdeUserFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeUserFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<JdeUser> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching JDE users, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var user in _jdeDataSource.GetUsersAsync(minimumDt, cancellationToken))
{
yield return user;
}
}
}
@@ -1,41 +0,0 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.Organization;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches work center data from JDE.
/// </summary>
public class JdeWorkCenterFetcher : IDataFetcher<WorkCenter>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeWorkCenterFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeWorkCenterFetcher"/> class.
/// </summary>
public JdeWorkCenterFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeWorkCenterFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<WorkCenter> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching work centers from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var workCenter in _jdeDataSource.GetWorkCentersAsync(minimumDt, cancellationToken))
{
yield return workCenter;
}
}
}
@@ -1,41 +0,0 @@
using System.Runtime.CompilerServices;
using JdeScoping.Core.Interfaces;
using JdeScoping.Core.Models;
using JdeScoping.Core.Models.WorkOrders;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Fetchers.Jde;
/// <summary>
/// Fetches work order data from JDE.
/// </summary>
public class JdeWorkOrderFetcher : IDataFetcher<WorkOrder>
{
private readonly IJdeDataSource _jdeDataSource;
private readonly ILogger<JdeWorkOrderFetcher> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="JdeWorkOrderFetcher"/> class.
/// </summary>
public JdeWorkOrderFetcher(
IJdeDataSource jdeDataSource,
ILogger<JdeWorkOrderFetcher> logger)
{
_jdeDataSource = jdeDataSource ?? throw new ArgumentNullException(nameof(jdeDataSource));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async IAsyncEnumerable<WorkOrder> FetchAsync(
DateTime? minimumDt,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_logger.LogDebug("Fetching work orders from JDE, minimumDT={MinDT}", minimumDt?.ToString("o") ?? "null");
await foreach (var workOrder in _jdeDataSource.GetWorkOrdersAsync(minimumDt, cancellationToken))
{
yield return workOrder;
}
}
}
@@ -1,20 +0,0 @@
namespace JdeScoping.DataSync.Models;
/// <summary>
/// Represents the schema of a database column.
/// </summary>
/// <param name="Name">Column name.</param>
/// <param name="DataType">SQL Server data type name (e.g., "nvarchar", "int", "decimal").</param>
/// <param name="MaxLength">Maximum length for string columns (-1 for MAX).</param>
/// <param name="Precision">Precision for decimal columns.</param>
/// <param name="Scale">Scale for decimal columns.</param>
/// <param name="IsNullable">Whether the column allows null values.</param>
/// <param name="OrdinalPosition">Column ordinal position (1-based).</param>
public record ColumnSchema(
string Name,
string DataType,
int? MaxLength,
int? Precision,
int? Scale,
bool IsNullable,
int OrdinalPosition);
@@ -1,33 +0,0 @@
namespace JdeScoping.DataSync.Models;
/// <summary>
/// Result of a bulk merge operation.
/// </summary>
/// <param name="TotalRowsProcessed">Total number of rows processed from source.</param>
/// <param name="RowsInserted">Number of rows inserted (new records).</param>
/// <param name="RowsUpdated">Number of rows updated (existing records).</param>
/// <param name="BatchCount">Number of batches processed.</param>
/// <param name="Elapsed">Total elapsed time for the operation.</param>
public record MergeResult(
int TotalRowsProcessed,
int RowsInserted,
int RowsUpdated,
int BatchCount,
TimeSpan Elapsed)
{
/// <summary>
/// Total rows affected (inserted + updated).
/// </summary>
public int TotalRowsAffected => RowsInserted + RowsUpdated;
}
/// <summary>
/// Result of a mass insert operation.
/// </summary>
/// <param name="TotalRowsInserted">Total rows inserted.</param>
/// <param name="Elapsed">Total elapsed time.</param>
/// <param name="IndexesRebuilt">Whether indexes were rebuilt (vs just re-enabled).</param>
public record MassInsertResult(
long TotalRowsInserted,
TimeSpan Elapsed,
bool IndexesRebuilt);
@@ -1,432 +0,0 @@
using System.Diagnostics;
using System.Linq.Expressions;
using Dapper;
using JdeScoping.DataAccess.Interfaces;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Exceptions;
using JdeScoping.DataSync.Models;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Implements bulk merge operations using temp tables and SQL MERGE statements.
/// </summary>
internal sealed class BulkMergeHelper : IBulkMergeHelper
{
private const int DefaultBatchSize = 10000;
private readonly IDbConnectionFactory _connectionFactory;
private readonly IDataReaderFactory _dataReaderFactory;
private readonly ISchemaValidator _schemaValidator;
private readonly ILogger<BulkMergeHelper> _logger;
public BulkMergeHelper(
IDbConnectionFactory connectionFactory,
IDataReaderFactory dataReaderFactory,
ISchemaValidator schemaValidator,
ILogger<BulkMergeHelper> logger)
{
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_dataReaderFactory = dataReaderFactory ?? throw new ArgumentNullException(nameof(dataReaderFactory));
_schemaValidator = schemaValidator ?? throw new ArgumentNullException(nameof(schemaValidator));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc />
public async Task<MergeResult> MergeAsync<T>(
IAsyncEnumerable<T> data,
string destinationTable,
Expression<Func<T, object>> matchOn,
Expression<Func<T, object>>? updateColumns = null,
Expression<Func<T, T, bool>>? updateWhen = null,
Expression<Func<T, object>>? insertColumns = null,
string? tempTableName = null,
int batchSize = 0,
bool validateBeforeCopy = false,
CancellationToken cancellationToken = default) where T : class
{
ArgumentNullException.ThrowIfNull(data);
ArgumentException.ThrowIfNullOrWhiteSpace(destinationTable);
ArgumentNullException.ThrowIfNull(matchOn);
var stopwatch = Stopwatch.StartNew();
var effectiveBatchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
var effectiveTempTable = tempTableName ?? $"#TEMP_{destinationTable.Replace(".", "_").Replace("[", "").Replace("]", "")}";
// Parse expressions to get column names
var matchColumnNames = ExpressionParser.GetColumnNames(matchOn);
var allColumns = _dataReaderFactory.GetColumnNames<T>();
// Default update columns = all columns except match columns
var updateColumnNames = updateColumns != null
? ExpressionParser.GetColumnNames(updateColumns)
: allColumns.Where(c => !matchColumnNames.Contains(c, StringComparer.OrdinalIgnoreCase)).ToList();
// Default insert columns = all columns
var insertColumnNames = insertColumns != null
? ExpressionParser.GetColumnNames(insertColumns)
: allColumns;
// Build the updateWhen SQL clause if provided
var updateWhenSql = ExpressionParser.BuildUpdateWhenSql(updateWhen);
_logger.LogDebug(
"Starting bulk merge to {DestinationTable}. BatchSize={BatchSize}, MatchColumns={MatchColumns}, TempTable={TempTable}",
destinationTable, effectiveBatchSize, string.Join(",", matchColumnNames), effectiveTempTable);
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
// Get schema for validation if needed
IReadOnlyList<ColumnSchema>? schema = null;
if (validateBeforeCopy)
{
schema = await _schemaValidator.GetTableSchemaAsync(connection, destinationTable, cancellationToken);
}
int totalRows = 0;
int totalInserted = 0;
int totalUpdated = 0;
int batchCount = 0;
bool tempTableCreated = false;
try
{
// Create temp table with same schema as destination
await CreateTempTableAsync(connection, effectiveTempTable, destinationTable, cancellationToken);
tempTableCreated = true;
// Build MERGE SQL once
var mergeSql = MergeSqlBuilder.BuildMergeSimple(
destinationTable,
effectiveTempTable,
matchColumnNames,
updateColumnNames,
updateWhenSql,
insertColumnNames);
// Process data in batches
var batch = new List<T>(effectiveBatchSize);
await foreach (var item in data.WithCancellation(cancellationToken))
{
batch.Add(item);
if (batch.Count >= effectiveBatchSize)
{
batchCount++;
var (inserted, updated) = await ProcessBatchAsync(
connection, batch, effectiveTempTable, destinationTable,
mergeSql, schema, batchCount, cancellationToken);
totalRows += batch.Count;
totalInserted += inserted;
totalUpdated += updated;
batch.Clear();
}
}
// Process remaining items
if (batch.Count > 0)
{
batchCount++;
var (inserted, updated) = await ProcessBatchAsync(
connection, batch, effectiveTempTable, destinationTable,
mergeSql, schema, batchCount, cancellationToken);
totalRows += batch.Count;
totalInserted += inserted;
totalUpdated += updated;
}
stopwatch.Stop();
_logger.LogInformation(
"Bulk merge to {DestinationTable} completed. TotalRows={TotalRows}, Inserted={Inserted}, Updated={Updated}, Batches={Batches}, Elapsed={Elapsed}ms",
destinationTable, totalRows, totalInserted, totalUpdated, batchCount, stopwatch.ElapsedMilliseconds);
return new MergeResult(totalRows, totalInserted, totalUpdated, batchCount, stopwatch.Elapsed);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogError(ex, "Bulk merge to {DestinationTable} failed after {Batches} batches and {TotalRows} rows",
destinationTable, batchCount, totalRows);
throw;
}
finally
{
// Always try to drop the temp table
if (tempTableCreated)
{
try
{
await DropTempTableAsync(connection, effectiveTempTable, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to drop temp table {TempTable}", effectiveTempTable);
}
}
}
}
private async Task<(int Inserted, int Updated)> ProcessBatchAsync<T>(
SqlConnection connection,
IReadOnlyList<T> batch,
string tempTableName,
string destinationTable,
string mergeSql,
IReadOnlyList<ColumnSchema>? schema,
int batchNumber,
CancellationToken cancellationToken) where T : class
{
try
{
// Validate if schema available
if (schema != null)
{
var errors = _schemaValidator.ValidateBatch(batch, schema);
if (errors.Count > 0)
{
throw new BulkMergeValidationException(
$"Validation failed for batch {batchNumber} with {errors.Count} errors.",
errors)
{
TableName = destinationTable,
BatchNumber = batchNumber,
RowsInBatch = batch.Count
};
}
}
// Bulk copy to temp table
await BulkCopyAsync(connection, batch, tempTableName, cancellationToken);
// Execute MERGE
var rowsAffected = await connection.ExecuteAsync(
new CommandDefinition(mergeSql, cancellationToken: cancellationToken));
// Truncate temp table for next batch
await connection.ExecuteAsync(
new CommandDefinition(
MergeSqlBuilder.BuildTruncateTempTable(tempTableName),
cancellationToken: cancellationToken));
_logger.LogDebug(
"Batch {BatchNumber} completed. Rows={RowCount}, Affected={RowsAffected}",
batchNumber, batch.Count, rowsAffected);
// Note: The simple MERGE doesn't track insert vs update counts
// We return affected rows as inserts for now - could enhance MERGE to track this
return (rowsAffected, 0);
}
catch (SqlException ex)
{
throw new BulkMergeException(
$"SQL error during batch {batchNumber}: {ex.Message}", ex)
{
TableName = destinationTable,
BatchNumber = batchNumber,
RowsInBatch = batch.Count,
SqlStatement = mergeSql
};
}
}
private async Task BulkCopyAsync<T>(
SqlConnection connection,
IReadOnlyList<T> batch,
string tempTableName,
CancellationToken cancellationToken) where T : class
{
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = tempTableName,
BatchSize = batch.Count
};
// Create reader from the batch list
using var reader = _dataReaderFactory.CreateReader(batch.ToAsyncEnumerable());
await bulkCopy.WriteToServerAsync(reader, cancellationToken);
}
private static async Task CreateTempTableAsync(
SqlConnection connection,
string tempTableName,
string sourceTableName,
CancellationToken cancellationToken)
{
var sql = MergeSqlBuilder.BuildCreateTempTable(tempTableName, sourceTableName);
await connection.ExecuteAsync(new CommandDefinition(sql, cancellationToken: cancellationToken));
}
private static async Task DropTempTableAsync(
SqlConnection connection,
string tempTableName,
CancellationToken cancellationToken)
{
var sql = MergeSqlBuilder.BuildDropTempTable(tempTableName);
await connection.ExecuteAsync(new CommandDefinition(sql, cancellationToken: cancellationToken));
}
/// <inheritdoc />
public async Task<MassInsertResult> MassInsertAsync<T>(
IAsyncEnumerable<T> data,
string destinationTable,
bool rebuildIndexes = true,
int batchSize = 0,
CancellationToken cancellationToken = default) where T : class
{
ArgumentNullException.ThrowIfNull(data);
ArgumentException.ThrowIfNullOrWhiteSpace(destinationTable);
var stopwatch = Stopwatch.StartNew();
var effectiveBatchSize = batchSize > 0 ? batchSize : DefaultBatchSize;
_logger.LogInformation(
"Starting mass insert to {DestinationTable}. BatchSize={BatchSize}, RebuildIndexes={RebuildIndexes}",
destinationTable, effectiveBatchSize, rebuildIndexes);
await using var connection = await _connectionFactory.CreateLotFinderConnectionAsync(cancellationToken);
long totalRows = 0;
try
{
// Step 1: Disable non-clustered indexes
await DisableNonClusteredIndexesAsync(connection, destinationTable, cancellationToken);
// Step 2: Truncate the table
await connection.ExecuteAsync(
new CommandDefinition($"TRUNCATE TABLE [{destinationTable}]", cancellationToken: cancellationToken));
_logger.LogDebug("Truncated table {DestinationTable}", destinationTable);
// Step 3: Bulk copy data in batches
var batch = new List<T>(effectiveBatchSize);
await foreach (var item in data.WithCancellation(cancellationToken))
{
batch.Add(item);
if (batch.Count >= effectiveBatchSize)
{
await BulkCopyDirectAsync(connection, batch, destinationTable, cancellationToken);
totalRows += batch.Count;
_logger.LogDebug("Inserted batch of {Count} rows to {Table}", batch.Count, destinationTable);
batch.Clear();
}
}
// Insert remaining rows
if (batch.Count > 0)
{
await BulkCopyDirectAsync(connection, batch, destinationTable, cancellationToken);
totalRows += batch.Count;
_logger.LogDebug("Inserted final batch of {Count} rows to {Table}", batch.Count, destinationTable);
}
// Step 4: Rebuild or re-enable indexes
if (rebuildIndexes)
{
await RebuildIndexesAsync(connection, destinationTable, cancellationToken);
}
else
{
await EnableNonClusteredIndexesAsync(connection, destinationTable, cancellationToken);
}
stopwatch.Stop();
_logger.LogInformation(
"Mass insert to {DestinationTable} completed. TotalRows={TotalRows}, Elapsed={Elapsed}ms",
destinationTable, totalRows, stopwatch.ElapsedMilliseconds);
return new MassInsertResult(totalRows, stopwatch.Elapsed, rebuildIndexes);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogError(ex, "Mass insert to {DestinationTable} failed after {TotalRows} rows",
destinationTable, totalRows);
// Try to re-enable indexes even on failure
try
{
await EnableNonClusteredIndexesAsync(connection, destinationTable, CancellationToken.None);
}
catch (Exception indexEx)
{
_logger.LogWarning(indexEx, "Failed to re-enable indexes on {Table} after error", destinationTable);
}
throw;
}
}
private async Task BulkCopyDirectAsync<T>(
SqlConnection connection,
IReadOnlyList<T> batch,
string destinationTable,
CancellationToken cancellationToken) where T : class
{
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = $"[{destinationTable}]",
BatchSize = batch.Count,
BulkCopyTimeout = 3600
};
using var reader = _dataReaderFactory.CreateReader(batch.ToAsyncEnumerable());
await bulkCopy.WriteToServerAsync(reader, cancellationToken);
}
private static async Task DisableNonClusteredIndexesAsync(
SqlConnection connection,
string tableName,
CancellationToken cancellationToken)
{
var sql = $@"
DECLARE @sql NVARCHAR(MAX) = '';
SELECT @sql = @sql + 'ALTER INDEX [' + i.name + '] ON [{tableName}] DISABLE;' + CHAR(13)
FROM sys.indexes i
INNER JOIN sys.tables t ON i.object_id = t.object_id
WHERE t.name = @tableName
AND i.type = 2
AND i.is_disabled = 0;
EXEC sp_executesql @sql;";
await connection.ExecuteAsync(
new CommandDefinition(sql, new { tableName }, commandTimeout: 300, cancellationToken: cancellationToken));
}
private static async Task EnableNonClusteredIndexesAsync(
SqlConnection connection,
string tableName,
CancellationToken cancellationToken)
{
var sql = $@"
DECLARE @sql NVARCHAR(MAX) = '';
SELECT @sql = @sql + 'ALTER INDEX [' + i.name + '] ON [{tableName}] REBUILD;' + CHAR(13)
FROM sys.indexes i
INNER JOIN sys.tables t ON i.object_id = t.object_id
WHERE t.name = @tableName
AND i.type = 2
AND i.is_disabled = 1;
EXEC sp_executesql @sql;";
await connection.ExecuteAsync(
new CommandDefinition(sql, new { tableName }, commandTimeout: 3600, cancellationToken: cancellationToken));
}
private static async Task RebuildIndexesAsync(
SqlConnection connection,
string tableName,
CancellationToken cancellationToken)
{
var sql = $"ALTER INDEX ALL ON [{tableName}] REBUILD WITH (FILLFACTOR = 95)";
await connection.ExecuteAsync(
new CommandDefinition(sql, commandTimeout: 3600, cancellationToken: cancellationToken));
}
}
@@ -1,192 +0,0 @@
using System.Linq.Expressions;
using System.Reflection;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Parses LINQ expressions to extract column names and build SQL clauses.
/// </summary>
internal static class ExpressionParser
{
/// <summary>
/// Extracts column names from an expression like: x => new { x.A, x.B } or x => x.Id
/// </summary>
public static IReadOnlyList<string> GetColumnNames<T>(Expression<Func<T, object>> expression)
{
ArgumentNullException.ThrowIfNull(expression);
var body = expression.Body;
// Handle conversion (boxing for value types)
if (body is UnaryExpression unary && unary.NodeType == ExpressionType.Convert)
{
body = unary.Operand;
}
return body switch
{
// Single property: x => x.Id
MemberExpression member => [GetPropertyName(member)],
// Anonymous type: x => new { x.A, x.B }
NewExpression newExpr => ExtractFromNewExpression(newExpr),
// Member init (shouldn't typically happen but handle it)
MemberInitExpression memberInit => ExtractFromNewExpression(memberInit.NewExpression),
_ => throw new ArgumentException(
$"Unsupported expression type: {body.NodeType}. " +
"Use either a single property (x => x.Id) or an anonymous type (x => new {{ x.A, x.B }}).",
nameof(expression))
};
}
/// <summary>
/// Builds a SQL condition clause from an updateWhen expression.
/// Example: (src, tgt) => src.LastUpdateDt > tgt.LastUpdateDt
/// Returns: "source.[LastUpdateDt] > target.[LastUpdateDt]"
/// </summary>
public static string? BuildUpdateWhenSql<T>(
Expression<Func<T, T, bool>>? expression,
string sourceAlias = "source",
string targetAlias = "target")
{
if (expression == null)
return null;
var sourceParam = expression.Parameters[0].Name!;
var targetParam = expression.Parameters[1].Name!;
return TranslateExpression(expression.Body, sourceParam, sourceAlias, targetParam, targetAlias);
}
private static IReadOnlyList<string> ExtractFromNewExpression(NewExpression newExpr)
{
if (newExpr.Arguments.Count == 0)
{
throw new ArgumentException("Anonymous type expression must contain at least one property.");
}
var columns = new List<string>();
foreach (var arg in newExpr.Arguments)
{
if (arg is MemberExpression member)
{
columns.Add(GetPropertyName(member));
}
else
{
throw new ArgumentException(
$"Each property in the anonymous type must be a direct property access. " +
$"Found: {arg.NodeType}");
}
}
return columns;
}
private static string GetPropertyName(MemberExpression member)
{
// Ensure it's a property access on the parameter
if (member.Expression is not ParameterExpression)
{
throw new ArgumentException(
$"Only direct property access is supported. " +
$"Nested properties like x.Parent.Name are not allowed. Found: {member}");
}
if (member.Member is not PropertyInfo prop)
{
throw new ArgumentException(
$"Expression must access a property, not a field. Found: {member.Member.MemberType}");
}
return prop.Name;
}
private static string TranslateExpression(
Expression expr,
string sourceParam, string sourceAlias,
string targetParam, string targetAlias)
{
return expr switch
{
BinaryExpression binary => TranslateBinaryExpression(
binary, sourceParam, sourceAlias, targetParam, targetAlias),
MemberExpression member => TranslateMemberExpression(
member, sourceParam, sourceAlias, targetParam, targetAlias),
ConstantExpression constant => TranslateConstant(constant),
UnaryExpression unary when unary.NodeType == ExpressionType.Not =>
$"NOT ({TranslateExpression(unary.Operand, sourceParam, sourceAlias, targetParam, targetAlias)})",
UnaryExpression unary when unary.NodeType == ExpressionType.Convert =>
TranslateExpression(unary.Operand, sourceParam, sourceAlias, targetParam, targetAlias),
_ => throw new NotSupportedException($"Expression type {expr.NodeType} is not supported in updateWhen clause.")
};
}
private static string TranslateBinaryExpression(
BinaryExpression binary,
string sourceParam, string sourceAlias,
string targetParam, string targetAlias)
{
var left = TranslateExpression(binary.Left, sourceParam, sourceAlias, targetParam, targetAlias);
var right = TranslateExpression(binary.Right, sourceParam, sourceAlias, targetParam, targetAlias);
var op = binary.NodeType switch
{
ExpressionType.Equal => "=",
ExpressionType.NotEqual => "<>",
ExpressionType.GreaterThan => ">",
ExpressionType.GreaterThanOrEqual => ">=",
ExpressionType.LessThan => "<",
ExpressionType.LessThanOrEqual => "<=",
ExpressionType.AndAlso => "AND",
ExpressionType.OrElse => "OR",
_ => throw new NotSupportedException($"Binary operator {binary.NodeType} is not supported.")
};
// Wrap AND/OR with parentheses for clarity
if (binary.NodeType is ExpressionType.AndAlso or ExpressionType.OrElse)
{
return $"({left} {op} {right})";
}
return $"{left} {op} {right}";
}
private static string TranslateMemberExpression(
MemberExpression member,
string sourceParam, string sourceAlias,
string targetParam, string targetAlias)
{
if (member.Expression is not ParameterExpression param)
{
throw new NotSupportedException(
"Only direct property access on source or target parameters is supported.");
}
var alias = param.Name == sourceParam ? sourceAlias : targetAlias;
var propName = member.Member.Name;
return $"{alias}.[{propName}]";
}
private static string TranslateConstant(ConstantExpression constant)
{
if (constant.Value == null)
return "NULL";
return constant.Value switch
{
string s => $"'{s.Replace("'", "''")}'",
bool b => b ? "1" : "0",
DateTime dt => $"'{dt:yyyy-MM-dd HH:mm:ss.fff}'",
_ => constant.Value.ToString() ?? "NULL"
};
}
}
@@ -1,29 +0,0 @@
using JdeScoping.DataSync.Contracts;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Registry implementation that resolves configurations from DI.
/// </summary>
internal sealed class MergeConfigurationRegistry : IMergeConfigurationRegistry
{
private readonly IServiceProvider _serviceProvider;
public MergeConfigurationRegistry(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}
public IMergeConfiguration<T> GetConfiguration<T>() where T : class
{
var config = _serviceProvider.GetService(typeof(IMergeConfiguration<T>)) as IMergeConfiguration<T>;
return config ?? throw new InvalidOperationException(
$"No merge configuration registered for {typeof(T).Name}. " +
$"Register IMergeConfiguration<{typeof(T).Name}> in ServiceCollectionExtensions.");
}
public bool HasConfiguration<T>() where T : class
{
return _serviceProvider.GetService(typeof(IMergeConfiguration<T>)) != null;
}
}
@@ -1,199 +0,0 @@
using System.Text;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Builds SQL statements for bulk merge operations.
/// </summary>
internal static class MergeSqlBuilder
{
/// <summary>
/// Builds a SQL statement to create a temp table with the same schema as the source table.
/// </summary>
public static string BuildCreateTempTable(string tempTableName, string sourceTableName)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tempTableName);
ArgumentException.ThrowIfNullOrWhiteSpace(sourceTableName);
return $"SELECT TOP 0 * INTO [{tempTableName}] FROM [{sourceTableName}]";
}
/// <summary>
/// Builds a SQL MERGE statement.
/// </summary>
/// <param name="destinationTable">Target table name.</param>
/// <param name="tempTableName">Source temp table name.</param>
/// <param name="matchColumns">Columns to match on (primary key).</param>
/// <param name="updateColumns">Columns to update on match.</param>
/// <param name="updateWhenClause">Optional SQL condition for when to update. Example: "source.[LastUpdateDt] > target.[LastUpdateDt]"</param>
/// <param name="insertColumns">Columns to insert for new records.</param>
/// <returns>The MERGE SQL statement with output counts.</returns>
public static string BuildMerge(
string destinationTable,
string tempTableName,
IReadOnlyList<string> matchColumns,
IReadOnlyList<string> updateColumns,
string? updateWhenClause,
IReadOnlyList<string> insertColumns)
{
ArgumentException.ThrowIfNullOrWhiteSpace(destinationTable);
ArgumentException.ThrowIfNullOrWhiteSpace(tempTableName);
ArgumentNullException.ThrowIfNull(matchColumns);
ArgumentNullException.ThrowIfNull(insertColumns);
if (matchColumns.Count == 0)
throw new ArgumentException("At least one match column is required.", nameof(matchColumns));
if (insertColumns.Count == 0)
throw new ArgumentException("At least one insert column is required.", nameof(insertColumns));
var sb = new StringBuilder();
// Declare output variable to track insert vs update
sb.AppendLine("DECLARE @InsertCount INT = 0, @UpdateCount INT = 0;");
sb.AppendLine();
// MERGE statement
sb.AppendLine($"MERGE INTO [{destinationTable}] AS target");
sb.AppendLine($"USING [{tempTableName}] AS source");
// ON clause
sb.Append("ON ");
for (int i = 0; i < matchColumns.Count; i++)
{
if (i > 0)
sb.Append(" AND ");
sb.Append($"target.[{matchColumns[i]}] = source.[{matchColumns[i]}]");
}
sb.AppendLine();
// WHEN MATCHED (update)
if (updateColumns != null && updateColumns.Count > 0)
{
sb.Append("WHEN MATCHED");
if (!string.IsNullOrWhiteSpace(updateWhenClause))
{
sb.Append($" AND {updateWhenClause}");
}
sb.AppendLine(" THEN");
sb.Append(" UPDATE SET ");
for (int i = 0; i < updateColumns.Count; i++)
{
if (i > 0)
sb.Append(", ");
sb.Append($"target.[{updateColumns[i]}] = source.[{updateColumns[i]}]");
}
sb.AppendLine();
}
// WHEN NOT MATCHED (insert)
sb.AppendLine("WHEN NOT MATCHED THEN");
sb.Append(" INSERT (");
sb.Append(string.Join(", ", insertColumns.Select(c => $"[{c}]")));
sb.AppendLine(")");
sb.Append(" VALUES (");
sb.Append(string.Join(", ", insertColumns.Select(c => $"source.[{c}]")));
sb.AppendLine(")");
// OUTPUT clause to count inserts and updates
sb.AppendLine("OUTPUT $action INTO @ActionOutput;");
sb.AppendLine();
// Calculate counts
sb.AppendLine("SELECT @InsertCount = COUNT(*) FROM @ActionOutput WHERE ActionType = 'INSERT';");
sb.AppendLine("SELECT @UpdateCount = COUNT(*) FROM @ActionOutput WHERE ActionType = 'UPDATE';");
sb.AppendLine("SELECT @InsertCount AS InsertCount, @UpdateCount AS UpdateCount;");
return sb.ToString();
}
/// <summary>
/// Builds a simpler MERGE statement that returns just the affected row count.
/// </summary>
public static string BuildMergeSimple(
string destinationTable,
string tempTableName,
IReadOnlyList<string> matchColumns,
IReadOnlyList<string> updateColumns,
string? updateWhenClause,
IReadOnlyList<string> insertColumns)
{
ArgumentException.ThrowIfNullOrWhiteSpace(destinationTable);
ArgumentException.ThrowIfNullOrWhiteSpace(tempTableName);
ArgumentNullException.ThrowIfNull(matchColumns);
ArgumentNullException.ThrowIfNull(insertColumns);
if (matchColumns.Count == 0)
throw new ArgumentException("At least one match column is required.", nameof(matchColumns));
if (insertColumns.Count == 0)
throw new ArgumentException("At least one insert column is required.", nameof(insertColumns));
var sb = new StringBuilder();
// MERGE statement
sb.AppendLine($"MERGE INTO [{destinationTable}] AS target");
sb.AppendLine($"USING [{tempTableName}] AS source");
// ON clause
sb.Append("ON ");
for (int i = 0; i < matchColumns.Count; i++)
{
if (i > 0)
sb.Append(" AND ");
sb.Append($"target.[{matchColumns[i]}] = source.[{matchColumns[i]}]");
}
sb.AppendLine();
// WHEN MATCHED (update)
if (updateColumns != null && updateColumns.Count > 0)
{
sb.Append("WHEN MATCHED");
if (!string.IsNullOrWhiteSpace(updateWhenClause))
{
sb.Append($" AND {updateWhenClause}");
}
sb.AppendLine(" THEN");
sb.Append(" UPDATE SET ");
for (int i = 0; i < updateColumns.Count; i++)
{
if (i > 0)
sb.Append(", ");
sb.Append($"target.[{updateColumns[i]}] = source.[{updateColumns[i]}]");
}
sb.AppendLine();
}
// WHEN NOT MATCHED (insert)
sb.AppendLine("WHEN NOT MATCHED THEN");
sb.Append(" INSERT (");
sb.Append(string.Join(", ", insertColumns.Select(c => $"[{c}]")));
sb.AppendLine(")");
sb.Append(" VALUES (");
sb.Append(string.Join(", ", insertColumns.Select(c => $"source.[{c}]")));
sb.Append(")");
sb.AppendLine(";");
return sb.ToString();
}
/// <summary>
/// Builds a SQL statement to truncate a temp table.
/// </summary>
public static string BuildTruncateTempTable(string tempTableName)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tempTableName);
return $"TRUNCATE TABLE [{tempTableName}]";
}
/// <summary>
/// Builds a SQL statement to drop a temp table.
/// </summary>
public static string BuildDropTempTable(string tempTableName)
{
ArgumentException.ThrowIfNullOrWhiteSpace(tempTableName);
return $"IF OBJECT_ID('tempdb..{tempTableName}') IS NOT NULL DROP TABLE [{tempTableName}]";
}
}
@@ -1,35 +0,0 @@
using JdeScoping.Core.Interfaces;
using JdeScoping.DataSync.Contracts;
using Microsoft.Extensions.Logging;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Post-processor for MIS data to set obsolete dates.
/// </summary>
public class MisDataPostProcessor : IPostProcessor
{
private readonly ILotFinderRepository _repository;
private readonly ILogger<MisDataPostProcessor> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="MisDataPostProcessor"/> class.
/// </summary>
public MisDataPostProcessor(
ILotFinderRepository repository,
ILogger<MisDataPostProcessor> logger)
{
_repository = repository ?? throw new ArgumentNullException(nameof(repository));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc/>
public async Task ProcessAsync(string tableName, CancellationToken cancellationToken = default)
{
_logger.LogDebug("Running MIS data post-processing for {Table}", tableName);
await _repository.PostProcessMisDataAsync(cancellationToken);
_logger.LogDebug("MIS data post-processing completed for {Table}", tableName);
}
}
@@ -1,223 +0,0 @@
using System.Data;
using System.Reflection;
using Dapper;
using JdeScoping.DataSync.Contracts;
using JdeScoping.DataSync.Exceptions;
using JdeScoping.DataSync.Models;
namespace JdeScoping.DataSync.Services;
/// <summary>
/// Validates data against database table schema.
/// </summary>
internal sealed class SchemaValidator : ISchemaValidator
{
/// <inheritdoc />
public async Task<IReadOnlyList<ColumnSchema>> GetTableSchemaAsync(
IDbConnection connection,
string tableName,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(connection);
ArgumentException.ThrowIfNullOrWhiteSpace(tableName);
// Parse schema and table name
var (schemaName, table) = ParseTableName(tableName);
const string sql = """
SELECT
COLUMN_NAME AS Name,
DATA_TYPE AS DataType,
CHARACTER_MAXIMUM_LENGTH AS MaxLength,
NUMERIC_PRECISION AS [Precision],
NUMERIC_SCALE AS Scale,
CASE WHEN IS_NULLABLE = 'YES' THEN 1 ELSE 0 END AS IsNullable,
ORDINAL_POSITION AS OrdinalPosition
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @Schema AND TABLE_NAME = @Table
ORDER BY ORDINAL_POSITION
""";
var columns = await connection.QueryAsync<ColumnSchema>(
new CommandDefinition(
sql,
new { Schema = schemaName, Table = table },
cancellationToken: cancellationToken));
return columns.ToList();
}
/// <inheritdoc />
public IReadOnlyList<ValidationError> ValidateBatch<T>(
IReadOnlyList<T> data,
IReadOnlyList<ColumnSchema> schema,
int maxErrors = 100) where T : class
{
ArgumentNullException.ThrowIfNull(data);
ArgumentNullException.ThrowIfNull(schema);
if (data.Count == 0 || schema.Count == 0)
return [];
var errors = new List<ValidationError>();
var schemaLookup = schema.ToDictionary(s => s.Name, StringComparer.OrdinalIgnoreCase);
var properties = typeof(T).GetProperties(BindingFlags.Public | BindingFlags.Instance)
.ToDictionary(p => p.Name, StringComparer.OrdinalIgnoreCase);
for (int rowIndex = 0; rowIndex < data.Count; rowIndex++)
{
var row = data[rowIndex];
if (row == null) continue;
foreach (var column in schema)
{
if (!properties.TryGetValue(column.Name, out var property))
continue;
var value = property.GetValue(row);
var error = ValidateValue(rowIndex, column, value);
if (error != null)
{
errors.Add(error);
if (maxErrors > 0 && errors.Count >= maxErrors)
return errors;
}
}
}
return errors;
}
private static ValidationError? ValidateValue(int rowIndex, ColumnSchema column, object? value)
{
// Check nullability
if (value == null || (value is string s && string.IsNullOrEmpty(s)))
{
if (!column.IsNullable && !IsIdentityOrComputed(column))
{
return new ValidationError(
rowIndex,
column.Name,
value,
$"Column '{column.Name}' does not allow null values.");
}
return null;
}
// Check string length
if (column.MaxLength.HasValue && column.MaxLength.Value > 0)
{
var stringValue = value as string ?? value.ToString();
if (stringValue != null && stringValue.Length > column.MaxLength.Value)
{
return new ValidationError(
rowIndex,
column.Name,
value,
$"Value for '{column.Name}' exceeds maximum length of {column.MaxLength}. Actual length: {stringValue.Length}.");
}
}
// Check decimal precision/scale
if (column.Precision.HasValue && column.Scale.HasValue && IsDecimalType(column.DataType))
{
if (value is decimal decimalValue)
{
var error = ValidateDecimal(rowIndex, column, decimalValue);
if (error != null)
return error;
}
else if (value is double || value is float)
{
// Convert to decimal for validation
try
{
var decVal = Convert.ToDecimal(value);
var error = ValidateDecimal(rowIndex, column, decVal);
if (error != null)
return error;
}
catch
{
return new ValidationError(
rowIndex,
column.Name,
value,
$"Value for '{column.Name}' cannot be converted to decimal.");
}
}
}
return null;
}
private static ValidationError? ValidateDecimal(int rowIndex, ColumnSchema column, decimal value)
{
// Calculate actual precision and scale
var sqlString = value.ToString(System.Globalization.CultureInfo.InvariantCulture);
var parts = sqlString.TrimStart('-').Split('.');
var integerDigits = parts[0].TrimStart('0').Length;
var decimalDigits = parts.Length > 1 ? parts[1].TrimEnd('0').Length : 0;
// Handle zero case
if (integerDigits == 0 && decimalDigits == 0)
integerDigits = 1;
var totalDigits = integerDigits + decimalDigits;
var maxTotalDigits = column.Precision!.Value;
var maxDecimalDigits = column.Scale!.Value;
var maxIntegerDigits = maxTotalDigits - maxDecimalDigits;
if (integerDigits > maxIntegerDigits)
{
return new ValidationError(
rowIndex,
column.Name,
value,
$"Value {value} for '{column.Name}' exceeds maximum integer digits. " +
$"Maximum: {maxIntegerDigits}, Actual: {integerDigits}.");
}
// Note: We don't validate decimal digits as SQL Server will truncate/round them
return null;
}
private static bool IsDecimalType(string dataType)
{
return dataType.Equals("decimal", StringComparison.OrdinalIgnoreCase) ||
dataType.Equals("numeric", StringComparison.OrdinalIgnoreCase) ||
dataType.Equals("money", StringComparison.OrdinalIgnoreCase) ||
dataType.Equals("smallmoney", StringComparison.OrdinalIgnoreCase);
}
private static bool IsIdentityOrComputed(ColumnSchema column)
{
// Identity columns and computed columns can be null in the source data
// This is a simple heuristic - actual identity check would require additional metadata
return column.Name.Equals("Id", StringComparison.OrdinalIgnoreCase) ||
column.Name.EndsWith("Id", StringComparison.OrdinalIgnoreCase);
}
private static (string Schema, string Table) ParseTableName(string tableName)
{
// Remove brackets if present
var cleaned = tableName.Replace("[", "").Replace("]", "");
// Check for temp table
if (cleaned.StartsWith('#'))
{
return ("dbo", cleaned);
}
// Split by dot
var parts = cleaned.Split('.');
return parts.Length switch
{
1 => ("dbo", parts[0]),
2 => (parts[0], parts[1]),
_ => throw new ArgumentException($"Invalid table name format: {tableName}")
};
}
}